aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--core/syncer/watch-cat.go17
-rw-r--r--core/syncer/watch-cat_test.go9
2 files changed, 15 insertions, 11 deletions
diff --git a/core/syncer/watch-cat.go b/core/syncer/watch-cat.go
index 5ee7f62..d08bff9 100644
--- a/core/syncer/watch-cat.go
+++ b/core/syncer/watch-cat.go
@@ -34,8 +34,9 @@ type configReader interface {
// WatchCat is reponsible for signaling if syncer object should be terminated.
type WatchCat struct {
recovery core.Recovery
+ timeout time.Duration
configReader configReader
- pat chan types.Position
+ feed chan types.Position
polling time.Duration
ctx context.Context
cancel context.CancelFunc
@@ -47,11 +48,13 @@ func NewWatchCat(
recovery core.Recovery,
configReader configReader,
polling time.Duration,
+ timeout time.Duration,
logger common.Logger) *WatchCat {
wc := &WatchCat{
recovery: recovery,
+ timeout: timeout,
configReader: configReader,
- pat: make(chan types.Position),
+ feed: make(chan types.Position),
polling: polling,
logger: logger,
}
@@ -60,11 +63,11 @@ func NewWatchCat(
// Feed the WatchCat so it won't produce the termination signal.
func (wc *WatchCat) Feed(position types.Position) {
- wc.pat <- position
+ wc.feed <- position
}
// Start the WatchCat.
-func (wc *WatchCat) Start(timeout time.Duration) {
+func (wc *WatchCat) Start() {
wc.Stop()
wc.ctx, wc.cancel = context.WithCancel(context.Background())
go func() {
@@ -79,14 +82,14 @@ func (wc *WatchCat) Start(timeout time.Duration) {
select {
case <-wc.ctx.Done():
return
- case pos := <-wc.pat:
+ case pos := <-wc.feed:
if !pos.Newer(lastPos) {
wc.logger.Warn("Feed with older height",
"pos", pos, "lastPos", lastPos)
continue
}
lastPos = pos
- case <-time.After(timeout):
+ case <-time.After(wc.timeout):
break MonitorLoop
}
}
@@ -95,7 +98,7 @@ func (wc *WatchCat) Start(timeout time.Duration) {
select {
case <-wc.ctx.Done():
return
- case <-wc.pat:
+ case <-wc.feed:
}
}
}()
diff --git a/core/syncer/watch-cat_test.go b/core/syncer/watch-cat_test.go
index 1dbf479..103200d 100644
--- a/core/syncer/watch-cat_test.go
+++ b/core/syncer/watch-cat_test.go
@@ -60,22 +60,23 @@ func (rec *recovery) Votes(height uint64) (uint64, error) {
}
func (s *WatchCatTestSuite) newWatchCat(
- notarySetSize uint32, polling time.Duration) (*WatchCat, *recovery) {
+ notarySetSize uint32, polling, timeout time.Duration) (*WatchCat, *recovery) {
cfg := &testConfigAccessor{
notarySetSize: notarySetSize,
}
recovery := &recovery{
votes: make(map[uint64]uint64),
}
- return NewWatchCat(recovery, cfg, polling, &common.NullLogger{}), recovery
+ wc := NewWatchCat(recovery, cfg, polling, timeout, &common.NullLogger{})
+ return wc, recovery
}
func (s *WatchCatTestSuite) TestBasicUsage() {
polling := 50 * time.Millisecond
timeout := 50 * time.Millisecond
notarySet := uint32(24)
- watchCat, rec := s.newWatchCat(notarySet, polling)
- watchCat.Start(timeout)
+ watchCat, rec := s.newWatchCat(notarySet, polling, timeout)
+ watchCat.Start()
defer watchCat.Stop()
pos := types.Position{
Height: 10,