Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make live queries aware of reindexed (decrypted) values #236

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 88 additions & 60 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const pull = require('pull-stream')
const mutexify = require('mutexify')
const toPull = require('push-stream-to-pull-stream')
const pullAsync = require('pull-async')
const pullMany = require('pull-many')
const TypedFastBitSet = require('typedfastbitset')
const bsb = require('binary-search-bounds')
const multicb = require('multicb')
Expand Down Expand Up @@ -1497,73 +1498,94 @@ module.exports = function (log, indexesPath) {
})
}

function createLazyPull() {
let onValue

return {
add(val) {
if (onValue) onValue(null, val)
},
source(abort, cb) {
if (abort) cb(abort)
else if (!onValue) onValue = cb
},
}
}

const reindexValues = createLazyPull()

// live will return new messages as they enter the log
// can be combined with a normal all or paginate first
function live(op) {
return pull(
pullAsync((cb) =>
onReady(() => {
executeOperation(op, (err) => cb(err))
})
),
pull.map(() => {
let offset = -1
let seqStream

function detectOffsetAndSeqStream(ops) {
ops.forEach((op) => {
if (
op.type === 'EQUAL' ||
op.type === 'INCLUDES' ||
op.type === 'PREDICATE' ||
op.type === 'ABSENT'
) {
if (!indexes.has(op.data.indexName)) offset = -1
else offset = indexes.get(op.data.indexName).offset
} else if (
op.type === 'AND' ||
op.type === 'OR' ||
op.type === 'NOT'
) {
detectOffsetAndSeqStream(op.data)
} else if (op.type === 'LIVESEQS') {
if (seqStream)
throw new Error('Only one seq stream in live supported')
seqStream = op.stream
pullMany([
pull(
pullAsync((cb) =>
onReady(() => {
executeOperation(op, (err) => cb(err))
})
),
pull.map(() => {
let offset = -1
let seqStream

function detectOffsetAndSeqStream(ops) {
ops.forEach((op) => {
if (
op.type === 'EQUAL' ||
op.type === 'INCLUDES' ||
op.type === 'PREDICATE' ||
op.type === 'ABSENT'
) {
if (!indexes.has(op.data.indexName)) offset = -1
else offset = indexes.get(op.data.indexName).offset
} else if (
op.type === 'AND' ||
op.type === 'OR' ||
op.type === 'NOT'
) {
detectOffsetAndSeqStream(op.data)
} else if (op.type === 'LIVESEQS') {
if (seqStream)
throw new Error('Only one seq stream in live supported')
seqStream = op.stream
}
})
}
})
}

detectOffsetAndSeqStream([op])

// There are two cases here:
// - op contains a live seq stream, in which case we let the
// seq stream drive new values
// - op doesn't, in which we let the log stream drive new values

let recordStream
if (seqStream) {
recordStream = pull(
seqStream,
pull.asyncMap((seq, cb) => {
ensureIndexSync({ data: { indexName: 'seq' } }, () => {
getRecord(seq, cb)
})
})
)
} else {
const opts =
offset === -1
? { live: true, gt: seqIndex.offset }
: { live: true, gt: offset }
const logstreamId = Math.ceil(Math.random() * 1000)
debug(`log.stream #${logstreamId} started, for a live query`)
recordStream = toPull(log.stream(opts))
}
detectOffsetAndSeqStream([op])

// There are two cases here:
// - op contains a live seq stream, in which case we let the
// seq stream drive new values
// - op doesn't, in which we let the log stream drive new values

let recordStream
if (seqStream) {
recordStream = pull(
seqStream,
pull.asyncMap((seq, cb) => {
ensureIndexSync({ data: { indexName: 'seq' } }, () => {
getRecord(seq, cb)
})
})
)
} else {
const opts =
offset === -1
? { live: true, gt: seqIndex.offset }
: { live: true, gt: offset }
const logstreamId = Math.ceil(Math.random() * 1000)
debug(`log.stream #${logstreamId} started, for a live query`)
recordStream = toPull(log.stream(opts))
}

return recordStream
}),
pull.flatten(),
return recordStream
}),
pull.flatten()
),
reindexValues.source,
]),
pull.filter((record) => isValueOk([op], record.value)),
pull.through(() => {
if (debugQuery.enabled)
Expand Down Expand Up @@ -1592,6 +1614,12 @@ module.exports = function (log, indexesPath) {
seq = 1
}

// add value to live streams
log.get(offset, (err, recBuffer) => {
if (err) return
else reindexValues.add({ value: recBuffer })
})

function resetIndex(index) {
if (index.offset >= prevOffset) {
if (index.count) index.count = seq
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"pull-async": "~1.0.0",
"pull-awaitable": "^1.0.0",
"pull-cat": "~1.1.11",
"pull-many": "^1.0.9",
"pull-stream": "^3.6.14",
"push-stream": "^11.0.0",
"push-stream-to-pull-stream": "^1.0.3",
Expand Down