Skip to content

Commit

Permalink
simplify cancellation cause and punt to issue Graceful Pipeline Exit a…
Browse files Browse the repository at this point in the history
  • Loading branch information
Zeph Grunschlag committed Aug 11, 2023
1 parent 8c1e6fa commit 1158f75
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 21 deletions.
21 changes: 1 addition & 20 deletions conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type Pipeline interface {
Init() error
Start()
Stop()
WhyStopped() error
Error() error
Wait()
}
Expand Down Expand Up @@ -74,9 +73,6 @@ type pipelineBlock struct {
type pluginChannel chan pipelineBlock

var (
// BecauseStopMethod is the sentinel error that signals the pipeline was stopped via Stop().
BecauseStopMethod = errors.New("pipeline stopped") //nolint:revive // this is a sentinel error

errImporterCause = errors.New("importer cancelled")
errProcessorCause = errors.New("processor cancelled")
errExporterCause = errors.New("exporter cancelled")
Expand Down Expand Up @@ -400,7 +396,7 @@ func (p *pipelineImpl) Init() error {
}

func (p *pipelineImpl) Stop() {
p.ccf(BecauseStopMethod)
p.ccf(nil)
p.wg.Wait()

if p.profFile != nil {
Expand Down Expand Up @@ -433,16 +429,6 @@ func (p *pipelineImpl) Stop() {
}
}

// WhyStopped returns nil if a context was never provided or the pipeline
// is yet to have stopped. Otherwise, it returns the cause of the pipeline's
// context cancellation.
func (p *pipelineImpl) WhyStopped() error {
if p.ctx == nil {
return nil
}
return context.Cause(p.ctx)
}

func numInnerTxn(txn sdk.SignedTxnWithAD) int {
result := 0
for _, itxn := range txn.ApplyData.EvalDelta.InnerTxns {
Expand Down Expand Up @@ -502,10 +488,6 @@ func (p *pipelineImpl) importerHandler(importer importers.Importer, roundChan <-

// TODO: Verify that the block was built with a known protocol version.

// Start time currently measures operations after block fetching is complete.
// This is for backwards compatibility w/ Indexer's metrics
// run through processors

importFinish := time.Now()
pipelineBlk := pipelineBlock{
BlockData: blkData,
Expand Down Expand Up @@ -713,7 +695,6 @@ func (p *pipelineImpl) Start() {
}(p.pipelineMetadata.NextRound)

<-p.ctx.Done()
// TODO: send a prometheus observation based on WhyStopped()
}

func (p *pipelineImpl) Wait() {
Expand Down
2 changes: 1 addition & 1 deletion conduit/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func TestPipelineRun(t *testing.T) {

mock.AssertExpectationsForObjects(t, &mImporter, &mProcessor, &mExporter)

assert.ErrorIs(t, pImpl.WhyStopped(), errTestCancellation)
assert.ErrorIs(t, context.Cause(pImpl.ctx), errTestCancellation)
}

// TestPipelineCpuPidFiles tests that cpu and pid files are created when specified
Expand Down

0 comments on commit 1158f75

Please sign in to comment.