diff options
-rw-r--r-- | core/syncer/watch-cat.go | 17 | ||||
-rw-r--r-- | core/syncer/watch-cat_test.go | 9 |
2 files changed, 15 insertions, 11 deletions
diff --git a/core/syncer/watch-cat.go b/core/syncer/watch-cat.go index 5ee7f62..d08bff9 100644 --- a/core/syncer/watch-cat.go +++ b/core/syncer/watch-cat.go @@ -34,8 +34,9 @@ type configReader interface { // WatchCat is reponsible for signaling if syncer object should be terminated. type WatchCat struct { recovery core.Recovery + timeout time.Duration configReader configReader - pat chan types.Position + feed chan types.Position polling time.Duration ctx context.Context cancel context.CancelFunc @@ -47,11 +48,13 @@ func NewWatchCat( recovery core.Recovery, configReader configReader, polling time.Duration, + timeout time.Duration, logger common.Logger) *WatchCat { wc := &WatchCat{ recovery: recovery, + timeout: timeout, configReader: configReader, - pat: make(chan types.Position), + feed: make(chan types.Position), polling: polling, logger: logger, } @@ -60,11 +63,11 @@ func NewWatchCat( // Feed the WatchCat so it won't produce the termination signal. func (wc *WatchCat) Feed(position types.Position) { - wc.pat <- position + wc.feed <- position } // Start the WatchCat. -func (wc *WatchCat) Start(timeout time.Duration) { +func (wc *WatchCat) Start() { wc.Stop() wc.ctx, wc.cancel = context.WithCancel(context.Background()) go func() { @@ -79,14 +82,14 @@ func (wc *WatchCat) Start(timeout time.Duration) { select { case <-wc.ctx.Done(): return - case pos := <-wc.pat: + case pos := <-wc.feed: if !pos.Newer(lastPos) { wc.logger.Warn("Feed with older height", "pos", pos, "lastPos", lastPos) continue } lastPos = pos - case <-time.After(timeout): + case <-time.After(wc.timeout): break MonitorLoop } } @@ -95,7 +98,7 @@ func (wc *WatchCat) Start(timeout time.Duration) { select { case <-wc.ctx.Done(): return - case <-wc.pat: + case <-wc.feed: } } }() diff --git a/core/syncer/watch-cat_test.go b/core/syncer/watch-cat_test.go index 1dbf479..103200d 100644 --- a/core/syncer/watch-cat_test.go +++ b/core/syncer/watch-cat_test.go @@ -60,22 +60,23 @@ func (rec *recovery) Votes(height uint64) (uint64, error) { } func (s *WatchCatTestSuite) newWatchCat( - notarySetSize uint32, polling time.Duration) (*WatchCat, *recovery) { + notarySetSize uint32, polling, timeout time.Duration) (*WatchCat, *recovery) { cfg := &testConfigAccessor{ notarySetSize: notarySetSize, } recovery := &recovery{ votes: make(map[uint64]uint64), } - return NewWatchCat(recovery, cfg, polling, &common.NullLogger{}), recovery + wc := NewWatchCat(recovery, cfg, polling, timeout, &common.NullLogger{}) + return wc, recovery } func (s *WatchCatTestSuite) TestBasicUsage() { polling := 50 * time.Millisecond timeout := 50 * time.Millisecond notarySet := uint32(24) - watchCat, rec := s.newWatchCat(notarySet, polling) - watchCat.Start(timeout) + watchCat, rec := s.newWatchCat(notarySet, polling, timeout) + watchCat.Start() defer watchCat.Stop() pos := types.Position{ Height: 10, |