// Copyright 2019 The dexon-consensus Authors
// This file is part of the dexon-consensus-core library.
//
// The dexon-consensus-core library is free software: you can redistribute it
// and/or modify it under the terms of the GNU Lesser General Public License as
// published by the Free Software Foundation, either version 3 of the License,
// or (at your option) any later version.
//
// The dexon-consensus-core library is distributed in the hope that it will be
// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the dexon-consensus-core library. If not, see
// <http://www.gnu.org/licenses/>.
package syncer
import (
"context"
"time"
"github.com/dexon-foundation/dexon-consensus/common"
"github.com/dexon-foundation/dexon-consensus/core"
"github.com/dexon-foundation/dexon-consensus/core/types"
"github.com/dexon-foundation/dexon-consensus/core/utils"
)
type configReader interface {
Configuration(round uint64) *types.Config
}
// WatchCat is reponsible for signaling if syncer object should be terminated.
type WatchCat struct {
recovery core.Recovery
configReader configReader
pat chan types.Position
polling time.Duration
ctx context.Context
cancel context.CancelFunc
logger common.Logger
}
// NewWatchCat creats a new WatchCat 🐱 object.
func NewWatchCat(
recovery core.Recovery,
configReader configReader,
polling time.Duration,
logger common.Logger) *WatchCat {
wc := &WatchCat{
recovery: recovery,
configReader: configReader,
pat: make(chan types.Position),
polling: polling,
logger: logger,
}
return wc
}
// Feed the WatchCat so it won't produce the termination signal.
func (wc *WatchCat) Feed(position types.Position) {
wc.pat <- position
}
// Start the WatchCat.
func (wc *WatchCat) Start(timeout time.Duration) {
wc.Stop()
wc.ctx, wc.cancel = context.WithCancel(context.Background())
go func() {
var lastPos types.Position
MonitorLoop:
for {
select {
case <-wc.ctx.Done():
return
default:
}
select {
case <-wc.ctx.Done():
return
case pos := <-wc.pat:
if !pos.Newer(lastPos) {
wc.logger.Warn("Feed with older height",
"pos", pos, "lastPos", lastPos)
continue
}
lastPos = pos
case <-time.After(timeout):
break MonitorLoop
}
}
go func() {
for {
select {
case <-wc.ctx.Done():
return
case <-wc.pat:
}
}
}()
defer wc.cancel()
proposed := false
threshold := uint64(
utils.GetConfigWithPanic(wc.configReader, lastPos.Round, wc.logger).
NotarySetSize / 2)
wc.logger.Info("Threshold for recovery", "votes", threshold)
ResetLoop:
for {
if !proposed {
wc.logger.Info("Calling Recovery.ProposeSkipBlock",
"height", lastPos.Height)
if err := wc.recovery.ProposeSkipBlock(lastPos.Height); err != nil {
wc.logger.Warn("Failed to proposeSkipBlock", "height", lastPos.Height, "error", err)
} else {
proposed = true
}
}
votes, err := wc.recovery.Votes(lastPos.Height)
if err != nil {
wc.logger.Error("Failed to get recovery votes", "height", lastPos.Height, "error", err)
} else if votes > threshold {
wc.logger.Info("Threshold for recovery reached!")
break ResetLoop
}
select {
case <-wc.ctx.Done():
return
case <-time.After(wc.polling):
}
}
}()
}
// Stop the WatchCat.
func (wc *WatchCat) Stop() {
if wc.cancel != nil {
wc.cancel()
}
}
// Meow return a closed channel if syncer should be terminated.
func (wc *WatchCat) Meow() <-chan struct{} {
return wc.ctx.Done()
}