From a2414ed74b09746f4e8d7854e7a43d197715da4d Mon Sep 17 00:00:00 2001 From: Sebastian Rabenhorst Date: Mon, 13 May 2024 18:44:10 +0200 Subject: [PATCH] Made reject ooo samples in agent db configurable Signed-off-by: Sebastian Rabenhorst --- cmd/prometheus/main.go | 6 ++++++ docs/command-line/prometheus.md | 1 + tsdb/agent/db.go | 14 ++++++++++++++ tsdb/agent/db_test.go | 29 +++++++++++++++++++++++++++++ 4 files changed, 50 insertions(+) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 8218ffb18d4b..52c038238679 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -397,6 +397,9 @@ func main() { agentOnlyFlag(a, "storage.agent.no-lockfile", "Do not create lockfile in data directory."). Default("false").BoolVar(&cfg.agent.NoLockfile) + agentOnlyFlag(a, "storage.agent.reject-out-of-order-samples", "Reject out-of-order samples."). + Default("false").BoolVar(&cfg.agent.RejectOOOSamples) + a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload."). Default("1m").PlaceHolder("").SetValue(&cfg.RemoteFlushDeadline) @@ -1215,6 +1218,7 @@ func main() { "TruncateFrequency", cfg.agent.TruncateFrequency, "MinWALTime", cfg.agent.MinWALTime, "MaxWALTime", cfg.agent.MaxWALTime, + "RejectOOOSamples", cfg.agent.RejectOOOSamples, ) localStorage.Set(db, 0) @@ -1707,6 +1711,7 @@ type agentOptions struct { TruncateFrequency model.Duration MinWALTime, MaxWALTime model.Duration NoLockfile bool + RejectOOOSamples bool } func (opts agentOptions) ToAgentOptions() agent.Options { @@ -1718,6 +1723,7 @@ func (opts agentOptions) ToAgentOptions() agent.Options { MinWALTime: durationToInt64Millis(time.Duration(opts.MinWALTime)), MaxWALTime: durationToInt64Millis(time.Duration(opts.MaxWALTime)), NoLockfile: opts.NoLockfile, + RejectOOOSamples: opts.RejectOOOSamples, } } diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md index 93eaf251d0b7..a5c9ac9daaf4 100644 --- a/docs/command-line/prometheus.md +++ b/docs/command-line/prometheus.md @@ -41,6 +41,7 @@ The Prometheus monitoring server | --storage.agent.retention.min-time | Minimum age samples may be before being considered for deletion when the WAL is truncated Use with agent mode only. | | | --storage.agent.retention.max-time | Maximum age samples may be before being forcibly deleted when the WAL is truncated Use with agent mode only. | | | --storage.agent.no-lockfile | Do not create lockfile in data directory. Use with agent mode only. | `false` | +| --storage.agent.reject-out-of-order-samples | Reject out-of-order samples. Use with agent mode only. | `false` | | --storage.remote.flush-deadline | How long to wait flushing sample on shutdown or config reload. | `1m` | | --storage.remote.read-sample-limit | Maximum overall number of samples to return via the remote read interface, in a single query. 0 means no limit. This limit is ignored for streamed response types. Use with server mode only. | `5e7` | | --storage.remote.read-concurrent-limit | Maximum number of concurrent remote read calls. 0 means no limit. Use with server mode only. | `10` | diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 513c2ed5a339..41bac430ee69 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -81,6 +81,9 @@ type Options struct { // NoLockfile disables creation and consideration of a lock file. NoLockfile bool + + // RejectOOOSamples enables rejecting out of order samples. + RejectOOOSamples bool } // DefaultOptions used for the WAL storage. They are reasonable for setups using @@ -94,6 +97,7 @@ func DefaultOptions() *Options { MinWALTime: DefaultMinWALTime, MaxWALTime: DefaultMaxWALTime, NoLockfile: false, + RejectOOOSamples: false, } } @@ -812,6 +816,11 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo series.Lock() defer series.Unlock() + if a.opts.RejectOOOSamples && t < series.lastTs { + a.metrics.totalOutOfOrderSamples.Inc() + return 0, storage.ErrOutOfOrderSample + } + // NOTE: always modify pendingSamples and sampleSeries together. a.pendingSamples = append(a.pendingSamples, record.RefSample{ Ref: series.ref, @@ -935,6 +944,11 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int series.Lock() defer series.Unlock() + if a.opts.RejectOOOSamples && t < series.lastTs { + a.metrics.totalOutOfOrderSamples.Inc() + return 0, storage.ErrOutOfOrderSample + } + switch { case h != nil: // NOTE: always modify pendingHistograms and histogramSeries together diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index a7dae322085a..c37728bb4e90 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -879,6 +879,35 @@ func TestDBAllowOOOSamples(t *testing.T) { require.NoError(t, db.Close()) } +func TestDBRejectOOOSamples(t *testing.T) { + reg := prometheus.NewRegistry() + opts := DefaultOptions() + opts.RejectOOOSamples = true + s := createTestAgentDB(t, reg, opts) + app := s.Appender(context.TODO()) + + lbls := labelsForTest(t.Name()+"_histogram", 1) + lset := labels.New(lbls[0]...) + _, err := app.AppendHistogram(0, lset, 1, tsdbutil.GenerateTestHistograms(1)[0], nil) + err = app.Commit() + require.NoError(t, err) + _, err = app.AppendHistogram(0, lset, 0, tsdbutil.GenerateTestHistograms(1)[0], nil) + require.ErrorIs(t, err, storage.ErrOutOfOrderSample, "should reject OOO samples") + + lbls = labelsForTest(t.Name()+"_histogram", 1) + lset = labels.New(lbls[0]...) + _, err = app.Append(0, lset, 1, 0) + err = app.Commit() + require.NoError(t, err) + _, err = app.Append(0, lset, 0, 0) + require.ErrorIs(t, err, storage.ErrOutOfOrderSample, "should reject OOO samples") + + m := gatherFamily(t, reg, "prometheus_agent_samples_appended_total") + require.Equal(t, float64(1), m.Metric[0].Counter.GetValue(), "agent wal mismatch of total appended samples") + require.Equal(t, float64(1), m.Metric[1].Counter.GetValue(), "agent wal mismatch of total appended histograms") + require.NoError(t, s.Close()) +} + func BenchmarkCreateSeries(b *testing.B) { s := createTestAgentDB(b, nil, DefaultOptions()) defer s.Close()