aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJimmy Hu <jimmy.hu@dexon.org>2018-10-04 17:26:46 +0800
committerGitHub <noreply@github.com>2018-10-04 17:26:46 +0800
commit6773c56fe29511aca0f4345e9fd3758ca05e174f (patch)
tree4cac43ea71124e9fea092397756e1b0f8ce38f01
parent604dd7e52c8cbffd7646205c464f7333d215ceb6 (diff)
downloaddexon-consensus-6773c56fe29511aca0f4345e9fd3758ca05e174f.tar
dexon-consensus-6773c56fe29511aca0f4345e9fd3758ca05e174f.tar.gz
dexon-consensus-6773c56fe29511aca0f4345e9fd3758ca05e174f.tar.bz2
dexon-consensus-6773c56fe29511aca0f4345e9fd3758ca05e174f.tar.lz
dexon-consensus-6773c56fe29511aca0f4345e9fd3758ca05e174f.tar.xz
dexon-consensus-6773c56fe29511aca0f4345e9fd3758ca05e174f.tar.zst
dexon-consensus-6773c56fe29511aca0f4345e9fd3758ca05e174f.zip
core: Use event to run DKG and CRS in Consensus. (#171)
-rw-r--r--common/event.go26
-rw-r--r--core/consensus.go84
-rw-r--r--core/consensus_test.go5
-rw-r--r--core/lattice-data.go9
4 files changed, 71 insertions, 53 deletions
diff --git a/common/event.go b/common/event.go
index 4c11d62..9e62d03 100644
--- a/common/event.go
+++ b/common/event.go
@@ -19,6 +19,7 @@ package common
import (
"container/heap"
+ "sync"
"time"
)
@@ -48,7 +49,8 @@ func (h *timeEvents) Pop() interface{} {
// Event implements the Observer pattern.
type Event struct {
- timeEvents timeEvents
+ timeEvents timeEvents
+ timeEventsLock sync.Mutex
}
// NewEvent creates a new event instance.
@@ -62,6 +64,8 @@ func NewEvent() *Event {
// RegisterTime to get notified on and after specific time.
func (e *Event) RegisterTime(t time.Time, fn timeEventFn) {
+ e.timeEventsLock.Lock()
+ defer e.timeEventsLock.Unlock()
heap.Push(&e.timeEvents, timeEvent{
t: t,
fn: fn,
@@ -70,14 +74,22 @@ func (e *Event) RegisterTime(t time.Time, fn timeEventFn) {
// NotifyTime and trigger function callback.
func (e *Event) NotifyTime(t time.Time) {
- if len(e.timeEvents) == 0 {
- return
- }
- for !t.Before(e.timeEvents[0].t) {
- te := heap.Pop(&e.timeEvents).(timeEvent)
- te.fn(t)
+ fns := func() (fns []timeEventFn) {
+ e.timeEventsLock.Lock()
+ defer e.timeEventsLock.Unlock()
if len(e.timeEvents) == 0 {
return
}
+ for !t.Before(e.timeEvents[0].t) {
+ te := heap.Pop(&e.timeEvents).(timeEvent)
+ fns = append(fns, te.fn)
+ if len(e.timeEvents) == 0 {
+ return
+ }
+ }
+ return
+ }()
+ for _, fn := range fns {
+ fn(t)
}
}
diff --git a/core/consensus.go b/core/consensus.go
index 4bcc116..92990b0 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -184,6 +184,7 @@ type Consensus struct {
lock sync.RWMutex
ctx context.Context
ctxCancel context.CancelFunc
+ event *common.Event
}
// NewConsensus construct an Consensus instance.
@@ -249,6 +250,7 @@ func NewConsensus(
ctx: ctx,
ctxCancel: ctxCancel,
authModule: authModule,
+ event: common.NewEvent(),
}
con.baModules = make([]*agreement, config.NumChains)
@@ -278,7 +280,8 @@ func NewConsensus(
// Run starts running DEXON Consensus.
func (con *Consensus) Run() {
go con.processMsg(con.network.ReceiveChan())
- con.runDKGTSIG()
+ startTime := time.Now().UTC().Add(con.currentConfig.RoundInterval / 2)
+ con.initialRound(startTime)
func() {
con.dkgReady.L.Lock()
defer con.dkgReady.L.Unlock()
@@ -286,13 +289,13 @@ func (con *Consensus) Run() {
con.dkgReady.Wait()
}
}()
+ time.Sleep(startTime.Sub(time.Now().UTC()))
ticks := make([]chan struct{}, 0, con.currentConfig.NumChains)
for i := uint32(0); i < con.currentConfig.NumChains; i++ {
tick := make(chan struct{})
ticks = append(ticks, tick)
go con.runBA(i, tick)
}
- go con.runCRS()
// Reset ticker.
<-con.tickerObj.Tick()
@@ -400,51 +403,51 @@ func (con *Consensus) runDKGTSIG() {
}
func (con *Consensus) runCRS() {
- for {
- ticker := newTicker(con.gov, con.round, TickerCRS)
- select {
- case <-con.ctx.Done():
- return
- default:
- }
- <-ticker.Tick()
- // Start running next round CRS.
- psig, err := con.cfgModule.preparePartialSignature(
- con.round, con.gov.CRS(con.round), types.TSigCRS)
+ // Start running next round CRS.
+ psig, err := con.cfgModule.preparePartialSignature(
+ con.round, con.gov.CRS(con.round), types.TSigCRS)
+ if err != nil {
+ log.Println(err)
+ } else if err = con.authModule.SignDKGPartialSignature(psig); err != nil {
+ log.Println(err)
+ } else if err = con.cfgModule.processPartialSignature(psig); err != nil {
+ log.Println(err)
+ } else {
+ con.network.BroadcastDKGPartialSignature(psig)
+ crs, err := con.cfgModule.runCRSTSig(con.round, con.gov.CRS(con.round))
if err != nil {
log.Println(err)
- } else if err = con.authModule.SignDKGPartialSignature(psig); err != nil {
- log.Println(err)
- } else if err = con.cfgModule.processPartialSignature(psig); err != nil {
- log.Println(err)
} else {
- con.network.BroadcastDKGPartialSignature(psig)
- crs, err := con.cfgModule.runCRSTSig(con.round, con.gov.CRS(con.round))
- if err != nil {
- log.Println(err)
- } else {
- con.gov.ProposeCRS(con.round+1, crs)
- }
+ con.gov.ProposeCRS(con.round+1, crs)
}
- con.cfgModule.registerDKG(con.round+1, con.currentConfig.NumDKGSet/3)
- <-ticker.Tick()
- select {
- case <-con.ctx.Done():
- return
- default:
- }
- // Change round.
- con.round++
- con.currentConfig = con.gov.Configuration(con.round)
- func() {
- con.dkgReady.L.Lock()
- defer con.dkgReady.L.Unlock()
- con.dkgRunning = 0
- }()
- con.runDKGTSIG()
}
}
+func (con *Consensus) initialRound(startTime time.Time) {
+ con.currentConfig = con.gov.Configuration(con.round)
+ func() {
+ con.dkgReady.L.Lock()
+ defer con.dkgReady.L.Unlock()
+ con.dkgRunning = 0
+ }()
+ con.runDKGTSIG()
+
+ con.event.RegisterTime(startTime.Add(con.currentConfig.RoundInterval/2),
+ func(time.Time) {
+ go con.runCRS()
+ })
+ con.event.RegisterTime(startTime.Add(con.currentConfig.RoundInterval/2),
+ func(time.Time) {
+ con.cfgModule.registerDKG(con.round+1, con.currentConfig.NumDKGSet/3)
+ })
+ con.event.RegisterTime(startTime.Add(con.currentConfig.RoundInterval),
+ func(time.Time) {
+ // Change round.
+ con.round++
+ con.initialRound(startTime.Add(con.currentConfig.RoundInterval))
+ })
+}
+
// Stop the Consensus core.
func (con *Consensus) Stop() {
con.ctxCancel()
@@ -533,6 +536,7 @@ func (con *Consensus) processBlock(block *types.Block) (err error) {
if err = con.db.Update(*b); err != nil {
return
}
+ go con.event.NotifyTime(b.ConsensusTimestamp)
con.nbModule.BlockDelivered(*b)
// TODO(mission): Find a way to safely recycle the block.
// We should deliver block directly to
diff --git a/core/consensus_test.go b/core/consensus_test.go
index 9439adb..e8615a2 100644
--- a/core/consensus_test.go
+++ b/core/consensus_test.go
@@ -436,7 +436,6 @@ func (s *ConsensusTestSuite) TestDKGCRS() {
gov, err := test.NewGovernance(n, lambda*time.Millisecond)
s.Require().Nil(err)
gov.RoundInterval = 200 * lambda * time.Millisecond
- config := gov.Configuration(0)
prvKeys := gov.PrivateKeys()
cons := map[types.NodeID]*Consensus{}
for _, key := range prvKeys {
@@ -464,10 +463,6 @@ func (s *ConsensusTestSuite) TestDKGCRS() {
crsFinish <- struct{}{}
}(con)
}
- time.Sleep(config.RoundInterval * 3 / 4)
- for _, con := range cons {
- con.Stop()
- }
for range cons {
<-crsFinish
}
diff --git a/core/lattice-data.go b/core/lattice-data.go
index 7eec7f2..75447c6 100644
--- a/core/lattice-data.go
+++ b/core/lattice-data.go
@@ -43,7 +43,7 @@ var (
ErrAcksNotSorted = fmt.Errorf("acks not sorted")
ErrInvalidBlockHeight = fmt.Errorf("invalid block height")
ErrAlreadyInLattice = fmt.Errorf("block already in lattice")
- ErrIncorrectBlockTime = fmt.Errorf("block timestampe is incorrect")
+ ErrIncorrectBlockTime = fmt.Errorf("block timestamp is incorrect")
)
// Errors for method usage
@@ -271,6 +271,13 @@ func (data *latticeData) prepareBlock(block *types.Block) {
block.ParentHash = curBlock.Hash
block.Position.Height = curBlock.Position.Height + 1
block.Witness.Height = curBlock.Witness.Height
+ minTimestamp := curBlock.Timestamp.Add(data.minBlockTimeInterval)
+ maxTimestamp := curBlock.Timestamp.Add(data.maxBlockTimeInterval)
+ if block.Timestamp.Before(minTimestamp) {
+ block.Timestamp = minTimestamp
+ } else if block.Timestamp.After(maxTimestamp) {
+ block.Timestamp = maxTimestamp
+ }
}
}
block.Acks = common.NewSortedHashes(acks)