aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorzelig <viktor.tron@gmail.com>2015-04-10 23:31:00 +0800
committerzelig <viktor.tron@gmail.com>2015-04-10 23:36:40 +0800
commitda7332a7317bda9003e5e8a4aaa4b1710c69b664 (patch)
tree7c4fc11a84c18946b105bf778aa7243dd70e54ec
parentfc1d1f9afd155fab1f614c6a0340233f90afafd6 (diff)
downloaddexon-da7332a7317bda9003e5e8a4aaa4b1710c69b664.tar
dexon-da7332a7317bda9003e5e8a4aaa4b1710c69b664.tar.gz
dexon-da7332a7317bda9003e5e8a4aaa4b1710c69b664.tar.bz2
dexon-da7332a7317bda9003e5e8a4aaa4b1710c69b664.tar.lz
dexon-da7332a7317bda9003e5e8a4aaa4b1710c69b664.tar.xz
dexon-da7332a7317bda9003e5e8a4aaa4b1710c69b664.tar.zst
dexon-da7332a7317bda9003e5e8a4aaa4b1710c69b664.zip
td update from node
- reorg and simplify AddBlock - introduce nodeCache - TestPeerPromotionByTdOnBlock unskipped and passes - move switchC/idleC channel creation around: solves deadlock (now respects the contract with section process: either can activate or complete at any one time)
-rw-r--r--blockpool/blockpool.go189
-rw-r--r--blockpool/errors_test.go2
-rw-r--r--blockpool/peers.go85
-rw-r--r--blockpool/peers_test.go11
4 files changed, 148 insertions, 139 deletions
diff --git a/blockpool/blockpool.go b/blockpool/blockpool.go
index f442259e0..3ed2e92c7 100644
--- a/blockpool/blockpool.go
+++ b/blockpool/blockpool.go
@@ -169,6 +169,9 @@ type BlockPool struct {
// alloc-easy pool of hash slices
hashSlicePool chan []common.Hash
+ nodeCache map[common.Hash]*node
+ nodeCacheLock sync.RWMutex
+
// waitgroup is used in tests to wait for result-critical routines
// as well as in determining idle / syncing status
wg sync.WaitGroup //
@@ -210,6 +213,7 @@ func (self *BlockPool) Start() {
self.Config.init()
self.hashSlicePool = make(chan []common.Hash, 150)
+ self.nodeCache = make(map[common.Hash]*node)
self.status = newStatus()
self.quit = make(chan bool)
self.pool = make(map[common.Hash]*entry)
@@ -615,127 +619,104 @@ LOOP:
If the block received is the head block of the current best peer, signal it to the head section process
*/
func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
- hash := block.Hash()
-
- sender, _ := self.peers.getPeer(peerId)
- if sender == nil {
- return
- }
self.status.lock.Lock()
self.status.activePeers[peerId]++
self.status.lock.Unlock()
- entry := self.get(hash)
- blockIsCurrentHead := false
- sender.lock.RLock()
- currentBlockHash := sender.currentBlockHash
- currentBlock := sender.currentBlock
- currentBlockC := sender.currentBlockC
- switchC := sender.switchC
- sender.lock.RUnlock()
-
- // a peer's current head block is appearing the first time
- if hash == currentBlockHash {
- // this happens when block came in a newblock message but
- // also if sent in a blockmsg (for instance, if we requested, only if we
- // dont apply on blockrequests the restriction of flood control)
- blockIsCurrentHead = true
- if currentBlock == nil {
- sender.lock.Lock()
- sender.setChainInfoFromBlock(block)
- sender.lock.Unlock()
-
- self.status.lock.Lock()
- self.status.values.BlockHashes++
- self.status.values.Blocks++
- self.status.values.BlocksInPool++
- self.status.lock.Unlock()
- // signal to head section process
- select {
- case currentBlockC <- block:
- case <-switchC:
- }
- } else {
- plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), peerId, hex(currentBlockHash))
- }
- } else {
-
- plog.DebugDetailf("AddBlock: block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(currentBlockHash))
-
- /* @zelig !!!
- requested 5 hashes from both A & B. A responds sooner then B, process blocks. Close section.
- delayed B sends you block ... UNREQUESTED. Blocked
- if entry == nil {
- plog.DebugDetailf("AddBlock: unrequested block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash))
- sender.addError(ErrUnrequestedBlock, "%x", hash)
-
- self.status.lock.Lock()
- self.status.badPeers[peerId]++
- self.status.lock.Unlock()
- return
- }
- */
- }
+ hash := block.Hash()
- if entry == nil {
- // FIXME: here check the cache find or create node -
- // put peer as blockBy!
+ // check if block is already inserted in the blockchain
+ if self.hasBlock(hash) {
return
}
- node := entry.node
- node.lock.Lock()
- defer node.lock.Unlock()
-
- // register peer on node as source
- if node.peers == nil {
- node.peers = make(map[string]bool)
- }
- FoundBlockCurrentHead, found := node.peers[sender.id]
- if !found || FoundBlockCurrentHead {
- // if found but not FoundBlockCurrentHead, then no update
- // necessary (||)
- node.peers[sender.id] = blockIsCurrentHead
- // for those that are false, TD will update their head
- // for those that are true, TD is checked !
- // this is checked at the time of TD calculation in checkTD
- }
- // check if block already received
- if node.block != nil {
- plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already sent by <%s> ", hex(hash), peerId, hex(sender.currentBlockHash), node.blockBy)
- }
-
- // check if block is already inserted in the blockchain
- if self.hasBlock(hash) {
- plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already in the blockchain", hex(hash), peerId, hex(sender.currentBlockHash))
+ sender, _ := self.peers.getPeer(peerId)
+ if sender == nil {
return
}
+ tdFromCurrentHead, currentBlockHash := sender.setChainInfoFromBlock(block)
- /*
- @zelig needs discussing
- Viktor: pow check can be delayed in a go routine and therefore cache
- creation is not blocking
- // validate block for PoW
- if !self.verifyPoW(block) {
- plog.Warnf("AddBlock: invalid PoW on block %s from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash))
- sender.addError(ErrInvalidPoW, "%x", hash)
+ entry := self.get(hash)
- self.status.lock.Lock()
- self.status.badPeers[peerId]++
- self.status.lock.Unlock()
+ /* @zelig !!!
+ requested 5 hashes from both A & B. A responds sooner then B, process blocks. Close section.
+ delayed B sends you block ... UNREQUESTED. Blocked
+ if entry == nil {
+ plog.DebugDetailf("AddBlock: unrequested block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash))
+ sender.addError(ErrUnrequestedBlock, "%x", hash)
+
+ self.status.lock.Lock()
+ self.status.badPeers[peerId]++
+ self.status.lock.Unlock()
+ return
+ }
+ */
- return
+ var bnode *node
+ if entry == nil {
+ self.nodeCacheLock.Lock()
+ bnode, _ = self.nodeCache[hash]
+ if bnode == nil {
+ bnode = &node{
+ hash: currentBlockHash,
+ block: block,
+ hashBy: peerId,
+ blockBy: peerId,
+ td: tdFromCurrentHead,
}
- */
+ self.nodeCache[hash] = bnode
+ }
+ self.nodeCacheLock.Unlock()
+ } else {
+ bnode = entry.node
+ }
- node.block = block
- node.blockBy = peerId
+ bnode.lock.Lock()
+ defer bnode.lock.Unlock()
- self.status.lock.Lock()
- self.status.values.Blocks++
- self.status.values.BlocksInPool++
- self.status.lock.Unlock()
+ // check if block already received
+ if bnode.block != nil {
+ plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already sent by <%s> ", hex(hash), peerId, hex(sender.currentBlockHash), bnode.blockBy)
+ // register peer on node as source
+ if bnode.peers == nil {
+ bnode.peers = make(map[string]bool)
+ }
+ foundBlockCurrentHead, found := bnode.peers[sender.id]
+ if !found || foundBlockCurrentHead {
+ // if found but not FoundBlockCurrentHead, then no update
+ // necessary (||)
+ bnode.peers[sender.id] = (currentBlockHash == hash)
+ // for those that are false, TD will update their head
+ // for those that are true, TD is checked !
+ // this is checked at the time of TD calculation in checkTD
+ }
+ sender.setChainInfoFromNode(bnode)
+ } else {
+ /*
+ @zelig needs discussing
+ Viktor: pow check can be delayed in a go routine and therefore cache
+ creation is not blocking
+ // validate block for PoW
+ if !self.verifyPoW(block) {
+ plog.Warnf("AddBlock: invalid PoW on block %s from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash))
+ sender.addError(ErrInvalidPoW, "%x", hash)
+
+ self.status.lock.Lock()
+ self.status.badPeers[peerId]++
+ self.status.lock.Unlock()
+
+ return
+ }
+ */
+ bnode.block = block
+ bnode.blockBy = peerId
+ bnode.td = tdFromCurrentHead
+ self.status.lock.Lock()
+ self.status.values.Blocks++
+ self.status.values.BlocksInPool++
+ self.status.lock.Unlock()
+ }
}
diff --git a/blockpool/errors_test.go b/blockpool/errors_test.go
index c9bf79a23..0fbf94d7d 100644
--- a/blockpool/errors_test.go
+++ b/blockpool/errors_test.go
@@ -128,7 +128,7 @@ func TestErrInsufficientChainInfo(t *testing.T) {
}
func TestIncorrectTD(t *testing.T) {
- t.Skip() // td not tested atm
+ t.Skip("skipping TD check until network is healthy")
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
diff --git a/blockpool/peers.go b/blockpool/peers.go
index e5d6a16d6..90dc5daef 100644
--- a/blockpool/peers.go
+++ b/blockpool/peers.go
@@ -18,6 +18,7 @@ type peer struct {
// last known blockchain status
td *big.Int
+ tdAdvertised bool
currentBlockHash common.Hash
currentBlock *types.Block
parentHash common.Hash
@@ -135,21 +136,52 @@ func (self *peer) addError(code int, format string, params ...interface{}) {
}
// caller must hold peer lock
-func (self *peer) setChainInfo(td *big.Int, c common.Hash) {
- self.td = td
- self.currentBlockHash = c
- self.currentBlock = nil
- self.parentHash = common.Hash{}
- self.headSection = nil
+func (self *peer) setChainInfo(td *big.Int, currentBlockHash common.Hash) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ if self.currentBlockHash != currentBlockHash {
+ previousBlockHash := self.currentBlockHash
+ plog.Debugf("addPeer: Update peer <%s> with td %v and current block %s (was %v)", self.id, td, hex(currentBlockHash), hex(previousBlockHash))
+ self.td = td
+ self.currentBlockHash = currentBlockHash
+ self.currentBlock = nil
+ self.parentHash = common.Hash{}
+ self.headSection = nil
+ }
+ self.tdAdvertised = true
}
-// caller must hold peer lock
-func (self *peer) setChainInfoFromBlock(block *types.Block) {
- // use the optional TD to update peer td, this helps second best peer selection
+func (self *peer) setChainInfoFromBlock(block *types.Block) (td *big.Int, currentBlockHash common.Hash) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ hash := block.Hash()
+ // this happens when block came in a newblock message but
+ // also if sent in a blockmsg (for instance, if we requested, only if we
+ // dont apply on blockrequests the restriction of flood control)
+ currentBlockHash = self.currentBlockHash
+ if currentBlockHash == hash && self.currentBlock == nil {
+ // signal to head section process
+ plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) received\n", hex(hash), self.id, hex(currentBlockHash))
+ select {
+ case self.currentBlockC <- block:
+ case <-self.switchC:
+ }
+ return self.td, currentBlockHash
+ } else {
+ plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), self.id, hex(currentBlockHash))
+ return nil, currentBlockHash
+ }
+}
+
+// this will use the TD given by the first peer to update peer td, this helps second best peer selection
+// :FIXME: node
+func (self *peer) setChainInfoFromNode(n *node) {
// in case best peer is lost
- if block.Td != nil && block.Td.Cmp(self.td) > 0 {
- plog.DebugDetailf("setChainInfoFromBlock: update <%s> - head: %v->%v - TD: %v->%v", self.id, hex(self.currentBlockHash), hex(block.Hash()), self.td, block.Td)
- self.td = block.Td
+ block := n.block
+ hash := block.Hash()
+ if n.td != nil && n.td.Cmp(self.td) > 0 {
+ plog.DebugDetailf("AddBlock: update peer <%s> - head: %v->%v - TD: %v->%v", self.id, hex(self.currentBlockHash), hex(hash), self.td, n.td)
+ self.td = n.td
self.currentBlockHash = block.Hash()
self.parentHash = block.ParentHash()
self.currentBlock = block
@@ -218,17 +250,11 @@ func (self *peers) addPeer(
if found {
// when called on an already connected peer, it means a newBlockMsg is received
// peer head info is updated
- p.lock.Lock()
- if p.currentBlockHash != currentBlockHash {
- previousBlockHash = p.currentBlockHash
- plog.Debugf("addPeer: Update peer <%s> with td %v and current block %s (was %v)", id, td, hex(currentBlockHash), hex(previousBlockHash))
- p.setChainInfo(td, currentBlockHash)
-
- self.status.lock.Lock()
- self.status.values.NewBlocks++
- self.status.lock.Unlock()
- }
- p.lock.Unlock()
+ p.setChainInfo(td, currentBlockHash)
+ // FIXME: only count the same block once
+ self.status.lock.Lock()
+ self.status.values.NewBlocks++
+ self.status.lock.Unlock()
} else {
p = self.newPeer(td, currentBlockHash, id, requestBlockHashes, requestBlocks, peerError)
@@ -333,8 +359,8 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) {
close(oldp.switchC)
}
if newp != nil {
- newp.idleC = make(chan bool)
- newp.switchC = make(chan bool)
+ // newp.idleC = make(chan bool)
+ // newp.switchC = make(chan bool)
// if new best peer has no head section yet, create it and run it
// otherwise head section is an element of peer.sections
if newp.headSection == nil {
@@ -354,6 +380,9 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) {
}
}()
+ } else {
+ newp.idleC = make(chan bool)
+ newp.switchC = make(chan bool)
}
var connected = make(map[common.Hash]*section)
@@ -528,10 +557,12 @@ func (self *peer) getBlockHashes() bool {
// main loop for head section process
func (self *peer) run() {
- self.lock.RLock()
+ self.lock.Lock()
+ self.switchC = make(chan bool)
+ self.idleC = make(chan bool)
switchC := self.switchC
plog.Debugf("HeadSection: <%s> section process for head %s started", self.id, hex(self.currentBlockHash))
- self.lock.RUnlock()
+ self.lock.Unlock()
self.blockHashesRequestTimer = nil
diff --git a/blockpool/peers_test.go b/blockpool/peers_test.go
index e32bb6fc8..e5788e379 100644
--- a/blockpool/peers_test.go
+++ b/blockpool/peers_test.go
@@ -145,7 +145,6 @@ func TestAddPeer(t *testing.T) {
}
func TestPeerPromotionByTdOnBlock(t *testing.T) {
- t.Skip()
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
@@ -155,28 +154,26 @@ func TestPeerPromotionByTdOnBlock(t *testing.T) {
peer2 := blockPoolTester.newPeer("peer2", 4, 4)
blockPool.Start()
- blockPoolTester.tds = make(map[int]int)
- blockPoolTester.tds[3] = 3
- // pool
peer0.AddPeer()
peer0.serveBlocks(1, 2)
best := peer1.AddPeer()
// this tests that peer1 is not promoted over peer0 yet
if best {
t.Errorf("peer1 (TD=1) should not be set as best")
+ return
}
best = peer2.AddPeer()
peer2.serveBlocks(3, 4)
peer2.serveBlockHashes(4, 3, 2, 1)
- // hashes := blockPoolTester.hashPool.IndexesToHashes([]int{2, 3})
- peer1.serveBlocks(2, 3)
+ peer1.sendBlocks(3, 4)
blockPool.RemovePeer("peer2")
if blockPool.peers.best.id != "peer1" {
t.Errorf("peer1 (TD=3) should be set as best")
+ return
}
- peer1.serveBlocks(0, 1, 2)
+ peer1.serveBlocks(0, 1, 2, 3)
blockPool.Wait(waitTimeout)
blockPool.Stop()