Skip to content

Commit

Permalink
tests pass
Browse files Browse the repository at this point in the history
  • Loading branch information
akvlad committed Nov 10, 2021
1 parent b5dc7bd commit e59f786
Show file tree
Hide file tree
Showing 12 changed files with 163,472 additions and 66 deletions.
2 changes: 2 additions & 0 deletions .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
"extends": [
"standard"
],

"parserOptions": {
"ecmaVersion": 2021
},
"plugins": ["jest"],
"rules": {
"no-template-curly-in-string": "off"
}
}
14 changes: 7 additions & 7 deletions lib/db/clickhouse.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class TimeoutThrottler {
}

const samplesThrottler = new TimeoutThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.samples(fingerprint, timestampMs, value, string) FORMAT JSONEachRow`)
`INSERT INTO ${clickhouseOptions.queryOptions.database}.samples(fingerprint, timestamp_ms, value, string) FORMAT JSONEachRow`)
const timeSeriesThrottler = new TimeoutThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.time_series(date, fingerprint, labels, name) FORMAT JSONEachRow`)
/* TODO: tsv2
Expand All @@ -112,7 +112,7 @@ const onStale = function (data) {
for (const [key, value] of data.records.entries()) {
samplesThrottler.queue.push.apply(samplesThrottler.queue, value.list.map(r => JSON.stringify({
fingerprint: r.record[0],
timestampMs: r.record[1],
timestamp_ms: r.record[1],
value: r.record[2],
string: r.record[3]
})))
Expand Down Expand Up @@ -181,7 +181,7 @@ const initialize = function (dbName) {
console.log('CREATE TABLES', dbName)

let ts_table = 'CREATE TABLE IF NOT EXISTS ' + dbName + '.time_series (date Date,fingerprint UInt64,labels String, name String) ENGINE = ReplacingMergeTree(date) PARTITION BY date ORDER BY fingerprint'
let sm_table = 'CREATE TABLE IF NOT EXISTS ' + dbName + '.samples (fingerprint UInt64,timestampMs Int64,value Float64,string String) ENGINE = MergeTree PARTITION BY toRelativeHourNum(toDateTime(timestampMs / 1000)) ORDER BY (fingerprint, timestampMs)'
let sm_table = 'CREATE TABLE IF NOT EXISTS ' + dbName + '.samples (fingerprint UInt64,timestamp_ms Int64,value Float64,string String) ENGINE = MergeTree PARTITION BY toRelativeHourNum(toDateTime(timestamp_ms / 1000)) ORDER BY (fingerprint, timestamp_ms)'

if (storagePolicy) {
console.log('ADD SETTINGS storage policy', storagePolicy)
Expand All @@ -201,7 +201,7 @@ const initialize = function (dbName) {
})

var alter_table = 'ALTER TABLE ' + dbName + '.samples MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192'
var rotate_table = 'ALTER TABLE ' + dbName + '.samples MODIFY TTL toDateTime(timestampMs / 1000) + INTERVAL ' + rotationSamples + ' DAY'
var rotate_table = 'ALTER TABLE ' + dbName + '.samples MODIFY TTL toDateTime(timestamp_ms / 1000) + INTERVAL ' + rotationSamples + ' DAY'

await ch.query(alter_table, undefined, function (err, data) {
if (err) { console.log(err) } else if (debug) console.log('Samples Table altered for rotation!')
Expand Down Expand Up @@ -379,7 +379,7 @@ const processResponseStream = async (query, _stream, res) => {
stream = []
}
lastLabel = row.labels
row.timestampMs && stream.push([(parseInt(row.timestampMs) * 1000000).toString(), row.string])
row.timestamp_ms && stream.push([(parseInt(row.timestamp_ms) * 1000000).toString(), row.string])
})
} else {
const step = query.step || 5000
Expand All @@ -403,7 +403,7 @@ const processResponseStream = async (query, _stream, res) => {
}

lastLabel = row.labels
const timestampMs = parseInt(row.timestampMs)
const timestampMs = parseInt(row.timestamp_ms)
if (timestampMs < nextTime) {
return
}
Expand Down Expand Up @@ -638,7 +638,7 @@ const scanClickhouse = function (settings, client, params) {
* @param res {{res: {write: (function(string)), writeHead: (function(number, {}))}}}
*/
const getSeries = async (matches, res) => {
const query = transpiler.transpile_series(matches)
const query = transpiler.transpileSeries(matches)
const stream = await axios.post(`${getClickhouseUrl()}`, query + ' FORMAT JSONEachRow', {
responseType: 'stream'
})
Expand Down
2 changes: 1 addition & 1 deletion lib/db/watcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ module.exports = class extends EventEmitter {

async initQueryWatchPoll () {
this.watch = true
const request = transpiler.transpile_tail(this.request)
const request = transpiler.transpileTail(this.request)
const name = `watcher_${this.uid.toString().substr(2)}`
await createLiveView(name, request.query, { timeout_sec: 10 })
this.flushInterval = setInterval(this.flush.bind(this), 1000)
Expand Down
2 changes: 1 addition & 1 deletion lib/handlers/series.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ async function handler (req, res) {
} catch (e) {
throw e
}
};
}

module.exports = handler
36 changes: 18 additions & 18 deletions parser/registry/complex_label_filter_expression.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ module.exports = (token, query) => {
ex = ex[0] === 'and' ? ex.slice(1) : [ex]
return hasExtraLabels(query)
? _and(query, ex)
: reg.simple_and(query, ex)
: reg.simpleAnd(query, ex)
}

/**
Expand Down Expand Up @@ -105,7 +105,7 @@ const processWhereExpression = (token, query) => {
where = whereConcat(where, andOr, processWhereExpression(t, query))
}
}
if (t.name === 'andOr') {
if (t.name === 'and_or') {
andOr = t.value
}
}
Expand All @@ -124,16 +124,16 @@ const getLabelFilterWhereExpression = (token, query) => {
if (token.Child('string_label_filter_expression')) {
switch (token.Child('operator').value) {
case '=':
clauses = hasExtraLabels(query) ? reg.eq_extra_labels(token) : reg.eq_simple(token)
clauses = hasExtraLabels(query) ? reg.eqExtraLabels(token) : reg.eqSimple(token)
break
case '!=':
clauses = hasExtraLabels(query) ? reg.neq_extra_labels(token) : reg.neq_simple(token)
clauses = hasExtraLabels(query) ? reg.neqExtraLabels(token) : reg.neqSimple(token)
break
case '=~':
clauses = hasExtraLabels(query) ? reg.reg_extra_labels(token) : reg.nreg_extra_labels(token)
clauses = hasExtraLabels(query) ? reg.regExtraLabels(token) : reg.regSimple(token)
break
case '!~':
clauses = hasExtraLabels(query) ? reg.nreg_extra_labels(token) : reg.nreg_simple(token)
clauses = hasExtraLabels(query) ? reg.nregExtraLabels(token) : reg.nregSimple(token)
break
default:
throw new Error('Unsupported operator')
Expand All @@ -146,7 +146,7 @@ const getLabelFilterWhereExpression = (token, query) => {
throw new Error('Not supported')
}
const val = token.Child('number_value').value
const idx = hasExtraLabels(query) ? 'extra_labels_where' : 'simple_where'
const idx = hasExtraLabels(query) ? 'extraLabelsWhere' : 'simpleWhere'
switch (token.Child('number_operator').value) {
case '==':
return numreg[idx].eq(label, val)
Expand Down Expand Up @@ -210,7 +210,7 @@ const processStreamExpression = (token, query) => {
? genericAnd(res, processStreamExpression(t, query))
: genericOr(res, processStreamExpression(t, query))
}
if (t.name === 'andOr') {
if (t.name === 'and_or') {
andOr = t.value
}
}
Expand All @@ -227,13 +227,13 @@ const getLabelFilterStreamExpression = (token, query) => {
if (token.Child('string_label_filter_expression')) {
switch (token.Child('operator').value) {
case '=':
return reg.eq_stream(token, query)
return reg.eqStream(token, query)
case '!=':
return reg.neq_stream(token, query)
return reg.neqStream(token, query)
case '=~':
return reg.reg_stream(token, query)
return reg.regStream(token, query)
case '!~':
return reg.nreg_stream(token, query)
return reg.nregStream(token, query)
default:
throw new Error('Unsupported operator')
}
Expand All @@ -246,17 +246,17 @@ const getLabelFilterStreamExpression = (token, query) => {
const val = token.Child('number_value').value
switch (token.Child('number_operator').value) {
case '==':
return numreg.stream_where.eq(label, val)
return numreg.streamWhere.eq(label, val)
case '!=':
return numreg.stream_where.neq(label, val)
return numreg.streamWhere.neq(label, val)
case '>':
return numreg.stream_where.gt(label, val)
return numreg.streamWhere.gt(label, val)
case '>=':
return numreg.stream_where.ge(label, val)
return numreg.streamWhere.ge(label, val)
case '<':
return numreg.stream_where.lt(label, val)
return numreg.streamWhere.lt(label, val)
case '<=':
return numreg.stream_where.le(label, val)
return numreg.streamWhere.le(label, val)
}
}
}
10 changes: 5 additions & 5 deletions parser/registry/log_range_aggregation_registry/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ module.exports = {
*/
rate: (token, query) => {
if (query.stream && query.stream.length) {
return reg.rate_stream(token, query)
return reg.rateStream(token, query)
}
const duration = getDuration(token, query)
return genericRate(`toFloat64(count(1)) * 1000 / ${duration}`, token, query)
Expand All @@ -23,7 +23,7 @@ module.exports = {
* @param query {registry_types.Request}
* @returns {registry_types.Request}
*/
countOverTime: (token, query) => {
count_over_time: (token, query) => {
if (query.stream && query.stream.length) {
return reg.countOverTimeStream(token, query)
}
Expand All @@ -36,7 +36,7 @@ module.exports = {
* @param query {registry_types.Request}
* @returns {registry_types.Request}
*/
bytesRate: (token, query) => {
bytes_rate: (token, query) => {
if (query.stream && query.stream.length) {
return reg.bytesRateStream(token, query)
}
Expand All @@ -49,7 +49,7 @@ module.exports = {
* @param query {registry_types.Request}
* @returns {registry_types.Request}
*/
bytesOverTime: (token, query) => {
bytes_over_time: (token, query) => {
if (query.stream && query.stream.length) {
return reg.bytesOverTimeStream(token, query)
}
Expand All @@ -61,7 +61,7 @@ module.exports = {
* @param query {registry_types.Request}
* @returns {registry_types.Request}
*/
absentOverTime: (token, query) => {
absent_over_time: (token, query) => {
if (query.stream && query.stream.length) {
return reg.bytesOverTimeStream(token, query)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ module.exports.eqExtraLabels = (token, query) => {
* @param query {registry_types.Request}
* @returns {function({labels: Object}): boolean}
*/
module.exports.eq_stream = (token, query) => {
module.exports.eqStream = (token, query) => {
const [label, value] = labelAndVal(token)
return (e) => e.EOF || (e && e.labels && e.labels[label] && e.labels[label] === value)
}
18 changes: 9 additions & 9 deletions parser/registry/unwrap_registry/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ module.exports = {
* @param query {registry_types.Request}
* @returns {registry_types.Request}
*/
sumOverTime: (token, query) => {
sum_over_time: (token, query) => {
if (query.stream) {
return reg.sumOverTime.viaStream(token, query)
}
Expand All @@ -32,7 +32,7 @@ module.exports = {
* @param query {registry_types.Request}
* @returns {registry_types.Request}
*/
avgOverTime: (token, query) => {
avg_over_time: (token, query) => {
if (query.stream) {
return reg.avgOverTime.viaStream(token, query)
}
Expand All @@ -44,7 +44,7 @@ module.exports = {
* @param query {registry_types.Request}
* @returns {registry_types.Request}
*/
maxOverTime: (token, query) => {
max_over_time: (token, query) => {
if (query.stream) {
return reg.maxOverTime.viaStream(token, query)
}
Expand All @@ -56,7 +56,7 @@ module.exports = {
* @param query {registry_types.Request}
* @returns {registry_types.Request}
*/
minOverTime: (token, query) => {
min_over_time: (token, query) => {
if (query.stream) {
return reg.minOverTime.viaStream(token, query)
}
Expand All @@ -80,7 +80,7 @@ module.exports = {
* @param query {registry_types.Request}
* @returns {registry_types.Request}
*/
lastOverTime: (token, query) => {
last_over_time: (token, query) => {
if (query.stream) {
return reg.lastOverTime.viaStream(token, query)
}
Expand All @@ -92,7 +92,7 @@ module.exports = {
* @param query {registry_types.Request}
* @returns {registry_types.Request}
*/
stdvarOverTime: (token, query) => {
stdvar_over_time: (token, query) => {
if (query.stream) {
return reg.stdvarOverTime.viaStream(token, query)
}
Expand All @@ -104,7 +104,7 @@ module.exports = {
* @param query {registry_types.Request}
* @returns {registry_types.Request}
*/
stddevOverTime: (token, query) => {
stddev_over_time: (token, query) => {
if (query.stream) {
return reg.stddevOverTime.viaStream(token, query)
}
Expand All @@ -116,7 +116,7 @@ module.exports = {
* @param query {registry_types.Request}
* @returns {registry_types.Request}
*/
quantileOverTime: (token, query) => {
quantile_over_time: (token, query) => {
if (query.stream) {
return reg.quantileOverTime.viaStream(token, query)
}
Expand All @@ -128,7 +128,7 @@ module.exports = {
* @param query {registry_types.Request}
* @returns {registry_types.Request}
*/
absentOverTime: (token, query) => {
absent_over_time: (token, query) => {
if (query.stream) {
return reg.absentOverTime.viaStream(token, query)
}
Expand Down
Loading

0 comments on commit e59f786

Please sign in to comment.