diff --git a/.github/workflows/node-clickhouse-cluster.js.yml b/.github/workflows/node-clickhouse-cluster.js.yml index c2b323fd..fbfddb72 100644 --- a/.github/workflows/node-clickhouse-cluster.js.yml +++ b/.github/workflows/node-clickhouse-cluster.js.yml @@ -49,4 +49,4 @@ jobs: CLICKHOUSE_TSDB: qryn INTEGRATION_E2E: 1 CLOKI_EXT_URL: 127.0.0.1:3100 - run: CLUSTER_NAME=test_cluster_two_shards node qryn.mjs >/dev/stdout & npm run test --forceExit + run: CLUSTER_NAME=test_cluster_two_shards node qryn.mjs >/dev/stdout & sleep 10 && npm run test --forceExit diff --git a/lib/db/maintain/index.js b/lib/db/maintain/index.js index c2076e79..c1c88faa 100644 --- a/lib/db/maintain/index.js +++ b/lib/db/maintain/index.js @@ -103,7 +103,8 @@ module.exports.rotate = async (opts) => { { type: 'rotate', name: 'v3_time_series_days' }, { type: 'rotate', name: 'v3_storage_policy' }, { type: 'rotate', name: 'v1_traces_days' }, - { type: 'rotate', name: 'v1_traces_storage_policy' } + { type: 'rotate', name: 'v1_traces_storage_policy' }, + { type: 'rotate', name: 'v1_profiles_days' } ], db.db) const _update = (req) => { return upgradeRequest({ db: db.db, useDefaultDB: true }, req) @@ -161,5 +162,24 @@ module.exports.rotate = async (opts) => { await _update(alterSm, null, db.db) await client.addSetting('rotate', 'v1_traces_storage_policy', db.storage_policy, db.db) } + if (db.samples_days + '' !== settings.v1_profiles_days) { + let alterTable = 'ALTER TABLE profiles {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + let rotateTable = `ALTER TABLE profiles {{{OnCluster}}} MODIFY TTL toDateTime(timestamp_ns / 1000000000) + INTERVAL ${db.samples_days} DAY` + await _update(alterTable, null, db.db) + await _update(rotateTable, null, db.db) + alterTable = 'ALTER TABLE profiles_series {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + rotateTable = `ALTER TABLE profiles_series {{{OnCluster}}} MODIFY TTL date + INTERVAL ${db.samples_days} DAY` + await _update(alterTable, null, db.db) + await _update(rotateTable, null, db.db) + alterTable = 'ALTER TABLE profiles_series_gin {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + rotateTable = `ALTER TABLE profiles_series_gin {{{OnCluster}}} MODIFY TTL date + INTERVAL ${db.samples_days} DAY` + await _update(alterTable, null, db.db) + await _update(rotateTable, null, db.db) + alterTable = 'ALTER TABLE profiles_series_keys {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + rotateTable = `ALTER TABLE profiles_series_keys {{{OnCluster}}} MODIFY TTL date + INTERVAL ${db.samples_days} DAY` + await _update(alterTable, null, db.db) + await _update(rotateTable, null, db.db) + await client.addSetting('rotate', 'v1_profiles_days', db.samples_days + '', db.db) + } } } diff --git a/lib/db/maintain/scripts.js b/lib/db/maintain/scripts.js index b11f69ed..ad0b4dd3 100644 --- a/lib/db/maintain/scripts.js +++ b/lib/db/maintain/scripts.js @@ -306,7 +306,7 @@ module.exports.profiles = [ values_agg Array(Tuple(String, Int64, Int32)) CODEC(ZSTD(1)) ) Engine {{MergeTree}}() ORDER BY (type_id, service_name, timestamp_ns) - PARTITION BY toDate(FROM_UNIXTIME(intDiv(timestamp_ns, 1000000000)))`, + PARTITION BY toDate(FROM_UNIXTIME(intDiv(timestamp_ns, 1000000000))) {{{CREATE_SETTINGS}}}`, `CREATE MATERIALIZED VIEW IF NOT EXISTS profiles_mv {{{OnCluster}}} TO profiles AS SELECT @@ -335,7 +335,7 @@ module.exports.profiles = [ tags Array(Tuple(String, String)) CODEC(ZSTD(1)), ) Engine {{ReplacingMergeTree}}() ORDER BY (date, type_id, fingerprint) - PARTITION BY date`, + PARTITION BY date {{{CREATE_SETTINGS}}}`, `CREATE MATERIALIZED VIEW IF NOT EXISTS profiles_series_mv {{{OnCluster}}} TO profiles_series AS SELECT @@ -362,7 +362,7 @@ module.exports.profiles = [ fingerprint UInt64 CODEC(DoubleDelta, ZSTD(1)), ) Engine {{ReplacingMergeTree}}() ORDER BY (date, key, val, type_id, fingerprint) - PARTITION BY date`, + PARTITION BY date {{{CREATE_SETTINGS}}}`, `CREATE MATERIALIZED VIEW IF NOT EXISTS profiles_series_gin_mv {{{OnCluster}}} TO profiles_series_gin AS SELECT @@ -382,7 +382,7 @@ module.exports.profiles = [ val_id UInt64 ) Engine {{ReplacingMergeTree}}() ORDER BY (date, key, val_id) - PARTITION BY date`, + PARTITION BY date {{{CREATE_SETTINGS}}}`, `CREATE MATERIALIZED VIEW IF NOT EXISTS profiles_series_keys_mv {{{OnCluster}}} TO profiles_series_keys AS SELECT diff --git a/pyroscope/pyroscope.js b/pyroscope/pyroscope.js index 518cea6d..360b3b72 100644 --- a/pyroscope/pyroscope.js +++ b/pyroscope/pyroscope.js @@ -12,6 +12,24 @@ const { clusterName} = require('../common') const HISTORY_TIMESPAN = 1000 * 60 * 60 * 24 * 7 +/** + * + * @param typeId {string} + */ +const parseTypeId = (typeId) => { + const typeParts = typeId.match(/^([^:]+):([^:]+):([^:]+):([^:]+):([^:]+)$/) + if (!typeParts) { + throw new QrynBadRequest('invalid type id') + } + return { + type: typeParts[1], + sampleType: typeParts[2], + sampleUnit: typeParts[3], + periodType: typeParts[4], + periodUnit: typeParts[5] + } +} + const profileTypesHandler = async (req, res) => { const dist = clusterName ? '_dist' : '' const _res = new messages.ProfileTypesResponse() @@ -28,7 +46,9 @@ WHERE date >= toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)})) AND date <= toDa _res.setProfileTypesList(profileTypes.data.data.map(profileType => { const pt = new types.ProfileType() const [name, periodType, periodUnit] = profileType.type_id.split(':') - pt.setId(profileType.type_id + ':' + profileType.sample_type_unit[0] + ':' + profileType.sample_type_unit[1]) + const typeIdParts = profileType.type_id.match(/^([^:]+):(.*)$/) + pt.setId(typeIdParts[1] + ':' + profileType.sample_type_unit[0] + ':' + profileType.sample_type_unit[1] + + ':' + typeIdParts[2]) pt.setName(name) pt.setSampleType(profileType.sample_type_unit[0]) pt.setSampleUnit(profileType.sample_type_unit[1]) @@ -149,7 +169,7 @@ const labelSelectorQuery = (query, labelSelector) => { const selectMergeStacktraces = async (req, res) => { const dist = clusterName ? '_dist' : '' - const typeRe = req.body.getProfileTypeid().match(/^(.+):([^:]+):([^:]+)$/) + const typeRegex = parseTypeId(req.body.getProfileTypeid()) const sel = req.body.getLabelSelector() const fromTimeSec = req.body && req.body.getStart() ? Math.floor(parseInt(req.body.getStart()) / 1000) @@ -162,8 +182,8 @@ const selectMergeStacktraces = async (req, res) => { .from(`${DATABASE_NAME()}.profiles_series_gin`) .where( Sql.And( - Sql.Eq(new Sql.Raw(`has(sample_types_units, (${Sql.quoteVal(typeRe[2])},${Sql.quoteVal(typeRe[3])}))`), 1), - Sql.Eq('type_id', Sql.val(typeRe[1])), + Sql.Eq(new Sql.Raw(`has(sample_types_units, (${Sql.quoteVal(typeRegex.sampleType)},${Sql.quoteVal(typeRegex.sampleUnit)}))`), 1), + Sql.Eq('type_id', Sql.val(`${typeRegex.type}:${typeRegex.periodType}:${typeRegex.periodUnit}`)), Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)), Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)) ) @@ -198,7 +218,7 @@ const selectMergeStacktraces = async (req, res) => { i += size + shift promises.push(new Promise((resolve, reject) => setTimeout(() => { try { - pprofBin.merge_tree(_ctxIdx, uarray, `${typeRe[2]}:${typeRe[3]}`) + pprofBin.merge_tree(_ctxIdx, uarray, `${typeRegex.sampleType}:${typeRegex.sampleUnit}`) resolve() } catch (e) { reject(e) @@ -208,7 +228,7 @@ const selectMergeStacktraces = async (req, res) => { let sResp = null try { await Promise.all(promises) - sResp = pprofBin.export_tree(_ctxIdx, `${typeRe[2]}:${typeRe[3]}`) + sResp = pprofBin.export_tree(_ctxIdx, `${typeRegex.sampleType}:${typeRegex.sampleUnit}`) } finally { try { pprofBin.drop_tree(_ctxIdx) } catch (e) { req.log.error(e) } } @@ -223,16 +243,16 @@ const selectSeries = async (req, res) => { const toTimeSec = Math.floor(req.getEnd && req.getEnd() ? parseInt(req.getEnd()) / 1000 : Date.now() / 1000) - let typeId = _req.getProfileTypeid && _req.getProfileTypeid() - if (!typeId) { + let typeID = _req.getProfileTypeid && _req.getProfileTypeid() + if (!typeID) { throw new QrynBadRequest('No type provided') } - typeId = typeId.match(/^(.+):([^:]+):([^:]+)$/) - if (!typeId) { + typeID = parseTypeId(typeID) + if (!typeID) { throw new QrynBadRequest('Invalid type provided') } const dist = clusterName ? '_dist' : '' - const sampleTypeId = typeId[2] + ':' + typeId[3] + const sampleTypeId = typeID.sampleType + ':' + typeID.sampleUnit const labelSelector = _req.getLabelSelector && _req.getLabelSelector() let groupBy = _req.getGroupByList && _req.getGroupByList() groupBy = groupBy && groupBy.length ? groupBy : null @@ -247,11 +267,11 @@ const selectSeries = async (req, res) => { .from(`${DATABASE_NAME()}.profiles_series_gin`) .where( Sql.And( - Sql.Eq('type_id', Sql.val(typeId[1])), + Sql.Eq('type_id', Sql.val(`${typeID.type}:${typeID.periodType}:${typeID.periodUnit}`)), Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)), Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)), Sql.Eq(new Sql.Raw( - `has(sample_types_units, (${Sql.quoteVal(typeId[2])}, ${Sql.quoteVal(typeId[3])}))`), + `has(sample_types_units, (${Sql.quoteVal(typeID.sampleType)}, ${Sql.quoteVal(typeID.sampleUnit)}))`), 1) ) ) diff --git a/test/e2e b/test/e2e index 02f8c2ae..d45fb2db 160000 --- a/test/e2e +++ b/test/e2e @@ -1 +1 @@ -Subproject commit 02f8c2ae70b13affbd2ac14bf5a8eb11d1d84880 +Subproject commit d45fb2db4bd2360fcafef54cdf1f42fef9b8b9bc