Skip to content

Commit

Permalink
Merge pull request #3873 from oasisprotocol/kostko/stable/20.12.x/con…
Browse files Browse the repository at this point in the history
…s-graceful-halt

[BACKPORT/20.12.x] go/consensus: Gracefully handle halt
  • Loading branch information
kostko authored Apr 22, 2021
2 parents 76f75f2 + 2098172 commit 9d6f466
Show file tree
Hide file tree
Showing 15 changed files with 158 additions and 50 deletions.
1 change: 1 addition & 0 deletions .changelog/3755.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/consensus: Move upgrade logic to governance, gracefully handle halt
1 change: 1 addition & 0 deletions .changelog/3873.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/oasis-node: Dump correct block height on halt
1 change: 1 addition & 0 deletions .changelog/3877.internal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go: Ignore jwt-go vulns since we're not using the features
1 change: 1 addition & 0 deletions go/.nancy-ignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ CVE-2018-17848
CVE-2020-15114
CVE-2020-15136
CVE-2020-15115
CVE-2020-26160 # Until viper and etcd/prometheus are upgraded to not need jwt-go.
16 changes: 15 additions & 1 deletion go/common/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const (

maxRecvMsgSize = 104857600 // 100 MiB
maxSendMsgSize = 104857600 // 100 MiB

gracefulStopWaitPeriod = 5 * time.Second
)

var (
Expand Down Expand Up @@ -506,7 +508,19 @@ func (s *Server) Stop() {
}
default:
}
s.server.GracefulStop() // Repeated calls are ok.

// Attempt to stop gracefully, but if that doesn't work, stop forcibly.
gracefulCh := make(chan struct{})
go func() {
s.server.GracefulStop()
close(gracefulCh)
}()
select {
case <-gracefulCh:
case <-time.After(gracefulStopWaitPeriod):
s.Logger.Warn("graceful stop failed, forcing stop")
s.server.Stop()
}
s.server = nil
}
}
Expand Down
8 changes: 5 additions & 3 deletions go/consensus/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,16 +210,18 @@ type Backend interface {
GetAddresses() ([]node.ConsensusAddress, error)
}

// HaltHook is a function that gets called when consensus needs to halt for some reason.
type HaltHook func(ctx context.Context, blockHeight int64, epoch epochtime.EpochTime, err error)

