From e9184bbc5411cec4667515d14ad5dadae22fc5ec Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Wed, 18 Dec 2024 15:25:13 -0500 Subject: [PATCH] feat(workflow): Propagate otel context when scheduling local activities (#1577) --- .../src/workflow/index.ts | 18 ++++++++++++++++++ packages/test/src/test-otel.ts | 14 ++++++++++++++ packages/test/src/workflows/smorgasbord.ts | 17 ++++++++++++++++- 3 files changed, 48 insertions(+), 1 deletion(-) diff --git a/packages/interceptors-opentelemetry/src/workflow/index.ts b/packages/interceptors-opentelemetry/src/workflow/index.ts index 70c9413a2..e413ffc51 100644 --- a/packages/interceptors-opentelemetry/src/workflow/index.ts +++ b/packages/interceptors-opentelemetry/src/workflow/index.ts @@ -9,6 +9,7 @@ import { ContinueAsNewInput, DisposeInput, GetLogAttributesInput, + LocalActivityInput, Next, SignalInput, SignalWorkflowInput, @@ -104,6 +105,23 @@ export class OpenTelemetryOutboundInterceptor implements WorkflowOutboundCallsIn }); } + public async scheduleLocalActivity( + input: LocalActivityInput, + next: Next + ): Promise { + return await instrument({ + tracer: this.tracer, + spanName: `${SpanName.ACTIVITY_START}${SPAN_DELIMITER}${input.activityType}`, + fn: async () => { + const headers = await headersWithContext(input.headers); + return next({ + ...input, + headers, + }); + }, + }); + } + public async startChildWorkflowExecution( input: StartChildWorkflowExecutionInput, next: Next diff --git a/packages/test/src/test-otel.ts b/packages/test/src/test-otel.ts index 077991508..5cdf77383 100644 --- a/packages/test/src/test-otel.ts +++ b/packages/test/src/test-otel.ts @@ -258,6 +258,20 @@ if (RUN_INTEGRATION_TESTS) { ); t.true(signalChildWithUnblockSpan !== undefined); + const localActivityStartSpan = spans.find( + ({ name, parentSpanId }) => + name === `${SpanName.ACTIVITY_START}${SPAN_DELIMITER}echo` && + parentSpanId === parentExecuteSpan?.spanContext().spanId + ); + t.true(localActivityStartSpan !== undefined); + + const localActivityExecuteSpan = spans.find( + ({ name, parentSpanId }) => + name === `${SpanName.ACTIVITY_EXECUTE}${SPAN_DELIMITER}echo` && + parentSpanId === localActivityStartSpan?.spanContext().spanId + ); + t.true(localActivityExecuteSpan !== undefined); + const activityStartedSignalSpan = spans.find( ({ name, parentSpanId }) => name === `${SpanName.WORKFLOW_SIGNAL}${SPAN_DELIMITER}activityStarted` && diff --git a/packages/test/src/workflows/smorgasbord.ts b/packages/test/src/workflows/smorgasbord.ts index 23beccecf..b71c714dc 100644 --- a/packages/test/src/workflows/smorgasbord.ts +++ b/packages/test/src/workflows/smorgasbord.ts @@ -12,6 +12,7 @@ import { setHandler, condition, continueAsNew, + proxyLocalActivities, } from '@temporalio/workflow'; import * as activities from '../activities'; import { signalTarget } from './signal-target'; @@ -22,6 +23,11 @@ const { fakeProgress, queryOwnWf } = proxyActivities({ cancellationType: ActivityCancellationType.WAIT_CANCELLATION_COMPLETED, }); +const { echo } = proxyLocalActivities({ + startToCloseTimeout: '1m', + cancellationType: ActivityCancellationType.WAIT_CANCELLATION_COMPLETED, +}); + export const stepQuery = defineQuery('step'); export async function smorgasbord(iteration = 0): Promise { @@ -42,11 +48,20 @@ export async function smorgasbord(iteration = 0): Promise { await childWf.result(); })(); + const localActivityPromise = echo('local-activity'); + if (iteration === 0) { CancellationScope.current().cancel(); } - await Promise.all([activityPromise, queryActPromise, timerPromise, childWfPromise, condition(() => unblocked)]); + await Promise.all([ + activityPromise, + queryActPromise, + timerPromise, + childWfPromise, + localActivityPromise, + condition(() => unblocked), + ]); }); } catch (e) { if (iteration !== 0 || !isCancellation(e)) {