Skip to content

Commit

Permalink
fix: restrict more than 2M nodes; improve CH request; add dist suppor…
Browse files Browse the repository at this point in the history
…t for pyroscope v0.2
  • Loading branch information
akvlad committed Feb 19, 2024
1 parent b5462f8 commit b53e279
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 19 deletions.
17 changes: 15 additions & 2 deletions lib/db/maintain/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ module.exports.profiles = [
ADD COLUMN IF NOT EXISTS \`tree\` Array(Tuple(UInt64, UInt64, UInt64, Array(Tuple(String, Int64, Int64)))),
ADD COLUMN IF NOT EXISTS \`functions\` Array(Tuple(UInt64, String))`,

'RENAME TABLE IF EXISTS profiles_mv {{{OnCluster}}} TO profiles_mv_bak',
'RENAME TABLE IF EXISTS profiles_mv TO profiles_mv_bak {{{OnCluster}}}',

`CREATE MATERIALIZED VIEW IF NOT EXISTS profiles_mv {{{OnCluster}}} TO profiles AS
SELECT
Expand Down Expand Up @@ -462,5 +462,18 @@ module.exports.profiles_dist = [
key String,
val String,
val_id UInt64
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series_keys', rand());`
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series_keys', rand());`,

`ALTER TABLE profiles_dist {{{OnCluster}}}
ADD COLUMN IF NOT EXISTS \`tree\` Array(Tuple(UInt64, UInt64, UInt64, Array(Tuple(String, Int64, Int64)))),
ADD COLUMN IF NOT EXISTS \`functions\` Array(Tuple(UInt64, String))`,

`ALTER TABLE profiles_dist {{{OnCluster}}}
ADD COLUMN IF NOT EXISTS \`sample_types_units\` Array(Tuple(String, String))`,

`ALTER TABLE profiles_series_dist {{{OnCluster}}}
ADD COLUMN IF NOT EXISTS \`sample_types_units\` Array(Tuple(String, String))`,

`ALTER TABLE profiles_series_gin_dist {{{OnCluster}}}
ADD COLUMN IF NOT EXISTS \`sample_types_units\` Array(Tuple(String, String))`
]
Binary file modified pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm
Binary file not shown.
25 changes: 20 additions & 5 deletions pyroscope/pprof-bin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,18 @@ use lazy_static::lazy_static;
use pprof_pb::google::v1::Function;
use pprof_pb::google::v1::Location;
use pprof_pb::google::v1::Profile;
use pprof_pb::querier::v1::FlameGraph;
use pprof_pb::querier::v1::Level;
use pprof_pb::querier::v1::FlameGraph;
use pprof_pb::querier::v1::SelectMergeStacktracesResponse;
use std::panic;
use prost::Message;
use std::collections::{HashMap, HashSet};
use std::io::stderr;
use std::slice::SliceIndex;
use std::sync::Mutex;
use std::vec::Vec;
use wasm_bindgen::prelude::*;
use ch64::city_hash_64;
use ch64::read_uint64_le;
use ch64::hash_128_to_64;
use std::panic;

