diff --git a/.eslintrc.json b/.eslintrc.json index c1b66fd2..bab99442 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -14,6 +14,7 @@ }, "plugins": ["jest"], "rules": { - "no-template-curly-in-string": "off" + "no-template-curly-in-string": "off", + "no-useless-escape": "off" } } diff --git a/lib/db/clickhouse.js b/lib/db/clickhouse.js index da381294..12033c40 100644 --- a/lib/db/clickhouse.js +++ b/lib/db/clickhouse.js @@ -39,9 +39,6 @@ let state = 'INITIALIZING' const clickhouse = new ClickHouse(clickhouseOptions) let ch -const samples = [] -const timeSeries = [] - class TimeoutThrottler { constructor (statement) { this.statement = statement @@ -72,7 +69,7 @@ class TimeoutThrottler { } const p = Date.now() - ts if (p < 100) { - await new Promise((resolve, reject) => setTimeout(resolve, 100 - p)) + await new Promise((resolve) => setTimeout(resolve, 100 - p)) } } }) @@ -83,8 +80,8 @@ class TimeoutThrottler { if (len < 1) { return } - const ts = Date.now() - const resp = await axios.post(`${getClickhouseUrl()}/?query=${this.statement}`, + + await axios.post(`${getClickhouseUrl()}/?query=${this.statement}`, this.queue.join('\n') ) this.queue = this.queue.slice(len) @@ -101,7 +98,7 @@ const timeSeriesThrottler = new TimeoutThrottler( `INSERT INTO ${clickhouseOptions.queryOptions.database}.time_series(date, fingerprint, labels, name) FORMAT JSONEachRow`) /* TODO: tsv2 const timeSeriesv2Throttler = new TimeoutThrottler( - `INSERT INTO ${clickhouseOptions.queryOptions.database}.time_series_v2(date, fingerprint, labels, name) FORMAT JSONEachRow`); */ + `INSERT INTO ${clickhouseOptions.queryOptions.database}.time_series_v2(date, fingerprint, labels, name) FORMAT JSONEachRow`); */ samplesThrottler.start() timeSeriesThrottler.start() // timeSeriesv2Throttler.start(); @@ -109,7 +106,8 @@ timeSeriesThrottler.start() /* Cache Helper */ const recordCache = require('record-cache') const onStale = function (data) { - for (const [key, value] of data.records.entries()) { + for (const entry of data.records.entries()) { + const value = entry[1] samplesThrottler.queue.push.apply(samplesThrottler.queue, value.list.map(r => JSON.stringify({ fingerprint: r.record[0], timestamp_ms: r.record[1], @@ -118,22 +116,23 @@ const onStale = function (data) { }))) } } -const onStale_labels = function (data) { - for (const [key, value] of data.records.entries()) { - timeSeriesThrottler.queue.push.apply(timeSeriesThrottler.queue, value.list.map(r => JSON.stringify({ +const onStaleLabels = function (data) { + for (const entry of data.records.entries()) { + const value = entry[1] + timeSeriesThrottler.queue.push.apply(timeSeriesThrottler.queue, value.list.map(r => JSON.stringify({ date: r.record[0], fingerprint: r.record[1], labels: r.record[2], name: r.record[3] }))) - /* TODO: tsv2 - timeSeriesv2Throttler.queue.push.apply(timeSeriesv2Throttler.queue, value.list.map(r => JSON.stringify({ - date: r.record[0], - fingerprint: r.record[1], - labels: JSON.parse(r.record[2]), - name: r.record[3] - }))); - */ + /* TODO: tsv2 + timeSeriesv2Throttler.queue.push.apply(timeSeriesv2Throttler.queue, value.list.map(r => JSON.stringify({ + date: r.record[0], + fingerprint: r.record[1], + labels: JSON.parse(r.record[2]), + name: r.record[3] + }))); + */ } } @@ -144,10 +143,10 @@ const bulk = recordCache({ onStale: onStale }) -const bulk_labels = recordCache({ +const bulkLabels = recordCache({ maxSize: 100, maxAge: 500, - onStale: onStale_labels + onStale: onStaleLabels }) // In-Memory LRU for quick lookups @@ -164,76 +163,76 @@ const initialize = function (dbName) { const tmp = { ...clickhouseOptions, queryOptions: { database: '' } } ch = new ClickHouse(tmp) - const hack_ch = (ch) => { + const hackCh = (ch) => { ch._query = ch.query ch.query = (q, opts, cb) => { - return new Promise(f => ch._query(q, opts, (err, data) => { + return new Promise(resolve => ch._query(q, opts, (err, data) => { cb(err, data) - f() + resolve() })) } } - ch.query(dbQuery, undefined, async function (err, data) { + ch.query(dbQuery, undefined, async function (err/*, data */) { if (err) { console.error('error', err); return } const ch = new ClickHouse(clickhouseOptions) - hack_ch(ch) + hackCh(ch) console.log('CREATE TABLES', dbName) - let ts_table = 'CREATE TABLE IF NOT EXISTS ' + dbName + '.time_series (date Date,fingerprint UInt64,labels String, name String) ENGINE = ReplacingMergeTree(date) PARTITION BY date ORDER BY fingerprint' - let sm_table = 'CREATE TABLE IF NOT EXISTS ' + dbName + '.samples (fingerprint UInt64,timestamp_ms Int64,value Float64,string String) ENGINE = MergeTree PARTITION BY toRelativeHourNum(toDateTime(timestamp_ms / 1000)) ORDER BY (fingerprint, timestamp_ms)' + let tsTable = 'CREATE TABLE IF NOT EXISTS ' + dbName + '.time_series (date Date,fingerprint UInt64,labels String, name String) ENGINE = ReplacingMergeTree(date) PARTITION BY date ORDER BY fingerprint' + let smTable = 'CREATE TABLE IF NOT EXISTS ' + dbName + '.samples (fingerprint UInt64,timestamp_ms Int64,value Float64,string String) ENGINE = MergeTree PARTITION BY toRelativeHourNum(toDateTime(timestamp_ms / 1000)) ORDER BY (fingerprint, timestamp_ms)' if (storagePolicy) { console.log('ADD SETTINGS storage policy', storagePolicy) - const set_storage = ` SETTINGS storagePolicy='${storagePolicy}'` - ts_table += set_storage - sm_table += set_storage + const setStorage = ` SETTINGS storagePolicy='${storagePolicy}'` + tsTable += setStorage + smTable += setStorage } - await ch.query(ts_table, undefined, function (err, data) { + await ch.query(tsTable, undefined, function (err/*, data */) { if (err) { console.log(err); process.exit(1) } else if (debug) console.log('Timeseries Table ready!') console.log('Timeseries Table ready!') return true }) - await ch.query(sm_table, undefined, function (err, data) { + await ch.query(smTable, undefined, function (err/*, data */) { if (err) { console.log(err); process.exit(1) } else if (debug) console.log('Samples Table ready!') return true }) - var alter_table = 'ALTER TABLE ' + dbName + '.samples MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' - var rotate_table = 'ALTER TABLE ' + dbName + '.samples MODIFY TTL toDateTime(timestamp_ms / 1000) + INTERVAL ' + rotationSamples + ' DAY' + let alterTable = 'ALTER TABLE ' + dbName + '.samples MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + let rotateTable = 'ALTER TABLE ' + dbName + '.samples MODIFY TTL toDateTime(timestamp_ms / 1000) + INTERVAL ' + rotationSamples + ' DAY' - await ch.query(alter_table, undefined, function (err, data) { + await ch.query(alterTable, undefined, function (err/*, data */) { if (err) { console.log(err) } else if (debug) console.log('Samples Table altered for rotation!') // return true; }) - await ch.query(rotate_table, undefined, function (err, data) { + await ch.query(rotateTable, undefined, function (err/*, data */) { if (err) { console.log(err) } else if (debug) console.log('Samples Table rotation set to days: ' + rotationSamples) return true }) - var alter_table = 'ALTER TABLE ' + dbName + '.time_series MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' - var rotate_table = 'ALTER TABLE ' + dbName + '.time_series MODIFY TTL date + INTERVAL ' + rotationLabels + ' DAY' + alterTable = 'ALTER TABLE ' + dbName + '.time_series MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + rotateTable = 'ALTER TABLE ' + dbName + '.time_series MODIFY TTL date + INTERVAL ' + rotationLabels + ' DAY' - await ch.query(alter_table, undefined, function (err, data) { + await ch.query(alterTable, undefined, function (err/*, data */) { if (err) { console.log(err) } else if (debug) console.log('Labels Table altered for rotation!') return true }) - await ch.query(rotate_table, undefined, function (err, data) { + await ch.query(rotateTable, undefined, function (err/*, data */) { if (err) { console.log(err) } else if (debug) console.log('Labels Table rotation set to days: ' + rotationLabels) return true }) if (storagePolicy) { console.log('ALTER storage policy', storagePolicy) - const alter_ts = `ALTER TABLE ${dbName}.time_series MODIFY SETTING storagePolicy='${storagePolicy}'` - const alter_sm = `ALTER TABLE ${dbName}.samples MODIFY SETTING storagePolicy='${storagePolicy}'` + const alterTs = `ALTER TABLE ${dbName}.time_series MODIFY SETTING storagePolicy='${storagePolicy}'` + const alterSm = `ALTER TABLE ${dbName}.samples MODIFY SETTING storagePolicy='${storagePolicy}'` - await ch.query(alter_ts, undefined, function (err, data) { + await ch.query(alterTs, undefined, function (err/*, data */) { if (err) { console.log(err) } else if (debug) console.log('Storage policy update for fingerprints ' + storagePolicy) return true }) - await ch.query(alter_sm, undefined, function (err, data) { + await ch.query(alterSm, undefined, function (err/*, data */) { if (err) { console.log(err) } else if (debug) console.log('Storage policy update for samples ' + storagePolicy) return true }) @@ -244,26 +243,26 @@ const initialize = function (dbName) { state = 'READY' /* TODO: tsv2 - const tsv2 = await axios.get(`${protocol}://${clickhouseOptions.auth}@${clickhouseOptions.host}:${clickhouseOptions.port}/?query=SHOW TABLES FROM ${dbName} LIKE 'time_series_v2' FORMAT JSON`); - if (!tsv2.data.rows) { - const create_tsv2 = `CREATE TABLE IF NOT EXISTS ${dbName}.time_series_v2 - ( - date Date, - fingerprint UInt64, - labels Array(Tuple(String, String)), - labels_map Map(String, String), - name String - ) ENGINE = ReplacingMergeTree(date) PARTITION BY date ORDER BY fingerprint`; - await ch.query(create_tsv2, undefined, () => {}); - const insert = `INSERT INTO ${dbName}.time_series_v2 (date, fingerprint, labels, labels_map, name) - SELECT date, fingerprint, JSONExtractKeysAndValues(labels, 'String') as labels, - CAST(( - arrayMap(x -> x.1, JSONExtractKeysAndValues(labels, 'String')), - arrayMap(x -> x.2, JSONExtractKeysAndValues(labels, 'String'))), 'Map(String, String)') as labels_map, - name FROM ${dbName}.time_series`; - await axios.post(`${protocol}://${clickhouseOptions.auth}@${clickhouseOptions.host}:${clickhouseOptions.port}/`, - insert); - } */ + const tsv2 = await axios.get(`${protocol}://${clickhouseOptions.auth}@${clickhouseOptions.host}:${clickhouseOptions.port}/?query=SHOW TABLES FROM ${dbName} LIKE 'time_series_v2' FORMAT JSON`); + if (!tsv2.data.rows) { + const create_tsv2 = `CREATE TABLE IF NOT EXISTS ${dbName}.time_series_v2 + ( + date Date, + fingerprint UInt64, + labels Array(Tuple(String, String)), + labels_map Map(String, String), + name String + ) ENGINE = ReplacingMergeTree(date) PARTITION BY date ORDER BY fingerprint`; + await ch.query(create_tsv2, undefined, () => {}); + const insert = `INSERT INTO ${dbName}.time_series_v2 (date, fingerprint, labels, labels_map, name) + SELECT date, fingerprint, JSONExtractKeysAndValues(labels, 'String') as labels, + CAST(( + arrayMap(x -> x.1, JSONExtractKeysAndValues(labels, 'String')), + arrayMap(x -> x.2, JSONExtractKeysAndValues(labels, 'String'))), 'Map(String, String)') as labels_map, + name FROM ${dbName}.time_series`; + await axios.post(`${protocol}://${clickhouseOptions.auth}@${clickhouseOptions.host}:${clickhouseOptions.port}/`, + insert); + } */ reloadFingerprints() }) @@ -273,7 +272,7 @@ const checkCapabilities = async () => { console.log('Checking clickhouse capabilities: ') try { await axios.post(getClickhouseUrl() + '/?allow_experimental_live_view=1', - `CREATE LIVE VIEW ${clickhouseOptions.queryOptions.database}.lvcheck WITH TIMEOUT 1 AS SELECT 1`) + `CREATE LIVE VIEW ${clickhouseOptions.queryOptions.database}.lvcheck WITH TIMEOUT 1 AS SELECT 1`) capabilities.liveView = true console.log('LIVE VIEW: supported') } catch (e) { @@ -282,34 +281,34 @@ const checkCapabilities = async () => { } } -var reloadFingerprints = function () { +const reloadFingerprints = function () { console.log('Reloading Fingerprints...') - const select_query = `SELECT DISTINCT fingerprint, labels FROM ${clickhouseOptions.queryOptions.database}.time_series` - const stream = ch.query(select_query) + const selectQuery = `SELECT DISTINCT fingerprint, labels FROM ${clickhouseOptions.queryOptions.database}.time_series` + const stream = ch.query(selectQuery) // or collect records yourself const rows = [] stream.on('metadata', function (columns) { - // do something with column list + // do something with column list }) stream.on('data', function (row) { - rows.push(row) + rows.push(row) }) stream.on('error', function (err) { - // TODO: handler error + console.log(err) }) stream.on('end', function () { - rows.forEach(function (row) { + rows.forEach(function (row) { try { - const JSON_labels = toJSON(row[1].replace(/\!?=/g, ':')) - labels.add(row[0], JSON.stringify(JSON_labels)) - for (const key in JSON_labels) { + const JSONLabels = toJSON(row[1].replace(/\!?=/g, ':')) + labels.add(row[0], JSON.stringify(JSONLabels)) + for (const key in JSONLabels) { // if (debug) console.log('Adding key',row); labels.add('_LABELS_', key) - labels.add(key, JSON_labels[key]) - }; + labels.add(key, JSONLabels[key]) + } } catch (e) { console.error(e) } - }) - if (debug) console.log('Reloaded fingerprints:', rows.length + 1) + }) + if (debug) console.log('Reloaded fingerprints:', rows.length + 1) }) } @@ -333,7 +332,7 @@ function getClickhouseUrl () { * @param res {{res: {write: (function(string)), writeHead: (function(number, {}))}}} * @returns {Promise} */ -var queryFingerprintsScan = async function (query, res) { +const queryFingerprintsScan = async function (query, res) { if (debug) console.log('Scanning Fingerprints...') // console.log(_query.query); @@ -730,7 +729,7 @@ module.exports.watchLiveView = async (name, db, res, options) => { module.exports.databaseOptions = clickhouseOptions module.exports.database = clickhouse -module.exports.cache = { bulk: bulk, bulk_labels: bulk_labels, labels: labels } +module.exports.cache = { bulk: bulk, bulk_labels: bulkLabels, labels: labels } module.exports.scanFingerprints = scanFingerprints module.exports.queryFingerprintsScan = queryFingerprintsScan module.exports.scanMetricFingerprints = scanMetricFingerprints diff --git a/lib/handlers/push.js b/lib/handlers/push.js index 094cf004..d1d2e5be 100644 --- a/lib/handlers/push.js +++ b/lib/handlers/push.js @@ -45,9 +45,10 @@ function handler (req, res) { } if (streams) { streams.forEach(function (stream) { + let finger = null try { + let JSONLabels try { - var JSONLabels if (stream.stream) { JSONLabels = stream.stream } else { @@ -60,7 +61,7 @@ function handler (req, res) { return } // Calculate Fingerprint - var finger = self.fingerPrint(JSON.stringify(JSONLabels)) + finger = self.fingerPrint(JSON.stringify(JSONLabels)) if (self.debug) { console.log('LABELS FINGERPRINT', stream.labels, finger) } self.labels.add(finger, stream.labels) // Store Fingerprint @@ -119,6 +120,6 @@ function handler (req, res) { }) } res.send(200) -}; +} module.exports = handler diff --git a/lib/handlers/query.js b/lib/handlers/query.js index 58620d27..8ed75f50 100644 --- a/lib/handlers/query.js +++ b/lib/handlers/query.js @@ -6,16 +6,15 @@ function handler (req, res) { // console.log( req.urlData().query.replace('query=',' ') ); const allValues = this.labels.get(query.name) - if (!allValues || allValues.length == 0) { - var resp = { + if (this.debug) console.log('LABEL', query.name, 'VALUES', allValues) + if (!allValues || allValues.length === 0) { + res.send({ status: 'success', data: { resultType: 'streams', result: [] } - } + }) } else { - var resp = { values: allValues } + res.send({ values: allValues }) } - if (this.debug) console.log('LABEL', query.name, 'VALUES', allValues) - res.send(resp) -}; +} module.exports = handler diff --git a/lib/handlers/query_range.js b/lib/handlers/query_range.js index 054af657..f70faaee 100644 --- a/lib/handlers/query_range.js +++ b/lib/handlers/query_range.js @@ -25,14 +25,14 @@ async function handler (req, res) { const RATEQUERY = /(.*) by \((.*)\) \(rate\((.*)\[(.*)\]\)\) from (.*)\.(.*)/ const RATEQUERYWHERE = /(.*) by \((.*)\) \(rate\((.*)\[(.*)\]\)\) from (.*)\.(.*) where (.*)/ const RATEQUERYNOWHERE = /(.*) by \((.*)\) \(rate\((.*)\[(.*)\]\)\) from (.*)\.([\S]+)\s?(.*)/ - const RATEQUERYMETRICS = /(.*) by \((.*)\) \(rate\((.*)\[(.*)\]\)\)/ + // const RATEQUERYMETRICS = /(.*) by \((.*)\) \(rate\((.*)\[(.*)\]\)\)/ if (!req.query.query) { res.code(400).send('invalid query') } else if (RATEQUERYNOWHERE.test(req.query.query)) { - var s = RATEQUERYNOWHERE.exec(req.query.query) + const s = RATEQUERYNOWHERE.exec(req.query.query) console.log('tags', s) - var JSONLabels = { + const JSONLabels = { db: s[5], table: s[6], interval: s[4] || 60, @@ -42,9 +42,9 @@ async function handler (req, res) { } this.scanClickhouse(JSONLabels, res, params) } else if (RATEQUERYWHERE.test(req.query.query)) { - var s = RATEQUERYWHERE.exec(req.query.query) + const s = RATEQUERYWHERE.exec(req.query.query) console.log('tags', s) - var JSONLabels = { + const JSONLabels = { db: s[5], table: s[6], interval: s[4] || 60, @@ -54,9 +54,9 @@ async function handler (req, res) { } this.scanClickhouse(JSONLabels, res, params) } else if (RATEQUERY.test(req.query.query)) { - var s = RATEQUERY.exec(req.query.query) + const s = RATEQUERY.exec(req.query.query) console.log('tags', s) - var JSONLabels = { + const JSONLabels = { db: s[5], table: s[6], interval: s[4] || 60, @@ -76,11 +76,13 @@ async function handler (req, res) { }; this.scanMetricFingerprints(JSONLabels, res, params); } else */if (req.query.query.startsWith('clickhouse(')) { + let JSONLabels = null + let queries = null try { const query = /\{(.*?)\}/g.exec(req.query.query)[1] || req.query.query - var queries = query.replace(/\!?="/g, ':"') - var JSONLabels = this.toJSON(queries) + queries = query.replace(/\!?="/g, ':"') + JSONLabels = this.toJSON(queries) } catch (e) { console.error(e, queries) res.send(resp) @@ -97,8 +99,7 @@ async function handler (req, res) { console.log(e) res.send(resp) } - if (this.debug) console.log('SCAN LABELS', JSONLabels, params) } -}; +} module.exports = handler diff --git a/lib/handlers/series.js b/lib/handlers/series.js index 198b000a..d92740cd 100644 --- a/lib/handlers/series.js +++ b/lib/handlers/series.js @@ -2,15 +2,11 @@ const { scanSeries } = require('../db/clickhouse') // Example Handler async function handler (req, res) { - try { - if (!req.query.match) { - throw new Error('Match param is required') - } - await scanSeries(Array.isArray(req.query.match) ? req.query.match : [req.query.match], - { res: res.raw }) - } catch (e) { - throw e + if (!req.query.match) { + throw new Error('Match param is required') } + await scanSeries(Array.isArray(req.query.match) ? req.query.match : [req.query.match], + { res: res.raw }) } module.exports = handler diff --git a/lib/handlers/telegraf.js b/lib/handlers/telegraf.js index a58d9ce4..e27b383e 100644 --- a/lib/handlers/telegraf.js +++ b/lib/handlers/telegraf.js @@ -16,7 +16,7 @@ function handler (req, res) { console.error('No Request Body!', req) return } - if (readonly) { // TODO: this.readonly? + if (this.readonly) { console.error('Readonly! No push support.') res.send(500) return @@ -27,11 +27,12 @@ function handler (req, res) { if (streams) { if (this.debug) console.log('influx', streams) streams.forEach(function (stream) { + let finger = null try { const JSONLabels = stream.tags JSONLabels.metric = stream.name // Calculate Fingerprint - const finger = this.fingerPrint(JSON.stringify(JSONLabels)) + finger = this.fingerPrint(JSON.stringify(JSONLabels)) if (this.debug) { console.log('LABELS FINGERPRINT', JSONLabels, finger) } this.labels.add(finger, stream.labels) // Store Fingerprint @@ -73,6 +74,6 @@ function handler (req, res) { }) } res.send(200) -}; +} module.exports = handler diff --git a/lib/utils.js b/lib/utils.js index e38f0ca6..ff5e8da2 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -4,14 +4,17 @@ const labelParser = function (labels) { // Label Parser const rx = /\"?\b(\w+)\"?(!?=~?)("[^"\n]*?")/g - let matches; const output = [] - while (matches = rx.exec(labels)) { // TODO: Comparison or assigment check + let matches + const output = [] + matches = rx.exec(labels) + while (matches) { if (matches.length > 3) output.push([matches[1], matches[2], matches[3].replace(/['"]+/g, '')]) + matches = rx.exec(labels) } + let regex = false try { - var regex = /\}\s*(.*)/g.exec(labels)[1] || false + regex = /\}\s*(.*)/g.exec(labels)[1] || false } catch (e) { - var regex = false } return { labels: output, regex: regex } } @@ -24,15 +27,6 @@ const fingerPrint = function (text, hex) { const toJSON = require('jsonic') -/* clickhouse query parser */ -const clickParser = function (query) { - /* Example cQL format */ - /* clickhouse({db="mydb", table="mytable", tag="key", metric="avg(value)", interval=60}) */ - const regx = /clickhouse\((.*)\)/g - const clickQuery = regx.exec(req.query.query)[1] || false // TODO: req. correct? - return labelParser(clickQuery) -} - const parseOrDefault = (str, def) => { try { return str ? parseFloat(str) : def @@ -52,7 +46,6 @@ const parseMs = (time, def) => { module.exports.DATABASE_NAME = () => process.env.CLICKHOUSE_DB || 'cloki' module.exports.fingerPrint = fingerPrint module.exports.labelParser = labelParser -module.exports.clickParser = clickParser module.exports.toJSON = toJSON module.exports.parseMs = parseMs module.exports.parseOrDefault = parseOrDefault diff --git a/package.json b/package.json index f7feabf3..0a40de0b 100644 --- a/package.json +++ b/package.json @@ -10,12 +10,13 @@ "test": "jest", "start": "node cloki.js", "postinstall": "patch-package", - "lint": "npx standard --fix *.js lib parser plugins test" + "lint": "npx eslint --fix *.js lib parser plugins test" }, "standard": { "env": [ "jest" ] + }, "repository": { "type": "git", diff --git a/parser/bnf.js b/parser/bnf.js index ef6e0635..32e778d6 100644 --- a/parser/bnf.js +++ b/parser/bnf.js @@ -8,8 +8,9 @@ const registryNames = [ 'parser_registry', 'unwrap_registry' ] +const path = require('path') const registries = registryNames.reduce((sum, n) => { - sum[n] = require(`${__dirname}/registry/${n}`) + sum[n] = require(path.join(__dirname, 'registry', n)) return sum }, {}) const fs = require('fs') @@ -29,7 +30,7 @@ Token.prototype.Children = function (tokenType) { return tokens } -let bnf = fs.readFileSync(__dirname + '/logql.bnf').toString() +let bnf = fs.readFileSync(path.join(__dirname, 'logql.bnf')).toString() for (const reg of Object.keys(registries)) { const keys = Object.keys(registries[reg]).map(n => `"${n}"`) keys.sort((a, b) => b.length - a.length) diff --git a/parser/registry/common.js b/parser/registry/common.js index 76dcfa7c..c060d55a 100644 --- a/parser/registry/common.js +++ b/parser/registry/common.js @@ -76,7 +76,7 @@ module.exports.durationToMs = (durationStr) => { * @returns {DataStream} */ module.exports.map = (s, fn) => s.map((e) => { - return new Promise((resolve, reject) => { + return new Promise((resolve) => { setImmediate(() => { resolve(fn(e)) }) @@ -242,7 +242,7 @@ module.exports.applyViaStream = (token, query, */ (s) => s.remap((emit, e) => { if (!e || !e.labels) { - for (const [_, v] of results) { // TODO: Fix unused _ + for (const v of results.values()) { const ts = [...Object.entries(v.values)] ts.sort() for (const _v of ts) { diff --git a/parser/registry/stream_selector_operator_registry/stream_selector_operator_registry.js b/parser/registry/stream_selector_operator_registry/stream_selector_operator_registry.js index 85addf45..fca5ce8f 100644 --- a/parser/registry/stream_selector_operator_registry/stream_selector_operator_registry.js +++ b/parser/registry/stream_selector_operator_registry/stream_selector_operator_registry.js @@ -62,10 +62,10 @@ module.exports.simpleAnd = (query, clauses) => { /** * * @param token {Token} - * @param query {registry_types.Request} + * //@param query {registry_types.Request} * @returns {string[]} */ -module.exports.neqSimple = (token, query) => { +module.exports.neqSimple = (token/*, query */) => { const [label, value] = labelAndVal(token) return selectorClauses(false, false, label, value) } @@ -73,10 +73,10 @@ module.exports.neqSimple = (token, query) => { /** * * @param token {Token} - * @param query {registry_types.Request} + * //@param query {registry_types.Request} * @returns {string[]} */ -module.exports.neqExtraLabels = (token, query) => { +module.exports.neqExtraLabels = (token/*, query */) => { const [label, value] = labelAndVal(token) return [['OR', `arrayExists(x -> x.1 == '${label}' AND x.2 != '${value}', extra_labels) != 0`, [ @@ -87,22 +87,13 @@ module.exports.neqExtraLabels = (token, query) => { ]] } -/** - * - * @param s {DataStream} - * @param fn {function(Object): boolean} - */ -function filter (s, fn) { // TODO: Are we using this function? - return s.filter(e => (e && e.labels && fn(e)) || isEOF(e)) -} - /** * * @param token {Token} - * @param query {registry_types.Request} + * //@param query {registry_types.Request} * @returns {function({labels: Object}): boolean} */ -module.exports.neqStream = (token, query) => { +module.exports.neqStream = (token/*, query */) => { const [label, value] = labelAndVal(token) return (e) => e.labels[label] && e.labels[label] !== value } @@ -110,10 +101,10 @@ module.exports.neqStream = (token, query) => { /** * * @param token {Token} - * @param query {registry_types.Request} + * //@param query {registry_types.Request} * @returns {string[]} */ -module.exports.nregSimple = (token, query) => { +module.exports.nregSimple = (token/*, query */) => { const [label, value] = labelAndVal(token) return selectorClauses(true, false, label, value) } @@ -121,10 +112,10 @@ module.exports.nregSimple = (token, query) => { /** * * @param token {Token} - * @param query {registry_types.Request} + * //@param query {registry_types.Request} * @returns {string[]} */ -module.exports.nregExtraLabels = (token, query) => { +module.exports.nregExtraLabels = (token/*, query */) => { const [label, value] = labelAndVal(token) return [['OR', `arrayExists(x -> x.1 == '${label}' AND extractAllGroups(x.2, '(${value})') == [], extra_labels) != 0`, @@ -139,10 +130,10 @@ module.exports.nregExtraLabels = (token, query) => { /** * * @param token {Token} - * @param query {registry_types.Request} + * //@param query {registry_types.Request} * @returns {function({labels: Object}): boolean} */ -module.exports.nregStream = (token, query) => { +module.exports.nregStream = (token/*, query */) => { const [label, value] = labelAndVal(token) const re = new RegExp(value) return (e) => e.labels[label] && !e.labels[label].match(re) @@ -151,10 +142,10 @@ module.exports.nregStream = (token, query) => { /** * * @param token {Token} - * @param query {registry_types.Request} + * //@param query {registry_types.Request} * @returns {string[]} */ -module.exports.regSimple = (token, query) => { +module.exports.regSimple = (token/*, query */) => { const [label, value] = labelAndVal(token) return selectorClauses(true, true, label, value) } @@ -162,10 +153,10 @@ module.exports.regSimple = (token, query) => { /** * * @param token {Token} - * @param query {registry_types.Request} + * //@param query {registry_types.Request} * @returns {string[]} */ -module.exports.regExtraLabels = (token, query) => { +module.exports.regExtraLabels = (token/*, query */) => { const [label, value] = labelAndVal(token) return [['OR', `arrayExists(x -> x.1 == '${label}' AND extractAllGroups(x.2, '(${value})') != [], extra_labels) != 0`, @@ -180,32 +171,32 @@ module.exports.regExtraLabels = (token, query) => { /** * * @param token {Token} - * @param query {registry_types.Request} + * //@param query {registry_types.Request} * @returns {function({labels: Object}): boolean} */ -module.exports.regStream = (token, query) => { +module.exports.regStream = (token/*, query */) => { const [label, value] = labelAndVal(token) const re = new RegExp(value) - return (e) => e.EOF || (e && e.labels && e.labels[label] && e.labels[label].match(re)) + return (e) => isEOF(e) || (e && e.labels && e.labels[label] && e.labels[label].match(re)) } /** * * @param token {Token} - * @param query {registry_types.Request} + * //@param query {registry_types.Request} * @returns {string[]} */ -module.exports.eqSimple = (token, query) => { +module.exports.eqSimple = (token/*, query */) => { const [label, value] = labelAndVal(token) return selectorClauses(false, true, label, value) } /** * * @param token {Token} - * @param query {registry_types.Request} + * //@param query {registry_types.Request} * @returns {string[]} */ -module.exports.eqExtraLabels = (token, query) => { +module.exports.eqExtraLabels = (token/*, query */) => { const [label, value] = labelAndVal(token) return [['OR', `indexOf(extra_labels, ('${label}', '${value}')) > 0`, @@ -220,10 +211,10 @@ module.exports.eqExtraLabels = (token, query) => { /** * * @param token {Token} - * @param query {registry_types.Request} + * //@param query {registry_types.Request} * @returns {function({labels: Object}): boolean} */ -module.exports.eqStream = (token, query) => { +module.exports.eqStream = (token/*, query */) => { const [label, value] = labelAndVal(token) - return (e) => e.EOF || (e && e.labels && e.labels[label] && e.labels[label] === value) + return (e) => isEOF(e) || (e && e.labels && e.labels[label] && e.labels[label] === value) } diff --git a/plugins/base/base.js b/plugins/base/base.js index 95b57696..3fc09ee2 100644 --- a/plugins/base/base.js +++ b/plugins/base/base.js @@ -1,7 +1,12 @@ const { PluginLoaderBase } = require('plugnplay') module.exports = class extends PluginLoaderBase { exportSync () { - const res = { validate: (plg) => res.props = Object.keys(plg) } // TODO: FIX this error + const res = { + validate: (plg) => { + res.props = Object.keys(plg) + return res.props + } + } return res } } diff --git a/plugins/base/unwrap_registry/index.js b/plugins/base/unwrap_registry/index.js index 48334cdc..6e684ac0 100644 --- a/plugins/base/unwrap_registry/index.js +++ b/plugins/base/unwrap_registry/index.js @@ -2,8 +2,10 @@ const Base = require('../base') module.exports = class extends Base { exportSync () { const res = { - validate: (plg) => - res.props = Object.entries(plg).filter(e => e[1].run && e[1].approx).map(e => e[0]) // TODO: fix me + validate: (plg) => { + res.props = Object.entries(plg).filter(e => e[1].run && e[1].approx).map(e => e[0]) + return res.props + } } return res } diff --git a/test/e2e.test.js b/test/e2e.test.js index 399723e1..f1d1d202 100644 --- a/test/e2e.test.js +++ b/test/e2e.test.js @@ -3,7 +3,7 @@ const axios = require('axios') const { WebSocket } = require('ws') // const pb = require("protobufjs"); const e2e = () => process.env.INTEGRATION_E2E || process.env.INTEGRATION -const cloki_local = () => process.env.CLOKI_LOCAL || false +const clokiLocal = () => process.env.CLOKI_LOCAL || false let l = null // const root = pb.loadSync(__dirname + "/../lib/loki.proto"); @@ -19,14 +19,14 @@ function setup () { if (!e2e()) { return } - if (!cloki_local()) l = require('../cloki') - return new Promise(f => setTimeout(f, 1000)) + if (!clokiLocal()) l = require('../cloki') + return new Promise(resolve => setTimeout(resolve, 1000)) } afterAll(() => { if (!e2e()) { return } - if (!cloki_local()) l.stop() + if (!clokiLocal()) l.stop() }) /* async function pushPBPoints(endpoint, points) { @@ -49,7 +49,7 @@ it('e2e', async () => { return } console.log('Waiting 2s before all inits') - await new Promise(f => setTimeout(f, 2000)) + await new Promise(resolve => setTimeout(resolve, 2000)) const testID = Math.random() + '' console.log(testID) const start = Math.floor((Date.now() - 60 * 1000 * 10) / 60 / 1000) * 60 * 1000 @@ -64,7 +64,7 @@ it('e2e', async () => { (i) => JSON.stringify({ lbl_repl: 'REPL', int_val: '1', new_lbl: 'new_val', str_id: i, arr: [1, 2, 3], obj: { o_1: 'v_1' } }) ) await sendPoints('http://localhost:3100', points) - await new Promise(f => setTimeout(f, 4000)) + await new Promise(resolve => setTimeout(resolve, 4000)) const adjustResult = (resp, id, _start) => { _start = _start || start id = id || testID @@ -293,9 +293,9 @@ it('e2e', async () => { const points = createPoints(testID + '_ws', 1, wsStart + i * 1000, wsStart + i * 1000 + 1000, {}, {}, () => `MSG_${i}`) sendPoints('http://localhost:3100', points) - await new Promise(f => setTimeout(f, 1000)) + await new Promise(resolve => setTimeout(resolve, 1000)) } - await new Promise(f => setTimeout(f, 6000)) + await new Promise(resolve => setTimeout(resolve, 6000)) ws.close() for (const res of resp.data.data.result) { res.values.sort() @@ -316,7 +316,7 @@ it('e2e', async () => { }) resp.data.data.sort((a, b) => JSON.stringify(a).localeCompare(JSON.stringify(b))) expect(resp.data).toMatchSnapshot() - await new Promise(f => setTimeout(f, 1000)) + await new Promise(resolve => setTimeout(resolve, 1000)) resp = await runRequest(`{test_id="${testID}"} | freq > 1 and (freq="4" or freq==2 or freq > 0.5)`) adjustResult(resp, testID) expect(resp.data.data.result.map(s => [s.stream, s.values.length])).toMatchSnapshot() diff --git a/test/insert.same.data.test.js b/test/insert.same.data.test.js index cd047339..abb9cc7c 100644 --- a/test/insert.same.data.test.js +++ b/test/insert.same.data.test.js @@ -1,4 +1,3 @@ -const axios = require('axios') const fs = require('fs') const { createPoints, sendPoints } = require('./common') @@ -21,7 +20,7 @@ beforeAll(async () => { return } l = require('../cloki') - await new Promise((resolve, reject) => setTimeout(resolve, 500)) + await new Promise((resolve) => setTimeout(resolve, 500)) }) afterAll(() => { @@ -42,5 +41,5 @@ it('should stream the same data to loki / cloki', async () => { fs.writeFileSync('points.json', JSON.stringify({ streams: Object.values(points) })) await sendPoints('http://localhost:3100', points) await sendPoints(process.env.LOKI_ENDPOINT, points) - await new Promise((resolve, reject) => setTimeout(resolve, 1000)) + await new Promise((resolve) => setTimeout(resolve, 1000)) }) diff --git a/test/transpiler.test.js b/test/transpiler.test.js index f83a849a..5db64e8c 100644 --- a/test/transpiler.test.js +++ b/test/transpiler.test.js @@ -7,7 +7,7 @@ beforeAll(() => { }) it('should transpile log_stream_selector', () => { - let scr = '{et_dolorem=`nemo doloremque`, quia=\"eum voluptatem non eligendi\"}' + let scr = '{et_dolorem=`nemo doloremque`, quia="eum voluptatem non eligendi"}' let script = bnf.ParseScript(scr) let query = transpiler.transpileLogStreamSelector(script.rootToken, transpiler.initQuery()) expect(query).toMatchSnapshot() @@ -33,7 +33,7 @@ it('should transpile log_stream_selector', () => { }) it('should transpile log_stream_selector with stream filter', () => { - let scr = '{et_dolorem=`nemo doloremque`, quia=\"eum voluptatem non eligendi\"} |= "at et"' + let scr = '{et_dolorem=`nemo doloremque`, quia="eum voluptatem non eligendi"} |= "at et"' let script = bnf.ParseScript(scr) let query = transpiler.transpileLogStreamSelector(script.rootToken, transpiler.initQuery()) expect(query).toMatchSnapshot()