Skip to content

Commit

Permalink
quantiles init
Browse files Browse the repository at this point in the history
  • Loading branch information
akvlad committed Mar 2, 2023
1 parent 0f4bf8c commit 3d5b51e
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 27 deletions.
3 changes: 2 additions & 1 deletion parser/bnf.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ const registryNames = [
'line_filter_operator_registry',
'parser_registry',
'unwrap_registry',
'parameterized_aggregation_registry'
'parameterized_aggregation_registry',
'parameterized_unwrapped_registry'
]
const path = require('path')
const registries = registryNames.reduce((sum, n) => {
Expand Down
9 changes: 6 additions & 3 deletions parser/logql.bnf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<SYNTAX> ::= <log_stream_selector> | <agg_statement> | <parameterized_expression> | <user_macro>
<SYNTAX> ::= <log_stream_selector> | <agg_statement> | <user_macro>

log_stream_fp_selector ::= "{" <OWSP> <log_stream_selector_rule> *(<OWSP> "," <OWSP> <log_stream_selector_rule>) <OWSP> "}"
log_stream_selector ::= <log_stream_fp_selector> <OWSP> *(<OWSP><log_pipeline>)
Expand Down Expand Up @@ -64,11 +64,14 @@ unwrap_fn ::= <unwrap_registry>
req_by_without_unwrap ::= <by_without_unwrap> <OWSP> "(" <OWSP> <label_list> <OWSP> ")"
by_without_unwrap ::= "by" | "without"

agg_statement ::= (<aggregation_operator> | <log_range_aggregation> | <unwrap_function>) [<OWSP> <compared_agg_statement_cmp>]
agg_statement ::= (<aggregation_operator> | <log_range_aggregation> | <unwrap_function> | <parameterized_expression> | <parameterized_unwrapped_expression>) [<OWSP> <compared_agg_statement_cmp>]
compared_agg_statement_cmp ::= <number_operator> <OWSP> <number_value>

unwrap_value_statement ::= "unwrap_value"

parameterized_expression ::= <parameterized_expression_fn><OWSP>"("<OWSP><parameter_value><OWSP>","<OWSP><agg_statement>")"
parameterized_unwrapped_expression ::= <parameterized_unwrapped_expression_fn><OWSP>"("<OWSP><parameter_value><OWSP>","<OWSP><unwrap_expression><OWSP>"["<duration_value>"]"<OWSP>")" [ <OWSP> <req_by_without_unwrap> ] [<OWSP> <compared_agg_statement_cmp>]
parameterized_unwrapped_expression_fn ::= <parameterized_unwrapped_registry>

parameterized_expression ::= <parameterized_expression_fn><OWSP>"("<OWSP><parameter_value><OWSP>","<OWSP>(<agg_statement>|<parameterized_unwrapped_expression>)<OWSP>")" [<OWSP> <compared_agg_statement_cmp>]
parameter_value ::= <NUMBER>
parameterized_expression_fn ::= <parameterized_aggregation_registry>
47 changes: 47 additions & 0 deletions parser/registry/parameterized_unwrapped_registry/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
const { QrynBadRequest } = require('../../../lib/handlers/errors')
const { hasStream, getDuration } = require('../common')
const Sql = require('@cloki/clickhouse-sql')
const { applyByWithoutLabels } = require('../unwrap_registry/unwrap_registry')

module.exports = {
/**
* quantileOverTime(scalar,unwrapped-range): the φ-quantile (0 ≤ φ ≤ 1) of the values in the specified interval.
* @param token {Token}
* @param query {Select}
* @returns {Select}
*/
quantile_over_time: (token, query) => {
if (hasStream(query)) {
throw new QrynBadRequest('Not supported')
}
query.ctx.matrix = true
const durationMS = getDuration(token)
query.ctx.duration = durationMS
const stepMS = query.ctx.step
const quantVal = parseFloat(token.Child('parameter_value').value)
const quantA = new Sql.With('quant_a', query)
const labels = applyByWithoutLabels(token.Child('req_by_without_unwrap'), query)
const quantB = (new Sql.Select())
.with(quantA)
.select(
[labels, 'labels'],
[new Sql.Raw(`intDiv(quant_a.timestamp_ns, ${durationMS}) * ${durationMS}`), 'timestamp_ns'],
[new Sql.Raw(`quantile(${quantVal})(unwrapped)`), 'value']
).from(new Sql.WithReference(quantA))
.groupBy('timestamp_ns', 'labels')
.orderBy('labels', 'timestamp_ns')
if (stepMS <= durationMS) {
return quantB
}
const withQuantB = new Sql.With('quant_b', quantB)
return (new Sql.Select())
.with(withQuantB)
.select(
['quant_b.labels', 'labels'],
[new Sql.Raw(`intDiv(quant_b.timestamp_ns, ${stepMS}) * ${stepMS}`), 'timestamp_ns'],
[new Sql.Raw('argMin(quant_b.value, quant_b.timestamp_ns)'), 'value'])
.from(new Sql.WithReference(withQuantB))
.groupBy('labels', 'timestamp_ns')
.orderBy('labels', 'timestamp_ns')
}
}
12 changes: 0 additions & 12 deletions parser/registry/unwrap_registry/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,18 +110,6 @@ module.exports = {
}
return reg.stddevOverTime.viaRequest(token, query)
},
/**
* quantileOverTime(scalar,unwrapped-range): the φ-quantile (0 ≤ φ ≤ 1) of the values in the specified interval.
* @param token {Token}
* @param query {Select}
* @returns {Select}
*/
quantile_over_time: (token, query) => {
if (hasStream(query)) {
return reg.quantileOverTime.viaStream(token, query)
}
return reg.quantileOverTime.viaRequest(token, query)
},
/**
* absentOverTime(unwrapped-range): returns an empty vector if the range vector passed to it has any elements and a 1-element vector with the value 1 if the range vector passed to it has no elements. (absentOverTime is useful for alerting on when no time series and logs stream exist for label combination for a certain amount of time.)
* @param token {Token}
Expand Down
12 changes: 2 additions & 10 deletions parser/registry/unwrap_registry/unwrap_registry.js
Original file line number Diff line number Diff line change
Expand Up @@ -218,15 +218,6 @@ module.exports = {
throw new Error('not implemented')
}, (sum) => sum, false, 'by_without_unwrap')
}),
/**
* quantileOverTime(scalar,unwrapped-range): the φ-quantile (0 ≤ φ ≤ 1) of the values in the specified interval.
* //@param token {Token}
* //@param query {Select}
* @returns {Select}
*/
quantileOverTime: (/* token, query */) => {
throw new Error('Not implemented')
},
/**
* absentOverTime(unwrapped-range): returns an empty vector if the range vector passed to it has any elements and a 1-element vector with the value 1 if the range vector passed to it has no elements. (absentOverTime is useful for alerting on when no time series and logs stream exist for label combination for a certain amount of time.)
* //@param token {Token}
Expand All @@ -235,5 +226,6 @@ module.exports = {
*/
absentOverTime: (/* token, query */) => {
throw new Error('Not implemented')
}
},
applyByWithoutLabels
}
30 changes: 29 additions & 1 deletion parser/transpiler.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const logRangeAggregationRegistry = require('./registry/log_range_aggregation_re
const highLevelAggregationRegistry = require('./registry/high_level_aggregation_registry')
const numberOperatorRegistry = require('./registry/number_operator_registry')
const parameterizedAggregationRegistry = require('./registry/parameterized_aggregation_registry')
const parameterizedUnwrappedRegistry = require('./registry/parameterized_unwrapped_registry')
const complexLabelFilterRegistry = require('./registry/complex_label_filter_expression')
const lineFormat = require('./registry/line_format')
const parserRegistry = require('./registry/parser_registry')
Expand Down Expand Up @@ -122,7 +123,11 @@ module.exports.transpile = (request) => {
joinLabels: joinLabels
}
let duration = null
const matrixOp = ['aggregation_operator', 'unwrap_function', 'log_range_aggregation'].find(t => token.Child(t))
const matrixOp = [
'aggregation_operator',
'unwrap_function',
'log_range_aggregation',
'parameterized_unwrapped_expression'].find(t => token.Child(t))
if (matrixOp) {
duration = durationToMs(token.Child(matrixOp).Child('duration_value').value)
start = Math.floor(start / duration) * duration
Expand All @@ -143,6 +148,9 @@ module.exports.transpile = (request) => {
case 'log_range_aggregation':
query = module.exports.transpileLogRangeAggregation(token, query)
break
case 'parameterized_unwrapped_expression':
query = module.exports.transpileParameterizedUnwrappedExpression(token, query)
break
default:
// eslint-disable-next-line no-case-declarations
const _query = module.exports.transpileLogStreamSelector(token, query)
Expand Down Expand Up @@ -195,6 +203,26 @@ const setQueryParam = (query, name, val) => {
}
}

/**
*
* @param token {Token}
* @param query {Select}
* @returns {Select}
*/
module.exports.transpileParameterizedUnwrappedExpression = (token, query) => {
query = module.exports.transpileLogStreamSelector(token, query)
if (token.Child('unwrap_value_statement')) {
if (token.Child('log_pipeline')) {
throw new Error('log pipeline not supported')
}
query = transpileUnwrapMetrics(token, query)
} else {
query = module.exports.transpileUnwrapExpression(token.Child('unwrap_expression'), query)
}
return parameterizedUnwrappedRegistry[token.Child('parameterized_unwrapped_expression_fn').value](
token, query)
}

/**
*
* @param request {{
Expand Down

0 comments on commit 3d5b51e

Please sign in to comment.