// Copyright 2018 The dexon-consensus Authors
// This file is part of the dexon-consensus library.
//
// The dexon-consensus library is free software: you can redistribute it
// and/or modify it under the terms of the GNU Lesser General Public License as
// published by the Free Software Foundation, either version 3 of the License,
// or (at your option) any later version.
//
// The dexon-consensus library is distributed in the hope that it will be
// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the dexon-consensus library. If not, see
// <http://www.gnu.org/licenses/>.
package integration
import (
"fmt"
"time"
"github.com/dexon-foundation/dexon-consensus/common"
"github.com/dexon-foundation/dexon-consensus/core"
"github.com/dexon-foundation/dexon-consensus/core/crypto"
"github.com/dexon-foundation/dexon-consensus/core/db"
"github.com/dexon-foundation/dexon-consensus/core/test"
"github.com/dexon-foundation/dexon-consensus/core/types"
"github.com/dexon-foundation/dexon-consensus/core/utils"
)
type consensusEventType int
const (
evtProposeBlock consensusEventType = iota
evtReceiveBlock
)
type consensusEventPayload struct {
Type consensusEventType
PiggyBack interface{}
}
// newProposeBlockEvent constructs an test.Event that would trigger
// block proposing.
func newProposeBlockEvent(nID types.NodeID,
roundID uint64, chainID uint32, when time.Time) *test.Event {
return test.NewEvent(nID, when, &consensusEventPayload{
Type: evtProposeBlock,
PiggyBack: &struct {
round uint64
chain uint32
}{roundID, chainID},
})
}
// newReceiveBlockEvent constructs an test.Event that would trigger
// block received.
func newReceiveBlockEvent(
nID types.NodeID, when time.Time, block *types.Block) *test.Event {
return test.NewEvent(nID, when, &consensusEventPayload{
Type: evtReceiveBlock,
PiggyBack: block,
})
}
// Node is designed to work with test.Scheduler.
type Node struct {
ID types.NodeID
ownChains []uint32
roundEndTimes []time.Time
roundToNotify uint64
lattice *core.Lattice
appModule *test.App
stateModule *test.State
govModule *test.Governance
dbModule db.Database
broadcastTargets map[types.NodeID]struct{}
networkLatency test.LatencyModel
proposingLatency test.LatencyModel
prevFinalHeight uint64
pendings []*types.Block
prevHash common.Hash
// This variable caches the maximum NumChains seen by this node when
// it's notified for round switching.
latticeMaxNumChains uint32
}
// newNode constructs an instance of Node.
func newNode(
gov *test.Governance,
privateKey crypto.PrivateKey,
dMoment time.Time,
ownChains []uint32,
networkLatency test.LatencyModel,
proposingLatency test.LatencyModel) (*Node, error) {
// Load all configs prepared in core.Governance into core.Lattice.
copiedGov := gov.Clone()
configs := loadAllConfigs(copiedGov)
// Setup db.
dbInst, err := db.NewMemBackedDB()
if err != nil {
return nil, err
}
// Setup test.App
app := test.NewApp(0, copiedGov)
// Setup lattice instance.
lattice := core.NewLattice(
dMoment,
0,
configs[0],
utils.NewSigner(privateKey),
app,
app,
dbInst,
&common.NullLogger{})
n := &Node{
ID: types.NewNodeID(privateKey.PublicKey()),
ownChains: ownChains,
roundEndTimes: genRoundEndTimes(configs, dMoment),
roundToNotify: 2,
networkLatency: networkLatency,
proposingLatency: proposingLatency,
appModule: app,
stateModule: copiedGov.State(),
dbModule: dbInst,
govModule: copiedGov,
lattice: lattice,
latticeMaxNumChains: configs[0].NumChains,
}
for idx, config := range configs[1:] {
if err := lattice.AppendConfig(uint64(idx+1), config); err != nil {
return nil, err
}
if config.NumChains > n.latticeMaxNumChains {
n.latticeMaxNumChains = config.NumChains
}
}
return n, nil
}
// Handle implements test.EventHandler interface.
func (n *Node) Handle(e *test.Event) (events []*test.Event) {
payload := e.Payload.(*consensusEventPayload)
switch payload.Type {
case evtProposeBlock:
events, e.ExecError = n.handleProposeBlock(e.Time, payload.PiggyBack)
case evtReceiveBlock:
events, e.ExecError = n.handleReceiveBlock(payload.PiggyBack)
default:
panic(fmt.Errorf("unknown consensus event type: %v", payload.Type))
}
return
}
func (n *Node) handleProposeBlock(when time.Time, payload interface{}) (
events []*test.Event, err error) {
pos := payload.(*struct {
round uint64
chain uint32
})
b, err := n.prepareBlock(pos.round, pos.chain, when)
if err != nil {
if err == utils.ErrInvalidChainID {
// This chain is not included in this round, retry in next round.
events = append(events, newProposeBlockEvent(
n.ID, b.Position.Round+1, b.Position.ChainID,
n.roundEndTimes[b.Position.Round]))
}
return
}
if events, err = n.processBlock(b); err != nil {
// It's shouldn't be error when prepared.
panic(err)
}
// Create 'block received' event for each other nodes.
for nID := range n.broadcastTargets {
events = append(events, newReceiveBlockEvent(
nID, when.Add(n.networkLatency.Delay()), b.Clone()))
}
// Create next 'block proposing' event for this nodes.
events = append(events, newProposeBlockEvent(n.ID,
b.Position.Round,
b.Position.ChainID,
when.Add(n.proposingLatency.Delay())))
return
}
func (n *Node) handleReceiveBlock(piggyback interface{}) (
events []*test.Event, err error) {
events, err = n.processBlock(piggyback.(*types.Block))
if err != nil {
panic(err)
}
return
}
func (n *Node) prepareBlock(
round uint64, chainID uint32, when time.Time) (b *types.Block, err error) {
b = &types.Block{
Position: types.Position{
Round: round,
ChainID: chainID,
}}
if err = n.lattice.PrepareBlock(b, when); err != nil {
if err == core.ErrRoundNotSwitch {
b.Position.Round++
err = n.lattice.PrepareBlock(b, when)
}
}
return
}
func (n *Node) processBlock(b *types.Block) (events []*test.Event, err error) {
// TODO(mission): this segment of code is identical to testLatticeMgr in
// core/lattice_test.go, except the compaction-chain part.
var (
delivered []*types.Block
)
n.pendings = append([]*types.Block{b}, n.pendings...)
for {
var (
newPendings []*types.Block
tmpDelivered []*types.Block
tmpErr error
)
updated := false
for _, p := range n.pendings {
if tmpErr = n.lattice.SanityCheck(p, false); tmpErr != nil {
if tmpErr == core.ErrRetrySanityCheckLater {
newPendings = append(newPendings, p)
} else {
// Those blocks are prepared by lattice module, they should
// not be wrong.
panic(tmpErr)
}
continue
}
if tmpDelivered, tmpErr =
n.lattice.ProcessBlock(p); tmpErr != nil {
// It's not allowed that sanity checked block failed to
// be added to lattice.
panic(tmpErr)
}
delivered = append(delivered, tmpDelivered...)
updated = true
}
n.pendings = newPendings
if !updated {
break
}
}
// Deliver blocks.
for _, b = range delivered {
b.Finalization.Height = n.prevFinalHeight + 1
b.Finalization.ParentHash = n.prevHash
n.appModule.BlockDelivered(b.Hash, b.Position, b.Finalization)
n.prevFinalHeight++
n.prevHash = b.Hash
events = append(events, n.checkRoundSwitch(b)...)
}
if err = n.lattice.PurgeBlocks(delivered); err != nil {
panic(err)
}
return
}
func (n *Node) checkRoundSwitch(b *types.Block) (evts []*test.Event) {
if !b.Timestamp.After(n.roundEndTimes[b.Position.Round]) {
return
}
if b.Position.Round+2 != n.roundToNotify {
return
}
// Handle round switching logic.
n.govModule.NotifyRound(n.roundToNotify)
if n.roundToNotify == uint64(len(n.roundEndTimes)) {
config := n.govModule.Configuration(n.roundToNotify)
if config == nil {
panic(fmt.Errorf(
"config is not ready for round: %v", n.roundToNotify-1))
}
// Cache round ended time for each round.
n.roundEndTimes = append(n.roundEndTimes,
n.roundEndTimes[len(n.roundEndTimes)-1].Add(
config.RoundInterval))
// Add new config to lattice module.
if err := n.lattice.AppendConfig(n.roundToNotify, config); err != nil {
panic(err)
}
if config.NumChains > n.latticeMaxNumChains {
// We can be sure that lattice module can support this number of
// chains.
for _, chainID := range n.ownChains {
if chainID < n.latticeMaxNumChains {
continue
}
if chainID >= config.NumChains {
continue
}
// For newly added chains, add block proposing seed event.
evts = append(evts, newProposeBlockEvent(n.ID, n.roundToNotify,
chainID, n.roundEndTimes[n.roundToNotify-1]))
}
n.latticeMaxNumChains = config.NumChains
}
} else if n.roundToNotify > uint64(len(n.roundEndTimes)) {
panic(fmt.Errorf(
"config notification not incremental: %v, cached configs: %v",
n.roundToNotify, len(n.roundEndTimes)))
}
n.roundToNotify++
return
}
// Bootstrap this node with block proposing event.
func (n *Node) Bootstrap(sch *test.Scheduler, now time.Time) (err error) {
sch.RegisterEventHandler(n.ID, n)
for _, chainID := range n.ownChains {
if chainID >= n.latticeMaxNumChains {
continue
}
err = sch.Seed(newProposeBlockEvent(n.ID, 0, chainID, now))
if err != nil {
return
}
}
return
}
func (n *Node) setBroadcastTargets(targets map[types.NodeID]struct{}) {
// Clone targets, except self.
targetsCopy := make(map[types.NodeID]struct{})
for nID := range targets {
if nID == n.ID {
continue
}
targetsCopy[nID] = struct{}{}
}
n.broadcastTargets = targetsCopy
}
func (n *Node) app() *test.App {
return n.appModule
}
func (n *Node) db() db.Database {
return n.dbModule
}
func (n *Node) gov() *test.Governance {
return n.govModule
}