aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
authorJimmy Hu <jimmy.hu@dexon.org>2018-12-26 14:24:16 +0800
committerGitHub <noreply@github.com>2018-12-26 14:24:16 +0800
commit00416c9df2fec5398389863fb6f885a1fe11a6cc (patch)
tree74d260ac8a8c0e2ab782be1814b49b4232f56dc8 /core
parentd333dc1a24df26ae8e8e3ffa2d700c1116a93ba2 (diff)
downloadtangerine-consensus-00416c9df2fec5398389863fb6f885a1fe11a6cc.tar
tangerine-consensus-00416c9df2fec5398389863fb6f885a1fe11a6cc.tar.gz
tangerine-consensus-00416c9df2fec5398389863fb6f885a1fe11a6cc.tar.bz2
tangerine-consensus-00416c9df2fec5398389863fb6f885a1fe11a6cc.tar.lz
tangerine-consensus-00416c9df2fec5398389863fb6f885a1fe11a6cc.tar.xz
tangerine-consensus-00416c9df2fec5398389863fb6f885a1fe11a6cc.tar.zst
tangerine-consensus-00416c9df2fec5398389863fb6f885a1fe11a6cc.zip
core: pull block random (#384)
* Add PullRandomness to interface * Add pendingBlocksWithoutRandomness to compactionChain * Pull randomness every 1 second
Diffstat (limited to 'core')
-rw-r--r--core/compaction-chain.go12
-rw-r--r--core/compaction-chain_test.go8
-rw-r--r--core/consensus.go56
-rw-r--r--core/consensus_test.go4
-rw-r--r--core/interfaces.go3
-rw-r--r--core/test/network.go178
6 files changed, 206 insertions, 55 deletions
diff --git a/core/compaction-chain.go b/core/compaction-chain.go
index 14e3b26..8e04429 100644
--- a/core/compaction-chain.go
+++ b/core/compaction-chain.go
@@ -257,3 +257,15 @@ func (cc *compactionChain) lastPendingBlock() *types.Block {
}
return nil
}
+
+func (cc *compactionChain) pendingBlocksWithoutRandomness() (
+ hashes common.Hashes) {
+ cc.lock.RLock()
+ defer cc.lock.RUnlock()
+ for _, block := range cc.pendingBlocks {
+ if _, exist := cc.blockRandomness[block.Hash]; !exist {
+ hashes = append(hashes, block.Hash)
+ }
+ }
+ return
+}
diff --git a/core/compaction-chain_test.go b/core/compaction-chain_test.go
index e944e4d..ca88734 100644
--- a/core/compaction-chain_test.go
+++ b/core/compaction-chain_test.go
@@ -201,6 +201,7 @@ func (s *CompactionChainTestSuite) TestMissedRandomness() {
cc.registerBlock(blocks[idx])
s.Require().True(cc.blockRegistered(blocks[idx].Hash))
}
+ noRandBlocks := common.Hashes{}
// Block#4, #5, contains randomness.
for i := range blocks {
s.Require().NoError(cc.processBlock(blocks[i]))
@@ -212,8 +213,11 @@ func (s *CompactionChainTestSuite) TestMissedRandomness() {
Position: blocks[i].Position,
Randomness: h[:],
}))
+ } else {
+ noRandBlocks = append(noRandBlocks, blocks[i].Hash)
}
}
+ s.Equal(noRandBlocks, cc.pendingBlocksWithoutRandomness())
s.Require().Len(cc.extractBlocks(), 0)
// Give compactionChain module randomnessResult via finalized block
// #0, #1, #2, #3, #4.
@@ -227,6 +231,9 @@ func (s *CompactionChainTestSuite) TestMissedRandomness() {
block.Finalization.Height = uint64(i + 1)
cc.processFinalizedBlock(block)
}
+ // Block #0-3 has randomness result.
+ noRandBlocks = noRandBlocks[4:]
+ s.Equal(noRandBlocks, cc.pendingBlocksWithoutRandomness())
delivered := cc.extractBlocks()
s.Require().Len(delivered, 6)
// Give compactionChain module randomnessResult#6-9.
@@ -239,6 +246,7 @@ func (s *CompactionChainTestSuite) TestMissedRandomness() {
Randomness: h[:],
}))
}
+ s.Len(cc.pendingBlocksWithoutRandomness(), 0)
delivered = append(delivered, cc.extractBlocks()...)
s.Require().Len(delivered, 10)
// The delivered order should be the same as processing order.
diff --git a/core/consensus.go b/core/consensus.go
index 0d4a38a..7f77df9 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -387,6 +387,7 @@ type Consensus struct {
event *common.Event
logger common.Logger
nonFinalizedBlockDelivered bool
+ resetRandomnessTicker chan struct{}
}
// NewConsensus construct an Consensus instance.
@@ -466,22 +467,23 @@ func newConsensus(
}
// Construct Consensus instance.
con := &Consensus{
- ID: ID,
- ccModule: newCompactionChain(gov),
- lattice: lattice,
- app: appModule,
- debugApp: debugApp,
- gov: gov,
- db: db,
- network: network,
- baConfirmedBlock: make(map[common.Hash]chan<- *types.Block),
- dkgReady: sync.NewCond(&sync.Mutex{}),
- cfgModule: cfgModule,
- dMoment: dMoment,
- nodeSetCache: nodeSetCache,
- signer: signer,
- event: common.NewEvent(),
- logger: logger,
+ ID: ID,
+ ccModule: newCompactionChain(gov),
+ lattice: lattice,
+ app: appModule,
+ debugApp: debugApp,
+ gov: gov,
+ db: db,
+ network: network,
+ baConfirmedBlock: make(map[common.Hash]chan<- *types.Block),
+ dkgReady: sync.NewCond(&sync.Mutex{}),
+ cfgModule: cfgModule,
+ dMoment: dMoment,
+ nodeSetCache: nodeSetCache,
+ signer: signer,
+ event: common.NewEvent(),
+ logger: logger,
+ resetRandomnessTicker: make(chan struct{}),
}
con.ctx, con.ctxCancel = context.WithCancel(context.Background())
con.baMgr = newAgreementMgr(con, round, dMoment)
@@ -634,6 +636,9 @@ func (con *Consensus) Run() {
go con.processMsg(con.network.ReceiveChan())
// Sleep until dMoment come.
time.Sleep(con.dMoment.Sub(time.Now().UTC()))
+ // Take some time to bootstrap.
+ time.Sleep(3 * time.Second)
+ go con.pullRandomness()
// Block until done.
select {
case <-con.ctx.Done():
@@ -1051,8 +1056,27 @@ func (con *Consensus) preProcessBlock(b *types.Block) (err error) {
return
}
+func (con *Consensus) pullRandomness() {
+ for {
+ select {
+ case <-con.ctx.Done():
+ return
+ case <-con.resetRandomnessTicker:
+ case <-time.After(1 * time.Second):
+ // TODO(jimmy): pulling period should be related to lambdaBA.
+ hashes := con.ccModule.pendingBlocksWithoutRandomness()
+ con.logger.Debug("Calling Network.PullRandomness", "blocks", hashes)
+ con.network.PullRandomness(hashes)
+ }
+ }
+}
+
// deliverBlock deliver a block to application layer.
func (con *Consensus) deliverBlock(b *types.Block) {
+ select {
+ case con.resetRandomnessTicker <- struct{}{}:
+ default:
+ }
if err := con.db.UpdateBlock(*b); err != nil {
panic(err)
}
diff --git a/core/consensus_test.go b/core/consensus_test.go
index 40ee34b..d26e102 100644
--- a/core/consensus_test.go
+++ b/core/consensus_test.go
@@ -48,6 +48,10 @@ func (n *network) PullBlocks(common.Hashes) {
func (n *network) PullVotes(types.Position) {
}
+// PullRandomness tries to pull randomness from the DEXON network.
+func (n *network) PullRandomness(common.Hashes) {
+}
+
// BroadcastVote broadcasts vote to all nodes in DEXON network.
func (n *network) BroadcastVote(vote *types.Vote) {
n.conn.broadcast(n.nID, vote)
diff --git a/core/interfaces.go b/core/interfaces.go
index fc3bf09..2077032 100644
--- a/core/interfaces.go
+++ b/core/interfaces.go
@@ -67,6 +67,9 @@ type Network interface {
// PullVotes tries to pull votes from the DEXON network.
PullVotes(position types.Position)
+ // PullRandomness tries to pull randomness from the DEXON network.
+ PullRandomness(hashes common.Hashes)
+
// BroadcastVote broadcasts vote to all nodes in DEXON network.
BroadcastVote(vote *types.Vote)
diff --git a/core/test/network.go b/core/test/network.go
index 207d8a4..a79898e 100644
--- a/core/test/network.go
+++ b/core/test/network.go
@@ -125,34 +125,38 @@ func (req *PullRequest) UnmarshalJSON(data []byte) (err error) {
// Network implements core.Network interface based on TransportClient.
type Network struct {
- ID types.NodeID
- config NetworkConfig
- ctx context.Context
- ctxCancel context.CancelFunc
- trans TransportClient
- fromTransport <-chan *TransportEnvelope
- toConsensus chan interface{}
- toNode chan interface{}
- sentRandomnessLock sync.Mutex
- sentRandomness map[common.Hash]struct{}
- sentAgreementLock sync.Mutex
- sentAgreement map[common.Hash]struct{}
- blockCacheLock sync.RWMutex
- blockCache map[common.Hash]*types.Block
- voteCacheLock sync.RWMutex
- voteCache map[types.Position]map[types.VoteHeader]*types.Vote
- voteCacheSize int
- votePositions []types.Position
- stateModule *State
- peers map[types.NodeID]struct{}
- unreceivedBlocksLock sync.RWMutex
- unreceivedBlocks map[common.Hash]chan<- common.Hash
- latencyModel LatencyModel
- cache *utils.NodeSetCache
- notarySetCachesLock sync.Mutex
- notarySetCaches map[uint64]map[uint32]map[types.NodeID]struct{}
- dkgSetCachesLock sync.Mutex
- dkgSetCaches map[uint64]map[types.NodeID]struct{}
+ ID types.NodeID
+ config NetworkConfig
+ ctx context.Context
+ ctxCancel context.CancelFunc
+ trans TransportClient
+ fromTransport <-chan *TransportEnvelope
+ toConsensus chan interface{}
+ toNode chan interface{}
+ sentRandomnessLock sync.Mutex
+ sentRandomness map[common.Hash]struct{}
+ sentAgreementLock sync.Mutex
+ sentAgreement map[common.Hash]struct{}
+ blockCacheLock sync.RWMutex
+ blockCache map[common.Hash]*types.Block
+ voteCacheLock sync.RWMutex
+ voteCache map[types.Position]map[types.VoteHeader]*types.Vote
+ voteCacheSize int
+ votePositions []types.Position
+ randomnessCacheLock sync.RWMutex
+ randomnessCache map[common.Hash]*types.BlockRandomnessResult
+ stateModule *State
+ peers map[types.NodeID]struct{}
+ unreceivedBlocksLock sync.RWMutex
+ unreceivedBlocks map[common.Hash]chan<- common.Hash
+ unreceivedRandomnessLock sync.RWMutex
+ unreceivedRandomness map[common.Hash]chan<- common.Hash
+ latencyModel LatencyModel
+ cache *utils.NodeSetCache
+ notarySetCachesLock sync.Mutex
+ notarySetCaches map[uint64]map[uint32]map[types.NodeID]struct{}
+ dkgSetCachesLock sync.Mutex
+ dkgSetCaches map[uint64]map[types.NodeID]struct{}
}
// NewNetwork setup network stuffs for nodes, which provides an
@@ -161,17 +165,19 @@ func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel,
marshaller Marshaller, config NetworkConfig) (n *Network) {
// Construct basic network instance.
n = &Network{
- ID: types.NewNodeID(pubKey),
- config: config,
- toConsensus: make(chan interface{}, 1000),
- toNode: make(chan interface{}, 1000),
- sentRandomness: make(map[common.Hash]struct{}),
- sentAgreement: make(map[common.Hash]struct{}),
- blockCache: make(map[common.Hash]*types.Block),
- unreceivedBlocks: make(map[common.Hash]chan<- common.Hash),
- latencyModel: latency,
- notarySetCaches: make(map[uint64]map[uint32]map[types.NodeID]struct{}),
- dkgSetCaches: make(map[uint64]map[types.NodeID]struct{}),
+ ID: types.NewNodeID(pubKey),
+ config: config,
+ toConsensus: make(chan interface{}, 1000),
+ toNode: make(chan interface{}, 1000),
+ sentRandomness: make(map[common.Hash]struct{}),
+ sentAgreement: make(map[common.Hash]struct{}),
+ blockCache: make(map[common.Hash]*types.Block),
+ randomnessCache: make(map[common.Hash]*types.BlockRandomnessResult),
+ unreceivedBlocks: make(map[common.Hash]chan<- common.Hash),
+ unreceivedRandomness: make(map[common.Hash]chan<- common.Hash),
+ latencyModel: latency,
+ notarySetCaches: make(map[uint64]map[uint32]map[types.NodeID]struct{}),
+ dkgSetCaches: make(map[uint64]map[types.NodeID]struct{}),
voteCache: make(
map[types.Position]map[types.VoteHeader]*types.Vote),
}
@@ -200,6 +206,11 @@ func (n *Network) PullVotes(pos types.Position) {
go n.pullVotesAsync(pos)
}
+// PullRandomness implememnts core.Network interface.
+func (n *Network) PullRandomness(hashes common.Hashes) {
+ go n.pullRandomnessAsync(hashes)
+}
+
// BroadcastVote implements core.Network interface.
func (n *Network) BroadcastVote(vote *types.Vote) {
n.broadcastToSet(
@@ -256,6 +267,7 @@ func (n *Network) BroadcastRandomnessResult(
if err := n.trans.Broadcast(randResult); err != nil {
panic(err)
}
+ n.addRandomnessToCache(randResult)
}
// SendDKGPrivateShare implements core.Network interface.
@@ -380,6 +392,27 @@ func (n *Network) handlePullRequest(req *PullRequest) {
}
}
}()
+ case "randomness":
+ hashes := req.Identity.(common.Hashes)
+ func() {
+ n.randomnessCacheLock.Lock()
+ defer n.randomnessCacheLock.Unlock()
+ All:
+ for _, h := range hashes {
+ r, exists := n.randomnessCache[h]
+ if !exists {
+ continue
+ }
+ select {
+ case <-n.ctx.Done():
+ break All
+ default:
+ }
+ if err := n.trans.Send(req.Requester, r); err != nil {
+ log.Println("unable to send randomness", req.Requester, err)
+ }
+ }
+ }()
default:
panic(fmt.Errorf("unknown type of pull request: %v", req.Type))
}
@@ -539,6 +572,60 @@ func (n *Network) pullVotesAsync(pos types.Position) {
}
}
+func (n *Network) pullRandomnessAsync(hashes common.Hashes) {
+ // Setup notification channels for each block hash.
+ notYetReceived := make(map[common.Hash]struct{})
+ ch := make(chan common.Hash, len(hashes))
+ func() {
+ n.unreceivedRandomnessLock.Lock()
+ defer n.unreceivedRandomnessLock.Unlock()
+ for _, h := range hashes {
+ if _, exists := n.unreceivedRandomness[h]; exists {
+ continue
+ }
+ n.unreceivedRandomness[h] = ch
+ notYetReceived[h] = struct{}{}
+ }
+ }()
+ req := &PullRequest{
+ Requester: n.ID,
+ Type: "randomness",
+ Identity: hashes,
+ }
+ // Randomly pick peers to send pull requests.
+Loop:
+ for nID := range n.peers {
+ if nID == n.ID {
+ continue
+ }
+ if err := n.trans.Send(nID, req); err != nil {
+ // Try next peer.
+ continue
+ }
+ select {
+ case <-n.ctx.Done():
+ break Loop
+ case <-time.After(2 * n.latencyModel.Delay()):
+ // Consume everything in the notification channel.
+ for {
+ select {
+ case h, ok := <-ch:
+ if !ok {
+ // This network module is closed.
+ break Loop
+ }
+ delete(notYetReceived, h)
+ if len(notYetReceived) == 0 {
+ break Loop
+ }
+ default:
+ continue Loop
+ }
+ }
+ }
+ }
+}
+
func (n *Network) addBlockToCache(b *types.Block) {
n.blockCacheLock.Lock()
defer n.blockCacheLock.Unlock()
@@ -573,6 +660,19 @@ func (n *Network) addVoteToCache(v *types.Vote) {
n.voteCacheSize++
}
+func (n *Network) addRandomnessToCache(rand *types.BlockRandomnessResult) {
+ n.randomnessCacheLock.Lock()
+ defer n.randomnessCacheLock.Unlock()
+ if len(n.randomnessCache) > 1000 {
+ // Randomly purge one randomness from cache.
+ for k := range n.randomnessCache {
+ delete(n.randomnessCache, k)
+ break
+ }
+ }
+ n.randomnessCache[rand.BlockHash] = rand
+}
+
func (n *Network) cloneForFake(v interface{}) interface{} {
if n.config.Type != NetworkTypeFake {
return v