Skip to content

Commit

Permalink
Merge pull request #227 from uprendis/feature/fix-inconsistent-flush
Browse files Browse the repository at this point in the history
Fix inconsistent flush
  • Loading branch information
uprendis authored Jan 28, 2022
2 parents 2430682 + 8ea01ed commit ff1d6ca
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 26 deletions.
34 changes: 18 additions & 16 deletions gossip/c_llr_callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,16 @@ func (s *Service) processBlockVotes(bvs inter.LlrSignedBlockVotes) error {
}

func (s *Service) ProcessBlockVotes(bvs inter.LlrSignedBlockVotes) error {
defer s.mayCommit(false)
return s.processBlockVotes(bvs)
s.engineMu.Lock()
defer s.engineMu.Unlock()
err := s.processBlockVotes(bvs)
if err == nil {
s.mayCommit(false)
}
return err
}

func (s *Service) processFullBlockRecord(br ibr.LlrIdxFullBlockRecord) error {
func (s *Service) ProcessFullBlockRecord(br ibr.LlrIdxFullBlockRecord) error {
// engineMu should NOT be locked here
if s.store.HasBlock(br.Idx) {
return eventcheck.ErrAlreadyProcessedBR
Expand Down Expand Up @@ -143,15 +148,11 @@ func (s *Service) processFullBlockRecord(br ibr.LlrIdxFullBlockRecord) error {
}
}
updateLowestBlockToFill(br.Idx, s.store)
s.mayCommit(false)

return nil
}

func (s *Service) ProcessFullBlockRecord(br ibr.LlrIdxFullBlockRecord) error {
defer s.mayCommit(false)
return s.processFullBlockRecord(br)
}

func (s *Service) processRawEpochVote(epoch idx.Epoch, ev hash.Hash, val idx.Validator, vals *pos.Validators, llrs *LlrState) {
newWeight := s.store.AddLlrEpochVoteWeight(epoch, ev, val, vals.Len(), vals.GetWeightByIdx(val))
if newWeight >= vals.TotalWeight()/3+1 {
Expand Down Expand Up @@ -201,11 +202,16 @@ func (s *Service) processEpochVote(ev inter.LlrSignedEpochVote) error {
}

func (s *Service) ProcessEpochVote(ev inter.LlrSignedEpochVote) error {
defer s.mayCommit(false)
return s.processEpochVote(ev)
s.engineMu.Lock()
defer s.engineMu.Unlock()
err := s.processEpochVote(ev)
if err == nil {
s.mayCommit(false)
}
return err
}

func (s *Service) processFullEpochRecord(er ier.LlrIdxFullEpochRecord) error {
func (s *Service) ProcessFullEpochRecord(er ier.LlrIdxFullEpochRecord) error {
// engineMu should NOT be locked here
if s.store.HasHistoryBlockEpochState(er.Idx) {
return eventcheck.ErrAlreadyProcessedER
Expand All @@ -226,15 +232,11 @@ func (s *Service) processFullEpochRecord(er ier.LlrIdxFullEpochRecord) error {
s.engineMu.Lock()
defer s.engineMu.Unlock()
updateLowestEpochToFill(er.Idx, s.store)
s.mayCommit(false)

return nil
}

func (s *Service) ProcessFullEpochRecord(er ier.LlrIdxFullEpochRecord) error {
defer s.mayCommit(false)
return s.processFullEpochRecord(er)
}

func updateLowestBlockToFill(block idx.Block, store *Store) {
store.ModifyLlrState(func(llrs *LlrState) {
llrs.LowestBlockToFill = idx.Block(actualizeLowestIndex(uint64(llrs.LowestBlockToFill), uint64(block), func(u uint64) bool {
Expand Down
12 changes: 2 additions & 10 deletions gossip/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,11 +514,7 @@ func (h *handler) makeBvProcessor(checkers *eventcheck.Checkers) *bvprocessor.Pr
return bvprocessor.New(datasemaphore.New(h.config.Protocol.BVsSemaphoreLimit, getSemaphoreWarningFn("BVs")), h.config.Protocol.BvProcessor, bvprocessor.Callback{
// DAG callbacks
Item: bvprocessor.ItemCallback{
Process: func(bvs inter.LlrSignedBlockVotes) error {
h.engineMu.Lock()
defer h.engineMu.Unlock()
return h.process.BVs(bvs)
},
Process: h.process.BVs,
Released: func(bvs inter.LlrSignedBlockVotes, peer string, err error) {
if eventcheck.IsBan(err) {
log.Warn("Incoming BVs rejected", "BVs", bvs.Signed.Locator.ID(), "creator", bvs.Signed.Locator.Creator, "err", err)
Expand Down Expand Up @@ -562,11 +558,7 @@ func (h *handler) makeEpProcessor(checkers *eventcheck.Checkers) *epprocessor.Pr
return epprocessor.New(datasemaphore.New(h.config.Protocol.BVsSemaphoreLimit, getSemaphoreWarningFn("BR")), h.config.Protocol.EpProcessor, epprocessor.Callback{
// DAG callbacks
Item: epprocessor.ItemCallback{
ProcessEV: func(ev inter.LlrSignedEpochVote) error {
h.engineMu.Lock()
defer h.engineMu.Unlock()
return h.process.EV(ev)
},
ProcessEV: h.process.EV,
ProcessER: h.process.ER,
ReleasedEV: func(ev inter.LlrSignedEpochVote, peer string, err error) {
if eventcheck.IsBan(err) {
Expand Down

0 comments on commit ff1d6ca

Please sign in to comment.