diff --git a/service/history/api/create_workflow_util.go b/service/history/api/create_workflow_util.go index 84dd11f41bc..62b2efbd204 100644 --- a/service/history/api/create_workflow_util.go +++ b/service/history/api/create_workflow_util.go @@ -54,7 +54,11 @@ type ( RunID string LastWriteVersion int64 } - CreateOrUpdateLeaseFunc func(WorkflowLease, shard.Context, workflow.MutableState) (WorkflowLease, error) + CreateOrUpdateLeaseFunc func( + WorkflowLease, + shard.Context, + workflow.MutableState, + ) (WorkflowLease, error) ) func NewWorkflowWithSignal( diff --git a/service/history/api/multioperation/api.go b/service/history/api/multioperation/api.go index ddefb7a42e3..a28a628d271 100644 --- a/service/history/api/multioperation/api.go +++ b/service/history/api/multioperation/api.go @@ -37,6 +37,7 @@ import ( "go.temporal.io/server/common" "go.temporal.io/server/common/definition" "go.temporal.io/server/common/locks" + "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/service/history/api" "go.temporal.io/server/service/history/api/startworkflow" @@ -56,6 +57,20 @@ type ( noStartError struct{ startworkflow.StartOutcome } ) +type ( + multiOp struct { + shardContext shard.Context + namespaceId namespace.ID + consistencyChecker api.WorkflowConsistencyChecker + + updateReq *historyservice.UpdateWorkflowExecutionRequest + startReq *historyservice.StartWorkflowExecutionRequest + + updater *updateworkflow.Updater + starter *startworkflow.Starter + } +) + func Invoke( ctx context.Context, req *historyservice.ExecuteMultiOperationRequest, @@ -73,83 +88,54 @@ func Invoke( if updateReq == nil { return nil, serviceerror.NewInvalidArgument("expected second operation to be Update Workflow") } - updater := updateworkflow.NewUpdater( - shardContext, - workflowConsistencyChecker, - matchingClient, - updateReq, - ) startReq := req.Operations[0].GetStartWorkflow() if startReq == nil { return nil, serviceerror.NewInvalidArgument("expected first operation to be Start Workflow") } - starter, err := startworkflow.NewStarter( + + mo := &multiOp{ + shardContext: shardContext, + namespaceId: namespace.ID(req.NamespaceId), + consistencyChecker: workflowConsistencyChecker, + updateReq: updateReq, + startReq: startReq, + } + + var err error + mo.starter, err = startworkflow.NewStarter( shardContext, workflowConsistencyChecker, tokenSerializer, visibilityManager, startReq, - func( - existingLease api.WorkflowLease, - shardContext shard.Context, - ms workflow.MutableState, - ) (api.WorkflowLease, error) { - var res api.WorkflowLease - - if existingLease == nil { - // Create a new *locked* workflow context. This is important since without the lock, task processing - // would try to modify the mutable state concurrently. Once the Starter completes, it will release the lock. - // - // The cache write needs to happen *before* the persistence write because a failed cache write means an - // early error response that aborts the entire MultiOperation request. And it allows for a simple retry, too - - // whereas if the cache write happened and failed *after* a successful persistence write, - // it would leave behind a started workflow that will never receive the update. - workflowContext, releaseFunc, err := workflowConsistencyChecker.GetWorkflowCache().GetOrCreateWorkflowExecution( - ctx, - shardContext, - ms.GetNamespaceEntry().ID(), - &commonpb.WorkflowExecution{WorkflowId: ms.GetExecutionInfo().WorkflowId, RunId: ms.GetExecutionState().RunId}, - locks.PriorityHigh, - ) - if err != nil { - return nil, err - } - res = api.NewWorkflowLease(workflowContext, releaseFunc, ms) - } else { - // TODO(stephanos): remove this hack - // If the lease already exists, but the update needs to be re-applied since it was aborted due to a conflict. - res = existingLease - ms = existingLease.GetMutableState() - } - - // If MutableState isn't set here, the next request for it will load it from the database - // - but receive a new instance that won't have the in-memory Update registry. - res.GetContext().(*workflow.ContextImpl).MutableState = ms - - // Add the Update. - // NOTE: UpdateWorkflowAction return value is ignored since ther Starter will always create a WFT. - updateReg := res.GetContext().UpdateRegistry(ctx) - if _, err := updater.ApplyRequest(ctx, updateReg, ms); err != nil { - // Wrapping the error so Update and Start errors can be distinguished later. - return nil, updateError{err} - } - return res, nil - }, + mo.workflowLeaseCallback(ctx), ) if err != nil { return nil, newMultiOpError(err, multiOpAbortedErr) } + mo.updater = updateworkflow.NewUpdater( + mo.shardContext, + mo.consistencyChecker, + matchingClient, + mo.updateReq, + ) + + return mo.Invoke(ctx) +} + +func (mo *multiOp) Invoke(ctx context.Context) (*historyservice.ExecuteMultiOperationResponse, error) { // For workflow id conflict policy terminate-existing, always attempt a start // since that works when the workflow is already running *and* when it's not running. - conflictPolicy := startReq.StartRequest.WorkflowIdConflictPolicy + conflictPolicy := mo.startReq.StartRequest.WorkflowIdConflictPolicy if conflictPolicy == enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING { - resp, err := startAndUpdateWorkflow(ctx, starter, updater) + resp, err := mo.startAndUpdateWorkflow(ctx) var noStartErr *noStartError switch { case errors.As(err, &noStartErr): - // The start request was deduped. Continue below and only send the update. + // The start request was deduped, no termination is needed. + // Continue below by only sending the update. case err != nil: return nil, err default: @@ -157,62 +143,22 @@ func Invoke( } } - currentWorkflowLease, err := workflowConsistencyChecker.GetWorkflowLease( - ctx, - nil, - definition.NewWorkflowKey(req.NamespaceId, startReq.StartRequest.WorkflowId, ""), - locks.PriorityHigh, - ) - var notFound *serviceerror.NotFound - if errors.As(err, ¬Found) { - currentWorkflowLease = nil - } else if err != nil { - return nil, newMultiOpError(err, multiOpAbortedErr) + runningWorkflowLease, err := mo.getRunningWorkflowLease(ctx) + if err != nil { + return nil, err } - // workflow was already started, ... - if currentWorkflowLease != nil { - if !currentWorkflowLease.GetMutableState().IsWorkflowExecutionRunning() { - // ... but cannot receive an update since it's not running. - currentWorkflowLease.GetReleaseFn()(nil) - } else { - // and is running, ... - switch conflictPolicy { - case enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING: - // ... skip the start and only send the update - // NOTE: currentWorkflowLease will be released by the function - return updateWorkflow(ctx, shardContext, currentWorkflowLease, updater) - - case enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL: - // ... if same request ID, just send update - // NOTE: currentWorkflowLease will be released by the function - if dedup(startReq, currentWorkflowLease) { - return updateWorkflow(ctx, shardContext, currentWorkflowLease, updater) - } - - // ... otherwise, abort the entire operation - currentWorkflowLease.GetReleaseFn()(nil) // nil since nothing was modified - wfKey := currentWorkflowLease.GetContext().GetWorkflowKey() - err = serviceerror.NewWorkflowExecutionAlreadyStarted( - fmt.Sprintf("Workflow execution is already running. WorkflowId: %v, RunId: %v.", wfKey.WorkflowID, wfKey.RunID), - startReq.StartRequest.RequestId, - wfKey.RunID, - ) - return nil, newMultiOpError(err, multiOpAbortedErr) - - case enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING: - return updateWorkflow(ctx, shardContext, currentWorkflowLease, updater) - - case enumspb.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED: - // ... fail since this policy is invalid - currentWorkflowLease.GetReleaseFn()(nil) // nil since nothing was modified - return nil, serviceerror.NewInvalidArgument("unhandled workflow id conflict policy: unspecified") - } + // Workflow was already started ... + if runningWorkflowLease != nil { + if err = mo.allowUpdateWorkflow(ctx, runningWorkflowLease, conflictPolicy); err != nil { + runningWorkflowLease.GetReleaseFn()(nil) // nil since nothing was modified + return nil, err } + return mo.updateWorkflow(ctx, runningWorkflowLease) } - // workflow isn't running: start and then apply update - resp, err := startAndUpdateWorkflow(ctx, starter, updater) + // Workflow hasn't been started yet ... + resp, err := mo.startAndUpdateWorkflow(ctx) var noStartErr *noStartError if errors.As(err, &noStartErr) { // The workflow was meant to be started - but was actually *not* started. @@ -229,64 +175,165 @@ func Invoke( return resp, err } -func updateWorkflow( +func (mo *multiOp) workflowLeaseCallback( + ctx context.Context, +) api.CreateOrUpdateLeaseFunc { + return func( + existingLease api.WorkflowLease, + shardContext shard.Context, + ms workflow.MutableState, + ) (api.WorkflowLease, error) { + var res api.WorkflowLease + + if existingLease == nil { + // Create a new *locked* workflow context. This is important since without the lock, task processing + // would try to modify the mutable state concurrently. Once the Starter completes, it will release the lock. + // + // The cache write needs to happen *before* the persistence write because a failed cache write means an + // early error response that aborts the entire MultiOperation request. And it allows for a simple retry, too - + // whereas if the cache write happened and failed *after* a successful persistence write, + // it would leave behind a started workflow that will never receive the update. + workflowContext, releaseFunc, err := mo.consistencyChecker.GetWorkflowCache().GetOrCreateWorkflowExecution( + ctx, + shardContext, + ms.GetNamespaceEntry().ID(), + &commonpb.WorkflowExecution{WorkflowId: ms.GetExecutionInfo().WorkflowId, RunId: ms.GetExecutionState().RunId}, + locks.PriorityHigh, + ) + if err != nil { + return nil, err + } + res = api.NewWorkflowLease(workflowContext, releaseFunc, ms) + } else { + // TODO(stephanos): remove this hack + // If the lease already exists, but the update needs to be re-applied since it was aborted due to a conflict. + res = existingLease + ms = existingLease.GetMutableState() + } + + // If MutableState isn't set here, the next request for it will load it from the database + // - but receive a new instance that won't have the in-memory Update registry. + res.GetContext().(*workflow.ContextImpl).MutableState = ms + + // Add the Update. + // NOTE: UpdateWorkflowAction return value is ignored since ther Starter will always create a WFT. + updateReg := res.GetContext().UpdateRegistry(ctx) + if _, err := mo.updater.ApplyRequest(ctx, updateReg, ms); err != nil { + // Wrapping the error so Update and Start errors can be distinguished later. + return nil, updateError{err} + } + return res, nil + } +} + +func (mo *multiOp) getRunningWorkflowLease(ctx context.Context) (api.WorkflowLease, error) { + runningWorkflowLease, err := mo.consistencyChecker.GetWorkflowLease( + ctx, + nil, + definition.NewWorkflowKey(mo.namespaceId.String(), mo.startReq.StartRequest.WorkflowId, ""), + locks.PriorityHigh, + ) + var notFound *serviceerror.NotFound + if errors.As(err, ¬Found) { + return nil, nil + } + if err != nil { + return nil, newMultiOpError(err, multiOpAbortedErr) + } + + if runningWorkflowLease == nil { + return nil, nil + } + + if !runningWorkflowLease.GetMutableState().IsWorkflowExecutionRunning() { + runningWorkflowLease.GetReleaseFn()(nil) + return nil, nil + } + + return runningWorkflowLease, nil +} + +func (mo *multiOp) allowUpdateWorkflow( + ctx context.Context, + currentWorkflowLease api.WorkflowLease, + conflictPolicy enumspb.WorkflowIdConflictPolicy, +) error { + switch conflictPolicy { + case enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING: + // Allow sending the update. + return nil + + case enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL: + // If it's the same request ID, allow sending the update. + if canDedup(mo.startReq, currentWorkflowLease) { + return nil + } + + // Otherwise, don't allow sending the update. + wfKey := currentWorkflowLease.GetContext().GetWorkflowKey() + err := serviceerror.NewWorkflowExecutionAlreadyStarted( + fmt.Sprintf("Workflow execution is already running. WorkflowId: %v, RunId: %v.", wfKey.WorkflowID, wfKey.RunID), + mo.startReq.StartRequest.RequestId, + wfKey.RunID, + ) + return newMultiOpError(err, multiOpAbortedErr) + + case enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING: + // Allow sending the update since the termination was deduped earlier. + return nil + + case enumspb.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED: + // Don't allow sending the update as the policy is invalid. + // This should never happen as it should be validated by the frontend. + return serviceerror.NewInvalidArgument("unhandled workflow id conflict policy: unspecified") + + default: + // Don't allow sending the update as the policy is invalid. + // This should never happen as it should be validated by the frontend. + return serviceerror.NewInternal("unhandled workflow id conflict policy") + } +} + +func (mo *multiOp) updateWorkflow( ctx context.Context, - shardContext shard.Context, currentWorkflowLease api.WorkflowLease, - updater *updateworkflow.Updater, ) (*historyservice.ExecuteMultiOperationResponse, error) { - // apply update to workflow + // Apply the update to the workflow. err := api.UpdateWorkflowWithNew( - shardContext, + mo.shardContext, ctx, currentWorkflowLease, func(lease api.WorkflowLease) (*api.UpdateWorkflowAction, error) { ms := lease.GetMutableState() updateReg := lease.GetContext().UpdateRegistry(ctx) - return updater.ApplyRequest(ctx, updateReg, ms) + return mo.updater.ApplyRequest(ctx, updateReg, ms) }, nil, ) - // release lock since all changes to workflow have been completed now + // Release lock here since all changes to the workflow have been completed now. currentWorkflowLease.GetReleaseFn()(err) if err != nil { return nil, newMultiOpError(multiOpAbortedErr, err) } - // wait for the update to complete - updateResp, err := updater.OnSuccess(ctx) + // Wait for the update to complete. + updateResp, err := mo.updater.OnSuccess(ctx) if err != nil { return nil, newMultiOpError(multiOpAbortedErr, err) } - return &historyservice.ExecuteMultiOperationResponse{ - Responses: []*historyservice.ExecuteMultiOperationResponse_Response{ - { - Response: &historyservice.ExecuteMultiOperationResponse_Response_StartWorkflow{ - StartWorkflow: &historyservice.StartWorkflowExecutionResponse{ - RunId: currentWorkflowLease.GetContext().GetWorkflowKey().RunID, - Started: false, // set explicitly for emphasis - }, - }, - }, - { - Response: &historyservice.ExecuteMultiOperationResponse_Response_UpdateWorkflow{ - UpdateWorkflow: updateResp, - }, - }, - }, - }, nil + startResp := &historyservice.StartWorkflowExecutionResponse{ + RunId: currentWorkflowLease.GetContext().GetWorkflowKey().RunID, + Started: false, // set explicitly for emphasis + } + + return makeResponse(startResp, updateResp), nil } -// NOTE: Returns a `noStartError` if the start was unexpectedly reused/deduped. -func startAndUpdateWorkflow( - ctx context.Context, - starter *startworkflow.Starter, - updater *updateworkflow.Updater, -) (*historyservice.ExecuteMultiOperationResponse, error) { - startResp, startOutcome, err := starter.Invoke(ctx) +func (mo *multiOp) startAndUpdateWorkflow(ctx context.Context) (*historyservice.ExecuteMultiOperationResponse, error) { + startResp, startOutcome, err := mo.starter.Invoke(ctx) if err != nil { // An update error occurred. if errors.As(err, &updateError{}) { @@ -301,12 +348,19 @@ func startAndUpdateWorkflow( return nil, &noStartError{startOutcome} } - // wait for the update to complete - updateResp, err := updater.OnSuccess(ctx) + // Wait for the update to complete. + updateResp, err := mo.updater.OnSuccess(ctx) if err != nil { return nil, newMultiOpError(nil, err) // `nil` for start since it succeeded } + return makeResponse(startResp, updateResp), nil +} + +func makeResponse( + startResp *historyservice.StartWorkflowExecutionResponse, + updateResp *historyservice.UpdateWorkflowExecutionResponse, +) *historyservice.ExecuteMultiOperationResponse { return &historyservice.ExecuteMultiOperationResponse{ Responses: []*historyservice.ExecuteMultiOperationResponse_Response{ { @@ -320,7 +374,7 @@ func startAndUpdateWorkflow( }, }, }, - }, nil + } } func newMultiOpError(startErr, updateErr error) error { @@ -338,7 +392,7 @@ func newMultiOpError(startErr, updateErr error) error { []error{startErr, updateErr}) } -func dedup(startReq *historyservice.StartWorkflowExecutionRequest, currentWorkflowLease api.WorkflowLease) bool { +func canDedup(startReq *historyservice.StartWorkflowExecutionRequest, currentWorkflowLease api.WorkflowLease) bool { return startReq.StartRequest.RequestId == currentWorkflowLease.GetMutableState().GetExecutionState().GetCreateRequestId() }