diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2018-10-04 17:26:46 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-10-04 17:26:46 +0800 |
commit | 6773c56fe29511aca0f4345e9fd3758ca05e174f (patch) | |
tree | 4cac43ea71124e9fea092397756e1b0f8ce38f01 | |
parent | 604dd7e52c8cbffd7646205c464f7333d215ceb6 (diff) | |
download | dexon-consensus-6773c56fe29511aca0f4345e9fd3758ca05e174f.tar dexon-consensus-6773c56fe29511aca0f4345e9fd3758ca05e174f.tar.gz dexon-consensus-6773c56fe29511aca0f4345e9fd3758ca05e174f.tar.bz2 dexon-consensus-6773c56fe29511aca0f4345e9fd3758ca05e174f.tar.lz dexon-consensus-6773c56fe29511aca0f4345e9fd3758ca05e174f.tar.xz dexon-consensus-6773c56fe29511aca0f4345e9fd3758ca05e174f.tar.zst dexon-consensus-6773c56fe29511aca0f4345e9fd3758ca05e174f.zip |
core: Use event to run DKG and CRS in Consensus. (#171)
-rw-r--r-- | common/event.go | 26 | ||||
-rw-r--r-- | core/consensus.go | 84 | ||||
-rw-r--r-- | core/consensus_test.go | 5 | ||||
-rw-r--r-- | core/lattice-data.go | 9 |
4 files changed, 71 insertions, 53 deletions
diff --git a/common/event.go b/common/event.go index 4c11d62..9e62d03 100644 --- a/common/event.go +++ b/common/event.go @@ -19,6 +19,7 @@ package common import ( "container/heap" + "sync" "time" ) @@ -48,7 +49,8 @@ func (h *timeEvents) Pop() interface{} { // Event implements the Observer pattern. type Event struct { - timeEvents timeEvents + timeEvents timeEvents + timeEventsLock sync.Mutex } // NewEvent creates a new event instance. @@ -62,6 +64,8 @@ func NewEvent() *Event { // RegisterTime to get notified on and after specific time. func (e *Event) RegisterTime(t time.Time, fn timeEventFn) { + e.timeEventsLock.Lock() + defer e.timeEventsLock.Unlock() heap.Push(&e.timeEvents, timeEvent{ t: t, fn: fn, @@ -70,14 +74,22 @@ func (e *Event) RegisterTime(t time.Time, fn timeEventFn) { // NotifyTime and trigger function callback. func (e *Event) NotifyTime(t time.Time) { - if len(e.timeEvents) == 0 { - return - } - for !t.Before(e.timeEvents[0].t) { - te := heap.Pop(&e.timeEvents).(timeEvent) - te.fn(t) + fns := func() (fns []timeEventFn) { + e.timeEventsLock.Lock() + defer e.timeEventsLock.Unlock() if len(e.timeEvents) == 0 { return } + for !t.Before(e.timeEvents[0].t) { + te := heap.Pop(&e.timeEvents).(timeEvent) + fns = append(fns, te.fn) + if len(e.timeEvents) == 0 { + return + } + } + return + }() + for _, fn := range fns { + fn(t) } } diff --git a/core/consensus.go b/core/consensus.go index 4bcc116..92990b0 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -184,6 +184,7 @@ type Consensus struct { lock sync.RWMutex ctx context.Context ctxCancel context.CancelFunc + event *common.Event } // NewConsensus construct an Consensus instance. @@ -249,6 +250,7 @@ func NewConsensus( ctx: ctx, ctxCancel: ctxCancel, authModule: authModule, + event: common.NewEvent(), } con.baModules = make([]*agreement, config.NumChains) @@ -278,7 +280,8 @@ func NewConsensus( // Run starts running DEXON Consensus. func (con *Consensus) Run() { go con.processMsg(con.network.ReceiveChan()) - con.runDKGTSIG() + startTime := time.Now().UTC().Add(con.currentConfig.RoundInterval / 2) + con.initialRound(startTime) func() { con.dkgReady.L.Lock() defer con.dkgReady.L.Unlock() @@ -286,13 +289,13 @@ func (con *Consensus) Run() { con.dkgReady.Wait() } }() + time.Sleep(startTime.Sub(time.Now().UTC())) ticks := make([]chan struct{}, 0, con.currentConfig.NumChains) for i := uint32(0); i < con.currentConfig.NumChains; i++ { tick := make(chan struct{}) ticks = append(ticks, tick) go con.runBA(i, tick) } - go con.runCRS() // Reset ticker. <-con.tickerObj.Tick() @@ -400,51 +403,51 @@ func (con *Consensus) runDKGTSIG() { } func (con *Consensus) runCRS() { - for { - ticker := newTicker(con.gov, con.round, TickerCRS) - select { - case <-con.ctx.Done(): - return - default: - } - <-ticker.Tick() - // Start running next round CRS. - psig, err := con.cfgModule.preparePartialSignature( - con.round, con.gov.CRS(con.round), types.TSigCRS) + // Start running next round CRS. + psig, err := con.cfgModule.preparePartialSignature( + con.round, con.gov.CRS(con.round), types.TSigCRS) + if err != nil { + log.Println(err) + } else if err = con.authModule.SignDKGPartialSignature(psig); err != nil { + log.Println(err) + } else if err = con.cfgModule.processPartialSignature(psig); err != nil { + log.Println(err) + } else { + con.network.BroadcastDKGPartialSignature(psig) + crs, err := con.cfgModule.runCRSTSig(con.round, con.gov.CRS(con.round)) if err != nil { log.Println(err) - } else if err = con.authModule.SignDKGPartialSignature(psig); err != nil { - log.Println(err) - } else if err = con.cfgModule.processPartialSignature(psig); err != nil { - log.Println(err) } else { - con.network.BroadcastDKGPartialSignature(psig) - crs, err := con.cfgModule.runCRSTSig(con.round, con.gov.CRS(con.round)) - if err != nil { - log.Println(err) - } else { - con.gov.ProposeCRS(con.round+1, crs) - } + con.gov.ProposeCRS(con.round+1, crs) } - con.cfgModule.registerDKG(con.round+1, con.currentConfig.NumDKGSet/3) - <-ticker.Tick() - select { - case <-con.ctx.Done(): - return - default: - } - // Change round. - con.round++ - con.currentConfig = con.gov.Configuration(con.round) - func() { - con.dkgReady.L.Lock() - defer con.dkgReady.L.Unlock() - con.dkgRunning = 0 - }() - con.runDKGTSIG() } } +func (con *Consensus) initialRound(startTime time.Time) { + con.currentConfig = con.gov.Configuration(con.round) + func() { + con.dkgReady.L.Lock() + defer con.dkgReady.L.Unlock() + con.dkgRunning = 0 + }() + con.runDKGTSIG() + + con.event.RegisterTime(startTime.Add(con.currentConfig.RoundInterval/2), + func(time.Time) { + go con.runCRS() + }) + con.event.RegisterTime(startTime.Add(con.currentConfig.RoundInterval/2), + func(time.Time) { + con.cfgModule.registerDKG(con.round+1, con.currentConfig.NumDKGSet/3) + }) + con.event.RegisterTime(startTime.Add(con.currentConfig.RoundInterval), + func(time.Time) { + // Change round. + con.round++ + con.initialRound(startTime.Add(con.currentConfig.RoundInterval)) + }) +} + // Stop the Consensus core. func (con *Consensus) Stop() { con.ctxCancel() @@ -533,6 +536,7 @@ func (con *Consensus) processBlock(block *types.Block) (err error) { if err = con.db.Update(*b); err != nil { return } + go con.event.NotifyTime(b.ConsensusTimestamp) con.nbModule.BlockDelivered(*b) // TODO(mission): Find a way to safely recycle the block. // We should deliver block directly to diff --git a/core/consensus_test.go b/core/consensus_test.go index 9439adb..e8615a2 100644 --- a/core/consensus_test.go +++ b/core/consensus_test.go @@ -436,7 +436,6 @@ func (s *ConsensusTestSuite) TestDKGCRS() { gov, err := test.NewGovernance(n, lambda*time.Millisecond) s.Require().Nil(err) gov.RoundInterval = 200 * lambda * time.Millisecond - config := gov.Configuration(0) prvKeys := gov.PrivateKeys() cons := map[types.NodeID]*Consensus{} for _, key := range prvKeys { @@ -464,10 +463,6 @@ func (s *ConsensusTestSuite) TestDKGCRS() { crsFinish <- struct{}{} }(con) } - time.Sleep(config.RoundInterval * 3 / 4) - for _, con := range cons { - con.Stop() - } for range cons { <-crsFinish } diff --git a/core/lattice-data.go b/core/lattice-data.go index 7eec7f2..75447c6 100644 --- a/core/lattice-data.go +++ b/core/lattice-data.go @@ -43,7 +43,7 @@ var ( ErrAcksNotSorted = fmt.Errorf("acks not sorted") ErrInvalidBlockHeight = fmt.Errorf("invalid block height") ErrAlreadyInLattice = fmt.Errorf("block already in lattice") - ErrIncorrectBlockTime = fmt.Errorf("block timestampe is incorrect") + ErrIncorrectBlockTime = fmt.Errorf("block timestamp is incorrect") ) // Errors for method usage @@ -271,6 +271,13 @@ func (data *latticeData) prepareBlock(block *types.Block) { block.ParentHash = curBlock.Hash block.Position.Height = curBlock.Position.Height + 1 block.Witness.Height = curBlock.Witness.Height + minTimestamp := curBlock.Timestamp.Add(data.minBlockTimeInterval) + maxTimestamp := curBlock.Timestamp.Add(data.maxBlockTimeInterval) + if block.Timestamp.Before(minTimestamp) { + block.Timestamp = minTimestamp + } else if block.Timestamp.After(maxTimestamp) { + block.Timestamp = maxTimestamp + } } } block.Acks = common.NewSortedHashes(acks) |