aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJimmy Hu <jimmy.hu@dexon.org>2019-03-14 19:57:53 +0800
committerJimmy Hu <jimmy.hu@dexon.org>2019-03-15 17:36:45 +0800
commit2dc5d2af481fbad9edaad0ed536a53f7b17542f3 (patch)
treed60a07ab1ef84d31ef9aa97643548a5d60bc4168
parent651282c0790c4d64ecdde2a7174a8f4f77a67e1c (diff)
downloaddexon-consensus-2dc5d2af481fbad9edaad0ed536a53f7b17542f3.tar
dexon-consensus-2dc5d2af481fbad9edaad0ed536a53f7b17542f3.tar.gz
dexon-consensus-2dc5d2af481fbad9edaad0ed536a53f7b17542f3.tar.bz2
dexon-consensus-2dc5d2af481fbad9edaad0ed536a53f7b17542f3.tar.lz
dexon-consensus-2dc5d2af481fbad9edaad0ed536a53f7b17542f3.tar.xz
dexon-consensus-2dc5d2af481fbad9edaad0ed536a53f7b17542f3.tar.zst
dexon-consensus-2dc5d2af481fbad9edaad0ed536a53f7b17542f3.zip
core: Add Recovery Interface (#463)
* core: Add Recovery Interface * core/syncer: modify recovery interface
-rw-r--r--core/interfaces.go9
-rw-r--r--core/syncer/terminator.go138
-rw-r--r--core/syncer/terminator_test.go122
3 files changed, 269 insertions, 0 deletions
diff --git a/core/interfaces.go b/core/interfaces.go
index 45a1fc7..7accac2 100644
--- a/core/interfaces.go
+++ b/core/interfaces.go
@@ -162,3 +162,12 @@ type Ticker interface {
// Retart the ticker and clear all internal data.
Restart()
}
+
+// Recovery interface for interacting with recovery information.
+type Recovery interface {
+ // ProposeSkipBlock proposes a skip block.
+ ProposeSkipBlock(height uint64) error
+
+ // Votes gets the number of votes of given height.
+ Votes(height uint64) (uint64, error)
+}
diff --git a/core/syncer/terminator.go b/core/syncer/terminator.go
new file mode 100644
index 0000000..03c22ef
--- /dev/null
+++ b/core/syncer/terminator.go
@@ -0,0 +1,138 @@
+// Copyright 2019 The dexon-consensus Authors
+// This file is part of the dexon-consensus-core library.
+//
+// The dexon-consensus-core library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus-core library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus-core library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package syncer
+
+import (
+ "context"
+ "time"
+
+ "github.com/dexon-foundation/dexon-consensus/common"
+ "github.com/dexon-foundation/dexon-consensus/core"
+ "github.com/dexon-foundation/dexon-consensus/core/types"
+ "github.com/dexon-foundation/dexon-consensus/core/utils"
+)
+
+type configReader interface {
+ Configuration(round uint64) *types.Config
+}
+
+// Terminator is reponsible for signaling if syncer object should be terminated.
+type Terminator struct {
+ recovery core.Recovery
+ configReader configReader
+ ping chan types.Position
+ polling time.Duration
+ ctx context.Context
+ cancel context.CancelFunc
+ logger common.Logger
+}
+
+// NewTerminator creats a new terminator object.
+func NewTerminator(
+ recovery core.Recovery,
+ configReader configReader,
+ polling time.Duration,
+ logger common.Logger) *Terminator {
+ tt := &Terminator{
+ recovery: recovery,
+ configReader: configReader,
+ ping: make(chan types.Position),
+ polling: polling,
+ logger: logger,
+ }
+ return tt
+}
+
+// Ping the terminator so it won't produce the termination signal.
+func (tt *Terminator) Ping(position types.Position) {
+ tt.ping <- position
+}
+
+// Start the terminator.
+func (tt *Terminator) Start(timeout time.Duration) {
+ tt.Stop()
+ tt.ctx, tt.cancel = context.WithCancel(context.Background())
+ go func() {
+ var lastPos types.Position
+ MonitorLoop:
+ for {
+ select {
+ case <-tt.ctx.Done():
+ return
+ default:
+ }
+ select {
+ case <-tt.ctx.Done():
+ return
+ case pos := <-tt.ping:
+ if !pos.Newer(lastPos) {
+ tt.logger.Warn("Ping with older height",
+ "pos", pos, "lastPos", lastPos)
+ continue
+ }
+ lastPos = pos
+ case <-time.After(timeout):
+ tt.logger.Info("Calling Recovery.ProposeSkipBlock",
+ "height", lastPos.Height)
+ tt.recovery.ProposeSkipBlock(lastPos.Height)
+ break MonitorLoop
+ }
+ }
+ go func() {
+ for {
+ select {
+ case <-tt.ctx.Done():
+ return
+ case <-tt.ping:
+ }
+ }
+ }()
+ defer tt.cancel()
+ threshold := uint64(
+ utils.GetConfigWithPanic(tt.configReader, lastPos.Round, tt.logger).
+ NotarySetSize / 2)
+ tt.logger.Info("Threshold for recovery", "votes", threshold)
+ ResetLoop:
+ for {
+ votes, err := tt.recovery.Votes(lastPos.Height)
+ if err != nil {
+ tt.logger.Error("Failed to get recovery votes", "height", lastPos.Height)
+ } else if votes > threshold {
+ tt.logger.Info("Threshold for recovery reached!")
+ break ResetLoop
+ }
+ select {
+ case <-tt.ctx.Done():
+ return
+ case <-time.After(tt.polling):
+ }
+ }
+ }()
+}
+
+// Stop the terminator.
+func (tt *Terminator) Stop() {
+ if tt.cancel != nil {
+ tt.cancel()
+ }
+}
+
+// Terminated return a closed channel if syncer should be terminated.
+func (tt *Terminator) Terminated() <-chan struct{} {
+ return tt.ctx.Done()
+}
diff --git a/core/syncer/terminator_test.go b/core/syncer/terminator_test.go
new file mode 100644
index 0000000..ecabb7b
--- /dev/null
+++ b/core/syncer/terminator_test.go
@@ -0,0 +1,122 @@
+// Copyright 2019 The dexon-consensus Authors
+// This file is part of the dexon-consensus library.
+//
+// The dexon-consensus library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version. //
+// The dexon-consensus library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package syncer
+
+import (
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/suite"
+
+ "github.com/dexon-foundation/dexon-consensus/common"
+ "github.com/dexon-foundation/dexon-consensus/core/types"
+)
+
+type TerminatorTestSuite struct {
+ suite.Suite
+}
+
+type testConfigAccessor struct {
+ notarySetSize uint32
+}
+
+func (cfg *testConfigAccessor) Configuration(uint64) *types.Config {
+ return &types.Config{
+ NotarySetSize: cfg.notarySetSize,
+ }
+}
+
+type recovery struct {
+ lock sync.RWMutex
+ votes map[uint64]uint64
+}
+
+func (rec *recovery) ProposeSkipBlock(height uint64) error {
+ rec.lock.Lock()
+ defer rec.lock.Unlock()
+ rec.votes[height]++
+ return nil
+}
+
+func (rec *recovery) Votes(height uint64) (uint64, error) {
+ rec.lock.RLock()
+ defer rec.lock.RUnlock()
+ return rec.votes[height], nil
+}
+
+func (s *TerminatorTestSuite) newTerminator(
+ notarySetSize uint32, polling time.Duration) (*Terminator, *recovery) {
+ cfg := &testConfigAccessor{
+ notarySetSize: notarySetSize,
+ }
+ recovery := &recovery{
+ votes: make(map[uint64]uint64),
+ }
+ return NewTerminator(recovery, cfg, polling, &common.NullLogger{}), recovery
+}
+
+func (s *TerminatorTestSuite) TestBasicUsage() {
+ polling := 50 * time.Millisecond
+ timeout := 50 * time.Millisecond
+ notarySet := uint32(24)
+ terminator, rec := s.newTerminator(notarySet, polling)
+ terminator.Start(timeout)
+ defer terminator.Stop()
+ pos := types.Position{
+ Height: 10,
+ }
+
+ for i := 0; i < 10; i++ {
+ pos.Height++
+ terminator.Ping(pos)
+ time.Sleep(timeout / 2)
+ select {
+ case <-terminator.Terminated():
+ s.FailNow("unexpected terminated")
+ default:
+ }
+ }
+
+ time.Sleep(timeout)
+ rec.lock.RLock()
+ s.Require().Equal(1, len(rec.votes))
+ s.Require().Equal(uint64(1), rec.votes[pos.Height])
+ rec.lock.RUnlock()
+
+ time.Sleep(polling * 2)
+ select {
+ case <-terminator.Terminated():
+ s.FailNow("unexpected terminated")
+ default:
+ }
+
+ rec.lock.Lock()
+ rec.votes[pos.Height] = uint64(notarySet/2 + 1)
+ rec.lock.Unlock()
+
+ time.Sleep(polling * 2)
+ select {
+ case <-terminator.Terminated():
+ default:
+ s.FailNow("expecting terminated")
+ }
+}
+
+func TestTerminator(t *testing.T) {
+ suite.Run(t, new(TerminatorTestSuite))
+}