Skip to content

Commit

Permalink
custom formatters (#217)
Browse files Browse the repository at this point in the history
* protobuf ingestion test

* debug

* fix test node14

* remove logs

Co-authored-by: Lorenzo Mangani <[email protected]>
  • Loading branch information
akvlad and lmangani authored Sep 23, 2022
1 parent 3984611 commit 13c13f9
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 12 deletions.
59 changes: 59 additions & 0 deletions lib/handlers/common.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
const eng = require('../../plugins/engine')
const { parseCliQL } = require('../cliql')
const { Transform } = require('stream')
const { scanClickhouse, scanFingerprints } = require('../db/clickhouse')

module.exports.checkCustomPlugins = async (options) => {
options.API = options.API || {
logql: async (query, start, end, limit) => {
const params = {
query,
start,
end,
limit,
direction: 'backward',
step: '60s'
}
const req = {
query: params
}
const res = new Transform({
transform (chunk, encoding, callback) {
callback(null, chunk)
}
})
res.writeHead = () => {}
const cliqlParams = parseCliQL(req.query.query)
if (cliqlParams) {
scanClickhouse(cliqlParams, { res }, params)
} else {
await scanFingerprints(
req.query,
{ res: res }
)
}
let str = ''
res.on('data', (d) => {
str += d
})
await new Promise((resolve, reject) => {
res.once('error', reject)
res.once('close', resolve)
res.once('end', resolve)
})
return JSON.parse(str)
}/* ,
promql: async () => {
} */
}
const plugins = eng.getPlg({ type: 'custom_processor' })
for (const plugin of Object.values(plugins)) {
for (const e of Object.entries(options)) {
plugin[e[0]] = e[1]
}
if (plugin.check()) {
return await plugin.process()
}
}
}
28 changes: 17 additions & 11 deletions lib/handlers/query_range.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/

const { parseCliQL } = require('../cliql')
const { checkCustomPlugins } = require('./common')

async function handler (req, res) {
req.log.debug('GET /loki/api/v1/query_range')
Expand All @@ -28,17 +29,22 @@ async function handler (req, res) {
const cliqlParams = parseCliQL(req.query.query)
if (cliqlParams) {
this.scanClickhouse(cliqlParams, res, params)
} else {
try {
await this.scanFingerprints(
req.query,
{ res: res.raw }
)
res.hijack()
} catch (err) {
req.log.error({ err })
res.send(resp)
}
return
}
const pluginOut = await checkCustomPlugins(req.query)
if (pluginOut) {
res.header('Content-Type', pluginOut.type)
return res.send(pluginOut.out)
}
try {
await this.scanFingerprints(
req.query,
{ res: res.raw }
)
res.hijack()
} catch (err) {
req.log.error({ err })
res.send(resp)
}
}

Expand Down
11 changes: 11 additions & 0 deletions plugins/base/custom_processor/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
const { PluginTypeLoaderBase } = require('plugnplay')
module.exports = class extends PluginTypeLoaderBase {
exportSync (opts) {
return {
props: ['check', 'process'],
validate: (exports) => {
return exports
}
}
}
}
4 changes: 4 additions & 0 deletions plugins/base/custom_processor/plugnplay.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id: custom_processor
name: Custom Processor Plugin
description: plugin to custom process a logql / promql request
loader: index.js
54 changes: 54 additions & 0 deletions plugins/output_format/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
const { PluginLoaderBase } = require('plugnplay')

/**
* @class Plugin
* @property {string} query
* @property start {number} start in NS
* @property end {string} end in NS
* @property type {string} promql or logql
* @property limit {number}
* @property {{
* logql: (query: string, startNS: number, endNS: number, limit: number) => Promise<Object>
* }} API
* promql: (query: string, startNS: number, endNS: number, limit: number) => Promise<Object> //not implemented
*/
class Plugin {
/**
* @method
* @name check
* @this {Plg}
* @returns {boolean} if this plugin is usable for the query
*/
check () {
return this.query.match(/^toCsv\(.+\)\s*$/)
}

/**
* @method
* @name process
* @this {Plg}
* @returns {Promise<{type: string, out: string}>} The raw output
*/
async process () {
const match = this.query.match(/^toCsv\((.+)\)$/)
const response = await this.API.logql(match[1], this.start, this.end, this.limit)
let res = ''
for (const stream of response.data.result) {
const labels = JSON.stringify(stream.stream)
for (const val of stream.values) {
res += `${labels}\t${val[0]}\t${val[1]}\n`
}
}
return {
type: 'text/csv',
out: res
}
}
}
class Plg extends PluginLoaderBase {
exportSync (api) {
return new Plugin()
}
}

module.exports = Plg
5 changes: 5 additions & 0 deletions plugins/output_format/plugnplay.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
id: output_format
name: Format Output
description: Change output format
loader: index.js
type: custom_processor
2 changes: 1 addition & 1 deletion test/e2e
Submodule e2e updated from 8ecfe0 to 97b370

0 comments on commit 13c13f9

Please sign in to comment.