Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
mjameswh committed Jan 17, 2025
1 parent 56f16ac commit c028018
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 90 deletions.
3 changes: 1 addition & 2 deletions packages/test/src/test-integration-split-two.ts
Original file line number Diff line number Diff line change
Expand Up @@ -374,10 +374,9 @@ test('unhandledRejection causes WFT to fail', configMacro, async (t, config) =>
t.fail();
return;
}
t.is(failure.applicationFailureInfo?.type, 'Unhandled Promise rejection: Error: unhandled rejection');
t.is(failure.message, 'Unhandled Promise rejection: Error: unhandled rejection');
t.true(failure.stackTrace?.includes(`Error: unhandled rejection`));
t.is(failure.cause?.message, 'root failure');
t.is(failure.cause?.cause?.message, 'root failure');
},
{ minTimeout: 300, factor: 1, retries: 100 }
)
Expand Down
5 changes: 0 additions & 5 deletions packages/worker/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@ export class GracefulShutdownPeriodExpiredError extends Error {}
* a Promise. To silent rejections on a specific Promise, use `promise.catch(funcThatCantThrow)`
* (e.g. `promise.catch(() => void 0)` or `promise.catch((e) => logger.error(e))`).
*/
// FIXME: At this time, this wrapper is only used for errors that could not be associated with a
// specific workflow run; it should also be used for unhandled rejections in workflow code,
// but this is not possible at the moment as we intentionally "unhandle" non-TemporalFailure
// errors happening in workflow code (i.e. ALL non-TemporalFailure errors thrown from
// workflow code becomes Unhandled Rejection at some point in our own logic)
@SymbolBasedInstanceOfError('UnhandledRejectionError')
export class UnhandledRejectionError extends Error {
constructor(
Expand Down
153 changes: 80 additions & 73 deletions packages/worker/src/workflow/vm-shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export function setUnhandledRejectionHandler(getWorkflowByRunId: (runId: string)
if (runId !== undefined) {
const workflow = getWorkflowByRunId(runId);
if (workflow !== undefined) {
workflow.setUnhandledRejection(new UnhandledRejectionError(`Unhandled Promise rejection: ${err}`, err));
workflow.setUnhandledRejection(err);
return;
}
}
Expand Down Expand Up @@ -326,97 +326,104 @@ export abstract class BaseVMWorkflow implements Workflow {
public async activate(
activation: coresdk.workflow_activation.IWorkflowActivation
): Promise<coresdk.workflow_completion.IWorkflowActivationCompletion> {
if (this.context === undefined) throw new IllegalStateError('Workflow isolate context uninitialized');
activation = coresdk.workflow_activation.WorkflowActivation.fromObject(activation);
if (!activation.jobs) throw new TypeError('Expected workflow activation jobs to be defined');

// Queries are particular in many ways, and Core guarantees that a single activation will not
// contain both queries and other jobs. So let's handle them separately.
const [queries, nonQueries] = partition(activation.jobs, ({ queryWorkflow }) => queryWorkflow != null);
if (queries.length > 0) {
if (nonQueries.length > 0) throw new TypeError('Got both queries and other jobs in a single activation');
return this.activateQueries(activation);
}
try {
if (this.context === undefined) throw new IllegalStateError('Workflow isolate context uninitialized');
activation = coresdk.workflow_activation.WorkflowActivation.fromObject(activation);
if (!activation.jobs) throw new TypeError('Expected workflow activation jobs to be defined');

// Queries are particular in many ways, and Core guarantees that a single activation will not
// contain both queries and other jobs. So let's handle them separately.
const [queries, nonQueries] = partition(activation.jobs, ({ queryWorkflow }) => queryWorkflow != null);
if (queries.length > 0) {
if (nonQueries.length > 0) throw new TypeError('Got both queries and other jobs in a single activation');
return this.activateQueries(activation);
}

// Update the activator's state in preparation for a non-query activation.
// This is done early, so that we can then rely on the activator while processing the activation.
if (activation.timestamp == null)
throw new TypeError('Expected activation.timestamp to be set for non-query activation');
this.activator.now = tsToMs(activation.timestamp);
this.activator.mutateWorkflowInfo((info) => ({
...info,
historyLength: activation.historyLength as number,
// Exact truncation for multi-petabyte histories
// historySize === 0 means WFT was generated by pre-1.20.0 server, and the history size is unknown
historySize: activation.historySizeBytes?.toNumber() ?? 0,
continueAsNewSuggested: activation.continueAsNewSuggested ?? false,
currentBuildId: activation.buildIdForCurrentTask ?? undefined,
unsafe: {
...info.unsafe,
isReplaying: activation.isReplaying ?? false,
},
}));
this.activator.addKnownFlags(activation.availableInternalFlags ?? []);
// Update the activator's state in preparation for a non-query activation.
// This is done early, so that we can then rely on the activator while processing the activation.
if (activation.timestamp == null)
throw new TypeError('Expected activation.timestamp to be set for non-query activation');
this.activator.now = tsToMs(activation.timestamp);
this.activator.mutateWorkflowInfo((info) => ({
...info,
historyLength: activation.historyLength as number,
// Exact truncation for multi-petabyte histories
// historySize === 0 means WFT was generated by pre-1.20.0 server, and the history size is unknown
historySize: activation.historySizeBytes?.toNumber() ?? 0,
continueAsNewSuggested: activation.continueAsNewSuggested ?? false,
currentBuildId: activation.buildIdForCurrentTask ?? undefined,
unsafe: {
...info.unsafe,
isReplaying: activation.isReplaying ?? false,
},
}));
this.activator.addKnownFlags(activation.availableInternalFlags ?? []);

// Initialization of the workflow must happen before anything else. Yet, keep the init job in
// place in the list as we'll use it as a marker to know when to start the workflow function.
const initWorkflowJob = activation.jobs.find((job) => job.initializeWorkflow != null)?.initializeWorkflow;
if (initWorkflowJob) this.workflowModule.initialize(initWorkflowJob);
// Initialization of the workflow must happen before anything else. Yet, keep the init job in
// place in the list as we'll use it as a marker to know when to start the workflow function.
const initWorkflowJob = activation.jobs.find((job) => job.initializeWorkflow != null)?.initializeWorkflow;
if (initWorkflowJob) this.workflowModule.initialize(initWorkflowJob);

const hasSignals = activation.jobs.some(({ signalWorkflow }) => signalWorkflow != null);
const doSingleBatch = !hasSignals || this.activator.hasFlag(SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch);
const hasSignals = activation.jobs.some(({ signalWorkflow }) => signalWorkflow != null);
const doSingleBatch = !hasSignals || this.activator.hasFlag(SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch);

const [patches, nonPatches] = partition(activation.jobs, ({ notifyHasPatch }) => notifyHasPatch != null);
for (const { notifyHasPatch } of patches) {
if (notifyHasPatch == null) throw new TypeError('Expected notifyHasPatch to be set');
this.activator.notifyHasPatch(notifyHasPatch);
}
const [patches, nonPatches] = partition(activation.jobs, ({ notifyHasPatch }) => notifyHasPatch != null);
for (const { notifyHasPatch } of patches) {
if (notifyHasPatch == null) throw new TypeError('Expected notifyHasPatch to be set');
this.activator.notifyHasPatch(notifyHasPatch);
}

if (doSingleBatch) {
// updateRandomSeed requires the same special handling as patches (before anything else, and don't
// unblock conditions after each job). Unfortunately, prior to ProcessWorkflowActivationJobsAsSingleBatch,
// they were handled as regular jobs, making it unsafe to properly handle that job above, with patches.
const [updateRandomSeed, rest] = partition(nonPatches, ({ updateRandomSeed }) => updateRandomSeed != null);
if (updateRandomSeed.length > 0)
this.activator.updateRandomSeed(updateRandomSeed[updateRandomSeed.length - 1].updateRandomSeed!);
this.workflowModule.activate(
coresdk.workflow_activation.WorkflowActivation.fromObject({ ...activation, jobs: rest })
);
this.tryUnblockConditionsAndMicrotasks();
} else {
const [signals, nonSignals] = partition(
nonPatches,
// Move signals to a first batch; all the rest goes in a second batch.
({ signalWorkflow }) => signalWorkflow != null
);

// Loop and invoke each batch, waiting for microtasks to complete after each batch.
let batchIndex = 0;
for (const jobs of [signals, nonSignals]) {
if (jobs.length === 0) continue;
if (doSingleBatch) {
// updateRandomSeed requires the same special handling as patches (before anything else, and don't
// unblock conditions after each job). Unfortunately, prior to ProcessWorkflowActivationJobsAsSingleBatch,
// they were handled as regular jobs, making it unsafe to properly handle that job above, with patches.
const [updateRandomSeed, rest] = partition(nonPatches, ({ updateRandomSeed }) => updateRandomSeed != null);
if (updateRandomSeed.length > 0)
this.activator.updateRandomSeed(updateRandomSeed[updateRandomSeed.length - 1].updateRandomSeed!);
this.workflowModule.activate(
coresdk.workflow_activation.WorkflowActivation.fromObject({ ...activation, jobs }),
batchIndex++
coresdk.workflow_activation.WorkflowActivation.fromObject({ ...activation, jobs: rest })
);
this.tryUnblockConditionsAndMicrotasks();
} else {
const [signals, nonSignals] = partition(
nonPatches,
// Move signals to a first batch; all the rest goes in a second batch.
({ signalWorkflow }) => signalWorkflow != null
);

// Loop and invoke each batch, waiting for microtasks to complete after each batch.
let batchIndex = 0;
for (const jobs of [signals, nonSignals]) {
if (jobs.length === 0) continue;
this.workflowModule.activate(
coresdk.workflow_activation.WorkflowActivation.fromObject({ ...activation, jobs }),
batchIndex++
);
this.tryUnblockConditionsAndMicrotasks();
}
}
}

const completion = this.workflowModule.concludeActivation();
const completion = this.workflowModule.concludeActivation();

// Give unhandledRejection handler a chance to be triggered.
await new Promise(setImmediate);
if (this.unhandledRejection) {
// Give unhandledRejection handler a chance to be triggered.
await new Promise(setImmediate);

if (this.unhandledRejection)
throw new UnhandledRejectionError(
`Unhandled Promise rejection: ${this.unhandledRejection}`,
this.unhandledRejection
);
return completion;
} catch (err) {
return {
runId: this.activator.info.runId,
// FIXME: Calling `activator.errorToFailure()` directly from outside the VM is unsafe, as it
// depends on the `failureConverter` and `payloadConverter`, which may be customized and
// therefore aren't guaranteed not to access `global` or to cause scheduling microtasks.
// Admitingly, the risk is very low, so we're leaving it as is for now.
failed: { failure: this.activator.errorToFailure(this.unhandledRejection) },
failed: { failure: this.activator.errorToFailure(err) },
};
}
return completion;
}

private activateQueries(
Expand Down
24 changes: 16 additions & 8 deletions packages/workflow/src/internals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,17 @@ export class Activator implements ActivationHandler {
childToParent: new Map(),
};

/**
* The error that caused the current Workflow Task to fail. Sets if a non-TemporalFailure
* error bubbles up out of the Workflow function, or out of a Signal or Update handler.
*
* Our code should do a best effort to stop processing the current activation as soon as
* possible after this field is set, but it is possible that further microtasks may get
* processed before then. Only the first captured error is preserved; subsequent errors
* will be ignored.
*/
public workflowTaskError: unknown;

public readonly rootScope = new RootCancellationScope();

/**
Expand Down Expand Up @@ -703,7 +714,7 @@ export class Activator implements ActivationHandler {
if (error instanceof TemporalFailure) {
this.rejectUpdate(protocolInstanceId, error);
} else {
throw error;
this.handleWorkflowFailure(error);
}
})
.finally(() => this.inProgressUpdates.delete(updateId));
Expand Down Expand Up @@ -941,17 +952,12 @@ export class Activator implements ActivationHandler {
* Transforms failures into a command to be sent to the server.
* Used to handle any failure emitted by the Workflow.
*/
async handleWorkflowFailure(error: unknown): Promise<void> {
handleWorkflowFailure(error: unknown): void {
if (this.cancelled && isCancellation(error)) {
this.pushCommand({ cancelWorkflowExecution: {} }, true);
} else if (error instanceof ContinueAsNew) {
this.pushCommand({ continueAsNewWorkflowExecution: error.command }, true);
} else {
if (!(error instanceof TemporalFailure)) {
// This results in an unhandled rejection which will fail the activation
// preventing it from completing.
throw error;
}
} else if (error instanceof TemporalFailure) {
// Fail the workflow. We do not want to issue unfinishedHandlers warnings. To achieve that, we
// mark all handlers as completed now.
this.inProgressSignals.clear();
Expand All @@ -964,6 +970,8 @@ export class Activator implements ActivationHandler {
},
true
);
} else {
this.workflowTaskError = error;
}
}

Expand Down
9 changes: 7 additions & 2 deletions packages/workflow/src/worker-interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,15 +188,20 @@ export function concludeActivation(): coresdk.workflow_completion.IWorkflowActiv
* @returns number of unblocked conditions.
*/
export function tryUnblockConditions(): number {
const activator = getActivator();

// If a Workflow Task error was captured, bubble it up now
if (activator.workflowTaskError) throw activator.workflowTaskError;

let numUnblocked = 0;
for (;;) {
const prevUnblocked = numUnblocked;
for (const [seq, cond] of getActivator().blockedConditions.entries()) {
for (const [seq, cond] of activator.blockedConditions.entries()) {
if (cond.fn()) {
cond.resolve();
numUnblocked++;
// It is safe to delete elements during map iteration
getActivator().blockedConditions.delete(seq);
activator.blockedConditions.delete(seq);
}
}
if (prevUnblocked === numUnblocked) {
Expand Down

0 comments on commit c028018

Please sign in to comment.