Skip to content

Commit

Permalink
lint fix
Browse files Browse the repository at this point in the history
  • Loading branch information
akvlad committed Nov 10, 2021
1 parent e59f786 commit 1e03307
Show file tree
Hide file tree
Showing 17 changed files with 176 additions and 186 deletions.
3 changes: 2 additions & 1 deletion .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
},
"plugins": ["jest"],
"rules": {
"no-template-curly-in-string": "off"
"no-template-curly-in-string": "off",
"no-useless-escape": "off"
}
}
163 changes: 81 additions & 82 deletions lib/db/clickhouse.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ let state = 'INITIALIZING'
const clickhouse = new ClickHouse(clickhouseOptions)
let ch

const samples = []
const timeSeries = []

class TimeoutThrottler {
constructor (statement) {
this.statement = statement
Expand Down Expand Up @@ -72,7 +69,7 @@ class TimeoutThrottler {
}
const p = Date.now() - ts
if (p < 100) {
await new Promise((resolve, reject) => setTimeout(resolve, 100 - p))
await new Promise((resolve) => setTimeout(resolve, 100 - p))
}
}
})
Expand All @@ -83,8 +80,8 @@ class TimeoutThrottler {
if (len < 1) {
return
}
const ts = Date.now()
const resp = await axios.post(`${getClickhouseUrl()}/?query=${this.statement}`,

await axios.post(`${getClickhouseUrl()}/?query=${this.statement}`,
this.queue.join('\n')
)
this.queue = this.queue.slice(len)
Expand All @@ -101,15 +98,16 @@ const timeSeriesThrottler = new TimeoutThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.time_series(date, fingerprint, labels, name) FORMAT JSONEachRow`)
/* TODO: tsv2
const timeSeriesv2Throttler = new TimeoutThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.time_series_v2(date, fingerprint, labels, name) FORMAT JSONEachRow`); */
`INSERT INTO ${clickhouseOptions.queryOptions.database}.time_series_v2(date, fingerprint, labels, name) FORMAT JSONEachRow`); */
samplesThrottler.start()
timeSeriesThrottler.start()
// timeSeriesv2Throttler.start();

