From 42fb9652f56321d2752ffe7773806df11f3087b8 Mon Sep 17 00:00:00 2001 From: zelig <viktor.tron@gmail.com> Date: Tue, 7 Apr 2015 18:53:05 +0100 Subject: fix blockpool deadlock - do not break from headsection on error [remove peer after protocol quit will close switchC, until then head block can arrive and block on channel while keeping peers lock causing a deadlock.] - more careful locking in AddBlock --- blockpool/blockpool.go | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) (limited to 'blockpool/blockpool.go') diff --git a/blockpool/blockpool.go b/blockpool/blockpool.go index 3b3de928d..9871c5036 100644 --- a/blockpool/blockpool.go +++ b/blockpool/blockpool.go @@ -624,6 +624,7 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { entry := self.get(hash) // a peer's current head block is appearing the first time + sender.lock.Lock() if hash == sender.currentBlockHash { if sender.currentBlock == nil { plog.Debugf("AddBlock: add head block %s for peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) @@ -634,16 +635,28 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { self.status.values.Blocks++ self.status.values.BlocksInPool++ self.status.lock.Unlock() + select { + case sender.currentBlockC <- block: + case <-sender.switchC: + } } else { plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), peerId, hex(sender.currentBlockHash)) // signal to head section process - sender.currentBlockC <- block } + // self.wg.Add(1) + // go func() { + // timeout := time.After(1 * time.Second) + // select { + // case sender.currentBlockC <- block: + // case <-timeout: + // } + // self.wg.Done() + // }() + } else { plog.DebugDetailf("AddBlock: block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) - sender.lock.Lock() // update peer chain info if more recent than what we registered if block.Td != nil && block.Td.Cmp(sender.td) > 0 { sender.td = block.Td @@ -652,7 +665,6 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { sender.currentBlock = block sender.headSection = nil } - sender.lock.Unlock() /* @zelig !!! requested 5 hashes from both A & B. A responds sooner then B, process blocks. Close section. @@ -668,6 +680,8 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { } */ } + sender.lock.Unlock() + if entry == nil { return } -- cgit v1.2.3 From f546b486bf444d9601cf97b934b2974a9b4d58f8 Mon Sep 17 00:00:00 2001 From: zelig <viktor.tron@gmail.com> Date: Wed, 8 Apr 2015 03:34:20 +0100 Subject: introduce peers registry on nodes - TestPeerPromotionByTdOnBlock renamed and skipped for now test should pass iff if TD is updated based on an agreement - senders register in AddBlock, flag records if they are coming from newblock message (and therefore advertise their TD with the block) or block message (TODO: latter are stored on the cache and updated by checkTD call; protocol should also call AddBlock on newblock messages by non-best peers) - remove TD update from optional TD field in addBlock: this is no longer part of the eth protocol spec -> TODO: reflect in wiki - only initialise peer map if at least two --- blockpool/blockpool.go | 72 ++++++++++++++++++++++++++------------------------ 1 file changed, 37 insertions(+), 35 deletions(-) (limited to 'blockpool/blockpool.go') diff --git a/blockpool/blockpool.go b/blockpool/blockpool.go index 9871c5036..f9c8a64ab 100644 --- a/blockpool/blockpool.go +++ b/blockpool/blockpool.go @@ -132,6 +132,7 @@ type node struct { block *types.Block hashBy string blockBy string + peers map[string]bool td *big.Int } @@ -396,6 +397,7 @@ func (self *BlockPool) AddBlockHashes(next func() (common.Hash, bool), peerId st if entry := self.get(bestpeer.currentBlockHash); entry == nil { plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) head section starting from [%s] ", peerId, hex(bestpeer.currentBlockHash), hex(bestpeer.parentHash)) // if head block is not yet in the pool, create entry and start node list for section + node := &node{ hash: bestpeer.currentBlockHash, block: bestpeer.currentBlock, @@ -622,10 +624,14 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { self.status.lock.Unlock() entry := self.get(hash) - - // a peer's current head block is appearing the first time + blockIsCurrentHead := false sender.lock.Lock() + // a peer's current head block is appearing the first time if hash == sender.currentBlockHash { + // this happens when block came in a newblock message but + // also if sent in a blockmsg (for instance, if we requested, only if we + // dont apply on blockrequests the restriction of flood control) + blockIsCurrentHead = true if sender.currentBlock == nil { plog.Debugf("AddBlock: add head block %s for peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) sender.setChainInfoFromBlock(block) @@ -643,46 +649,29 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), peerId, hex(sender.currentBlockHash)) // signal to head section process } - // self.wg.Add(1) - // go func() { - // timeout := time.After(1 * time.Second) - // select { - // case sender.currentBlockC <- block: - // case <-timeout: - // } - // self.wg.Done() - // }() - } else { plog.DebugDetailf("AddBlock: block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) - // update peer chain info if more recent than what we registered - if block.Td != nil && block.Td.Cmp(sender.td) > 0 { - sender.td = block.Td - sender.currentBlockHash = block.Hash() - sender.parentHash = block.ParentHash() - sender.currentBlock = block - sender.headSection = nil - } - /* @zelig !!! - requested 5 hashes from both A & B. A responds sooner then B, process blocks. Close section. - delayed B sends you block ... UNREQUESTED. Blocked - if entry == nil { - plog.DebugDetailf("AddBlock: unrequested block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) - sender.addError(ErrUnrequestedBlock, "%x", hash) - - self.status.lock.Lock() - self.status.badPeers[peerId]++ - self.status.lock.Unlock() - return - } + requested 5 hashes from both A & B. A responds sooner then B, process blocks. Close section. + delayed B sends you block ... UNREQUESTED. Blocked + if entry == nil { + plog.DebugDetailf("AddBlock: unrequested block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) + sender.addError(ErrUnrequestedBlock, "%x", hash) + + self.status.lock.Lock() + self.status.badPeers[peerId]++ + self.status.lock.Unlock() + return + } */ } sender.lock.Unlock() if entry == nil { + // FIXME: here check the cache find or create node - + // put peer as blockBy! return } @@ -690,10 +679,22 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { node.lock.Lock() defer node.lock.Unlock() + // register peer on node as source + if node.peers == nil { + node.peers = make(map[string]bool) + } + FoundBlockCurrentHead, found := node.peers[sender.id] + if !found || FoundBlockCurrentHead { + // if found but not FoundBlockCurrentHead, then no update + // necessary (||) + node.peers[sender.id] = blockIsCurrentHead + // for those that are false, TD will update their head + // for those that are true, TD is checked ! + // this is checked at the time of TD calculation in checkTD + } // check if block already received if node.block != nil { plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already sent by <%s> ", hex(hash), peerId, hex(sender.currentBlockHash), node.blockBy) - return } // check if block is already inserted in the blockchain @@ -704,6 +705,8 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { /* @zelig needs discussing + Viktor: pow check can be delayed in a go routine and therefore cache + creation is not blocking // validate block for PoW if !self.verifyPoW(block) { plog.Warnf("AddBlock: invalid PoW on block %s from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) @@ -718,8 +721,7 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { */ node.block = block - node.blockBy = peerId - node.td = block.Td // optional field + // node.blockBy = peerId self.status.lock.Lock() self.status.values.Blocks++ -- cgit v1.2.3 From cbd0b42060d537d4d899b593be1ecd5ffdbd301a Mon Sep 17 00:00:00 2001 From: zelig <viktor.tron@gmail.com> Date: Wed, 8 Apr 2015 03:50:34 +0100 Subject: put back checkTD and unskip incorrectTD test --- blockpool/blockpool.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'blockpool/blockpool.go') diff --git a/blockpool/blockpool.go b/blockpool/blockpool.go index f9c8a64ab..d823d9898 100644 --- a/blockpool/blockpool.go +++ b/blockpool/blockpool.go @@ -785,10 +785,10 @@ func (self *BlockPool) checkTD(nodes ...*node) { if n.td != nil { plog.DebugDetailf("peer td %v =?= block td %v", n.td, n.block.Td) if n.td.Cmp(n.block.Td) != 0 { - //self.peers.peerError(n.blockBy, ErrIncorrectTD, "on block %x", n.hash) - //self.status.lock.Lock() - //self.status.badPeers[n.blockBy]++ - //self.status.lock.Unlock() + self.peers.peerError(n.blockBy, ErrIncorrectTD, "on block %x", n.hash) + self.status.lock.Lock() + self.status.badPeers[n.blockBy]++ + self.status.lock.Unlock() } } } -- cgit v1.2.3 From 262714fc6c269e0a3aa39892954b03db9418e649 Mon Sep 17 00:00:00 2001 From: zelig <viktor.tron@gmail.com> Date: Wed, 8 Apr 2015 12:43:55 +0100 Subject: future queued block support - queued bool // flag for blockpool to skip TD check - set to true when future block queued - in checkTD: skip check if queued - TODO: add test (insertchain sets future block) --- blockpool/blockpool.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'blockpool/blockpool.go') diff --git a/blockpool/blockpool.go b/blockpool/blockpool.go index d823d9898..e1891f5f4 100644 --- a/blockpool/blockpool.go +++ b/blockpool/blockpool.go @@ -782,7 +782,8 @@ LOOP: // check if block's actual TD (calculated after successful insertChain) is identical to TD advertised for peer's head block. func (self *BlockPool) checkTD(nodes ...*node) { for _, n := range nodes { - if n.td != nil { + // skip check if queued future block + if n.td != nil && !n.block.Queued() { plog.DebugDetailf("peer td %v =?= block td %v", n.td, n.block.Td) if n.td.Cmp(n.block.Td) != 0 { self.peers.peerError(n.blockBy, ErrIncorrectTD, "on block %x", n.hash) -- cgit v1.2.3 From e55747a074c7f280029b94c94418d60dff5d6773 Mon Sep 17 00:00:00 2001 From: zelig <viktor.tron@gmail.com> Date: Wed, 8 Apr 2015 20:33:54 +0100 Subject: fix deadlock issue in AddBlock - add peer switch channel arg to activateChain - no peer locking within - proper locking in AddBlock - fixes deadlock issue - comment out TD check and skip incorrect TD test again for hotfix --- blockpool/blockpool.go | 50 +++++++++++++++++++++++++++----------------------- 1 file changed, 27 insertions(+), 23 deletions(-) (limited to 'blockpool/blockpool.go') diff --git a/blockpool/blockpool.go b/blockpool/blockpool.go index e1891f5f4..0a130773d 100644 --- a/blockpool/blockpool.go +++ b/blockpool/blockpool.go @@ -377,7 +377,7 @@ func (self *BlockPool) AddBlockHashes(next func() (common.Hash, bool), peerId st var nodes []*node hash, ok = next() - bestpeer.lock.Lock() + bestpeer.lock.RLock() plog.Debugf("AddBlockHashes: peer <%s> starting from [%s] (peer head: %s)", peerId, hex(bestpeer.parentHash), hex(bestpeer.currentBlockHash)) @@ -423,7 +423,7 @@ func (self *BlockPool) AddBlockHashes(next func() (common.Hash, bool), peerId st } // the switch channel signals peerswitch event switchC := bestpeer.switchC - bestpeer.lock.Unlock() + bestpeer.lock.RUnlock() // iterate over hashes coming from peer (first round we have hash set above) LOOP: @@ -549,8 +549,10 @@ LOOP: In this case no activation should happen */ if parent != nil && !peerswitch { - self.activateChain(parent, bestpeer, nil) + bestpeer.lock.RLock() + self.activateChain(parent, bestpeer, bestpeer.switchC, nil) plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): parent section [%s]", peerId, hex(bestpeer.currentBlockHash), sectionhex(parent)) + bestpeer.lock.RUnlock() } /* @@ -625,33 +627,40 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { entry := self.get(hash) blockIsCurrentHead := false - sender.lock.Lock() + sender.lock.RLock() + currentBlockHash := sender.currentBlockHash + currentBlock := sender.currentBlock + currentBlockC := sender.currentBlockC + switchC := sender.switchC + sender.lock.RUnlock() + // a peer's current head block is appearing the first time - if hash == sender.currentBlockHash { + if hash == currentBlockHash { // this happens when block came in a newblock message but // also if sent in a blockmsg (for instance, if we requested, only if we // dont apply on blockrequests the restriction of flood control) blockIsCurrentHead = true - if sender.currentBlock == nil { - plog.Debugf("AddBlock: add head block %s for peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) + if currentBlock == nil { + sender.lock.Lock() sender.setChainInfoFromBlock(block) + sender.lock.Unlock() self.status.lock.Lock() self.status.values.BlockHashes++ self.status.values.Blocks++ self.status.values.BlocksInPool++ self.status.lock.Unlock() + // signal to head section process select { - case sender.currentBlockC <- block: - case <-sender.switchC: + case currentBlockC <- block: + case <-switchC: } } else { - plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), peerId, hex(sender.currentBlockHash)) - // signal to head section process + plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), peerId, hex(currentBlockHash)) } } else { - plog.DebugDetailf("AddBlock: block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) + plog.DebugDetailf("AddBlock: block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(currentBlockHash)) /* @zelig !!! requested 5 hashes from both A & B. A responds sooner then B, process blocks. Close section. @@ -667,7 +676,6 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { } */ } - sender.lock.Unlock() if entry == nil { // FIXME: here check the cache find or create node - @@ -721,7 +729,7 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { */ node.block = block - // node.blockBy = peerId + node.blockBy = peerId self.status.lock.Lock() self.status.values.Blocks++ @@ -735,11 +743,7 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { It activates the section process on incomplete sections with peer. It relinks orphaned sections with their parent if root block (and its parent hash) is known. */ -func (self *BlockPool) activateChain(sec *section, p *peer, connected map[common.Hash]*section) { - - p.lock.RLock() - switchC := p.switchC - p.lock.RUnlock() +func (self *BlockPool) activateChain(sec *section, p *peer, switchC chan bool, connected map[common.Hash]*section) { var i int @@ -786,10 +790,10 @@ func (self *BlockPool) checkTD(nodes ...*node) { if n.td != nil && !n.block.Queued() { plog.DebugDetailf("peer td %v =?= block td %v", n.td, n.block.Td) if n.td.Cmp(n.block.Td) != 0 { - self.peers.peerError(n.blockBy, ErrIncorrectTD, "on block %x", n.hash) - self.status.lock.Lock() - self.status.badPeers[n.blockBy]++ - self.status.lock.Unlock() + // self.peers.peerError(n.blockBy, ErrIncorrectTD, "on block %x", n.hash) + // self.status.lock.Lock() + // self.status.badPeers[n.blockBy]++ + // self.status.lock.Unlock() } } } -- cgit v1.2.3 From 0e2bc23148731a2e9fbb22885aced057e308335a Mon Sep 17 00:00:00 2001 From: zelig <viktor.tron@gmail.com> Date: Wed, 8 Apr 2015 21:14:49 +0100 Subject: uncomment future block TD check, add test for skipping TD check on future block --- blockpool/blockpool.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'blockpool/blockpool.go') diff --git a/blockpool/blockpool.go b/blockpool/blockpool.go index 0a130773d..2340eadae 100644 --- a/blockpool/blockpool.go +++ b/blockpool/blockpool.go @@ -790,10 +790,10 @@ func (self *BlockPool) checkTD(nodes ...*node) { if n.td != nil && !n.block.Queued() { plog.DebugDetailf("peer td %v =?= block td %v", n.td, n.block.Td) if n.td.Cmp(n.block.Td) != 0 { - // self.peers.peerError(n.blockBy, ErrIncorrectTD, "on block %x", n.hash) - // self.status.lock.Lock() - // self.status.badPeers[n.blockBy]++ - // self.status.lock.Unlock() + self.peers.peerError(n.blockBy, ErrIncorrectTD, "on block %x", n.hash) + self.status.lock.Lock() + self.status.badPeers[n.blockBy]++ + self.status.lock.Unlock() } } } -- cgit v1.2.3 From a009132c2429adca5ba058f46c0a460b287a4407 Mon Sep 17 00:00:00 2001 From: zelig <viktor.tron@gmail.com> Date: Thu, 9 Apr 2015 06:31:06 +0100 Subject: oops peer unlocked before return - fixes deadlock --- blockpool/blockpool.go | 1 + 1 file changed, 1 insertion(+) (limited to 'blockpool/blockpool.go') diff --git a/blockpool/blockpool.go b/blockpool/blockpool.go index 2340eadae..7a65768c7 100644 --- a/blockpool/blockpool.go +++ b/blockpool/blockpool.go @@ -384,6 +384,7 @@ func (self *BlockPool) AddBlockHashes(next func() (common.Hash, bool), peerId st // first check if we are building the head section of a peer's chain if bestpeer.parentHash == hash { if self.hasBlock(bestpeer.currentBlockHash) { + bestpeer.lock.RUnlock() return } /* -- cgit v1.2.3 From 764a802eaa33892447b82069ee1b1bb5a478837b Mon Sep 17 00:00:00 2001 From: obscuren <geffobscura@gmail.com> Date: Thu, 9 Apr 2015 17:27:43 +0200 Subject: Disabled TD check @zelig: Temporarily commented out TD check untill the rest of the network has been fixed. --- blockpool/blockpool.go | 2 ++ 1 file changed, 2 insertions(+) (limited to 'blockpool/blockpool.go') diff --git a/blockpool/blockpool.go b/blockpool/blockpool.go index 7a65768c7..f442259e0 100644 --- a/blockpool/blockpool.go +++ b/blockpool/blockpool.go @@ -790,12 +790,14 @@ func (self *BlockPool) checkTD(nodes ...*node) { // skip check if queued future block if n.td != nil && !n.block.Queued() { plog.DebugDetailf("peer td %v =?= block td %v", n.td, n.block.Td) + /* @zelig: Commented out temp untill the rest of the network has been fixed. if n.td.Cmp(n.block.Td) != 0 { self.peers.peerError(n.blockBy, ErrIncorrectTD, "on block %x", n.hash) self.status.lock.Lock() self.status.badPeers[n.blockBy]++ self.status.lock.Unlock() } + */ } } } -- cgit v1.2.3