Skip to content

Commit

Permalink
command persistence now will purge NextAwaiter state store
Browse files Browse the repository at this point in the history
  • Loading branch information
Kelerchian committed Jan 25, 2024
1 parent 2899508 commit 785f4db
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 31 deletions.
11 changes: 10 additions & 1 deletion machine-runner/src/runner/runner-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ export type MachineEmitterEventMap<
factory: StateFactory.Any
nextState: unknown
}) => unknown
commandPersisted: (_: void) => unknown
change: (_: StateOpaque<SwarmProtocolName, MachineName, string, StateUnion>) => unknown
next: (_: StateOpaque<SwarmProtocolName, MachineName, string, StateUnion>) => unknown
destroyed: (_: void) => unknown
Expand All @@ -100,7 +101,15 @@ export const makeEmitter = <
SwarmProtocolName extends string,
MachineName extends string,
StateUnion extends unknown = unknown,
>() => new ThrowIgnoringEmitter() as MachineEmitter<SwarmProtocolName, MachineName, StateUnion>
>() => {
const emitter = new ThrowIgnoringEmitter() as MachineEmitter<
SwarmProtocolName,
MachineName,
StateUnion
>
emitter.setMaxListeners(20)
return emitter
}

export const ActiveRunnerRegistryRegisterSymbol: unique symbol = Symbol(
'ActiveRunnerRegistryRegisterSymbol',
Expand Down
27 changes: 22 additions & 5 deletions machine-runner/src/runner/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -351,17 +351,24 @@ export const createMachineRunnerInternal = <
emitter.emit('change', ImplStateOpaque.make(internals, internals.current))
}

const persistResult = persist(events).catch((err) => {
unlockAndLogOnPersistFailure(err)
return Promise.reject(err)
})
// Aftermath
const persistResult = persist(events)
.then((res) => {
emitter.emit('commandPersisted', undefined)
return res
})
.catch((err) => {
unlockAndLogOnPersistFailure(err)
return Promise.reject(err)
})

// Change is triggered because commandLock status changed
emitter.emit('change', ImplStateOpaque.make(internals, internals.current))
return persistResult
})

// Actyx Subscription management
internals.destruction.addDestroyHook(() => emitter.emit('destroyed'))
internals.destruction.addDestroyHook(() => emitter.emit('destroyed', undefined))

