From 6a127c42323b9b5cdde1cdb17e385d22ef9dfd10 Mon Sep 17 00:00:00 2001 From: Jimmy Hu Date: Fri, 15 Mar 2019 11:50:43 +0800 Subject: core/syncer: rename terminator to watchcat (#491) * core/syncer: rename terminator to watchcat * Add error log * Rename Pat to Feed --- core/syncer/terminator.go | 138 --------------------------------------- core/syncer/terminator_test.go | 122 ---------------------------------- core/syncer/watch-cat.go | 145 +++++++++++++++++++++++++++++++++++++++++ core/syncer/watch-cat_test.go | 122 ++++++++++++++++++++++++++++++++++ 4 files changed, 267 insertions(+), 260 deletions(-) delete mode 100644 core/syncer/terminator.go delete mode 100644 core/syncer/terminator_test.go create mode 100644 core/syncer/watch-cat.go create mode 100644 core/syncer/watch-cat_test.go diff --git a/core/syncer/terminator.go b/core/syncer/terminator.go deleted file mode 100644 index 03c22ef..0000000 --- a/core/syncer/terminator.go +++ /dev/null @@ -1,138 +0,0 @@ -// Copyright 2019 The dexon-consensus Authors -// This file is part of the dexon-consensus-core library. -// -// The dexon-consensus-core library is free software: you can redistribute it -// and/or modify it under the terms of the GNU Lesser General Public License as -// published by the Free Software Foundation, either version 3 of the License, -// or (at your option) any later version. -// -// The dexon-consensus-core library is distributed in the hope that it will be -// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser -// General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the dexon-consensus-core library. If not, see -// . - -package syncer - -import ( - "context" - "time" - - "github.com/dexon-foundation/dexon-consensus/common" - "github.com/dexon-foundation/dexon-consensus/core" - "github.com/dexon-foundation/dexon-consensus/core/types" - "github.com/dexon-foundation/dexon-consensus/core/utils" -) - -type configReader interface { - Configuration(round uint64) *types.Config -} - -// Terminator is reponsible for signaling if syncer object should be terminated. -type Terminator struct { - recovery core.Recovery - configReader configReader - ping chan types.Position - polling time.Duration - ctx context.Context - cancel context.CancelFunc - logger common.Logger -} - -// NewTerminator creats a new terminator object. -func NewTerminator( - recovery core.Recovery, - configReader configReader, - polling time.Duration, - logger common.Logger) *Terminator { - tt := &Terminator{ - recovery: recovery, - configReader: configReader, - ping: make(chan types.Position), - polling: polling, - logger: logger, - } - return tt -} - -// Ping the terminator so it won't produce the termination signal. -func (tt *Terminator) Ping(position types.Position) { - tt.ping <- position -} - -// Start the terminator. -func (tt *Terminator) Start(timeout time.Duration) { - tt.Stop() - tt.ctx, tt.cancel = context.WithCancel(context.Background()) - go func() { - var lastPos types.Position - MonitorLoop: - for { - select { - case <-tt.ctx.Done(): - return - default: - } - select { - case <-tt.ctx.Done(): - return - case pos := <-tt.ping: - if !pos.Newer(lastPos) { - tt.logger.Warn("Ping with older height", - "pos", pos, "lastPos", lastPos) - continue - } - lastPos = pos - case <-time.After(timeout): - tt.logger.Info("Calling Recovery.ProposeSkipBlock", - "height", lastPos.Height) - tt.recovery.ProposeSkipBlock(lastPos.Height) - break MonitorLoop - } - } - go func() { - for { - select { - case <-tt.ctx.Done(): - return - case <-tt.ping: - } - } - }() - defer tt.cancel() - threshold := uint64( - utils.GetConfigWithPanic(tt.configReader, lastPos.Round, tt.logger). - NotarySetSize / 2) - tt.logger.Info("Threshold for recovery", "votes", threshold) - ResetLoop: - for { - votes, err := tt.recovery.Votes(lastPos.Height) - if err != nil { - tt.logger.Error("Failed to get recovery votes", "height", lastPos.Height) - } else if votes > threshold { - tt.logger.Info("Threshold for recovery reached!") - break ResetLoop - } - select { - case <-tt.ctx.Done(): - return - case <-time.After(tt.polling): - } - } - }() -} - -// Stop the terminator. -func (tt *Terminator) Stop() { - if tt.cancel != nil { - tt.cancel() - } -} - -// Terminated return a closed channel if syncer should be terminated. -func (tt *Terminator) Terminated() <-chan struct{} { - return tt.ctx.Done() -} diff --git a/core/syncer/terminator_test.go b/core/syncer/terminator_test.go deleted file mode 100644 index ecabb7b..0000000 --- a/core/syncer/terminator_test.go +++ /dev/null @@ -1,122 +0,0 @@ -// Copyright 2019 The dexon-consensus Authors -// This file is part of the dexon-consensus library. -// -// The dexon-consensus library is free software: you can redistribute it -// and/or modify it under the terms of the GNU Lesser General Public License as -// published by the Free Software Foundation, either version 3 of the License, -// or (at your option) any later version. // -// The dexon-consensus library is distributed in the hope that it will be -// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser -// General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the dexon-consensus library. If not, see -// . - -package syncer - -import ( - "sync" - "testing" - "time" - - "github.com/stretchr/testify/suite" - - "github.com/dexon-foundation/dexon-consensus/common" - "github.com/dexon-foundation/dexon-consensus/core/types" -) - -type TerminatorTestSuite struct { - suite.Suite -} - -type testConfigAccessor struct { - notarySetSize uint32 -} - -func (cfg *testConfigAccessor) Configuration(uint64) *types.Config { - return &types.Config{ - NotarySetSize: cfg.notarySetSize, - } -} - -type recovery struct { - lock sync.RWMutex - votes map[uint64]uint64 -} - -func (rec *recovery) ProposeSkipBlock(height uint64) error { - rec.lock.Lock() - defer rec.lock.Unlock() - rec.votes[height]++ - return nil -} - -func (rec *recovery) Votes(height uint64) (uint64, error) { - rec.lock.RLock() - defer rec.lock.RUnlock() - return rec.votes[height], nil -} - -func (s *TerminatorTestSuite) newTerminator( - notarySetSize uint32, polling time.Duration) (*Terminator, *recovery) { - cfg := &testConfigAccessor{ - notarySetSize: notarySetSize, - } - recovery := &recovery{ - votes: make(map[uint64]uint64), - } - return NewTerminator(recovery, cfg, polling, &common.NullLogger{}), recovery -} - -func (s *TerminatorTestSuite) TestBasicUsage() { - polling := 50 * time.Millisecond - timeout := 50 * time.Millisecond - notarySet := uint32(24) - terminator, rec := s.newTerminator(notarySet, polling) - terminator.Start(timeout) - defer terminator.Stop() - pos := types.Position{ - Height: 10, - } - - for i := 0; i < 10; i++ { - pos.Height++ - terminator.Ping(pos) - time.Sleep(timeout / 2) - select { - case <-terminator.Terminated(): - s.FailNow("unexpected terminated") - default: - } - } - - time.Sleep(timeout) - rec.lock.RLock() - s.Require().Equal(1, len(rec.votes)) - s.Require().Equal(uint64(1), rec.votes[pos.Height]) - rec.lock.RUnlock() - - time.Sleep(polling * 2) - select { - case <-terminator.Terminated(): - s.FailNow("unexpected terminated") - default: - } - - rec.lock.Lock() - rec.votes[pos.Height] = uint64(notarySet/2 + 1) - rec.lock.Unlock() - - time.Sleep(polling * 2) - select { - case <-terminator.Terminated(): - default: - s.FailNow("expecting terminated") - } -} - -func TestTerminator(t *testing.T) { - suite.Run(t, new(TerminatorTestSuite)) -} diff --git a/core/syncer/watch-cat.go b/core/syncer/watch-cat.go new file mode 100644 index 0000000..5ee7f62 --- /dev/null +++ b/core/syncer/watch-cat.go @@ -0,0 +1,145 @@ +// Copyright 2019 The dexon-consensus Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// . + +package syncer + +import ( + "context" + "time" + + "github.com/dexon-foundation/dexon-consensus/common" + "github.com/dexon-foundation/dexon-consensus/core" + "github.com/dexon-foundation/dexon-consensus/core/types" + "github.com/dexon-foundation/dexon-consensus/core/utils" +) + +type configReader interface { + Configuration(round uint64) *types.Config +} + +// WatchCat is reponsible for signaling if syncer object should be terminated. +type WatchCat struct { + recovery core.Recovery + configReader configReader + pat chan types.Position + polling time.Duration + ctx context.Context + cancel context.CancelFunc + logger common.Logger +} + +// NewWatchCat creats a new WatchCat 🐱 object. +func NewWatchCat( + recovery core.Recovery, + configReader configReader, + polling time.Duration, + logger common.Logger) *WatchCat { + wc := &WatchCat{ + recovery: recovery, + configReader: configReader, + pat: make(chan types.Position), + polling: polling, + logger: logger, + } + return wc +} + +// Feed the WatchCat so it won't produce the termination signal. +func (wc *WatchCat) Feed(position types.Position) { + wc.pat <- position +} + +// Start the WatchCat. +func (wc *WatchCat) Start(timeout time.Duration) { + wc.Stop() + wc.ctx, wc.cancel = context.WithCancel(context.Background()) + go func() { + var lastPos types.Position + MonitorLoop: + for { + select { + case <-wc.ctx.Done(): + return + default: + } + select { + case <-wc.ctx.Done(): + return + case pos := <-wc.pat: + if !pos.Newer(lastPos) { + wc.logger.Warn("Feed with older height", + "pos", pos, "lastPos", lastPos) + continue + } + lastPos = pos + case <-time.After(timeout): + break MonitorLoop + } + } + go func() { + for { + select { + case <-wc.ctx.Done(): + return + case <-wc.pat: + } + } + }() + defer wc.cancel() + proposed := false + threshold := uint64( + utils.GetConfigWithPanic(wc.configReader, lastPos.Round, wc.logger). + NotarySetSize / 2) + wc.logger.Info("Threshold for recovery", "votes", threshold) + ResetLoop: + for { + if !proposed { + wc.logger.Info("Calling Recovery.ProposeSkipBlock", + "height", lastPos.Height) + if err := wc.recovery.ProposeSkipBlock(lastPos.Height); err != nil { + wc.logger.Warn("Failed to proposeSkipBlock", "height", lastPos.Height, "error", err) + } else { + proposed = true + } + } + votes, err := wc.recovery.Votes(lastPos.Height) + if err != nil { + wc.logger.Error("Failed to get recovery votes", "height", lastPos.Height, "error", err) + } else if votes > threshold { + wc.logger.Info("Threshold for recovery reached!") + break ResetLoop + } + select { + case <-wc.ctx.Done(): + return + case <-time.After(wc.polling): + } + } + }() +} + +// Stop the WatchCat. +func (wc *WatchCat) Stop() { + if wc.cancel != nil { + wc.cancel() + } +} + +// Meow return a closed channel if syncer should be terminated. +func (wc *WatchCat) Meow() <-chan struct{} { + return wc.ctx.Done() +} diff --git a/core/syncer/watch-cat_test.go b/core/syncer/watch-cat_test.go new file mode 100644 index 0000000..1dbf479 --- /dev/null +++ b/core/syncer/watch-cat_test.go @@ -0,0 +1,122 @@ +// Copyright 2019 The dexon-consensus Authors +// This file is part of the dexon-consensus library. +// +// The dexon-consensus library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. // +// The dexon-consensus library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus library. If not, see +// . + +package syncer + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/suite" + + "github.com/dexon-foundation/dexon-consensus/common" + "github.com/dexon-foundation/dexon-consensus/core/types" +) + +type WatchCatTestSuite struct { + suite.Suite +} + +type testConfigAccessor struct { + notarySetSize uint32 +} + +func (cfg *testConfigAccessor) Configuration(uint64) *types.Config { + return &types.Config{ + NotarySetSize: cfg.notarySetSize, + } +} + +type recovery struct { + lock sync.RWMutex + votes map[uint64]uint64 +} + +func (rec *recovery) ProposeSkipBlock(height uint64) error { + rec.lock.Lock() + defer rec.lock.Unlock() + rec.votes[height]++ + return nil +} + +func (rec *recovery) Votes(height uint64) (uint64, error) { + rec.lock.RLock() + defer rec.lock.RUnlock() + return rec.votes[height], nil +} + +func (s *WatchCatTestSuite) newWatchCat( + notarySetSize uint32, polling time.Duration) (*WatchCat, *recovery) { + cfg := &testConfigAccessor{ + notarySetSize: notarySetSize, + } + recovery := &recovery{ + votes: make(map[uint64]uint64), + } + return NewWatchCat(recovery, cfg, polling, &common.NullLogger{}), recovery +} + +func (s *WatchCatTestSuite) TestBasicUsage() { + polling := 50 * time.Millisecond + timeout := 50 * time.Millisecond + notarySet := uint32(24) + watchCat, rec := s.newWatchCat(notarySet, polling) + watchCat.Start(timeout) + defer watchCat.Stop() + pos := types.Position{ + Height: 10, + } + + for i := 0; i < 10; i++ { + pos.Height++ + watchCat.Feed(pos) + time.Sleep(timeout / 2) + select { + case <-watchCat.Meow(): + s.FailNow("unexpected terminated") + default: + } + } + + time.Sleep(timeout) + rec.lock.RLock() + s.Require().Equal(1, len(rec.votes)) + s.Require().Equal(uint64(1), rec.votes[pos.Height]) + rec.lock.RUnlock() + + time.Sleep(polling * 2) + select { + case <-watchCat.Meow(): + s.FailNow("unexpected terminated") + default: + } + + rec.lock.Lock() + rec.votes[pos.Height] = uint64(notarySet/2 + 1) + rec.lock.Unlock() + + time.Sleep(polling * 2) + select { + case <-watchCat.Meow(): + default: + s.FailNow("expecting terminated") + } +} + +func TestWatchCat(t *testing.T) { + suite.Run(t, new(WatchCatTestSuite)) +} -- cgit v1.2.3