Skip to content

Commit

Permalink
Made reject ooo samples in agent db configurable
Browse files Browse the repository at this point in the history
Signed-off-by: Sebastian Rabenhorst <[email protected]>
  • Loading branch information
rabenhorst committed May 13, 2024
1 parent 3b8b577 commit 58eb977
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 0 deletions.
6 changes: 6 additions & 0 deletions cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,9 @@ func main() {
agentOnlyFlag(a, "storage.agent.no-lockfile", "Do not create lockfile in data directory.").
Default("false").BoolVar(&cfg.agent.NoLockfile)

agentOnlyFlag(a, "storage.agent.reject-out-of-order-samples", "Reject out-of-order samples.").
Default("false").BoolVar(&cfg.agent.RejectOOOSamples)

a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload.").
Default("1m").PlaceHolder("<duration>").SetValue(&cfg.RemoteFlushDeadline)

Expand Down Expand Up @@ -1215,6 +1218,7 @@ func main() {
"TruncateFrequency", cfg.agent.TruncateFrequency,
"MinWALTime", cfg.agent.MinWALTime,
"MaxWALTime", cfg.agent.MaxWALTime,
"RejectOOOSamples", cfg.agent.RejectOOOSamples,
)

localStorage.Set(db, 0)
Expand Down Expand Up @@ -1707,6 +1711,7 @@ type agentOptions struct {
TruncateFrequency model.Duration
MinWALTime, MaxWALTime model.Duration
NoLockfile bool
RejectOOOSamples bool
}

func (opts agentOptions) ToAgentOptions() agent.Options {
Expand All @@ -1718,6 +1723,7 @@ func (opts agentOptions) ToAgentOptions() agent.Options {
MinWALTime: durationToInt64Millis(time.Duration(opts.MinWALTime)),
MaxWALTime: durationToInt64Millis(time.Duration(opts.MaxWALTime)),
NoLockfile: opts.NoLockfile,
RejectOOOSamples: opts.RejectOOOSamples,
}
}

Expand Down
1 change: 1 addition & 0 deletions docs/command-line/prometheus.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ The Prometheus monitoring server
| <code class="text-nowrap">--storage.agent.retention.min-time</code> | Minimum age samples may be before being considered for deletion when the WAL is truncated Use with agent mode only. | |
| <code class="text-nowrap">--storage.agent.retention.max-time</code> | Maximum age samples may be before being forcibly deleted when the WAL is truncated Use with agent mode only. | |
| <code class="text-nowrap">--storage.agent.no-lockfile</code> | Do not create lockfile in data directory. Use with agent mode only. | `false` |
| <code class="text-nowrap">--storage.agent.reject-out-of-order-samples</code> | Reject out-of-order samples. Use with agent mode only. | `false` |
| <code class="text-nowrap">--storage.remote.flush-deadline</code> | How long to wait flushing sample on shutdown or config reload. | `1m` |
| <code class="text-nowrap">--storage.remote.read-sample-limit</code> | Maximum overall number of samples to return via the remote read interface, in a single query. 0 means no limit. This limit is ignored for streamed response types. Use with server mode only. | `5e7` |
| <code class="text-nowrap">--storage.remote.read-concurrent-limit</code> | Maximum number of concurrent remote read calls. 0 means no limit. Use with server mode only. | `10` |
Expand Down
14 changes: 14 additions & 0 deletions tsdb/agent/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ type Options struct {

// NoLockfile disables creation and consideration of a lock file.
NoLockfile bool

// RejectOOOSamples enables rejecting out of order samples.
RejectOOOSamples bool
}

// DefaultOptions used for the WAL storage. They are reasonable for setups using
Expand All @@ -94,6 +97,7 @@ func DefaultOptions() *Options {
MinWALTime: DefaultMinWALTime,
MaxWALTime: DefaultMaxWALTime,
NoLockfile: false,
RejectOOOSamples: false,
}
}

Expand Down Expand Up @@ -812,6 +816,11 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo
series.Lock()
defer series.Unlock()

if a.opts.RejectOOOSamples && t < series.lastTs {
a.metrics.totalOutOfOrderSamples.Inc()
return 0, storage.ErrOutOfOrderSample
}

// NOTE: always modify pendingSamples and sampleSeries together.
a.pendingSamples = append(a.pendingSamples, record.RefSample{
Ref: series.ref,
Expand Down Expand Up @@ -935,6 +944,11 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int
series.Lock()
defer series.Unlock()

if a.opts.RejectOOOSamples && t < series.lastTs {
a.metrics.totalOutOfOrderSamples.Inc()
return 0, storage.ErrOutOfOrderSample
}

switch {
case h != nil:
// NOTE: always modify pendingHistograms and histogramSeries together
Expand Down
29 changes: 29 additions & 0 deletions tsdb/agent/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,35 @@ func TestDBAllowOOOSamples(t *testing.T) {
require.NoError(t, db.Close())
}

func TestDBRejectOOOSamples(t *testing.T) {
reg := prometheus.NewRegistry()
opts := DefaultOptions()
opts.RejectOOOSamples = true
s := createTestAgentDB(t, reg, opts)
app := s.Appender(context.TODO())

lbls := labelsForTest(t.Name()+"_histogram", 1)
lset := labels.New(lbls[0]...)
_, err := app.AppendHistogram(0, lset, 1, tsdbutil.GenerateTestHistograms(1)[0], nil)

Check failure on line 891 in tsdb/agent/db_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

ineffectual assignment to err (ineffassign)
err = app.Commit()
require.NoError(t, err)
_, err = app.AppendHistogram(0, lset, 0, tsdbutil.GenerateTestHistograms(1)[0], nil)
require.ErrorIs(t, err, storage.ErrOutOfOrderSample, "should reject OOO samples")

lbls = labelsForTest(t.Name()+"_histogram", 1)
lset = labels.New(lbls[0]...)
_, err = app.Append(0, lset, 1, 0)

Check failure on line 899 in tsdb/agent/db_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

ineffectual assignment to err (ineffassign)
err = app.Commit()
require.NoError(t, err)
_, err = app.Append(0, lset, 0, 0)
require.ErrorIs(t, err, storage.ErrOutOfOrderSample, "should reject OOO samples")

m := gatherFamily(t, reg, "prometheus_agent_samples_appended_total")
require.Equal(t, float64(1), m.Metric[0].Counter.GetValue(), "agent wal mismatch of total appended samples")
require.Equal(t, float64(1), m.Metric[1].Counter.GetValue(), "agent wal mismatch of total appended histograms")
require.NoError(t, s.Close())
}

func BenchmarkCreateSeries(b *testing.B) {
s := createTestAgentDB(b, nil, DefaultOptions())
defer s.Close()
Expand Down

0 comments on commit 58eb977

Please sign in to comment.