const fail = (cause: MachineRunnerFailure) => {
// order of execution is very important here
Expand Down Expand Up @@ -520,10 +527,12 @@ export const createMachineRunnerInternal = <

emitter.on('next', nva.push)
emitter.on('failure', nva.fail)
emitter.on('commandPersisted', nva.purge)
internals.destruction.addDestroyHook(() => {
nva.kill()
emitter.off('next', nva.push)
emitter.off('failure', nva.fail)
emitter.off('commandPersisted', nva.purge)
})

return nva
Expand Down Expand Up @@ -809,6 +818,13 @@ namespace NextValueAwaiter {
return retval
}

const purge = () => {
const shouldNullify = !!store && 'state' in store
if (shouldNullify) {
store = null
}
}

return {
kill: () => {
if (store && 'control' in store) {
Expand Down Expand Up @@ -836,6 +852,7 @@ namespace NextValueAwaiter {
store && 'state' in store ? { state: store.state } : undefined,
consume,
peek,
purge,
}
}

Expand Down
117 changes: 103 additions & 14 deletions machine-runner/tests/esm/helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { StateOpaque, MachineEvent, StateFactory, State, globals } from '../../l
import { createMachineRunnerInternal } from '../../lib/esm/runner/runner.js'
import { PromiseDelay, Subscription, mockMeta } from '../../lib/esm/test-utils/mock-runner.js'
import { CommonEmitterEventMap, TypedEventEmitter } from '../../lib/esm/runner/runner-utils.js'
import { EventEmitter } from 'events'

export const sleep = (dur: number) => new Promise((res) => setTimeout(res, dur))

Expand Down Expand Up @@ -55,17 +56,25 @@ export class Runner<
Payload,
MachineEvent extends MachineEvent.Any = MachineEvent.Of<MachineEventFactories>,
> {
static EVENT_ROUNDTRIP_DELAY = 1

private persisted: ActyxEvent<MachineEvent.Any>[] = []
private unhandled: MachineEvent.Any[] = []
private caughtUpHistory: StateOpaque<SwarmProtocolName, MachineName, string, unknown>[] = []
private stateChangeHistory: {
state: StateOpaque<SwarmProtocolName, MachineName, string, unknown>
unhandled: MachineEvent.Any[]
}[] = []
private delayer = PromiseDelay.make()
private sub = Subscription.make<MachineEvent>()
public machine

private commandDelay = PromiseDelay.make()
public caughtUpDelay = CaughtUpDelay.make(async () => {
await this.eventRoundtrip.waitAllDone()
await this.feed([], true)
})
public eventRoundtrip = ActivePromiseSet.make()

public tag
/* eslint-disable @typescript-eslint/no-explicit-any */
constructor(
Expand All @@ -84,16 +93,27 @@ export class Runner<
payload: payload as MachineEvent.Of<MachineEventFactories>,
}),
)
const pair = this.delayer.make()
const retval = pair[0].then(() => {
this.persisted.push(...actyxEvents)
this.feed(
actyxEvents.map((e) => e.payload),
true,

const pair = this.commandDelay.make()
const caughtUpDelay = this.caughtUpDelay.shouldDelay()
const commandPromise = pair[0].then(() => actyxEvents.map((e) => e.meta))

// event roundtrip
commandPromise
.then(() =>
this.eventRoundtrip.queue(async () => {
await sleep(Runner.EVENT_ROUNDTRIP_DELAY)
this.persisted.push(...actyxEvents)
this.feed(
actyxEvents.map((e) => e.payload),
!caughtUpDelay,
)
}),
)
return actyxEvents.map((e) => e.meta)
})
return retval
// eslint-disable-next-line @typescript-eslint/no-empty-function
.catch(() => {})

return commandPromise
},
tag,
factory,
Expand Down Expand Up @@ -126,10 +146,13 @@ export class Runner<
async toggleCommandDelay(
delayControl: { delaying: true } | { delaying: false; reject?: boolean },
): Promise<void> {
await this.delayer.toggle(delayControl)
await this.commandDelay.toggle(delayControl)
if (!delayControl.delaying) {
await this.eventRoundtrip.waitAllDone()
}
}

feed(ev: MachineEvent[], caughtUp: boolean) {
feed = (ev: MachineEvent[], caughtUp: boolean) => {
const cb = this.sub.cb
if (!cb) {
console.warn('not subscribed')
Expand Down Expand Up @@ -228,12 +251,14 @@ export class Runner<
}
}

assertPersistedAsMachineEvent(...e: MachineEvent[]) {
assertPersistedAsMachineEvent = async (...e: MachineEvent[]) => {
await this.eventRoundtrip.waitAllDone()
expect(this.persisted.map((e) => e.payload)).toEqual(e)
this.persisted.length = 0
}

assertPersistedWithFn(fn: (events: ActyxEvent<MachineEvent.Any>[]) => void) {
assertPersistedWithFn = async (fn: (events: ActyxEvent<MachineEvent.Any>[]) => void) => {
await this.eventRoundtrip.waitAllDone()
fn([...this.persisted])
this.persisted.length = 0
}
Expand Down Expand Up @@ -261,3 +286,67 @@ export const createBufferLog = () => {
get: () => buffer,
}
}

export type ActivePromiseSet = ReturnType<typeof ActivePromiseSet['make']>
export namespace ActivePromiseSet {
const EMPTY = 'empty'

export const make = () => {
const internals = {
working: new Set<Promise<unknown>>(),
emitter: new EventEmitter(),
}
internals.emitter.setMaxListeners(Infinity)

const whenEmptyNotify = () => {
if (internals.working.size === 0) {
internals.emitter.emit(EMPTY, undefined)
}
}

const queue = (fn: () => Promise<unknown>) => {
const task = fn().finally(() => {
internals.working.delete(task)
whenEmptyNotify()
})
internals.working.add(task)
}

const waitAllDone = () =>
new Promise<void>((res) => {
if (internals.working.size === 0) {
return res()
}
internals.emitter.once(EMPTY, res)
})

return { waitAllDone, queue }
}
}

export type CaughtUpDelay = ReturnType<typeof CaughtUpDelay['make']>
export namespace CaughtUpDelay {
export const make = (onRelease: () => Promise<unknown>) => {
const internals = { delaying: false, buffered: false }

const releaseBuffer = async () => {
if (!internals.buffered) return
internals.buffered = false
await onRelease()
}

const toggle = async (delaying: boolean) => {
internals.delaying = delaying
if (!delaying) {
await releaseBuffer()
}
}

const shouldDelay = () => {
internals.buffered = internals.delaying
return internals.delaying
}

return { toggle, shouldDelay }
}
}
Loading

0 comments on commit 785f4db

Please sign in to comment.