Skip to content

Commit

Permalink
Flamechart added time_nanos to handle timestamps at nanos precision (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
manojVivek authored Jan 22, 2025
1 parent 493e117 commit 6b8807d
Show file tree
Hide file tree
Showing 12 changed files with 18,791 additions and 18,761 deletions.
18,736 changes: 9,368 additions & 9,368 deletions pkg/ingester/testdata/ingest_arrow.json

Large diffs are not rendered by default.

18,736 changes: 9,368 additions & 9,368 deletions pkg/ingester/testdata/ingest_uncompressed_arrow.json

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions pkg/normalizer/normalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func MetaFromPprof(p *pprofpb.Profile, name string, sampleIndex int) profile.Met
return profile.Meta{
Name: name,
Timestamp: p.TimeNanos / time.Millisecond.Nanoseconds(),
TimeNanos: p.TimeNanos,
Duration: p.DurationNanos,
Period: p.Period,
PeriodType: periodType,
Expand All @@ -106,6 +107,7 @@ func MetaFromOtelProfile(p *pprofextended.Profile, name string, sampleIndex int,
return profile.Meta{
Name: name,
Timestamp: p.TimeNanos / time.Millisecond.Nanoseconds(),
TimeNanos: p.TimeNanos,
Duration: duration,
Period: p.Period,
PeriodType: periodType,
Expand Down Expand Up @@ -273,6 +275,17 @@ func WriteRawRequestToArrowRecord(
}
}
}
case profile.ColumnTimeNanos:
cBuilder := b.Field(b.Schema().FieldIndices(col.Name)[0]).(*array.Int64Builder)
for _, series := range normalizedRequest.Series {
for _, sample := range series.Samples {
for _, p := range sample {
for range p.Samples {
cBuilder.Append(p.Meta.TimeNanos)
}
}
}
}
case profile.ColumnValue:
cBuilder := b.Field(b.Schema().FieldIndices(col.Name)[0]).(*array.Int64Builder)
for _, series := range normalizedRequest.Series {
Expand Down Expand Up @@ -662,6 +675,9 @@ func SampleToParquetRow(
case profile.ColumnTimestamp:
row = append(row, parquet.ValueOf(meta.Timestamp).Level(0, 0, columnIndex))
columnIndex++
case profile.ColumnTimeNanos:
row = append(row, parquet.ValueOf(meta.TimeNanos).Level(0, 0, columnIndex))
columnIndex++
case profile.ColumnValue:
row = append(row, parquet.ValueOf(s.Value).Level(0, 0, columnIndex))
columnIndex++
Expand Down
2 changes: 2 additions & 0 deletions pkg/profile/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ type Meta struct {
PeriodType ValueType
SampleType ValueType
Timestamp int64
TimeNanos int64
Duration int64
Period int64
}
Expand All @@ -182,6 +183,7 @@ func MetaFromPprof(p *pprofproto.Profile, name string, sampleIndex int) Meta {
return Meta{
Name: name,
Timestamp: p.TimeNanos / time.Millisecond.Nanoseconds(),
TimeNanos: p.TimeNanos,
Duration: p.DurationNanos,
Period: p.Period,
PeriodType: periodType,
Expand Down
6 changes: 3 additions & 3 deletions pkg/profile/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type RecordReader struct {
Locations *array.List
Location *array.Struct
Address *array.Uint64
Timestamp *array.Int64
TimeNanos *array.Int64
Duration *array.Int64
MappingStart *array.Uint64
MappingLimit *array.Uint64
Expand Down Expand Up @@ -122,7 +122,7 @@ func NewRecordReader(ar arrow.Record) *RecordReader {
lineFunctionStartLine := line.Field(4).(*array.Int64)
valueColumn := ar.Column(labelNum + 1).(*array.Int64)
diffColumn := ar.Column(labelNum + 2).(*array.Int64)
timestamp := ar.Column(labelNum + 3).(*array.Int64)
timeNanos := ar.Column(labelNum + 3).(*array.Int64)
duration := ar.Column(labelNum + 4).(*array.Int64)

return &RecordReader{
Expand Down Expand Up @@ -151,7 +151,7 @@ func NewRecordReader(ar arrow.Record) *RecordReader {
LineFunctionStartLine: lineFunctionStartLine,
Value: valueColumn,
Diff: diffColumn,
Timestamp: timestamp,
TimeNanos: timeNanos,
Duration: duration,
}
}
12 changes: 12 additions & 0 deletions pkg/profile/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
ColumnSampleUnit = "sample_unit"
ColumnStacktrace = "stacktrace"
ColumnTimestamp = "timestamp"
ColumnTimeNanos = "time_nanos"
ColumnValue = "value"
)

Expand Down Expand Up @@ -113,6 +114,14 @@ func SchemaDefinition() *schemapb.Schema {
Compression: schemapb.StorageLayout_COMPRESSION_LZ4_RAW,
},
Dynamic: false,
}, {
Name: ColumnTimeNanos,
StorageLayout: &schemapb.StorageLayout{
Type: schemapb.StorageLayout_TYPE_INT64,
Encoding: schemapb.StorageLayout_ENCODING_DELTA_BINARY_PACKED,
Compression: schemapb.StorageLayout_COMPRESSION_LZ4_RAW,
},
Dynamic: false,
}, {
Name: ColumnValue,
StorageLayout: &schemapb.StorageLayout{
Expand Down Expand Up @@ -142,6 +151,9 @@ func SchemaDefinition() *schemapb.Schema {
}, {
Name: ColumnTimestamp,
Direction: schemapb.SortingColumn_DIRECTION_ASCENDING,
}, {
Name: ColumnTimeNanos,
Direction: schemapb.SortingColumn_DIRECTION_ASCENDING,
},
},
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/profile/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Writer struct {
FunctionStartLine *array.Int64Builder
Value *array.Int64Builder
Diff *array.Int64Builder
Timestamp *array.Int64Builder
TimeNanos *array.Int64Builder
Duration *array.Int64Builder
}

Expand Down Expand Up @@ -89,7 +89,7 @@ func NewWriter(pool memory.Allocator, labelNames []string) Writer {

value := b.Field(labelNum + 1).(*array.Int64Builder)
diff := b.Field(labelNum + 2).(*array.Int64Builder)
timestamp := b.Field(labelNum + 3).(*array.Int64Builder)
timeNanos := b.Field(labelNum + 3).(*array.Int64Builder)
duration := b.Field(labelNum + 4).(*array.Int64Builder)

return Writer{
Expand All @@ -113,7 +113,7 @@ func NewWriter(pool memory.Allocator, labelNames []string) Writer {
FunctionStartLine: functionStartLine,
Value: value,
Diff: diff,
Timestamp: timestamp,
TimeNanos: timeNanos,
Duration: duration,
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/query/columnquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (q *ColumnQueryAPI) Query(ctx context.Context, req *pb.QueryRequest) (*pb.Q

groupBy := req.GetGroupBy().GetFields()
allowedGroupBy := map[string]struct{}{
profile.ColumnTimestamp: {},
profile.ColumnTimeNanos: {},
profile.ColumnDuration: {},
FlamegraphFieldFunctionName: {},
FlamegraphFieldLocationAddress: {},
Expand All @@ -245,7 +245,7 @@ func (q *ColumnQueryAPI) Query(ctx context.Context, req *pb.QueryRequest) (*pb.Q
}

if req.GetReportType() == pb.QueryRequest_REPORT_TYPE_FLAMECHART {
groupBy = append(groupBy, profile.ColumnTimestamp, profile.ColumnDuration)
groupBy = append(groupBy, profile.ColumnTimeNanos, profile.ColumnDuration)
}

groupByLabels := make([]string, 0, len(groupBy))
Expand Down
12 changes: 6 additions & 6 deletions pkg/query/columnquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1244,7 +1244,7 @@ func PprofToSymbolizedProfile(meta profile.Meta, prof *pprofprofile.Profile, ind

w.Value.Append(prof.Sample[i].Value[index])
w.Diff.Append(0)
w.Timestamp.Append(prof.TimeNanos)
w.TimeNanos.Append(prof.TimeNanos)
w.Duration.Append(prof.DurationNanos)

for labelName, labelBuilder := range w.LabelBuildersMap {
Expand Down Expand Up @@ -1361,7 +1361,7 @@ func TestFilterData(t *testing.T) {
w.FunctionStartLine.Append(1)
w.Value.Append(1)
w.Diff.Append(0)
w.Timestamp.Append(1)
w.TimeNanos.Append(1)
w.Duration.Append(1)

frameFilter := map[string]struct{}{"test": {}}
Expand Down Expand Up @@ -1409,7 +1409,7 @@ func TestFilterUnsymbolized(t *testing.T) {
w.Lines.Append(false)
w.Value.Append(1)
w.Diff.Append(0)
w.Timestamp.Append(1)
w.TimeNanos.Append(1)
w.Duration.Append(1)

originalRecord := w.RecordBuilder.NewRecord()
Expand Down Expand Up @@ -1491,7 +1491,7 @@ func TestFilterDataWithPath(t *testing.T) {
w.FunctionStartLine.Append(0)
w.Value.Append(1)
w.Diff.Append(0)
w.Timestamp.Append(1)
w.TimeNanos.Append(1)
w.Duration.Append(1)

frameFilter := map[string]struct{}{"libpython3.11.so.1.0": {}, "interpreter": {}}
Expand Down Expand Up @@ -1575,7 +1575,7 @@ func TestFilterDataFrameFilter(t *testing.T) {
w.FunctionStartLine.Append(0)
w.Value.Append(1)
w.Diff.Append(0)
w.Timestamp.Append(1)
w.TimeNanos.Append(1)
w.Duration.Append(1)

frameFilter := map[string]struct{}{"interpreter": {}}
Expand Down Expand Up @@ -1659,7 +1659,7 @@ func BenchmarkFilterData(t *testing.B) {
w.FunctionStartLine.Append(1)
w.Value.Append(1)
w.Diff.Append(0)
w.Timestamp.Append(1)
w.TimeNanos.Append(1)
w.Duration.Append(1)
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/query/flamegraph_arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,12 @@ func generateFlamegraphArrowRecord(ctx context.Context, mem memory.Allocator, tr
row = fb.builderCumulative.Len()
}
if fb.aggregationConfig.aggregateByTimestamp {
tsHash = uint64(r.Timestamp.Value(i))
tsHash = uint64(r.TimeNanos.Value(i))

sampleTsRow := row
if _, ok := fb.rootsRow[tsHash]; ok {
// If we have multiple samples for the same timestamp, we return an error.
return nil, 0, 0, 0, fmt.Errorf("multiple samples for the same timestamp is not allowed: %d", r.Timestamp.Value(i))
return nil, 0, 0, 0, fmt.Errorf("multiple samples for the same timestamp is not allowed: %d", r.TimeNanos.Value(i))
} else {
rootRowChildren = map[uint64]int{}
err := fb.AppendTimestampRow(
Expand Down Expand Up @@ -317,7 +317,7 @@ func generateFlamegraphArrowRecord(ctx context.Context, mem memory.Allocator, tr
key = hashCombine(key, r.Address.Value(j))
}
if fb.aggregationConfig.aggregateByTimestamp {
key = hashCombine(key, uint64(r.Timestamp.Value(i)))
key = hashCombine(key, uint64(r.TimeNanos.Value(i)))
key = hashCombine(key, uint64(r.Duration.Value(i)))
}
if fb.aggregationConfig.aggregateByFunctionFilename {
Expand Down Expand Up @@ -962,7 +962,7 @@ func newFlamegraphBuilder(
if f == FlamegraphFieldFunctionFileName {
fb.aggregationConfig.aggregateByFunctionFilename = true
}
if f == profile.ColumnTimestamp {
if f == profile.ColumnTimeNanos {
fb.aggregationConfig.aggregateByTimestamp = true
}
}
Expand Down Expand Up @@ -1400,8 +1400,8 @@ func appendGroupByMetadata(fb *flamegraphBuilder, r *profile.RecordReader, sampl
n := fb.groupByMetadataFields[i].Name
b := fb.builderGroupByMetadata.FieldBuilder(i).(*array.BinaryBuilder)
switch n {
case profile.ColumnTimestamp:
ts := r.Timestamp.Value(sampleRow)
case profile.ColumnTimeNanos:
ts := r.TimeNanos.Value(sampleRow)
b.Append([]byte(fmt.Sprint(ts)))
case profile.ColumnDuration:
duration := r.Duration.Value(sampleRow)
Expand Down
6 changes: 3 additions & 3 deletions pkg/query/flamegraph_arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1201,7 +1201,7 @@ main;func_add 10 3000 20
mem,
tracer,
np,
[]string{FlamegraphFieldFunctionName, profile.ColumnTimestamp, profile.ColumnDuration},
[]string{FlamegraphFieldFunctionName, profile.ColumnTimeNanos, profile.ColumnDuration},
0,
)

Expand Down Expand Up @@ -1234,7 +1234,7 @@ main;func_fib 10 3000 20
mem,
tracer,
np,
[]string{FlamegraphFieldFunctionName, profile.ColumnTimestamp, profile.ColumnDuration},
[]string{FlamegraphFieldFunctionName, profile.ColumnTimeNanos, profile.ColumnDuration},
0,
)
require.NoError(t, err)
Expand Down Expand Up @@ -1328,7 +1328,7 @@ func foldedStacksWithTsToProfile(pool memory.Allocator, input []byte) (profile.P

w.Value.Append(val)
w.Diff.Append(0)
w.Timestamp.Append(ts)
w.TimeNanos.Append(ts)
w.Duration.Append(duration)
w.LocationsList.Append(true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ export const IcicleChartRootNode = React.memo(function IcicleChartRootNodeNonMem
groupByMetadata?.get(row) as StructRow<Record<string, Binary>>
).toJSON();

const tsStr = arrowToString(groupByFields.timestamp) as string;
const tsStr = arrowToString(groupByFields.time_nanos) as string;

const tsNanos = BigInt(parseInt(tsStr, 10)) * 1000000n;
const tsNanos = BigInt(parseInt(tsStr, 10));
const durationStr = arrowToString(groupByFields.duration) as string;
const duration = parseInt(durationStr, 10);

Expand Down

0 comments on commit 6b8807d

Please sign in to comment.