diff --git a/lib/db/maintain/scripts.js b/lib/db/maintain/scripts.js index fb506879..7c2aa097 100644 --- a/lib/db/maintain/scripts.js +++ b/lib/db/maintain/scripts.js @@ -400,7 +400,7 @@ module.exports.profiles = [ ADD COLUMN IF NOT EXISTS \`tree\` Array(Tuple(UInt64, UInt64, UInt64, Array(Tuple(String, Int64, Int64)))), ADD COLUMN IF NOT EXISTS \`functions\` Array(Tuple(UInt64, String))`, - 'RENAME TABLE IF EXISTS profiles_mv {{{OnCluster}}} TO profiles_mv_bak', + 'RENAME TABLE IF EXISTS profiles_mv TO profiles_mv_bak {{{OnCluster}}}', `CREATE MATERIALIZED VIEW IF NOT EXISTS profiles_mv {{{OnCluster}}} TO profiles AS SELECT @@ -462,5 +462,18 @@ module.exports.profiles_dist = [ key String, val String, val_id UInt64 - ) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series_keys', rand());` + ) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series_keys', rand());`, + + `ALTER TABLE profiles_dist {{{OnCluster}}} + ADD COLUMN IF NOT EXISTS \`tree\` Array(Tuple(UInt64, UInt64, UInt64, Array(Tuple(String, Int64, Int64)))), + ADD COLUMN IF NOT EXISTS \`functions\` Array(Tuple(UInt64, String))`, + + `ALTER TABLE profiles_dist {{{OnCluster}}} + ADD COLUMN IF NOT EXISTS \`sample_types_units\` Array(Tuple(String, String))`, + + `ALTER TABLE profiles_series_dist {{{OnCluster}}} + ADD COLUMN IF NOT EXISTS \`sample_types_units\` Array(Tuple(String, String))`, + + `ALTER TABLE profiles_series_gin_dist {{{OnCluster}}} + ADD COLUMN IF NOT EXISTS \`sample_types_units\` Array(Tuple(String, String))` ] diff --git a/pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm b/pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm index 2d96df68..4162f86e 100644 Binary files a/pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm and b/pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm differ diff --git a/pyroscope/pprof-bin/src/lib.rs b/pyroscope/pprof-bin/src/lib.rs index a87cbee7..da5af679 100644 --- a/pyroscope/pprof-bin/src/lib.rs +++ b/pyroscope/pprof-bin/src/lib.rs @@ -6,20 +6,18 @@ use lazy_static::lazy_static; use pprof_pb::google::v1::Function; use pprof_pb::google::v1::Location; use pprof_pb::google::v1::Profile; -use pprof_pb::querier::v1::FlameGraph; use pprof_pb::querier::v1::Level; +use pprof_pb::querier::v1::FlameGraph; use pprof_pb::querier::v1::SelectMergeStacktracesResponse; +use std::panic; use prost::Message; use std::collections::{HashMap, HashSet}; -use std::io::stderr; use std::slice::SliceIndex; use std::sync::Mutex; use std::vec::Vec; use wasm_bindgen::prelude::*; use ch64::city_hash_64; use ch64::read_uint64_le; -use ch64::hash_128_to_64; -use std::panic; pub mod pprof_pb { @@ -69,6 +67,21 @@ fn find_node(id: u64, nodes: &Vec) -> i32 { n } +fn get_node_id(parent_id: u64, name_hash: u64, level: u16) -> u64 { + let mut node_bytes: [u8; 16] = [0; 16]; + for i in 0..8 { + node_bytes[i] = ((parent_id >> (i * 8)) & 0xFF) as u8; + } + for i in 0..8 { + node_bytes[i+8] = ((name_hash >> (i * 8)) & 0xFF) as u8; + } + let mut _level = level; + if _level > 511 { + _level = 511; + } + (city_hash_64(&node_bytes[0..]) >> 9) | ((_level as u64) << 55) +} + fn merge(tree: &mut Tree, p: &Profile) { let mut functions: HashMap = HashMap::new(); for f in p.function.iter() { @@ -112,7 +125,9 @@ fn merge(tree: &mut Tree, p: &Profile) { let location = locations[&s.location_id[i]]; let name = &p.string_table[functions[&location.line[0].function_id].name as usize]; let name_hash = city_hash_64(name.as_bytes()); - let node_id = hash_128_to_64(parent_id, name_hash); + let node_id = get_node_id( + parent_id, name_hash,(s.location_id.len() - i) as u16 + ); if !tree.nodes.contains_key(&parent_id) && tree.nodes_num < 2000000{ tree.nodes.insert(parent_id, Vec::new()); } diff --git a/pyroscope/pyroscope.js b/pyroscope/pyroscope.js index a16fc4d9..1d4b8c1a 100644 --- a/pyroscope/pyroscope.js +++ b/pyroscope/pyroscope.js @@ -167,10 +167,53 @@ const labelSelectorQuery = (query, labelSelector) => { )) } +const serviceNameSelectorQuery = (labelSelector) => { + const empty = Sql.Eq(new Sql.Raw('1'), new Sql.Raw('1')) + if (!labelSelector || !labelSelector.length || labelSelector === '{}') { + return empty + } + const labelSelectorScript = compiler.ParseScript(labelSelector).rootToken + let conds = null + for (const rule of labelSelectorScript.Children('log_stream_selector_rule')) { + const label = rule.Child('label').value + if (label !== 'service_name') { + continue + } + const val = JSON.parse(rule.Child('quoted_str').value) + let valRul = null + switch (rule.Child('operator').value) { + case '=': + valRul = Sql.Eq(new Sql.Raw('service_name'), Sql.val(val)) + break + case '!=': + valRul = Sql.Ne(new Sql.Raw('service_name'), Sql.val(val)) + break + case '=~': + valRul = Sql.Eq(new Sql.Raw(`match(service_name, ${Sql.quoteVal(val)})`), 1) + break + case '!~': + valRul = Sql.Ne(new Sql.Raw(`match(service_name, ${Sql.quoteVal(val)})`), 1) + } + conds = valRul + } + return conds || empty +} + const selectMergeStacktraces = async (req, res) => { return await selectMergeStacktracesV2(req, res) } +const sqlWithReference = (ref) => { + const res = new Sql.WithReference(ref) + res.toString = function () { + if (this.ref.inline) { + return `(${this.ref.query.toString()}) as ${this.ref.alias}` + } + return this.ref.alias + } + return res +} + const selectMergeStacktracesV2 = async (req, res) => { const dist = clusterName ? '_dist' : '' const typeRegex = parseTypeId(req.body.getProfileTypeid()) @@ -182,15 +225,21 @@ const selectMergeStacktracesV2 = async (req, res) => { ? Math.floor(parseInt(req.body.getEnd()) / 1000) : Math.floor(Date.now() / 1000) const v2 = checkVersion('profiles_v2', (fromTimeSec - 3600) * 1000) + const serviceNameSelector = serviceNameSelectorQuery(sel) + const typeIdSelector = Sql.Eq( + 'type_id', + Sql.val(`${typeRegex.type}:${typeRegex.periodType}:${typeRegex.periodUnit}`) + ) const idxSelect = (new Sql.Select()) .select('fingerprint') .from(`${DATABASE_NAME()}.profiles_series_gin`) .where( Sql.And( Sql.Eq(new Sql.Raw(`has(sample_types_units, (${Sql.quoteVal(typeRegex.sampleType)},${Sql.quoteVal(typeRegex.sampleUnit)}))`), 1), - Sql.Eq('type_id', Sql.val(`${typeRegex.type}:${typeRegex.periodType}:${typeRegex.periodUnit}`)), + typeIdSelector, Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)), - Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)) + Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)), + serviceNameSelector ) ).groupBy('fingerprint') labelSelectorQuery(idxSelect, sel) @@ -205,7 +254,9 @@ const selectMergeStacktracesV2 = async (req, res) => { Sql.And( Sql.Gte('timestamp_ns', new Sql.Raw(Math.floor(fromTimeSec) + '000000000')), Sql.Lte('timestamp_ns', new Sql.Raw(Math.floor(toTimeSec) + '000000000')), - new Sql.In('fingerprint', 'IN', new Sql.WithReference(withIdxSelect)) + new Sql.In('fingerprint', 'IN', sqlWithReference(withIdxSelect)), + typeIdSelector, + serviceNameSelector )) if (process.env.ADVANCED_PROFILES_MERGE_LIMIT) { rawReq.orderBy(['timestamp_ns', 'desc']).limit(parseInt(process.env.ADVANCED_PROFILES_MERGE_LIMIT)) @@ -214,15 +265,16 @@ const selectMergeStacktracesV2 = async (req, res) => { const joinedReq = (new Sql.Select()).with(withRawReq).select([ new Sql.Raw('(raw.tree.1, raw.tree.2, raw.tree.3, sum(raw.tree.4), sum(raw.tree.5))'), 'tree2' - ]).from(new Sql.WithReference(withRawReq)) + ]).from(sqlWithReference(withRawReq)) .join('raw.tree', 'array') .groupBy(new Sql.Raw('raw.tree.1'), new Sql.Raw('raw.tree.2'), new Sql.Raw('raw.tree.3')) + .orderBy(new Sql.Raw('raw.tree.1')).limit(2000000) const withJoinedReq = new Sql.With('joined', joinedReq, !!clusterName) const joinedAggregatedReq = (new Sql.Select()).select( - [new Sql.Raw('groupArray(tree2)'), 'tree']).from(new Sql.WithReference(withJoinedReq)) + [new Sql.Raw('groupArray(tree2)'), 'tree']).from(sqlWithReference(withJoinedReq)) const functionsReq = (new Sql.Select()).select( [new Sql.Raw('groupUniqArray(raw.functions)'), 'functions2'] - ).from(new Sql.WithReference(withRawReq)).join('raw.functions', 'array') + ).from(sqlWithReference(withRawReq)).join('raw.functions', 'array') let brackLegacy = (new Sql.Select()).select( [new Sql.Raw('[]::Array(String)'), 'legacy'] @@ -236,8 +288,10 @@ const selectMergeStacktracesV2 = async (req, res) => { Sql.And( Sql.Gte('timestamp_ns', new Sql.Raw(Math.floor(fromTimeSec) + '000000000')), Sql.Lte('timestamp_ns', new Sql.Raw(Math.floor(toTimeSec) + '000000000')), - new Sql.In('fingerprint', 'IN', new Sql.WithReference(withIdxSelect)), - Sql.Eq(new Sql.Raw('empty(tree)'), 1) + new Sql.In('fingerprint', 'IN', sqlWithReference(withIdxSelect)), + Sql.Eq(new Sql.Raw('empty(tree)'), 1), + typeIdSelector, + serviceNameSelector )) if (process.env.ADVANCED_PROFILES_MERGE_LIMIT) { legacy.orderBy(['timestamp_ns', 'desc']).limit(parseInt(process.env.ADVANCED_PROFILES_MERGE_LIMIT)) @@ -245,7 +299,7 @@ const selectMergeStacktracesV2 = async (req, res) => { withLegacy = new Sql.With('legacy', legacy, !!clusterName) brackLegacy = (new Sql.Select()) .select([new Sql.Raw('groupArray(payload)'), 'payloads']) - .from(new Sql.WithReference(withLegacy)) + .from(sqlWithReference(withLegacy)) } brackLegacy = new Sql.Raw(`(${brackLegacy.toString()})`) const brack1 = new Sql.Raw(`(${joinedAggregatedReq.toString()})`) @@ -331,12 +385,18 @@ const selectSeries = async (req, res) => { } const aggregation = _req.getAggregation && _req.getAggregation() + const typeIdSelector = Sql.Eq( + 'type_id', + Sql.val(`${typeID.type}:${typeID.periodType}:${typeID.periodUnit}`)) + const serviceNameSelector = serviceNameSelectorQuery(labelSelector) + const idxReq = (new Sql.Select()) .select(new Sql.Raw('fingerprint')) .from(`${DATABASE_NAME()}.profiles_series_gin`) .where( Sql.And( - Sql.Eq('type_id', Sql.val(`${typeID.type}:${typeID.periodType}:${typeID.periodUnit}`)), + typeIdSelector, + serviceNameSelector, Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)), Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)), Sql.Eq(new Sql.Raw( @@ -361,7 +421,9 @@ const selectSeries = async (req, res) => { .where(Sql.And( new Sql.In('fingerprint', 'IN', new Sql.WithReference(withIdxReq)), Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)), - Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)) + Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)), + typeIdSelector, + serviceNameSelector )) const withLabelsReq = new Sql.With('labels', labelsReq, !!clusterName) @@ -388,7 +450,9 @@ const selectSeries = async (req, res) => { Sql.And( new Sql.In('p.fingerprint', 'IN', new Sql.WithReference(withIdxReq)), Sql.Gte('p.timestamp_ns', new Sql.Raw(`${fromTimeSec}000000000`)), - Sql.Lt('p.timestamp_ns', new Sql.Raw(`${toTimeSec}000000000`)) + Sql.Lt('p.timestamp_ns', new Sql.Raw(`${toTimeSec}000000000`)), + typeIdSelector, + serviceNameSelector ) ).groupBy('timestamp_ns', 'fingerprint') .orderBy(['fingerprint', 'ASC'], ['timestamp_ns', 'ASC'])