/* Cache Helper */
const recordCache = require('record-cache')
const onStale = function (data) {
for (const [key, value] of data.records.entries()) {
for (const entry of data.records.entries()) {
const value = entry[1]
samplesThrottler.queue.push.apply(samplesThrottler.queue, value.list.map(r => JSON.stringify({
fingerprint: r.record[0],
timestamp_ms: r.record[1],
Expand All @@ -118,22 +116,23 @@ const onStale = function (data) {
})))
}
}
const onStale_labels = function (data) {
for (const [key, value] of data.records.entries()) {
timeSeriesThrottler.queue.push.apply(timeSeriesThrottler.queue, value.list.map(r => JSON.stringify({
const onStaleLabels = function (data) {
for (const entry of data.records.entries()) {
const value = entry[1]
timeSeriesThrottler.queue.push.apply(timeSeriesThrottler.queue, value.list.map(r => JSON.stringify({
date: r.record[0],
fingerprint: r.record[1],
labels: r.record[2],
name: r.record[3]
})))
/* TODO: tsv2
timeSeriesv2Throttler.queue.push.apply(timeSeriesv2Throttler.queue, value.list.map(r => JSON.stringify({
date: r.record[0],
fingerprint: r.record[1],
labels: JSON.parse(r.record[2]),
name: r.record[3]
})));
*/
/* TODO: tsv2
timeSeriesv2Throttler.queue.push.apply(timeSeriesv2Throttler.queue, value.list.map(r => JSON.stringify({
date: r.record[0],
fingerprint: r.record[1],
labels: JSON.parse(r.record[2]),
name: r.record[3]
})));
*/
}
}

Expand All @@ -144,10 +143,10 @@ const bulk = recordCache({
onStale: onStale
})

const bulk_labels = recordCache({
const bulkLabels = recordCache({
maxSize: 100,
maxAge: 500,
onStale: onStale_labels
onStale: onStaleLabels
})

// In-Memory LRU for quick lookups
Expand All @@ -164,76 +163,76 @@ const initialize = function (dbName) {
const tmp = { ...clickhouseOptions, queryOptions: { database: '' } }
ch = new ClickHouse(tmp)

const hack_ch = (ch) => {
const hackCh = (ch) => {
ch._query = ch.query
ch.query = (q, opts, cb) => {
return new Promise(f => ch._query(q, opts, (err, data) => {
return new Promise(resolve => ch._query(q, opts, (err, data) => {
cb(err, data)
f()
resolve()
}))
}
}

ch.query(dbQuery, undefined, async function (err, data) {
ch.query(dbQuery, undefined, async function (err/*, data */) {
if (err) { console.error('error', err); return }
const ch = new ClickHouse(clickhouseOptions)
hack_ch(ch)
hackCh(ch)
console.log('CREATE TABLES', dbName)

let ts_table = 'CREATE TABLE IF NOT EXISTS ' + dbName + '.time_series (date Date,fingerprint UInt64,labels String, name String) ENGINE = ReplacingMergeTree(date) PARTITION BY date ORDER BY fingerprint'
let sm_table = 'CREATE TABLE IF NOT EXISTS ' + dbName + '.samples (fingerprint UInt64,timestamp_ms Int64,value Float64,string String) ENGINE = MergeTree PARTITION BY toRelativeHourNum(toDateTime(timestamp_ms / 1000)) ORDER BY (fingerprint, timestamp_ms)'
let tsTable = 'CREATE TABLE IF NOT EXISTS ' + dbName + '.time_series (date Date,fingerprint UInt64,labels String, name String) ENGINE = ReplacingMergeTree(date) PARTITION BY date ORDER BY fingerprint'
let smTable = 'CREATE TABLE IF NOT EXISTS ' + dbName + '.samples (fingerprint UInt64,timestamp_ms Int64,value Float64,string String) ENGINE = MergeTree PARTITION BY toRelativeHourNum(toDateTime(timestamp_ms / 1000)) ORDER BY (fingerprint, timestamp_ms)'

if (storagePolicy) {
console.log('ADD SETTINGS storage policy', storagePolicy)
const set_storage = ` SETTINGS storagePolicy='${storagePolicy}'`
ts_table += set_storage
sm_table += set_storage
const setStorage = ` SETTINGS storagePolicy='${storagePolicy}'`
tsTable += setStorage
smTable += setStorage
}

await ch.query(ts_table, undefined, function (err, data) {
await ch.query(tsTable, undefined, function (err/*, data */) {
if (err) { console.log(err); process.exit(1) } else if (debug) console.log('Timeseries Table ready!')
console.log('Timeseries Table ready!')
return true
})
await ch.query(sm_table, undefined, function (err, data) {
await ch.query(smTable, undefined, function (err/*, data */) {
if (err) { console.log(err); process.exit(1) } else if (debug) console.log('Samples Table ready!')
return true
})

var alter_table = 'ALTER TABLE ' + dbName + '.samples MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192'
var rotate_table = 'ALTER TABLE ' + dbName + '.samples MODIFY TTL toDateTime(timestamp_ms / 1000) + INTERVAL ' + rotationSamples + ' DAY'
let alterTable = 'ALTER TABLE ' + dbName + '.samples MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192'
let rotateTable = 'ALTER TABLE ' + dbName + '.samples MODIFY TTL toDateTime(timestamp_ms / 1000) + INTERVAL ' + rotationSamples + ' DAY'

await ch.query(alter_table, undefined, function (err, data) {
await ch.query(alterTable, undefined, function (err/*, data */) {
if (err) { console.log(err) } else if (debug) console.log('Samples Table altered for rotation!')
// return true;
})
await ch.query(rotate_table, undefined, function (err, data) {
await ch.query(rotateTable, undefined, function (err/*, data */) {
if (err) { console.log(err) } else if (debug) console.log('Samples Table rotation set to days: ' + rotationSamples)
return true
})

var alter_table = 'ALTER TABLE ' + dbName + '.time_series MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192'
var rotate_table = 'ALTER TABLE ' + dbName + '.time_series MODIFY TTL date + INTERVAL ' + rotationLabels + ' DAY'
alterTable = 'ALTER TABLE ' + dbName + '.time_series MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192'
rotateTable = 'ALTER TABLE ' + dbName + '.time_series MODIFY TTL date + INTERVAL ' + rotationLabels + ' DAY'

await ch.query(alter_table, undefined, function (err, data) {
await ch.query(alterTable, undefined, function (err/*, data */) {
if (err) { console.log(err) } else if (debug) console.log('Labels Table altered for rotation!')
return true
})
await ch.query(rotate_table, undefined, function (err, data) {
await ch.query(rotateTable, undefined, function (err/*, data */) {
if (err) { console.log(err) } else if (debug) console.log('Labels Table rotation set to days: ' + rotationLabels)
return true
})

if (storagePolicy) {
console.log('ALTER storage policy', storagePolicy)
const alter_ts = `ALTER TABLE ${dbName}.time_series MODIFY SETTING storagePolicy='${storagePolicy}'`
const alter_sm = `ALTER TABLE ${dbName}.samples MODIFY SETTING storagePolicy='${storagePolicy}'`
const alterTs = `ALTER TABLE ${dbName}.time_series MODIFY SETTING storagePolicy='${storagePolicy}'`
const alterSm = `ALTER TABLE ${dbName}.samples MODIFY SETTING storagePolicy='${storagePolicy}'`

await ch.query(alter_ts, undefined, function (err, data) {
await ch.query(alterTs, undefined, function (err/*, data */) {
if (err) { console.log(err) } else if (debug) console.log('Storage policy update for fingerprints ' + storagePolicy)
return true
})
await ch.query(alter_sm, undefined, function (err, data) {
await ch.query(alterSm, undefined, function (err/*, data */) {
if (err) { console.log(err) } else if (debug) console.log('Storage policy update for samples ' + storagePolicy)
return true
})
Expand All @@ -244,26 +243,26 @@ const initialize = function (dbName) {
state = 'READY'

/* TODO: tsv2
const tsv2 = await axios.get(`${protocol}://${clickhouseOptions.auth}@${clickhouseOptions.host}:${clickhouseOptions.port}/?query=SHOW TABLES FROM ${dbName} LIKE 'time_series_v2' FORMAT JSON`);
if (!tsv2.data.rows) {
const create_tsv2 = `CREATE TABLE IF NOT EXISTS ${dbName}.time_series_v2
(
date Date,
fingerprint UInt64,
labels Array(Tuple(String, String)),
labels_map Map(String, String),
name String
) ENGINE = ReplacingMergeTree(date) PARTITION BY date ORDER BY fingerprint`;
await ch.query(create_tsv2, undefined, () => {});
const insert = `INSERT INTO ${dbName}.time_series_v2 (date, fingerprint, labels, labels_map, name)
SELECT date, fingerprint, JSONExtractKeysAndValues(labels, 'String') as labels,
CAST((
arrayMap(x -> x.1, JSONExtractKeysAndValues(labels, 'String')),
arrayMap(x -> x.2, JSONExtractKeysAndValues(labels, 'String'))), 'Map(String, String)') as labels_map,
name FROM ${dbName}.time_series`;
await axios.post(`${protocol}://${clickhouseOptions.auth}@${clickhouseOptions.host}:${clickhouseOptions.port}/`,
insert);
} */
const tsv2 = await axios.get(`${protocol}://${clickhouseOptions.auth}@${clickhouseOptions.host}:${clickhouseOptions.port}/?query=SHOW TABLES FROM ${dbName} LIKE 'time_series_v2' FORMAT JSON`);
if (!tsv2.data.rows) {
const create_tsv2 = `CREATE TABLE IF NOT EXISTS ${dbName}.time_series_v2
(
date Date,
fingerprint UInt64,
labels Array(Tuple(String, String)),
labels_map Map(String, String),
name String
) ENGINE = ReplacingMergeTree(date) PARTITION BY date ORDER BY fingerprint`;
await ch.query(create_tsv2, undefined, () => {});
const insert = `INSERT INTO ${dbName}.time_series_v2 (date, fingerprint, labels, labels_map, name)
SELECT date, fingerprint, JSONExtractKeysAndValues(labels, 'String') as labels,
CAST((
arrayMap(x -> x.1, JSONExtractKeysAndValues(labels, 'String')),
arrayMap(x -> x.2, JSONExtractKeysAndValues(labels, 'String'))), 'Map(String, String)') as labels_map,
name FROM ${dbName}.time_series`;
await axios.post(`${protocol}://${clickhouseOptions.auth}@${clickhouseOptions.host}:${clickhouseOptions.port}/`,
insert);
} */

reloadFingerprints()
})
Expand All @@ -273,7 +272,7 @@ const checkCapabilities = async () => {
console.log('Checking clickhouse capabilities: ')
try {
await axios.post(getClickhouseUrl() + '/?allow_experimental_live_view=1',
`CREATE LIVE VIEW ${clickhouseOptions.queryOptions.database}.lvcheck WITH TIMEOUT 1 AS SELECT 1`)
`CREATE LIVE VIEW ${clickhouseOptions.queryOptions.database}.lvcheck WITH TIMEOUT 1 AS SELECT 1`)
capabilities.liveView = true
console.log('LIVE VIEW: supported')
} catch (e) {
Expand All @@ -282,34 +281,34 @@ const checkCapabilities = async () => {
}
}

var reloadFingerprints = function () {
const reloadFingerprints = function () {
console.log('Reloading Fingerprints...')
const select_query = `SELECT DISTINCT fingerprint, labels FROM ${clickhouseOptions.queryOptions.database}.time_series`
const stream = ch.query(select_query)
const selectQuery = `SELECT DISTINCT fingerprint, labels FROM ${clickhouseOptions.queryOptions.database}.time_series`
const stream = ch.query(selectQuery)
// or collect records yourself
const rows = []
stream.on('metadata', function (columns) {
// do something with column list
// do something with column list
})
stream.on('data', function (row) {
rows.push(row)
rows.push(row)
})
stream.on('error', function (err) {
// TODO: handler error
console.log(err)
})
stream.on('end', function () {
rows.forEach(function (row) {
rows.forEach(function (row) {
try {
const JSON_labels = toJSON(row[1].replace(/\!?=/g, ':'))
labels.add(row[0], JSON.stringify(JSON_labels))
for (const key in JSON_labels) {
const JSONLabels = toJSON(row[1].replace(/\!?=/g, ':'))
labels.add(row[0], JSON.stringify(JSONLabels))
for (const key in JSONLabels) {
// if (debug) console.log('Adding key',row);
labels.add('_LABELS_', key)
labels.add(key, JSON_labels[key])
};
labels.add(key, JSONLabels[key])
}
} catch (e) { console.error(e) }
})
if (debug) console.log('Reloaded fingerprints:', rows.length + 1)
})
if (debug) console.log('Reloaded fingerprints:', rows.length + 1)
})
}

Expand All @@ -333,7 +332,7 @@ function getClickhouseUrl () {
* @param res {{res: {write: (function(string)), writeHead: (function(number, {}))}}}
* @returns {Promise<void>}
*/
var queryFingerprintsScan = async function (query, res) {
const queryFingerprintsScan = async function (query, res) {
if (debug) console.log('Scanning Fingerprints...')

// console.log(_query.query);
Expand Down Expand Up @@ -730,7 +729,7 @@ module.exports.watchLiveView = async (name, db, res, options) => {

module.exports.databaseOptions = clickhouseOptions
module.exports.database = clickhouse
module.exports.cache = { bulk: bulk, bulk_labels: bulk_labels, labels: labels }
module.exports.cache = { bulk: bulk, bulk_labels: bulkLabels, labels: labels }
module.exports.scanFingerprints = scanFingerprints
module.exports.queryFingerprintsScan = queryFingerprintsScan
module.exports.scanMetricFingerprints = scanMetricFingerprints
Expand Down
7 changes: 4 additions & 3 deletions lib/handlers/push.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ function handler (req, res) {
}
if (streams) {
streams.forEach(function (stream) {
let finger = null
try {
let JSONLabels
try {
var JSONLabels
if (stream.stream) {
JSONLabels = stream.stream
} else {
Expand All @@ -60,7 +61,7 @@ function handler (req, res) {
return
}
// Calculate Fingerprint
var finger = self.fingerPrint(JSON.stringify(JSONLabels))
finger = self.fingerPrint(JSON.stringify(JSONLabels))
if (self.debug) { console.log('LABELS FINGERPRINT', stream.labels, finger) }
self.labels.add(finger, stream.labels)
// Store Fingerprint
Expand Down Expand Up @@ -119,6 +120,6 @@ function handler (req, res) {
})
}
res.send(200)
};
}

module.exports = handler
13 changes: 6 additions & 7 deletions lib/handlers/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@ function handler (req, res) {

// console.log( req.urlData().query.replace('query=',' ') );
const allValues = this.labels.get(query.name)
if (!allValues || allValues.length == 0) {
var resp = {
if (this.debug) console.log('LABEL', query.name, 'VALUES', allValues)
if (!allValues || allValues.length === 0) {
res.send({
status: 'success',
data: { resultType: 'streams', result: [] }
}
})
} else {
var resp = { values: allValues }
res.send({ values: allValues })
}
if (this.debug) console.log('LABEL', query.name, 'VALUES', allValues)
res.send(resp)
};
}

module.exports = handler
Loading

0 comments on commit 1e03307

Please sign in to comment.