pub mod pprof_pb {

Expand Down Expand Up @@ -69,6 +67,21 @@ fn find_node(id: u64, nodes: &Vec<TreeNodeV2>) -> i32 {
n
}

fn get_node_id(parent_id: u64, name_hash: u64, level: u16) -> u64 {
let mut node_bytes: [u8; 16] = [0; 16];
for i in 0..8 {
node_bytes[i] = ((parent_id >> (i * 8)) & 0xFF) as u8;
}
for i in 0..8 {
node_bytes[i+8] = ((name_hash >> (i * 8)) & 0xFF) as u8;
}
let mut _level = level;
if _level > 511 {
_level = 511;
}
(city_hash_64(&node_bytes[0..]) >> 9) | ((_level as u64) << 55)
}

fn merge(tree: &mut Tree, p: &Profile) {
let mut functions: HashMap<u64, &Function> = HashMap::new();
for f in p.function.iter() {
Expand Down Expand Up @@ -112,7 +125,9 @@ fn merge(tree: &mut Tree, p: &Profile) {
let location = locations[&s.location_id[i]];
let name = &p.string_table[functions[&location.line[0].function_id].name as usize];
let name_hash = city_hash_64(name.as_bytes());
let node_id = hash_128_to_64(parent_id, name_hash);
let node_id = get_node_id(
parent_id, name_hash,(s.location_id.len() - i) as u16
);
if !tree.nodes.contains_key(&parent_id) && tree.nodes_num < 2000000{
tree.nodes.insert(parent_id, Vec::new());
}
Expand Down
88 changes: 76 additions & 12 deletions pyroscope/pyroscope.js
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,53 @@ const labelSelectorQuery = (query, labelSelector) => {
))
}

const serviceNameSelectorQuery = (labelSelector) => {
const empty = Sql.Eq(new Sql.Raw('1'), new Sql.Raw('1'))
if (!labelSelector || !labelSelector.length || labelSelector === '{}') {
return empty
}
const labelSelectorScript = compiler.ParseScript(labelSelector).rootToken
let conds = null
for (const rule of labelSelectorScript.Children('log_stream_selector_rule')) {
const label = rule.Child('label').value
if (label !== 'service_name') {
continue
}
const val = JSON.parse(rule.Child('quoted_str').value)
let valRul = null
switch (rule.Child('operator').value) {
case '=':
valRul = Sql.Eq(new Sql.Raw('service_name'), Sql.val(val))
break
case '!=':
valRul = Sql.Ne(new Sql.Raw('service_name'), Sql.val(val))
break
case '=~':
valRul = Sql.Eq(new Sql.Raw(`match(service_name, ${Sql.quoteVal(val)})`), 1)
break
case '!~':
valRul = Sql.Ne(new Sql.Raw(`match(service_name, ${Sql.quoteVal(val)})`), 1)
}
conds = valRul
}
return conds || empty
}

const selectMergeStacktraces = async (req, res) => {
return await selectMergeStacktracesV2(req, res)
}

const sqlWithReference = (ref) => {
const res = new Sql.WithReference(ref)
res.toString = function () {
if (this.ref.inline) {
return `(${this.ref.query.toString()}) as ${this.ref.alias}`
}
return this.ref.alias
}
return res
}

const selectMergeStacktracesV2 = async (req, res) => {
const dist = clusterName ? '_dist' : ''
const typeRegex = parseTypeId(req.body.getProfileTypeid())
Expand All @@ -182,15 +225,21 @@ const selectMergeStacktracesV2 = async (req, res) => {
? Math.floor(parseInt(req.body.getEnd()) / 1000)
: Math.floor(Date.now() / 1000)
const v2 = checkVersion('profiles_v2', (fromTimeSec - 3600) * 1000)
const serviceNameSelector = serviceNameSelectorQuery(sel)
const typeIdSelector = Sql.Eq(
'type_id',
Sql.val(`${typeRegex.type}:${typeRegex.periodType}:${typeRegex.periodUnit}`)
)
const idxSelect = (new Sql.Select())
.select('fingerprint')
.from(`${DATABASE_NAME()}.profiles_series_gin`)
.where(
Sql.And(
Sql.Eq(new Sql.Raw(`has(sample_types_units, (${Sql.quoteVal(typeRegex.sampleType)},${Sql.quoteVal(typeRegex.sampleUnit)}))`), 1),
Sql.Eq('type_id', Sql.val(`${typeRegex.type}:${typeRegex.periodType}:${typeRegex.periodUnit}`)),
typeIdSelector,
Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`))
Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)),
serviceNameSelector
)
).groupBy('fingerprint')
labelSelectorQuery(idxSelect, sel)
Expand All @@ -205,7 +254,9 @@ const selectMergeStacktracesV2 = async (req, res) => {
Sql.And(
Sql.Gte('timestamp_ns', new Sql.Raw(Math.floor(fromTimeSec) + '000000000')),
Sql.Lte('timestamp_ns', new Sql.Raw(Math.floor(toTimeSec) + '000000000')),
new Sql.In('fingerprint', 'IN', new Sql.WithReference(withIdxSelect))
new Sql.In('fingerprint', 'IN', sqlWithReference(withIdxSelect)),
typeIdSelector,
serviceNameSelector
))
if (process.env.ADVANCED_PROFILES_MERGE_LIMIT) {
rawReq.orderBy(['timestamp_ns', 'desc']).limit(parseInt(process.env.ADVANCED_PROFILES_MERGE_LIMIT))
Expand All @@ -214,15 +265,16 @@ const selectMergeStacktracesV2 = async (req, res) => {
const joinedReq = (new Sql.Select()).with(withRawReq).select([
new Sql.Raw('(raw.tree.1, raw.tree.2, raw.tree.3, sum(raw.tree.4), sum(raw.tree.5))'),
'tree2'
]).from(new Sql.WithReference(withRawReq))
]).from(sqlWithReference(withRawReq))
.join('raw.tree', 'array')
.groupBy(new Sql.Raw('raw.tree.1'), new Sql.Raw('raw.tree.2'), new Sql.Raw('raw.tree.3'))
.orderBy(new Sql.Raw('raw.tree.1')).limit(2000000)
const withJoinedReq = new Sql.With('joined', joinedReq, !!clusterName)
const joinedAggregatedReq = (new Sql.Select()).select(
[new Sql.Raw('groupArray(tree2)'), 'tree']).from(new Sql.WithReference(withJoinedReq))
[new Sql.Raw('groupArray(tree2)'), 'tree']).from(sqlWithReference(withJoinedReq))
const functionsReq = (new Sql.Select()).select(
[new Sql.Raw('groupUniqArray(raw.functions)'), 'functions2']
).from(new Sql.WithReference(withRawReq)).join('raw.functions', 'array')
).from(sqlWithReference(withRawReq)).join('raw.functions', 'array')

let brackLegacy = (new Sql.Select()).select(
[new Sql.Raw('[]::Array(String)'), 'legacy']
Expand All @@ -236,16 +288,18 @@ const selectMergeStacktracesV2 = async (req, res) => {
Sql.And(
Sql.Gte('timestamp_ns', new Sql.Raw(Math.floor(fromTimeSec) + '000000000')),
Sql.Lte('timestamp_ns', new Sql.Raw(Math.floor(toTimeSec) + '000000000')),
new Sql.In('fingerprint', 'IN', new Sql.WithReference(withIdxSelect)),
Sql.Eq(new Sql.Raw('empty(tree)'), 1)
new Sql.In('fingerprint', 'IN', sqlWithReference(withIdxSelect)),
Sql.Eq(new Sql.Raw('empty(tree)'), 1),
typeIdSelector,
serviceNameSelector
))
if (process.env.ADVANCED_PROFILES_MERGE_LIMIT) {
legacy.orderBy(['timestamp_ns', 'desc']).limit(parseInt(process.env.ADVANCED_PROFILES_MERGE_LIMIT))
}
withLegacy = new Sql.With('legacy', legacy, !!clusterName)
brackLegacy = (new Sql.Select())
.select([new Sql.Raw('groupArray(payload)'), 'payloads'])
.from(new Sql.WithReference(withLegacy))
.from(sqlWithReference(withLegacy))
}
brackLegacy = new Sql.Raw(`(${brackLegacy.toString()})`)
const brack1 = new Sql.Raw(`(${joinedAggregatedReq.toString()})`)
Expand Down Expand Up @@ -331,12 +385,18 @@ const selectSeries = async (req, res) => {
}
const aggregation = _req.getAggregation && _req.getAggregation()

const typeIdSelector = Sql.Eq(
'type_id',
Sql.val(`${typeID.type}:${typeID.periodType}:${typeID.periodUnit}`))
const serviceNameSelector = serviceNameSelectorQuery(labelSelector)

const idxReq = (new Sql.Select())
.select(new Sql.Raw('fingerprint'))
.from(`${DATABASE_NAME()}.profiles_series_gin`)
.where(
Sql.And(
Sql.Eq('type_id', Sql.val(`${typeID.type}:${typeID.periodType}:${typeID.periodUnit}`)),
typeIdSelector,
serviceNameSelector,
Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)),
Sql.Eq(new Sql.Raw(
Expand All @@ -361,7 +421,9 @@ const selectSeries = async (req, res) => {
.where(Sql.And(
new Sql.In('fingerprint', 'IN', new Sql.WithReference(withIdxReq)),
Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`))
Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)),
typeIdSelector,
serviceNameSelector
))

const withLabelsReq = new Sql.With('labels', labelsReq, !!clusterName)
Expand All @@ -388,7 +450,9 @@ const selectSeries = async (req, res) => {
Sql.And(
new Sql.In('p.fingerprint', 'IN', new Sql.WithReference(withIdxReq)),
Sql.Gte('p.timestamp_ns', new Sql.Raw(`${fromTimeSec}000000000`)),
Sql.Lt('p.timestamp_ns', new Sql.Raw(`${toTimeSec}000000000`))
Sql.Lt('p.timestamp_ns', new Sql.Raw(`${toTimeSec}000000000`)),
typeIdSelector,
serviceNameSelector
)
).groupBy('timestamp_ns', 'fingerprint')
.orderBy(['fingerprint', 'ASC'], ['timestamp_ns', 'ASC'])
Expand Down

0 comments on commit b53e279

Please sign in to comment.