diff options
Diffstat (limited to 'core/syncer/watch-cat.go')
-rw-r--r-- | core/syncer/watch-cat.go | 145 |
1 files changed, 145 insertions, 0 deletions
diff --git a/core/syncer/watch-cat.go b/core/syncer/watch-cat.go new file mode 100644 index 0000000..5ee7f62 --- /dev/null +++ b/core/syncer/watch-cat.go @@ -0,0 +1,145 @@ +// Copyright 2019 The dexon-consensus Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// <http://www.gnu.org/licenses/>. + +package syncer + +import ( + "context" + "time" + + "github.com/dexon-foundation/dexon-consensus/common" + "github.com/dexon-foundation/dexon-consensus/core" + "github.com/dexon-foundation/dexon-consensus/core/types" + "github.com/dexon-foundation/dexon-consensus/core/utils" +) + +type configReader interface { + Configuration(round uint64) *types.Config +} + +// WatchCat is reponsible for signaling if syncer object should be terminated. +type WatchCat struct { + recovery core.Recovery + configReader configReader + pat chan types.Position + polling time.Duration + ctx context.Context + cancel context.CancelFunc + logger common.Logger +} + +// NewWatchCat creats a new WatchCat 🐱 object. +func NewWatchCat( + recovery core.Recovery, + configReader configReader, + polling time.Duration, + logger common.Logger) *WatchCat { + wc := &WatchCat{ + recovery: recovery, + configReader: configReader, + pat: make(chan types.Position), + polling: polling, + logger: logger, + } + return wc +} + +// Feed the WatchCat so it won't produce the termination signal. +func (wc *WatchCat) Feed(position types.Position) { + wc.pat <- position +} + +// Start the WatchCat. +func (wc *WatchCat) Start(timeout time.Duration) { + wc.Stop() + wc.ctx, wc.cancel = context.WithCancel(context.Background()) + go func() { + var lastPos types.Position + MonitorLoop: + for { + select { + case <-wc.ctx.Done(): + return + default: + } + select { + case <-wc.ctx.Done(): + return + case pos := <-wc.pat: + if !pos.Newer(lastPos) { + wc.logger.Warn("Feed with older height", + "pos", pos, "lastPos", lastPos) + continue + } + lastPos = pos + case <-time.After(timeout): + break MonitorLoop + } + } + go func() { + for { + select { + case <-wc.ctx.Done(): + return + case <-wc.pat: + } + } + }() + defer wc.cancel() + proposed := false + threshold := uint64( + utils.GetConfigWithPanic(wc.configReader, lastPos.Round, wc.logger). + NotarySetSize / 2) + wc.logger.Info("Threshold for recovery", "votes", threshold) + ResetLoop: + for { + if !proposed { + wc.logger.Info("Calling Recovery.ProposeSkipBlock", + "height", lastPos.Height) + if err := wc.recovery.ProposeSkipBlock(lastPos.Height); err != nil { + wc.logger.Warn("Failed to proposeSkipBlock", "height", lastPos.Height, "error", err) + } else { + proposed = true + } + } + votes, err := wc.recovery.Votes(lastPos.Height) + if err != nil { + wc.logger.Error("Failed to get recovery votes", "height", lastPos.Height, "error", err) + } else if votes > threshold { + wc.logger.Info("Threshold for recovery reached!") + break ResetLoop + } + select { + case <-wc.ctx.Done(): + return + case <-time.After(wc.polling): + } + } + }() +} + +// Stop the WatchCat. +func (wc *WatchCat) Stop() { + if wc.cancel != nil { + wc.cancel() + } +} + +// Meow return a closed channel if syncer should be terminated. +func (wc *WatchCat) Meow() <-chan struct{} { + return wc.ctx.Done() +} |