Skip to content

Commit

Permalink
elastic & influx compat
Browse files Browse the repository at this point in the history
  • Loading branch information
akvlad committed Feb 6, 2023
1 parent 0040da5 commit a0cdc20
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 27 deletions.
33 changes: 19 additions & 14 deletions lib/handlers/elastic_bulk.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
const { asyncLogError } = require('../../common')
const stringify = require('../utils').stringify

function handler (req, res) {
async function handler (req, res) {
const self = this
req.log.debug('ELASTIC Bulk Request')
if (!req.body) {
Expand All @@ -22,7 +22,7 @@ function handler (req, res) {
return res.code(400).send('{"status":400, "error": { "reason": "Read Only Mode" } }')
}

const doc_target = req.params.target || false;
const docTarget = req.params.target || false

let streams
if (
Expand All @@ -35,29 +35,32 @@ function handler (req, res) {
// assume ndjson raw body
streams = req.body.split(/\n/)
}
req.log.debug({ streams}, streams.lenght + ' bulk streams')
var last_tags = false;
let lastTags = false
const promises = []
if (streams) {
streams.forEach(function (stream) {
try {
stream = JSON.parse(stream)
} catch (err) { asyncLogError(err, req.log); return };

// Allow Index, Create. Discard Delete, Update.
if (stream.delete||stream.update) { last_tags = false; return; }
if (stream.delete || stream.update) {
lastTags = false
return
}
var command = stream.index || stream.create || false;
if (command && !last_tags) {
last_tags=stream.index;
return;
if (command && !lastTags) {
lastTags = stream.index
return
}

// Data Rows
let finger = null
let JSONLabels = last_tags;
let JSONLabels = lastTags
try {
try {
JSONLabels.type = 'elastic'
if (doc_target) JSONLabels._index = doc_target
if (docTarget) JSONLabels._index = docTarget
JSONLabels = Object.fromEntries(Object.entries(JSONLabels).sort())
} catch (err) {
asyncLogError(err, req.log)
Expand All @@ -68,12 +71,12 @@ function handler (req, res) {
finger = self.fingerPrint(strJson)
req.log.debug({ JSONLabels, finger }, 'LABELS FINGERPRINT')
// Store Fingerprint
self.bulk_labels.add([[
promises.push(self.bulk_labels.add([[
new Date().toISOString().split('T')[0],
finger,
strJson,
JSONLabels.target || ''
]])
]]))
for (const key in JSONLabels) {
req.log.debug({ key, data: JSONLabels[key] }, 'Storing label')
self.labels.add('_LABELS_', key)
Expand All @@ -91,12 +94,14 @@ function handler (req, res) {
JSON.stringify(stream) || stream
]
req.log.debug({ finger, values }, 'store')
self.bulk.add([values])
promises.push(self.bulk.add([values]))

// Reset State, Expect Command
last_tags = false;
lastTags = false
})
}
await Promise.all(promises)
res.header('x-elastic-product', 'Elasticsearch')
return res.code(200).send('{"took":0, "errors": false }')
}

Expand Down
21 changes: 12 additions & 9 deletions lib/handlers/elastic_index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
const { asyncLogError } = require('../../common')
const stringify = require('../utils').stringify

function handler (req, res) {
async function handler (req, res) {
const self = this
req.log.debug('ELASTIC Index Request')
if (!req.body || !req.params.target) {
Expand All @@ -24,8 +24,8 @@ function handler (req, res) {
return res.code(400).send('{"status":400, "error": { "reason": "Read Only Mode" } }')
}

const doc_target = req.params.target || false;
const doc_id = req.params.id || false;
const docTarget = req.params.target || false
const docId = req.params.id || false

let streams
if (
Expand All @@ -39,6 +39,7 @@ function handler (req, res) {
streams = req.body.split(/\n/)
}
req.log.info({ streams }, 'streams')
const promises = []
if (streams) {
streams.forEach(function (stream) {
req.log.debug({ stream }, 'ingesting elastic doc')
Expand All @@ -47,8 +48,8 @@ function handler (req, res) {
try {
try {
JSONLabels.type = 'elastic'
if (doc_target) JSONLabels._index = doc_target
if (doc_id) JSONLabels._id = doc_id
if (docTarget) JSONLabels._index = docTarget
if (docId) JSONLabels._id = docId
JSONLabels = Object.fromEntries(Object.entries(JSONLabels).sort())
} catch (err) {
asyncLogError(err, req.log)
Expand All @@ -58,12 +59,12 @@ function handler (req, res) {
const strJson = stringify(JSONLabels)
finger = self.fingerPrint(strJson)
// Store Fingerprint
self.bulk_labels.add([[
promises.push(self.bulk_labels.add([[
new Date().toISOString().split('T')[0],
finger,
strJson,
JSONLabels.target || ''
]])
]]))
for (const key in JSONLabels) {
req.log.debug({ key, data: JSONLabels[key] }, 'Storing label')
self.labels.add('_LABELS_', key)
Expand All @@ -75,7 +76,7 @@ function handler (req, res) {
// check if stream is JSON format
try {
stream = JSON.parse(stream)
} catch (err) {
} catch (err) {
asyncLogError(err, req.log)
};
// Store Elastic Doc Object
Expand All @@ -86,9 +87,11 @@ function handler (req, res) {
JSON.stringify(stream) || stream
]
req.log.debug({ finger, values }, 'store')
self.bulk.add([values])
promises.push(self.bulk.add([values]))
})
}
await Promise.all(promises)
res.header('x-elastic-product', 'Elasticsearch')
return res.code(200).send('{"took":0, "errors": false }')
}

Expand Down
7 changes: 3 additions & 4 deletions lib/handlers/influx_write.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async function handler (req, res) {
}
if (self.readonly) {
asyncLogError('Readonly! No push support.', req.log)
return res.send(500)
return res.code(500).send('')
}
await influxParser.init()
let streams = null
Expand All @@ -76,7 +76,6 @@ async function handler (req, res) {
// Calculate Fingerprint
const strLabels = stringify(Object.fromEntries(Object.entries(JSONLabels).sort()))
finger = self.fingerPrint(strLabels)
req.log.debug({ JSONLabels, finger }, 'LABELS FINGERPRINT')
self.labels.add(finger.toString(), stream.labels)
// Store Fingerprint
self.bulk_labels.add([[
Expand Down Expand Up @@ -104,7 +103,7 @@ async function handler (req, res) {
!value
) {
asyncLogError('no bulkable data', req.log)
return
return res.code(204).send('')
}
const values = [
finger,
Expand All @@ -128,7 +127,7 @@ async function handler (req, res) {
}
})
}
return res.send(200)
return res.code(204).send('')
}

function pad (pad, str, padLeft) {
Expand Down

0 comments on commit a0cdc20

Please sign in to comment.