Skip to content

Commit

Permalink
fix bug caused by late nva state store nullification
Browse files Browse the repository at this point in the history
  • Loading branch information
Kelerchian committed Feb 15, 2024
1 parent 8595842 commit 0256a83
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 17 deletions.
4 changes: 2 additions & 2 deletions machine-runner/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion machine-runner/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@actyx/machine-runner",
"version": "0.5.6",
"version": "0.5.7",
"description": "asymmetric replicated state machines: runtime support",
"type": "module",
"types": "./lib/esm/index.d.ts",
Expand Down
4 changes: 3 additions & 1 deletion machine-runner/src/runner/runner-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ export type MachineEmitterEventMap<
factory: StateFactory.Any
nextState: unknown
}) => unknown
commandPersisted: (_: void) => unknown
commandPersisted: (_: {
sourceState: StateOpaque<SwarmProtocolName, MachineName, string, StateUnion>
}) => unknown
change: (_: StateOpaque<SwarmProtocolName, MachineName, string, StateUnion>) => unknown
next: (_: StateOpaque<SwarmProtocolName, MachineName, string, StateUnion>) => unknown
destroyed: (_: void) => unknown
Expand Down
54 changes: 45 additions & 9 deletions machine-runner/src/runner/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ export const createMachineRunner = <
MachineEventFactories extends MachineEvent.Factory.Any,
Payload,
MachineEvents extends MachineEvent.Any = MachineEvent.Of<MachineEventFactories>,
StateUnion extends unknown = unknown,
>(
sdk: Actyx,
tags: Tags<MachineEvents>,
Expand All @@ -249,7 +250,7 @@ export const createMachineRunner = <
any
>,
initialPayload: Payload,
) => {
): MachineRunner<SwarmProtocolName, MachineName, StateUnion> => {
const subscribeMonotonicQuery = {
query: tags,
sessionId: 'dummy',
Expand Down Expand Up @@ -332,6 +333,10 @@ export const createMachineRunnerInternal = <
}

const currentCommandLock = Symbol(Math.random())
const sourceState = ImplStateOpaque.make<SwarmProtocolName, MachineName, StateUnion>(
internals,
internals.current,
)

internals.commandLock = currentCommandLock

Expand All @@ -354,7 +359,7 @@ export const createMachineRunnerInternal = <
// Aftermath
const persistResult = persist(events)
.then((res) => {
emitter.emit('commandPersisted', undefined)
emitter.emit('commandPersisted', { sourceState })
return res
})
.catch((err) => {
Expand Down Expand Up @@ -480,8 +485,8 @@ export const createMachineRunnerInternal = <
emitter.emit('change', stateOpaqueToBeEmitted)

if (
internals.current.factory !== internals.previouslyEmittedToNext?.factory ||
!deepEqual(internals.previouslyEmittedToNext.data, internals.current.data)
!internals.previouslyEmittedToNext ||
!ImplStateOpaque.eqInternal(internals.current, internals.previouslyEmittedToNext)
) {
internals.previouslyEmittedToNext = {
factory: internals.current.factory,
Expand Down Expand Up @@ -525,14 +530,17 @@ export const createMachineRunnerInternal = <
cloneFrom: state,
})

const purgeWhenMatching = ({ sourceState }: { sourceState: S }) =>
nva.purgeWhenMatching(sourceState)

emitter.on('next', nva.push)
emitter.on('failure', nva.fail)
emitter.on('commandPersisted', nva.purge)
emitter.on('commandPersisted', purgeWhenMatching)
internals.destruction.addDestroyHook(() => {
nva.kill()
emitter.off('next', nva.push)
emitter.off('failure', nva.fail)
emitter.off('commandPersisted', nva.purge)
emitter.off('commandPersisted', purgeWhenMatching)
})

return nva
Expand Down Expand Up @@ -783,10 +791,12 @@ namespace MachineRunnerIterableIterator {
/**
* Object to help "awaiting" next value.
*/
export type NextValueAwaiter<S extends any> = ReturnType<typeof NextValueAwaiter.make<S>>
export type NextValueAwaiter<S extends StateOpaque<any, any, any>> = ReturnType<
typeof NextValueAwaiter.make<S>
>

namespace NextValueAwaiter {
export const make = <S extends any>({
export const make = <S extends StateOpaque<any, any, any>>({
topLevelDestruction,
currentLevelDestruction,
failure,
Expand Down Expand Up @@ -831,6 +841,12 @@ namespace NextValueAwaiter {
}
}

const purgeWhenMatching = (comparedState: S) => {
if (!!store && 'state' in store && ImplStateOpaque.eq(store.state, comparedState)) {
store = null
}
}

return {
kill: () => {
if (store && 'control' in store) {
Expand Down Expand Up @@ -859,6 +875,7 @@ namespace NextValueAwaiter {
consume,
peek,
purge,
purgeWhenMatching,
}
}

Expand Down Expand Up @@ -888,6 +905,13 @@ namespace NextValueAwaiter {
}
}

type StateOpaqueInternalAccess = {
/**
* Do not use internal
*/
[ImplStateOpaque.InternalAccess]: () => StateAndFactory<any, any, any, any, any, any>
}

/**
* StateOpaque is an opaque snapshot of a MachineRunner state. A StateOpaque
* does not have direct access to the state's payload or command. In order to
Expand All @@ -904,7 +928,8 @@ export interface StateOpaque<
any,
Contained.ContainedEvent<MachineEvent.Any>[]
> = object,
> extends StateRaw<StateName, Payload> {
> extends StateOpaqueInternalAccess,
StateRaw<StateName, Payload> {
/**
* Checks if the StateOpaque's type equals to the StateFactory's type.
*
Expand Down Expand Up @@ -1096,6 +1121,16 @@ export namespace ImplStateOpaque {
export const isRunnerDestroyed = (internals: RunnerInternals.Any): boolean =>
internals.destruction.isDestroyed()

export const InternalAccess: unique symbol = Symbol()

export const eq = (a: StateOpaque<any, any, any, any>, b: StateOpaque<any, any, any, any>) =>
eqInternal(a[InternalAccess](), b[InternalAccess]())

export const eqInternal = (
a: StateAndFactory<any, any, any, any, any, any>,
b: StateAndFactory<any, any, any, any, any, any>,
): boolean => a.factory === b.factory && deepEqual(a.data, b.data)

export const make = <
SwarmProtocolName extends string,
MachineName extends string,
Expand Down Expand Up @@ -1164,6 +1199,7 @@ export namespace ImplStateOpaque {
type: stateAtSnapshot.type,
commandsAvailable: () =>
commandEnabledAtSnapshot && CommandGeneratorCriteria.allOk(commandGeneratorCriteria),
[InternalAccess]: () => ({ data: stateAtSnapshot, factory: factoryAtSnapshot }),
}
}
}
Expand Down
38 changes: 37 additions & 1 deletion machine-runner/tests/esm/runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,9 @@ describe('machine as async generator', () => {
if (!state) throw new Unreachable()

await r1.caughtUpDelay.toggle(true) // start delay
// this command call locks, nullifying state store previously populated by caughtUp event
// this command call locks
// after it is successfully persisted
// NVA state store will be nullified
const whenOffCommands = state.as(On)?.commands()
if (!whenOffCommands) throw new Unreachable()
await whenOffCommands.toggle()
Expand All @@ -628,6 +630,40 @@ describe('machine as async generator', () => {

machine.destroy()
})

it('nullification must not happen if state has changed between command issuance and persistance', async () => {
const r1 = new Runner(On, { toggleCount: 0 })
const { machine } = r1
r1.feed([], true) // caughtUp event

const state = (await machine.peekNext()).value
expect(state).toBeTruthy() // should resolve immediately
if (!state) throw new Unreachable()

await r1.caughtUpDelay.toggle(true) // start delay
// this command call locks
// and it is supposed to nullify the NVA state store
const whenOffCommands = state.as(On)?.commands()
if (!whenOffCommands) throw new Unreachable()
await whenOffCommands.toggle()

// however, a state change happens before the command is persisted
r1.feed([ToggleOff.make({}), ToggleOn.make({}), ToggleOff.make({})], true)

const newState = machine.get()

// persistence MUST not nullify stored state because the stored state is not the
// "source of the command" state anymore
await r1.eventRoundtrip.waitAllDone() // make sure all event roundtrips are done
await r1.caughtUpDelay.toggle(false) // end delay

const newState2 = machine.get()

expect(newState?.payload).toEqual(newState2?.payload)
expect(newState?.type).toEqual(newState2?.type)

machine.destroy()
})
})

describe('actual', () => {
Expand Down
8 changes: 5 additions & 3 deletions machine-visual/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 0256a83

Please sign in to comment.