From a0cdc20f1bba89a10d2a67f51e0eed6546e4d6db Mon Sep 17 00:00:00 2001 From: akvlad Date: Mon, 6 Feb 2023 12:17:53 +0200 Subject: [PATCH] elastic & influx compat --- lib/handlers/elastic_bulk.js | 33 +++++++++++++++++++-------------- lib/handlers/elastic_index.js | 21 ++++++++++++--------- lib/handlers/influx_write.js | 7 +++---- 3 files changed, 34 insertions(+), 27 deletions(-) diff --git a/lib/handlers/elastic_bulk.js b/lib/handlers/elastic_bulk.js index 5e9736e8..c3d467e7 100644 --- a/lib/handlers/elastic_bulk.js +++ b/lib/handlers/elastic_bulk.js @@ -9,7 +9,7 @@ const { asyncLogError } = require('../../common') const stringify = require('../utils').stringify -function handler (req, res) { +async function handler (req, res) { const self = this req.log.debug('ELASTIC Bulk Request') if (!req.body) { @@ -22,7 +22,7 @@ function handler (req, res) { return res.code(400).send('{"status":400, "error": { "reason": "Read Only Mode" } }') } - const doc_target = req.params.target || false; + const docTarget = req.params.target || false let streams if ( @@ -35,8 +35,8 @@ function handler (req, res) { // assume ndjson raw body streams = req.body.split(/\n/) } - req.log.debug({ streams}, streams.lenght + ' bulk streams') - var last_tags = false; + let lastTags = false + const promises = [] if (streams) { streams.forEach(function (stream) { try { @@ -44,20 +44,23 @@ function handler (req, res) { } catch (err) { asyncLogError(err, req.log); return }; // Allow Index, Create. Discard Delete, Update. - if (stream.delete||stream.update) { last_tags = false; return; } + if (stream.delete || stream.update) { + lastTags = false + return + } var command = stream.index || stream.create || false; - if (command && !last_tags) { - last_tags=stream.index; - return; + if (command && !lastTags) { + lastTags = stream.index + return } // Data Rows let finger = null - let JSONLabels = last_tags; + let JSONLabels = lastTags try { try { JSONLabels.type = 'elastic' - if (doc_target) JSONLabels._index = doc_target + if (docTarget) JSONLabels._index = docTarget JSONLabels = Object.fromEntries(Object.entries(JSONLabels).sort()) } catch (err) { asyncLogError(err, req.log) @@ -68,12 +71,12 @@ function handler (req, res) { finger = self.fingerPrint(strJson) req.log.debug({ JSONLabels, finger }, 'LABELS FINGERPRINT') // Store Fingerprint - self.bulk_labels.add([[ + promises.push(self.bulk_labels.add([[ new Date().toISOString().split('T')[0], finger, strJson, JSONLabels.target || '' - ]]) + ]])) for (const key in JSONLabels) { req.log.debug({ key, data: JSONLabels[key] }, 'Storing label') self.labels.add('_LABELS_', key) @@ -91,12 +94,14 @@ function handler (req, res) { JSON.stringify(stream) || stream ] req.log.debug({ finger, values }, 'store') - self.bulk.add([values]) + promises.push(self.bulk.add([values])) // Reset State, Expect Command - last_tags = false; + lastTags = false }) } + await Promise.all(promises) + res.header('x-elastic-product', 'Elasticsearch') return res.code(200).send('{"took":0, "errors": false }') } diff --git a/lib/handlers/elastic_index.js b/lib/handlers/elastic_index.js index cf2ffd6f..19528092 100644 --- a/lib/handlers/elastic_index.js +++ b/lib/handlers/elastic_index.js @@ -12,7 +12,7 @@ const { asyncLogError } = require('../../common') const stringify = require('../utils').stringify -function handler (req, res) { +async function handler (req, res) { const self = this req.log.debug('ELASTIC Index Request') if (!req.body || !req.params.target) { @@ -24,8 +24,8 @@ function handler (req, res) { return res.code(400).send('{"status":400, "error": { "reason": "Read Only Mode" } }') } - const doc_target = req.params.target || false; - const doc_id = req.params.id || false; + const docTarget = req.params.target || false + const docId = req.params.id || false let streams if ( @@ -39,6 +39,7 @@ function handler (req, res) { streams = req.body.split(/\n/) } req.log.info({ streams }, 'streams') + const promises = [] if (streams) { streams.forEach(function (stream) { req.log.debug({ stream }, 'ingesting elastic doc') @@ -47,8 +48,8 @@ function handler (req, res) { try { try { JSONLabels.type = 'elastic' - if (doc_target) JSONLabels._index = doc_target - if (doc_id) JSONLabels._id = doc_id + if (docTarget) JSONLabels._index = docTarget + if (docId) JSONLabels._id = docId JSONLabels = Object.fromEntries(Object.entries(JSONLabels).sort()) } catch (err) { asyncLogError(err, req.log) @@ -58,12 +59,12 @@ function handler (req, res) { const strJson = stringify(JSONLabels) finger = self.fingerPrint(strJson) // Store Fingerprint - self.bulk_labels.add([[ + promises.push(self.bulk_labels.add([[ new Date().toISOString().split('T')[0], finger, strJson, JSONLabels.target || '' - ]]) + ]])) for (const key in JSONLabels) { req.log.debug({ key, data: JSONLabels[key] }, 'Storing label') self.labels.add('_LABELS_', key) @@ -75,7 +76,7 @@ function handler (req, res) { // check if stream is JSON format try { stream = JSON.parse(stream) - } catch (err) { + } catch (err) { asyncLogError(err, req.log) }; // Store Elastic Doc Object @@ -86,9 +87,11 @@ function handler (req, res) { JSON.stringify(stream) || stream ] req.log.debug({ finger, values }, 'store') - self.bulk.add([values]) + promises.push(self.bulk.add([values])) }) } + await Promise.all(promises) + res.header('x-elastic-product', 'Elasticsearch') return res.code(200).send('{"took":0, "errors": false }') } diff --git a/lib/handlers/influx_write.js b/lib/handlers/influx_write.js index 3f0756a9..24a8d866 100644 --- a/lib/handlers/influx_write.js +++ b/lib/handlers/influx_write.js @@ -49,7 +49,7 @@ async function handler (req, res) { } if (self.readonly) { asyncLogError('Readonly! No push support.', req.log) - return res.send(500) + return res.code(500).send('') } await influxParser.init() let streams = null @@ -76,7 +76,6 @@ async function handler (req, res) { // Calculate Fingerprint const strLabels = stringify(Object.fromEntries(Object.entries(JSONLabels).sort())) finger = self.fingerPrint(strLabels) - req.log.debug({ JSONLabels, finger }, 'LABELS FINGERPRINT') self.labels.add(finger.toString(), stream.labels) // Store Fingerprint self.bulk_labels.add([[ @@ -104,7 +103,7 @@ async function handler (req, res) { !value ) { asyncLogError('no bulkable data', req.log) - return + return res.code(204).send('') } const values = [ finger, @@ -128,7 +127,7 @@ async function handler (req, res) { } }) } - return res.send(200) + return res.code(204).send('') } function pad (pad, str, padLeft) {