diff options
Diffstat (limited to 'core/syncer')
-rw-r--r-- | core/syncer/watch-cat.go (renamed from core/syncer/terminator.go) | 87 | ||||
-rw-r--r-- | core/syncer/watch-cat_test.go (renamed from core/syncer/terminator_test.go) | 28 |
2 files changed, 61 insertions, 54 deletions
diff --git a/core/syncer/terminator.go b/core/syncer/watch-cat.go index 03c22ef..5ee7f62 100644 --- a/core/syncer/terminator.go +++ b/core/syncer/watch-cat.go @@ -31,108 +31,115 @@ type configReader interface { Configuration(round uint64) *types.Config } -// Terminator is reponsible for signaling if syncer object should be terminated. -type Terminator struct { +// WatchCat is reponsible for signaling if syncer object should be terminated. +type WatchCat struct { recovery core.Recovery configReader configReader - ping chan types.Position + pat chan types.Position polling time.Duration ctx context.Context cancel context.CancelFunc logger common.Logger } -// NewTerminator creats a new terminator object. -func NewTerminator( +// NewWatchCat creats a new WatchCat 🐱 object. +func NewWatchCat( recovery core.Recovery, configReader configReader, polling time.Duration, - logger common.Logger) *Terminator { - tt := &Terminator{ + logger common.Logger) *WatchCat { + wc := &WatchCat{ recovery: recovery, configReader: configReader, - ping: make(chan types.Position), + pat: make(chan types.Position), polling: polling, logger: logger, } - return tt + return wc } -// Ping the terminator so it won't produce the termination signal. -func (tt *Terminator) Ping(position types.Position) { - tt.ping <- position +// Feed the WatchCat so it won't produce the termination signal. +func (wc *WatchCat) Feed(position types.Position) { + wc.pat <- position } -// Start the terminator. -func (tt *Terminator) Start(timeout time.Duration) { - tt.Stop() - tt.ctx, tt.cancel = context.WithCancel(context.Background()) +// Start the WatchCat. +func (wc *WatchCat) Start(timeout time.Duration) { + wc.Stop() + wc.ctx, wc.cancel = context.WithCancel(context.Background()) go func() { var lastPos types.Position MonitorLoop: for { select { - case <-tt.ctx.Done(): + case <-wc.ctx.Done(): return default: } select { - case <-tt.ctx.Done(): + case <-wc.ctx.Done(): return - case pos := <-tt.ping: + case pos := <-wc.pat: if !pos.Newer(lastPos) { - tt.logger.Warn("Ping with older height", + wc.logger.Warn("Feed with older height", "pos", pos, "lastPos", lastPos) continue } lastPos = pos case <-time.After(timeout): - tt.logger.Info("Calling Recovery.ProposeSkipBlock", - "height", lastPos.Height) - tt.recovery.ProposeSkipBlock(lastPos.Height) break MonitorLoop } } go func() { for { select { - case <-tt.ctx.Done(): + case <-wc.ctx.Done(): return - case <-tt.ping: + case <-wc.pat: } } }() - defer tt.cancel() + defer wc.cancel() + proposed := false threshold := uint64( - utils.GetConfigWithPanic(tt.configReader, lastPos.Round, tt.logger). + utils.GetConfigWithPanic(wc.configReader, lastPos.Round, wc.logger). NotarySetSize / 2) - tt.logger.Info("Threshold for recovery", "votes", threshold) + wc.logger.Info("Threshold for recovery", "votes", threshold) ResetLoop: for { - votes, err := tt.recovery.Votes(lastPos.Height) + if !proposed { + wc.logger.Info("Calling Recovery.ProposeSkipBlock", + "height", lastPos.Height) + if err := wc.recovery.ProposeSkipBlock(lastPos.Height); err != nil { + wc.logger.Warn("Failed to proposeSkipBlock", "height", lastPos.Height, "error", err) + } else { + proposed = true + } + } + votes, err := wc.recovery.Votes(lastPos.Height) if err != nil { - tt.logger.Error("Failed to get recovery votes", "height", lastPos.Height) + wc.logger.Error("Failed to get recovery votes", "height", lastPos.Height, "error", err) } else if votes > threshold { - tt.logger.Info("Threshold for recovery reached!") + wc.logger.Info("Threshold for recovery reached!") break ResetLoop } select { - case <-tt.ctx.Done(): + case <-wc.ctx.Done(): return - case <-time.After(tt.polling): + case <-time.After(wc.polling): } } }() } -// Stop the terminator. -func (tt *Terminator) Stop() { - if tt.cancel != nil { - tt.cancel() +// Stop the WatchCat. +func (wc *WatchCat) Stop() { + if wc.cancel != nil { + wc.cancel() } } -// Terminated return a closed channel if syncer should be terminated. -func (tt *Terminator) Terminated() <-chan struct{} { - return tt.ctx.Done() +// Meow return a closed channel if syncer should be terminated. +func (wc *WatchCat) Meow() <-chan struct{} { + return wc.ctx.Done() } diff --git a/core/syncer/terminator_test.go b/core/syncer/watch-cat_test.go index ecabb7b..1dbf479 100644 --- a/core/syncer/terminator_test.go +++ b/core/syncer/watch-cat_test.go @@ -27,7 +27,7 @@ import ( "github.com/dexon-foundation/dexon-consensus/core/types" ) -type TerminatorTestSuite struct { +type WatchCatTestSuite struct { suite.Suite } @@ -59,34 +59,34 @@ func (rec *recovery) Votes(height uint64) (uint64, error) { return rec.votes[height], nil } -func (s *TerminatorTestSuite) newTerminator( - notarySetSize uint32, polling time.Duration) (*Terminator, *recovery) { +func (s *WatchCatTestSuite) newWatchCat( + notarySetSize uint32, polling time.Duration) (*WatchCat, *recovery) { cfg := &testConfigAccessor{ notarySetSize: notarySetSize, } recovery := &recovery{ votes: make(map[uint64]uint64), } - return NewTerminator(recovery, cfg, polling, &common.NullLogger{}), recovery + return NewWatchCat(recovery, cfg, polling, &common.NullLogger{}), recovery } -func (s *TerminatorTestSuite) TestBasicUsage() { +func (s *WatchCatTestSuite) TestBasicUsage() { polling := 50 * time.Millisecond timeout := 50 * time.Millisecond notarySet := uint32(24) - terminator, rec := s.newTerminator(notarySet, polling) - terminator.Start(timeout) - defer terminator.Stop() + watchCat, rec := s.newWatchCat(notarySet, polling) + watchCat.Start(timeout) + defer watchCat.Stop() pos := types.Position{ Height: 10, } for i := 0; i < 10; i++ { pos.Height++ - terminator.Ping(pos) + watchCat.Feed(pos) time.Sleep(timeout / 2) select { - case <-terminator.Terminated(): + case <-watchCat.Meow(): s.FailNow("unexpected terminated") default: } @@ -100,7 +100,7 @@ func (s *TerminatorTestSuite) TestBasicUsage() { time.Sleep(polling * 2) select { - case <-terminator.Terminated(): + case <-watchCat.Meow(): s.FailNow("unexpected terminated") default: } @@ -111,12 +111,12 @@ func (s *TerminatorTestSuite) TestBasicUsage() { time.Sleep(polling * 2) select { - case <-terminator.Terminated(): + case <-watchCat.Meow(): default: s.FailNow("expecting terminated") } } -func TestTerminator(t *testing.T) { - suite.Run(t, new(TerminatorTestSuite)) +func TestWatchCat(t *testing.T) { + suite.Run(t, new(WatchCatTestSuite)) } |