From 160354b7cac0be5eb4929bc07e93eb489cf93357 Mon Sep 17 00:00:00 2001 From: Sebastian Rabenhorst Date: Fri, 17 May 2024 19:24:36 +0200 Subject: [PATCH] Use OutOfOrderTimeWindow --- cmd/prometheus/main.go | 6 +-- docs/command-line/prometheus.md | 1 - tsdb/agent/db.go | 41 ++++++++++++----- tsdb/agent/db_test.go | 79 ++++++++++++++++++++++----------- 4 files changed, 84 insertions(+), 43 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 52c038238679..2d2df7630331 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -397,9 +397,6 @@ func main() { agentOnlyFlag(a, "storage.agent.no-lockfile", "Do not create lockfile in data directory."). Default("false").BoolVar(&cfg.agent.NoLockfile) - agentOnlyFlag(a, "storage.agent.reject-out-of-order-samples", "Reject out-of-order samples."). - Default("false").BoolVar(&cfg.agent.RejectOOOSamples) - a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload."). Default("1m").PlaceHolder("").SetValue(&cfg.RemoteFlushDeadline) @@ -1183,6 +1180,7 @@ func main() { if agentMode { // WAL storage. opts := cfg.agent.ToAgentOptions() + opts.SetOutOfOrderTimeWindow(cfg.tsdb.OutOfOrderTimeWindow) cancel := make(chan struct{}) g.Add( func() error { @@ -1712,6 +1710,7 @@ type agentOptions struct { MinWALTime, MaxWALTime model.Duration NoLockfile bool RejectOOOSamples bool + OutOfOrderTimeWindow int64 } func (opts agentOptions) ToAgentOptions() agent.Options { @@ -1723,7 +1722,6 @@ func (opts agentOptions) ToAgentOptions() agent.Options { MinWALTime: durationToInt64Millis(time.Duration(opts.MinWALTime)), MaxWALTime: durationToInt64Millis(time.Duration(opts.MaxWALTime)), NoLockfile: opts.NoLockfile, - RejectOOOSamples: opts.RejectOOOSamples, } } diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md index a5c9ac9daaf4..93eaf251d0b7 100644 --- a/docs/command-line/prometheus.md +++ b/docs/command-line/prometheus.md @@ -41,7 +41,6 @@ The Prometheus monitoring server | --storage.agent.retention.min-time | Minimum age samples may be before being considered for deletion when the WAL is truncated Use with agent mode only. | | | --storage.agent.retention.max-time | Maximum age samples may be before being forcibly deleted when the WAL is truncated Use with agent mode only. | | | --storage.agent.no-lockfile | Do not create lockfile in data directory. Use with agent mode only. | `false` | -| --storage.agent.reject-out-of-order-samples | Reject out-of-order samples. Use with agent mode only. | `false` | | --storage.remote.flush-deadline | How long to wait flushing sample on shutdown or config reload. | `1m` | | --storage.remote.read-sample-limit | Maximum overall number of samples to return via the remote read interface, in a single query. 0 means no limit. This limit is ignored for streamed response types. Use with server mode only. | `5e7` | | --storage.remote.read-concurrent-limit | Maximum number of concurrent remote read calls. 0 means no limit. Use with server mode only. | `10` | diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 41bac430ee69..1fc7b3b42536 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -82,22 +82,29 @@ type Options struct { // NoLockfile disables creation and consideration of a lock file. NoLockfile bool - // RejectOOOSamples enables rejecting out of order samples. - RejectOOOSamples bool + // OutOfOrderTimeWindow specifies how much out of order is allowed, if any. + OutOfOrderTimeWindow int64 +} + +func (o *Options) SetOutOfOrderTimeWindow(outOfOrderTimeWindow int64) { + if outOfOrderTimeWindow < 0 { + return + } + o.OutOfOrderTimeWindow = outOfOrderTimeWindow } // DefaultOptions used for the WAL storage. They are reasonable for setups using // millisecond-precision timestamps. func DefaultOptions() *Options { return &Options{ - WALSegmentSize: wlog.DefaultSegmentSize, - WALCompression: wlog.CompressionNone, - StripeSize: tsdb.DefaultStripeSize, - TruncateFrequency: DefaultTruncateFrequency, - MinWALTime: DefaultMinWALTime, - MaxWALTime: DefaultMaxWALTime, - NoLockfile: false, - RejectOOOSamples: false, + WALSegmentSize: wlog.DefaultSegmentSize, + WALCompression: wlog.CompressionNone, + StripeSize: tsdb.DefaultStripeSize, + TruncateFrequency: DefaultTruncateFrequency, + MinWALTime: DefaultMinWALTime, + MaxWALTime: DefaultMaxWALTime, + NoLockfile: false, + OutOfOrderTimeWindow: 0, } } @@ -816,7 +823,7 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo series.Lock() defer series.Unlock() - if a.opts.RejectOOOSamples && t < series.lastTs { + if t <= a.minTs(series.lastTs) { a.metrics.totalOutOfOrderSamples.Inc() return 0, storage.ErrOutOfOrderSample } @@ -944,7 +951,7 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int series.Lock() defer series.Unlock() - if a.opts.RejectOOOSamples && t < series.lastTs { + if t <= a.minTs(series.lastTs) { a.metrics.totalOutOfOrderSamples.Inc() return 0, storage.ErrOutOfOrderSample } @@ -1117,3 +1124,13 @@ func (a *appender) logSeries() error { return nil } + +// mintTs returns the minimum timestamp that a sample can have +// and is needed for preventing underflow. +func (a *appender) minTs(lastTs int64) int64 { + if lastTs < math.MinInt64+a.opts.OutOfOrderTimeWindow { + return math.MinInt64 + } + + return lastTs - a.opts.OutOfOrderTimeWindow +} diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index 1029bcea7c54..267ae63b3706 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -16,6 +16,7 @@ package agent import ( "context" "fmt" + "math" "path/filepath" "strconv" "testing" @@ -761,7 +762,9 @@ func TestDBAllowOOOSamples(t *testing.T) { ) reg := prometheus.NewRegistry() - s := createTestAgentDB(t, reg, DefaultOptions()) + opts := DefaultOptions() + opts.SetOutOfOrderTimeWindow(math.MaxInt64) + s := createTestAgentDB(t, reg, opts) app := s.Appender(context.TODO()) // Let's add some samples in the [offset, offset+numDatapoints) range. @@ -880,34 +883,58 @@ func TestDBAllowOOOSamples(t *testing.T) { } func TestDBRejectOOOSamples(t *testing.T) { - reg := prometheus.NewRegistry() - opts := DefaultOptions() - opts.RejectOOOSamples = true - s := createTestAgentDB(t, reg, opts) - app := s.Appender(context.TODO()) + tc := []struct { + outOfOrderTimeWindow, firstTs, secondTs int64 + reject bool + }{ + {0, 100, 101, false}, + {0, 100, 100, true}, + {0, 100, 99, true}, + {100, 100, 1, false}, + {100, 100, 0, true}, + } + + for _, c := range tc { + t.Run(fmt.Sprintf("outOfOrderTimeWindow=%d, firstTs=%d, secondTs=%d, reject=%t", c.outOfOrderTimeWindow, c.firstTs, c.secondTs, c.reject), func(t *testing.T) { + reg := prometheus.NewRegistry() + opts := DefaultOptions() + opts.SetOutOfOrderTimeWindow(c.outOfOrderTimeWindow) + s := createTestAgentDB(t, reg, opts) + app := s.Appender(context.TODO()) + + var expectedErr error + if c.reject { + expectedErr = storage.ErrOutOfOrderSample + } - lbls := labelsForTest(t.Name()+"_histogram", 1) - lset := labels.New(lbls[0]...) - _, err := app.AppendHistogram(0, lset, 1, tsdbutil.GenerateTestHistograms(1)[0], nil) - require.NoError(t, err) - err = app.Commit() - require.NoError(t, err) - _, err = app.AppendHistogram(0, lset, 0, tsdbutil.GenerateTestHistograms(1)[0], nil) - require.ErrorIs(t, err, storage.ErrOutOfOrderSample, "should reject OOO samples") + lbls := labelsForTest(t.Name()+"_histogram", 1) + lset := labels.New(lbls[0]...) + _, err := app.AppendHistogram(0, lset, c.firstTs, tsdbutil.GenerateTestHistograms(1)[0], nil) + require.NoError(t, err) + err = app.Commit() + require.NoError(t, err) + _, err = app.AppendHistogram(0, lset, c.secondTs, tsdbutil.GenerateTestHistograms(1)[0], nil) + require.ErrorIs(t, err, expectedErr) - lbls = labelsForTest(t.Name()+"_histogram", 1) - lset = labels.New(lbls[0]...) - _, err = app.Append(0, lset, 1, 0) - require.NoError(t, err) - err = app.Commit() - require.NoError(t, err) - _, err = app.Append(0, lset, 0, 0) - require.ErrorIs(t, err, storage.ErrOutOfOrderSample, "should reject OOO samples") + lbls = labelsForTest(t.Name(), 1) + lset = labels.New(lbls[0]...) + _, err = app.Append(0, lset, c.firstTs, 0) + require.NoError(t, err) + err = app.Commit() + require.NoError(t, err) + _, err = app.Append(0, lset, c.secondTs, 0) + require.ErrorIs(t, err, expectedErr) - m := gatherFamily(t, reg, "prometheus_agent_samples_appended_total") - require.Equal(t, float64(1), m.Metric[0].Counter.GetValue(), "agent wal mismatch of total appended samples") - require.Equal(t, float64(1), m.Metric[1].Counter.GetValue(), "agent wal mismatch of total appended histograms") - require.NoError(t, s.Close()) + expectedAppendedSamples := float64(2) + if c.reject { + expectedAppendedSamples = 1 + } + m := gatherFamily(t, reg, "prometheus_agent_samples_appended_total") + require.Equal(t, expectedAppendedSamples, m.Metric[0].Counter.GetValue(), "agent wal mismatch of total appended samples") + require.Equal(t, expectedAppendedSamples, m.Metric[1].Counter.GetValue(), "agent wal mismatch of total appended histograms") + require.NoError(t, s.Close()) + }) + } } func BenchmarkCreateSeries(b *testing.B) {