Skip to content

Commit

Permalink
Merge branch 'go_test_compat' into topk_support
Browse files Browse the repository at this point in the history
# Conflicts:
#	lib/handlers/elastic_bulk.js
#	lib/handlers/influx_write.js
  • Loading branch information
akvlad committed Mar 3, 2023
2 parents 3d5b51e + a0cdc20 commit c239a14
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 68 deletions.
38 changes: 21 additions & 17 deletions lib/handlers/elastic_bulk.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
const { asyncLogError } = require('../../common')
const stringify = require('../utils').stringify

function handler (req, res) {
async function handler (req, res) {
const self = this
req.log.debug('ELASTIC Bulk Request')
if (!req.body) {
Expand All @@ -22,7 +22,7 @@ function handler (req, res) {
return res.code(400).send('{"status":400, "error": { "reason": "Read Only Mode" } }')
}

const doc_target = req.params.target || false;
const docTarget = req.params.target || false

let streams
if (
Expand All @@ -35,30 +35,32 @@ function handler (req, res) {
// assume ndjson raw body
streams = req.body.split(/\n/)
}
req.log.debug({ streams}, streams.lenght + ' bulk streams')
var last_tags = false;
let lastTags = false
const promises = []
if (streams) {
streams.forEach(function (stream) {
try {
stream = JSON.parse(stream)
} catch (err) { asyncLogError(err, req.log); return };

// Allow Index, Create. Discard Delete, Update.
if (stream.delete||stream.update) { last_tags = false; return; }
if (stream.delete || stream.update) {
lastTags = false
return
}
var command = stream.index || stream.create || false;
if (command && !last_tags) {
last_tags=stream.index;
return;
if (command && !lastTags) {
lastTags = stream.index
return
}

// Data Rows
let finger = null
let JSONLabels = last_tags;
const ts = BigInt((new Date().getTime() * 1000) + '000')
let JSONLabels = lastTags
try {
try {
JSONLabels.type = 'elastic'
if (doc_target) JSONLabels._index = doc_target
if (docTarget) JSONLabels._index = docTarget
JSONLabels = Object.fromEntries(Object.entries(JSONLabels).sort())
} catch (err) {
asyncLogError(err, req.log)
Expand All @@ -69,12 +71,12 @@ function handler (req, res) {
finger = self.fingerPrint(strJson)
req.log.debug({ JSONLabels, finger }, 'LABELS FINGERPRINT')
// Store Fingerprint
self.bulk_labels.add([[
new Date(parseInt((ts / BigInt('1000000')).toString())).toISOString().split('T')[0],
promises.push(self.bulk_labels.add([[
new Date().toISOString().split('T')[0],
finger,
strJson,
JSONLabels.target || ''
]])
]]))
for (const key in JSONLabels) {
req.log.debug({ key, data: JSONLabels[key] }, 'Storing label')
self.labels.add('_LABELS_', key)
Expand All @@ -87,17 +89,19 @@ function handler (req, res) {
// Store Elastic Doc Object
const values = [
finger,
ts,
BigInt((new Date().getTime() * 1000) + '000'),
null,
JSON.stringify(stream) || stream
]
req.log.debug({ finger, values }, 'store')
self.bulk.add([values])
promises.push(self.bulk.add([values]))

// Reset State, Expect Command
last_tags = false
lastTags = false
})
}
await Promise.all(promises)
res.header('x-elastic-product', 'Elasticsearch')
return res.code(200).send('{"took":0, "errors": false }')
}

Expand Down
21 changes: 12 additions & 9 deletions lib/handlers/elastic_index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
const { asyncLogError } = require('../../common')
const stringify = require('../utils').stringify

function handler (req, res) {
async function handler (req, res) {
const self = this
req.log.debug('ELASTIC Index Request')
if (!req.body || !req.params.target) {
Expand All @@ -24,8 +24,8 @@ function handler (req, res) {
return res.code(400).send('{"status":400, "error": { "reason": "Read Only Mode" } }')
}

const doc_target = req.params.target || false;
const doc_id = req.params.id || false;
const docTarget = req.params.target || false
const docId = req.params.id || false

let streams
if (
Expand All @@ -39,6 +39,7 @@ function handler (req, res) {
streams = req.body.split(/\n/)
}
req.log.info({ streams }, 'streams')
const promises = []
if (streams) {
streams.forEach(function (stream) {
req.log.debug({ stream }, 'ingesting elastic doc')
Expand All @@ -47,8 +48,8 @@ function handler (req, res) {
try {
try {
JSONLabels.type = 'elastic'
if (doc_target) JSONLabels._index = doc_target
if (doc_id) JSONLabels._id = doc_id
if (docTarget) JSONLabels._index = docTarget
if (docId) JSONLabels._id = docId
JSONLabels = Object.fromEntries(Object.entries(JSONLabels).sort())
} catch (err) {
asyncLogError(err, req.log)
Expand All @@ -58,12 +59,12 @@ function handler (req, res) {
const strJson = stringify(JSONLabels)
finger = self.fingerPrint(strJson)
// Store Fingerprint
self.bulk_labels.add([[
promises.push(self.bulk_labels.add([[
new Date().toISOString().split('T')[0],
finger,
strJson,
JSONLabels.target || ''
]])
]]))
for (const key in JSONLabels) {
req.log.debug({ key, data: JSONLabels[key] }, 'Storing label')
self.labels.add('_LABELS_', key)
Expand All @@ -75,7 +76,7 @@ function handler (req, res) {
// check if stream is JSON format
try {
stream = JSON.parse(stream)
} catch (err) {
} catch (err) {
asyncLogError(err, req.log)
};
// Store Elastic Doc Object
Expand All @@ -86,9 +87,11 @@ function handler (req, res) {
JSON.stringify(stream) || stream
]
req.log.debug({ finger, values }, 'store')
self.bulk.add([values])
promises.push(self.bulk.add([values]))
})
}
await Promise.all(promises)
res.header('x-elastic-product', 'Elasticsearch')
return res.code(200).send('{"took":0, "errors": false }')
}

Expand Down
80 changes: 38 additions & 42 deletions lib/handlers/influx_write.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async function handler (req, res) {
}
if (self.readonly) {
asyncLogError('Readonly! No push support.', req.log)
return res.send(500)
return res.code(500).send('')
}
await influxParser.init()
let streams = null
Expand Down Expand Up @@ -76,63 +76,59 @@ async function handler (req, res) {
// Calculate Fingerprint
const strLabels = stringify(Object.fromEntries(Object.entries(JSONLabels).sort()))
finger = self.fingerPrint(strLabels)
req.log.debug({ JSONLabels, finger }, 'LABELS FINGERPRINT')
self.labels.add(finger.toString(), stream.labels)
// Store Fingerprint
self.bulk_labels.add([[
new Date().toISOString().split('T')[0],
finger,
strLabels,
stream.measurement || ''
]])
for (const key in JSONLabels) {
// req.log.debug({ key, data: JSONLabels[key] }, 'Storing label');
self.labels.add('_LABELS_', key)
self.labels.add(key, JSONLabels[key])
}

const timestamp = stream.timestamp || JSONFields.timestamp

promises.push(self.bulk_labels.add([[
new Date(parseInt((BigInt(timestamp) / BigInt(1000000)).toString()))
.toISOString().split('T')[0],
finger,
strLabels,
stream.measurement || ''
]]))
/* metrics */
if (stream.fields && stream.measurement !== 'syslog' && !JSONFields.message) {
for (const [key, value] of Object.entries(JSONFields)) {
// req.log.debug({ key, value, finger }, 'BULK ROW');
if (
!key &&
!timestamp &&
!value
) {
asyncLogError('no bulkable data', req.log)
return
}
const values = [
finger,
BigInt(pad('0000000000000000000', timestamp, true)),
value || 0,
key || ''
]
promises.push(self.bulk.add([values]))
} catch (err) {
asyncLogError(err, req.log)
}
const timestamp = stream.timestamp || JSONFields.timestamp
/* metrics */
if (stream.fields && stream.measurement !== 'syslog' && !JSONFields.message) {
for (const [key, value] of Object.entries(JSONFields)) {
// req.log.debug({ key, value, finger }, 'BULK ROW');
if (
!key &&
!timestamp &&
!value
) {
asyncLogError('no bulkable data', req.log)
return res.code(204).send('')
}
/* logs or syslog */
} else if (stream.measurement === 'syslog' || JSONFields.message) {
// Send fields as a JSON object for qryn to parse
// const message = JSON.stringify(JSONFields)
const values = [
finger,
BigInt(pad('0000000000000000000', timestamp)),
null,
JSONFields.message
BigInt(pad('0000000000000000000', timestamp, true)),
value || 0,
key || ''
]
promises.push(self.bulk.add([values]))
self.bulk.add([values])
}
} catch (err) {
asyncLogError(err, req.log)
/* logs or syslog */
} else if (stream.measurement === 'syslog' || JSONFields.message) {
// Send fields as a JSON object for qryn to parse
// const message = JSON.stringify(JSONFields)
const values = [
finger,
BigInt(pad('0000000000000000000', timestamp)),
null,
JSONFields.message
]
self.bulk.add([values])
}
})
}
await Promise.all(promises)
return res.send(200)
return res.code(204).send('')
}

function pad (pad, str, padLeft) {
Expand Down

0 comments on commit c239a14

Please sign in to comment.