// ServicesBackend is an interface for consensus backends which indicate support for
// communicating with consensus services.
//
// In case the feature is absent, these methods may return nil or ErrUnsupported.
type ServicesBackend interface {
ClientBackend

// RegisterHaltHook registers a function to be called when the
// consensus Halt epoch height is reached.
RegisterHaltHook(func(ctx context.Context, blockHeight int64, epoch epochtime.EpochTime))
// RegisterHaltHook registers a function to be called when the consensus needs to halt.
RegisterHaltHook(hook HaltHook)

// SubmissionManager returns the transaction submission manager.
SubmissionManager() SubmissionManager
Expand Down
32 changes: 16 additions & 16 deletions go/consensus/tendermint/abci/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (a *ApplicationServer) Register(app api.Application) error {

// RegisterHaltHook registers a function to be called when the
// consensus Halt epoch height is reached.
func (a *ApplicationServer) RegisterHaltHook(hook func(ctx context.Context, blockHeight int64, epoch epochtime.EpochTime)) {
func (a *ApplicationServer) RegisterHaltHook(hook consensus.HaltHook) {
a.mux.registerHaltHook(hook)
}

Expand Down Expand Up @@ -216,7 +216,7 @@ type abciMux struct {
lastBeginBlock int64
currentTime time.Time

haltHooks []func(context.Context, int64, epochtime.EpochTime)
haltHooks []consensus.HaltHook

// invalidatedTxs maps transaction hashes (hash.Hash) to a subscriber
// waiting for that transaction to become invalid.
Expand Down Expand Up @@ -255,7 +255,7 @@ func (mux *abciMux) watchInvalidatedTx(txHash hash.Hash) (<-chan error, pubsub.C
return resultCh, sub, nil
}

func (mux *abciMux) registerHaltHook(hook func(context.Context, int64, epochtime.EpochTime)) {
func (mux *abciMux) registerHaltHook(hook consensus.HaltHook) {
mux.Lock()
defer mux.Unlock()

Expand Down Expand Up @@ -358,6 +358,13 @@ func (mux *abciMux) InitChain(req types.RequestInitChain) types.ResponseInitChai
return resp
}

func (mux *abciMux) dispatchHaltHooks(blockHeight int64, currentEpoch epochtime.EpochTime, err error) {
for _, hook := range mux.haltHooks {
hook(mux.state.ctx, blockHeight, currentEpoch, err)
}
mux.logger.Debug("halt hook dispatch complete")
}

func (mux *abciMux) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginBlock {
blockHeight := mux.state.BlockHeight()

Expand Down Expand Up @@ -396,14 +403,10 @@ func (mux *abciMux) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginB
case nil:
// Everything ok.
case upgrade.ErrStopForUpgrade:
// Stop for upgrade -- but dispatch halt hooks first.
mux.logger.Debug("dispatching halt hooks before stopping for upgrade")
for _, hook := range mux.haltHooks {
hook(mux.state.ctx, blockHeight, currentEpoch)
}
mux.logger.Debug("halt hook dispatch complete")

panic("mux: reached upgrade epoch")
// Signal graceful stop for upgrade.
mux.logger.Debug("dispatching halt hooks for upgrade")
mux.dispatchHaltHooks(blockHeight, currentEpoch, err)
panic(err)
default:
panic(fmt.Sprintf("mux: error while trying to perform consensus upgrade: %v", err))
}
Expand All @@ -418,11 +421,8 @@ func (mux *abciMux) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginB
"block_height", blockHeight,
"epoch", mux.state.haltEpochHeight,
)
mux.logger.Debug("Dispatching halt hooks")
for _, hook := range mux.haltHooks {
hook(mux.state.ctx, blockHeight, mux.state.haltEpochHeight)
}
mux.logger.Debug("Halt hook dispatch complete")
mux.logger.Debug("dispatching halt hooks before halt epoch")
mux.dispatchHaltHooks(blockHeight, currentEpoch, nil)
return types.ResponseBeginBlock{}
case true:
if !mux.state.afterHaltEpoch(ctx) {
Expand Down
80 changes: 66 additions & 14 deletions go/consensus/tendermint/full/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"context"
"fmt"
"math/rand"
"path/filepath"
"strings"
"sync"
Expand Down Expand Up @@ -132,6 +133,9 @@ const (
// NOTE: this is only used during the initial sync.
syncWorkerLastBlockTimeDiffThreshold = 1 * time.Minute

minUpgradeStopWaitPeriod = 5 * time.Second
upgradeStopDelay = 30 * time.Second

// tmSubscriberID is the subscriber identifier used for all internal Tendermint pubsub
// subscriptions. If any other subscriber IDs need to be derived they will be under this prefix.
tmSubscriberID = "oasis-core"
Expand Down Expand Up @@ -181,8 +185,10 @@ type fullService struct { // nolint: maligned
isInitialized, isStarted bool
startedCh chan struct{}
syncedCh chan struct{}
quitCh chan struct{}

startFn func() error
startFn func() error
stopOnce sync.Once

nextSubscriberID uint64
}
Expand Down Expand Up @@ -219,6 +225,19 @@ func (t *fullService) Start() error {
return fmt.Errorf("tendermint: failed to start service: %w", err)
}

// Make sure the quit channel is closed when the node shuts down.
go func() {
select {
case <-t.quitCh:
case <-t.node.Quit():
select {
case <-t.quitCh:
default:
close(t.quitCh)
}
}
}()

// Start event dispatchers for all the service clients.
t.serviceClientsWg.Add(len(t.serviceClients))
for _, svc := range t.serviceClients {
Expand Down Expand Up @@ -247,11 +266,7 @@ func (t *fullService) Start() error {

// Implements service.BackgroundService.
func (t *fullService) Quit() <-chan struct{} {
if !t.started() {
return make(chan struct{})
}

return t.node.Quit()
return t.quitCh
}

// Implements service.BackgroundService.
Expand All @@ -266,14 +281,15 @@ func (t *fullService) Stop() {
return
}

t.failMonitor.markCleanShutdown()
if err := t.node.Stop(); err != nil {
t.Logger.Error("Error on stopping node", err)
}
t.stopOnce.Do(func() {
t.failMonitor.markCleanShutdown()
if err := t.node.Stop(); err != nil {
t.Logger.Error("Error on stopping node", err)
}

t.svcMgr.Stop()
t.mux.Stop()
t.node.Wait()
t.svcMgr.Stop()
t.mux.Stop()
})
}

func (t *fullService) Started() <-chan struct{} {
Expand Down Expand Up @@ -401,7 +417,7 @@ func (t *fullService) GetGenesisDocument(ctx context.Context) (*genesisAPI.Docum
return t.genesis, nil
}

func (t *fullService) RegisterHaltHook(hook func(context.Context, int64, epochtimeAPI.EpochTime)) {
func (t *fullService) RegisterHaltHook(hook consensusAPI.HaltHook) {
if !t.initialized() {
return
}
Expand Down Expand Up @@ -1301,6 +1317,41 @@ func (t *fullService) lazyInit() error {
t.client = tmcli.New(t.node)
t.failMonitor = newFailMonitor(t.ctx, t.Logger, t.node.ConsensusState().Wait)

// Register a halt hook that handles upgrades gracefully.
t.RegisterHaltHook(func(ctx context.Context, blockHeight int64, epoch epochtimeAPI.EpochTime, err error) {
if !errors.Is(err, upgradeAPI.ErrStopForUpgrade) {
return
}

// Mark this as a clean shutdown and request the node to stop gracefully.
t.failMonitor.markCleanShutdown()

// Wait before stopping to give time for P2P messages to propagate. Sleep for at least
// minUpgradeStopWaitPeriod or the configured commit timeout.
t.Logger.Info("waiting a bit before stopping the node for upgrade")
waitPeriod := minUpgradeStopWaitPeriod
if tc := t.genesis.Consensus.Parameters.TimeoutCommit; tc > waitPeriod {
waitPeriod = tc
}
time.Sleep(waitPeriod)

go func() {
// Sleep another period so there is some time between when consensus shuts down and
// when all the other services start shutting down.
//
// Randomize the period so that not all nodes shut down at the same time.
delay := getRandomValueFromInterval(0.5, rand.Float64(), upgradeStopDelay)
time.Sleep(delay)

t.Logger.Info("stopping the node for upgrade")
t.Stop()

// Close the quit channel early to force the node to stop. This is needed because
// the Tendermint node will otherwise never quit.
close(t.quitCh)
}()
})

return nil
}

Expand Down Expand Up @@ -1462,6 +1513,7 @@ func New(
dataDir: dataDir,
startedCh: make(chan struct{}),
syncedCh: make(chan struct{}),
quitCh: make(chan struct{}),
}

t.Logger.Info("starting a full consensus node")
Expand Down
18 changes: 18 additions & 0 deletions go/consensus/tendermint/full/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package full

import "time"

// Borrowed from https://github.com/cenkalti/backoff.

// Returns a random value from the following interval:
// [currentInterval - randomizationFactor * currentInterval, currentInterval + randomizationFactor * currentInterval].
func getRandomValueFromInterval(randomizationFactor, random float64, currentInterval time.Duration) time.Duration {
delta := randomizationFactor * float64(currentInterval)
minInterval := float64(currentInterval) - delta
maxInterval := float64(currentInterval) + delta

// Get a random value from the range [minInterval, maxInterval].
// The formula used below has a +1 because if the minInterval is 1 and the maxInterval is 3 then
// we want a 33% chance for selecting either 1, 2 or 3.
return time.Duration(minInterval + (random * (maxInterval - minInterval + 1)))
}
2 changes: 1 addition & 1 deletion go/consensus/tendermint/seed/seed.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (srv *seedService) SubmitTxNoWait(ctx context.Context, tx *transaction.Sign
}

// Implements Backend.
func (srv *seedService) RegisterHaltHook(func(ctx context.Context, blockHeight int64, epoch epochtime.EpochTime)) {
func (srv *seedService) RegisterHaltHook(consensus.HaltHook) {
panic(consensus.ErrUnsupported)
}

Expand Down
3 changes: 3 additions & 0 deletions go/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
module github.com/oasisprotocol/oasis-core/go

replace (
// CVE-2021-3121
github.com/gogo/protobuf => github.com/gogo/protobuf v1.3.2

// Updates the version used in spf13/cobra (dependency via tendermint) as
// there is no release yet with the fix. Remove once an updated release of
// spf13/cobra exists and tendermint is updated to include it.
Expand Down
Loading

0 comments on commit 9d6f466

Please sign in to comment.