diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2019-03-14 19:57:53 +0800 |
---|---|---|
committer | Jimmy Hu <jimmy.hu@dexon.org> | 2019-03-15 17:36:45 +0800 |
commit | 2dc5d2af481fbad9edaad0ed536a53f7b17542f3 (patch) | |
tree | d60a07ab1ef84d31ef9aa97643548a5d60bc4168 | |
parent | 651282c0790c4d64ecdde2a7174a8f4f77a67e1c (diff) | |
download | dexon-consensus-2dc5d2af481fbad9edaad0ed536a53f7b17542f3.tar dexon-consensus-2dc5d2af481fbad9edaad0ed536a53f7b17542f3.tar.gz dexon-consensus-2dc5d2af481fbad9edaad0ed536a53f7b17542f3.tar.bz2 dexon-consensus-2dc5d2af481fbad9edaad0ed536a53f7b17542f3.tar.lz dexon-consensus-2dc5d2af481fbad9edaad0ed536a53f7b17542f3.tar.xz dexon-consensus-2dc5d2af481fbad9edaad0ed536a53f7b17542f3.tar.zst dexon-consensus-2dc5d2af481fbad9edaad0ed536a53f7b17542f3.zip |
core: Add Recovery Interface (#463)
* core: Add Recovery Interface
* core/syncer: modify recovery interface
-rw-r--r-- | core/interfaces.go | 9 | ||||
-rw-r--r-- | core/syncer/terminator.go | 138 | ||||
-rw-r--r-- | core/syncer/terminator_test.go | 122 |
3 files changed, 269 insertions, 0 deletions
diff --git a/core/interfaces.go b/core/interfaces.go index 45a1fc7..7accac2 100644 --- a/core/interfaces.go +++ b/core/interfaces.go @@ -162,3 +162,12 @@ type Ticker interface { // Retart the ticker and clear all internal data. Restart() } + +// Recovery interface for interacting with recovery information. +type Recovery interface { + // ProposeSkipBlock proposes a skip block. + ProposeSkipBlock(height uint64) error + + // Votes gets the number of votes of given height. + Votes(height uint64) (uint64, error) +} diff --git a/core/syncer/terminator.go b/core/syncer/terminator.go new file mode 100644 index 0000000..03c22ef --- /dev/null +++ b/core/syncer/terminator.go @@ -0,0 +1,138 @@ +// Copyright 2019 The dexon-consensus Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// <http://www.gnu.org/licenses/>. + +package syncer + +import ( + "context" + "time" + + "github.com/dexon-foundation/dexon-consensus/common" + "github.com/dexon-foundation/dexon-consensus/core" + "github.com/dexon-foundation/dexon-consensus/core/types" + "github.com/dexon-foundation/dexon-consensus/core/utils" +) + +type configReader interface { + Configuration(round uint64) *types.Config +} + +// Terminator is reponsible for signaling if syncer object should be terminated. +type Terminator struct { + recovery core.Recovery + configReader configReader + ping chan types.Position + polling time.Duration + ctx context.Context + cancel context.CancelFunc + logger common.Logger +} + +// NewTerminator creats a new terminator object. +func NewTerminator( + recovery core.Recovery, + configReader configReader, + polling time.Duration, + logger common.Logger) *Terminator { + tt := &Terminator{ + recovery: recovery, + configReader: configReader, + ping: make(chan types.Position), + polling: polling, + logger: logger, + } + return tt +} + +// Ping the terminator so it won't produce the termination signal. +func (tt *Terminator) Ping(position types.Position) { + tt.ping <- position +} + +// Start the terminator. +func (tt *Terminator) Start(timeout time.Duration) { + tt.Stop() + tt.ctx, tt.cancel = context.WithCancel(context.Background()) + go func() { + var lastPos types.Position + MonitorLoop: + for { + select { + case <-tt.ctx.Done(): + return + default: + } + select { + case <-tt.ctx.Done(): + return + case pos := <-tt.ping: + if !pos.Newer(lastPos) { + tt.logger.Warn("Ping with older height", + "pos", pos, "lastPos", lastPos) + continue + } + lastPos = pos + case <-time.After(timeout): + tt.logger.Info("Calling Recovery.ProposeSkipBlock", + "height", lastPos.Height) + tt.recovery.ProposeSkipBlock(lastPos.Height) + break MonitorLoop + } + } + go func() { + for { + select { + case <-tt.ctx.Done(): + return + case <-tt.ping: + } + } + }() + defer tt.cancel() + threshold := uint64( + utils.GetConfigWithPanic(tt.configReader, lastPos.Round, tt.logger). + NotarySetSize / 2) + tt.logger.Info("Threshold for recovery", "votes", threshold) + ResetLoop: + for { + votes, err := tt.recovery.Votes(lastPos.Height) + if err != nil { + tt.logger.Error("Failed to get recovery votes", "height", lastPos.Height) + } else if votes > threshold { + tt.logger.Info("Threshold for recovery reached!") + break ResetLoop + } + select { + case <-tt.ctx.Done(): + return + case <-time.After(tt.polling): + } + } + }() +} + +// Stop the terminator. +func (tt *Terminator) Stop() { + if tt.cancel != nil { + tt.cancel() + } +} + +// Terminated return a closed channel if syncer should be terminated. +func (tt *Terminator) Terminated() <-chan struct{} { + return tt.ctx.Done() +} diff --git a/core/syncer/terminator_test.go b/core/syncer/terminator_test.go new file mode 100644 index 0000000..ecabb7b --- /dev/null +++ b/core/syncer/terminator_test.go @@ -0,0 +1,122 @@ +// Copyright 2019 The dexon-consensus Authors +// This file is part of the dexon-consensus library. +// +// The dexon-consensus library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. // +// The dexon-consensus library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus library. If not, see +// <http://www.gnu.org/licenses/>. + +package syncer + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/suite" + + "github.com/dexon-foundation/dexon-consensus/common" + "github.com/dexon-foundation/dexon-consensus/core/types" +) + +type TerminatorTestSuite struct { + suite.Suite +} + +type testConfigAccessor struct { + notarySetSize uint32 +} + +func (cfg *testConfigAccessor) Configuration(uint64) *types.Config { + return &types.Config{ + NotarySetSize: cfg.notarySetSize, + } +} + +type recovery struct { + lock sync.RWMutex + votes map[uint64]uint64 +} + +func (rec *recovery) ProposeSkipBlock(height uint64) error { + rec.lock.Lock() + defer rec.lock.Unlock() + rec.votes[height]++ + return nil +} + +func (rec *recovery) Votes(height uint64) (uint64, error) { + rec.lock.RLock() + defer rec.lock.RUnlock() + return rec.votes[height], nil +} + +func (s *TerminatorTestSuite) newTerminator( + notarySetSize uint32, polling time.Duration) (*Terminator, *recovery) { + cfg := &testConfigAccessor{ + notarySetSize: notarySetSize, + } + recovery := &recovery{ + votes: make(map[uint64]uint64), + } + return NewTerminator(recovery, cfg, polling, &common.NullLogger{}), recovery +} + +func (s *TerminatorTestSuite) TestBasicUsage() { + polling := 50 * time.Millisecond + timeout := 50 * time.Millisecond + notarySet := uint32(24) + terminator, rec := s.newTerminator(notarySet, polling) + terminator.Start(timeout) + defer terminator.Stop() + pos := types.Position{ + Height: 10, + } + + for i := 0; i < 10; i++ { + pos.Height++ + terminator.Ping(pos) + time.Sleep(timeout / 2) + select { + case <-terminator.Terminated(): + s.FailNow("unexpected terminated") + default: + } + } + + time.Sleep(timeout) + rec.lock.RLock() + s.Require().Equal(1, len(rec.votes)) + s.Require().Equal(uint64(1), rec.votes[pos.Height]) + rec.lock.RUnlock() + + time.Sleep(polling * 2) + select { + case <-terminator.Terminated(): + s.FailNow("unexpected terminated") + default: + } + + rec.lock.Lock() + rec.votes[pos.Height] = uint64(notarySet/2 + 1) + rec.lock.Unlock() + + time.Sleep(polling * 2) + select { + case <-terminator.Terminated(): + default: + s.FailNow("expecting terminated") + } +} + +func TestTerminator(t *testing.T) { + suite.Run(t, new(TerminatorTestSuite)) +} |