Skip to content

Commit

Permalink
feat(workflow): Propagate otel context when scheduling local activiti…
Browse files Browse the repository at this point in the history
…es (#1577)
  • Loading branch information
THardy98 authored Dec 18, 2024
1 parent 0cd8980 commit e9184bb
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 1 deletion.
18 changes: 18 additions & 0 deletions packages/interceptors-opentelemetry/src/workflow/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
ContinueAsNewInput,
DisposeInput,
GetLogAttributesInput,
LocalActivityInput,
Next,
SignalInput,
SignalWorkflowInput,
Expand Down Expand Up @@ -104,6 +105,23 @@ export class OpenTelemetryOutboundInterceptor implements WorkflowOutboundCallsIn
});
}

public async scheduleLocalActivity(
input: LocalActivityInput,
next: Next<WorkflowOutboundCallsInterceptor, 'scheduleLocalActivity'>
): Promise<unknown> {
return await instrument({
tracer: this.tracer,
spanName: `${SpanName.ACTIVITY_START}${SPAN_DELIMITER}${input.activityType}`,
fn: async () => {
const headers = await headersWithContext(input.headers);
return next({
...input,
headers,
});
},
});
}

public async startChildWorkflowExecution(
input: StartChildWorkflowExecutionInput,
next: Next<WorkflowOutboundCallsInterceptor, 'startChildWorkflowExecution'>
Expand Down
14 changes: 14 additions & 0 deletions packages/test/src/test-otel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,20 @@ if (RUN_INTEGRATION_TESTS) {
);
t.true(signalChildWithUnblockSpan !== undefined);

const localActivityStartSpan = spans.find(
({ name, parentSpanId }) =>
name === `${SpanName.ACTIVITY_START}${SPAN_DELIMITER}echo` &&
parentSpanId === parentExecuteSpan?.spanContext().spanId
);
t.true(localActivityStartSpan !== undefined);

const localActivityExecuteSpan = spans.find(
({ name, parentSpanId }) =>
name === `${SpanName.ACTIVITY_EXECUTE}${SPAN_DELIMITER}echo` &&
parentSpanId === localActivityStartSpan?.spanContext().spanId
);
t.true(localActivityExecuteSpan !== undefined);

const activityStartedSignalSpan = spans.find(
({ name, parentSpanId }) =>
name === `${SpanName.WORKFLOW_SIGNAL}${SPAN_DELIMITER}activityStarted` &&
Expand Down
17 changes: 16 additions & 1 deletion packages/test/src/workflows/smorgasbord.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
setHandler,
condition,
continueAsNew,
proxyLocalActivities,
} from '@temporalio/workflow';
import * as activities from '../activities';
import { signalTarget } from './signal-target';
Expand All @@ -22,6 +23,11 @@ const { fakeProgress, queryOwnWf } = proxyActivities<typeof activities>({
cancellationType: ActivityCancellationType.WAIT_CANCELLATION_COMPLETED,
});

const { echo } = proxyLocalActivities<typeof activities>({
startToCloseTimeout: '1m',
cancellationType: ActivityCancellationType.WAIT_CANCELLATION_COMPLETED,
});

export const stepQuery = defineQuery<number>('step');

export async function smorgasbord(iteration = 0): Promise<void> {
Expand All @@ -42,11 +48,20 @@ export async function smorgasbord(iteration = 0): Promise<void> {
await childWf.result();
})();

const localActivityPromise = echo('local-activity');

if (iteration === 0) {
CancellationScope.current().cancel();
}

await Promise.all([activityPromise, queryActPromise, timerPromise, childWfPromise, condition(() => unblocked)]);
await Promise.all([
activityPromise,
queryActPromise,
timerPromise,
childWfPromise,
localActivityPromise,
condition(() => unblocked),
]);
});
} catch (e) {
if (iteration !== 0 || !isCancellation(e)) {
Expand Down

0 comments on commit e9184bb

Please sign in to comment.