diff --git a/lib/handlers/elastic_bulk.js b/lib/handlers/elastic_bulk.js index 09ed5a77..c3d467e7 100644 --- a/lib/handlers/elastic_bulk.js +++ b/lib/handlers/elastic_bulk.js @@ -9,7 +9,7 @@ const { asyncLogError } = require('../../common') const stringify = require('../utils').stringify -function handler (req, res) { +async function handler (req, res) { const self = this req.log.debug('ELASTIC Bulk Request') if (!req.body) { @@ -22,7 +22,7 @@ function handler (req, res) { return res.code(400).send('{"status":400, "error": { "reason": "Read Only Mode" } }') } - const doc_target = req.params.target || false; + const docTarget = req.params.target || false let streams if ( @@ -35,8 +35,8 @@ function handler (req, res) { // assume ndjson raw body streams = req.body.split(/\n/) } - req.log.debug({ streams}, streams.lenght + ' bulk streams') - var last_tags = false; + let lastTags = false + const promises = [] if (streams) { streams.forEach(function (stream) { try { @@ -44,21 +44,23 @@ function handler (req, res) { } catch (err) { asyncLogError(err, req.log); return }; // Allow Index, Create. Discard Delete, Update. - if (stream.delete||stream.update) { last_tags = false; return; } + if (stream.delete || stream.update) { + lastTags = false + return + } var command = stream.index || stream.create || false; - if (command && !last_tags) { - last_tags=stream.index; - return; + if (command && !lastTags) { + lastTags = stream.index + return } // Data Rows let finger = null - let JSONLabels = last_tags; - const ts = BigInt((new Date().getTime() * 1000) + '000') + let JSONLabels = lastTags try { try { JSONLabels.type = 'elastic' - if (doc_target) JSONLabels._index = doc_target + if (docTarget) JSONLabels._index = docTarget JSONLabels = Object.fromEntries(Object.entries(JSONLabels).sort()) } catch (err) { asyncLogError(err, req.log) @@ -69,12 +71,12 @@ function handler (req, res) { finger = self.fingerPrint(strJson) req.log.debug({ JSONLabels, finger }, 'LABELS FINGERPRINT') // Store Fingerprint - self.bulk_labels.add([[ - new Date(parseInt((ts / BigInt('1000000')).toString())).toISOString().split('T')[0], + promises.push(self.bulk_labels.add([[ + new Date().toISOString().split('T')[0], finger, strJson, JSONLabels.target || '' - ]]) + ]])) for (const key in JSONLabels) { req.log.debug({ key, data: JSONLabels[key] }, 'Storing label') self.labels.add('_LABELS_', key) @@ -87,17 +89,19 @@ function handler (req, res) { // Store Elastic Doc Object const values = [ finger, - ts, + BigInt((new Date().getTime() * 1000) + '000'), null, JSON.stringify(stream) || stream ] req.log.debug({ finger, values }, 'store') - self.bulk.add([values]) + promises.push(self.bulk.add([values])) // Reset State, Expect Command - last_tags = false + lastTags = false }) } + await Promise.all(promises) + res.header('x-elastic-product', 'Elasticsearch') return res.code(200).send('{"took":0, "errors": false }') } diff --git a/lib/handlers/elastic_index.js b/lib/handlers/elastic_index.js index cf2ffd6f..19528092 100644 --- a/lib/handlers/elastic_index.js +++ b/lib/handlers/elastic_index.js @@ -12,7 +12,7 @@ const { asyncLogError } = require('../../common') const stringify = require('../utils').stringify -function handler (req, res) { +async function handler (req, res) { const self = this req.log.debug('ELASTIC Index Request') if (!req.body || !req.params.target) { @@ -24,8 +24,8 @@ function handler (req, res) { return res.code(400).send('{"status":400, "error": { "reason": "Read Only Mode" } }') } - const doc_target = req.params.target || false; - const doc_id = req.params.id || false; + const docTarget = req.params.target || false + const docId = req.params.id || false let streams if ( @@ -39,6 +39,7 @@ function handler (req, res) { streams = req.body.split(/\n/) } req.log.info({ streams }, 'streams') + const promises = [] if (streams) { streams.forEach(function (stream) { req.log.debug({ stream }, 'ingesting elastic doc') @@ -47,8 +48,8 @@ function handler (req, res) { try { try { JSONLabels.type = 'elastic' - if (doc_target) JSONLabels._index = doc_target - if (doc_id) JSONLabels._id = doc_id + if (docTarget) JSONLabels._index = docTarget + if (docId) JSONLabels._id = docId JSONLabels = Object.fromEntries(Object.entries(JSONLabels).sort()) } catch (err) { asyncLogError(err, req.log) @@ -58,12 +59,12 @@ function handler (req, res) { const strJson = stringify(JSONLabels) finger = self.fingerPrint(strJson) // Store Fingerprint - self.bulk_labels.add([[ + promises.push(self.bulk_labels.add([[ new Date().toISOString().split('T')[0], finger, strJson, JSONLabels.target || '' - ]]) + ]])) for (const key in JSONLabels) { req.log.debug({ key, data: JSONLabels[key] }, 'Storing label') self.labels.add('_LABELS_', key) @@ -75,7 +76,7 @@ function handler (req, res) { // check if stream is JSON format try { stream = JSON.parse(stream) - } catch (err) { + } catch (err) { asyncLogError(err, req.log) }; // Store Elastic Doc Object @@ -86,9 +87,11 @@ function handler (req, res) { JSON.stringify(stream) || stream ] req.log.debug({ finger, values }, 'store') - self.bulk.add([values]) + promises.push(self.bulk.add([values])) }) } + await Promise.all(promises) + res.header('x-elastic-product', 'Elasticsearch') return res.code(200).send('{"took":0, "errors": false }') } diff --git a/lib/handlers/influx_write.js b/lib/handlers/influx_write.js index bc5fcd2e..5563f8a9 100644 --- a/lib/handlers/influx_write.js +++ b/lib/handlers/influx_write.js @@ -48,7 +48,7 @@ async function handler (req, res) { } if (self.readonly) { asyncLogError('Readonly! No push support.', req.log) - return res.send(500) + return res.code(500).send('') } await influxParser.init() let streams = null @@ -76,63 +76,59 @@ async function handler (req, res) { // Calculate Fingerprint const strLabels = stringify(Object.fromEntries(Object.entries(JSONLabels).sort())) finger = self.fingerPrint(strLabels) - req.log.debug({ JSONLabels, finger }, 'LABELS FINGERPRINT') self.labels.add(finger.toString(), stream.labels) // Store Fingerprint + self.bulk_labels.add([[ + new Date().toISOString().split('T')[0], + finger, + strLabels, + stream.measurement || '' + ]]) for (const key in JSONLabels) { // req.log.debug({ key, data: JSONLabels[key] }, 'Storing label'); self.labels.add('_LABELS_', key) self.labels.add(key, JSONLabels[key]) } - - const timestamp = stream.timestamp || JSONFields.timestamp - - promises.push(self.bulk_labels.add([[ - new Date(parseInt((BigInt(timestamp) / BigInt(1000000)).toString())) - .toISOString().split('T')[0], - finger, - strLabels, - stream.measurement || '' - ]])) - /* metrics */ - if (stream.fields && stream.measurement !== 'syslog' && !JSONFields.message) { - for (const [key, value] of Object.entries(JSONFields)) { - // req.log.debug({ key, value, finger }, 'BULK ROW'); - if ( - !key && - !timestamp && - !value - ) { - asyncLogError('no bulkable data', req.log) - return - } - const values = [ - finger, - BigInt(pad('0000000000000000000', timestamp, true)), - value || 0, - key || '' - ] - promises.push(self.bulk.add([values])) + } catch (err) { + asyncLogError(err, req.log) + } + const timestamp = stream.timestamp || JSONFields.timestamp + /* metrics */ + if (stream.fields && stream.measurement !== 'syslog' && !JSONFields.message) { + for (const [key, value] of Object.entries(JSONFields)) { + // req.log.debug({ key, value, finger }, 'BULK ROW'); + if ( + !key && + !timestamp && + !value + ) { + asyncLogError('no bulkable data', req.log) + return res.code(204).send('') } - /* logs or syslog */ - } else if (stream.measurement === 'syslog' || JSONFields.message) { - // Send fields as a JSON object for qryn to parse - // const message = JSON.stringify(JSONFields) const values = [ finger, - BigInt(pad('0000000000000000000', timestamp)), - null, - JSONFields.message + BigInt(pad('0000000000000000000', timestamp, true)), + value || 0, + key || '' ] - promises.push(self.bulk.add([values])) + self.bulk.add([values]) } - } catch (err) { - asyncLogError(err, req.log) + /* logs or syslog */ + } else if (stream.measurement === 'syslog' || JSONFields.message) { + // Send fields as a JSON object for qryn to parse + // const message = JSON.stringify(JSONFields) + const values = [ + finger, + BigInt(pad('0000000000000000000', timestamp)), + null, + JSONFields.message + ] + self.bulk.add([values]) } }) } await Promise.all(promises) - return res.send(200) + return res.code(204).send('') } function pad (pad, str, padLeft) {