From da7332a7317bda9003e5e8a4aaa4b1710c69b664 Mon Sep 17 00:00:00 2001 From: zelig Date: Fri, 10 Apr 2015 16:31:00 +0100 Subject: td update from node - reorg and simplify AddBlock - introduce nodeCache - TestPeerPromotionByTdOnBlock unskipped and passes - move switchC/idleC channel creation around: solves deadlock (now respects the contract with section process: either can activate or complete at any one time) --- blockpool/blockpool.go | 189 +++++++++++++++++++++-------------------------- blockpool/errors_test.go | 2 +- blockpool/peers.go | 85 ++++++++++++++------- blockpool/peers_test.go | 11 +-- 4 files changed, 148 insertions(+), 139 deletions(-) (limited to 'blockpool') diff --git a/blockpool/blockpool.go b/blockpool/blockpool.go index f442259e0..3ed2e92c7 100644 --- a/blockpool/blockpool.go +++ b/blockpool/blockpool.go @@ -169,6 +169,9 @@ type BlockPool struct { // alloc-easy pool of hash slices hashSlicePool chan []common.Hash + nodeCache map[common.Hash]*node + nodeCacheLock sync.RWMutex + // waitgroup is used in tests to wait for result-critical routines // as well as in determining idle / syncing status wg sync.WaitGroup // @@ -210,6 +213,7 @@ func (self *BlockPool) Start() { self.Config.init() self.hashSlicePool = make(chan []common.Hash, 150) + self.nodeCache = make(map[common.Hash]*node) self.status = newStatus() self.quit = make(chan bool) self.pool = make(map[common.Hash]*entry) @@ -615,127 +619,104 @@ LOOP: If the block received is the head block of the current best peer, signal it to the head section process */ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { - hash := block.Hash() - - sender, _ := self.peers.getPeer(peerId) - if sender == nil { - return - } self.status.lock.Lock() self.status.activePeers[peerId]++ self.status.lock.Unlock() - entry := self.get(hash) - blockIsCurrentHead := false - sender.lock.RLock() - currentBlockHash := sender.currentBlockHash - currentBlock := sender.currentBlock - currentBlockC := sender.currentBlockC - switchC := sender.switchC - sender.lock.RUnlock() - - // a peer's current head block is appearing the first time - if hash == currentBlockHash { - // this happens when block came in a newblock message but - // also if sent in a blockmsg (for instance, if we requested, only if we - // dont apply on blockrequests the restriction of flood control) - blockIsCurrentHead = true - if currentBlock == nil { - sender.lock.Lock() - sender.setChainInfoFromBlock(block) - sender.lock.Unlock() - - self.status.lock.Lock() - self.status.values.BlockHashes++ - self.status.values.Blocks++ - self.status.values.BlocksInPool++ - self.status.lock.Unlock() - // signal to head section process - select { - case currentBlockC <- block: - case <-switchC: - } - } else { - plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), peerId, hex(currentBlockHash)) - } - } else { - - plog.DebugDetailf("AddBlock: block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(currentBlockHash)) - - /* @zelig !!! - requested 5 hashes from both A & B. A responds sooner then B, process blocks. Close section. - delayed B sends you block ... UNREQUESTED. Blocked - if entry == nil { - plog.DebugDetailf("AddBlock: unrequested block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) - sender.addError(ErrUnrequestedBlock, "%x", hash) - - self.status.lock.Lock() - self.status.badPeers[peerId]++ - self.status.lock.Unlock() - return - } - */ - } + hash := block.Hash() - if entry == nil { - // FIXME: here check the cache find or create node - - // put peer as blockBy! + // check if block is already inserted in the blockchain + if self.hasBlock(hash) { return } - node := entry.node - node.lock.Lock() - defer node.lock.Unlock() - - // register peer on node as source - if node.peers == nil { - node.peers = make(map[string]bool) - } - FoundBlockCurrentHead, found := node.peers[sender.id] - if !found || FoundBlockCurrentHead { - // if found but not FoundBlockCurrentHead, then no update - // necessary (||) - node.peers[sender.id] = blockIsCurrentHead - // for those that are false, TD will update their head - // for those that are true, TD is checked ! - // this is checked at the time of TD calculation in checkTD - } - // check if block already received - if node.block != nil { - plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already sent by <%s> ", hex(hash), peerId, hex(sender.currentBlockHash), node.blockBy) - } - - // check if block is already inserted in the blockchain - if self.hasBlock(hash) { - plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already in the blockchain", hex(hash), peerId, hex(sender.currentBlockHash)) + sender, _ := self.peers.getPeer(peerId) + if sender == nil { return } + tdFromCurrentHead, currentBlockHash := sender.setChainInfoFromBlock(block) - /* - @zelig needs discussing - Viktor: pow check can be delayed in a go routine and therefore cache - creation is not blocking - // validate block for PoW - if !self.verifyPoW(block) { - plog.Warnf("AddBlock: invalid PoW on block %s from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) - sender.addError(ErrInvalidPoW, "%x", hash) + entry := self.get(hash) - self.status.lock.Lock() - self.status.badPeers[peerId]++ - self.status.lock.Unlock() + /* @zelig !!! + requested 5 hashes from both A & B. A responds sooner then B, process blocks. Close section. + delayed B sends you block ... UNREQUESTED. Blocked + if entry == nil { + plog.DebugDetailf("AddBlock: unrequested block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) + sender.addError(ErrUnrequestedBlock, "%x", hash) + + self.status.lock.Lock() + self.status.badPeers[peerId]++ + self.status.lock.Unlock() + return + } + */ - return + var bnode *node + if entry == nil { + self.nodeCacheLock.Lock() + bnode, _ = self.nodeCache[hash] + if bnode == nil { + bnode = &node{ + hash: currentBlockHash, + block: block, + hashBy: peerId, + blockBy: peerId, + td: tdFromCurrentHead, } - */ + self.nodeCache[hash] = bnode + } + self.nodeCacheLock.Unlock() + } else { + bnode = entry.node + } - node.block = block - node.blockBy = peerId + bnode.lock.Lock() + defer bnode.lock.Unlock() - self.status.lock.Lock() - self.status.values.Blocks++ - self.status.values.BlocksInPool++ - self.status.lock.Unlock() + // check if block already received + if bnode.block != nil { + plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already sent by <%s> ", hex(hash), peerId, hex(sender.currentBlockHash), bnode.blockBy) + // register peer on node as source + if bnode.peers == nil { + bnode.peers = make(map[string]bool) + } + foundBlockCurrentHead, found := bnode.peers[sender.id] + if !found || foundBlockCurrentHead { + // if found but not FoundBlockCurrentHead, then no update + // necessary (||) + bnode.peers[sender.id] = (currentBlockHash == hash) + // for those that are false, TD will update their head + // for those that are true, TD is checked ! + // this is checked at the time of TD calculation in checkTD + } + sender.setChainInfoFromNode(bnode) + } else { + /* + @zelig needs discussing + Viktor: pow check can be delayed in a go routine and therefore cache + creation is not blocking + // validate block for PoW + if !self.verifyPoW(block) { + plog.Warnf("AddBlock: invalid PoW on block %s from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) + sender.addError(ErrInvalidPoW, "%x", hash) + + self.status.lock.Lock() + self.status.badPeers[peerId]++ + self.status.lock.Unlock() + + return + } + */ + bnode.block = block + bnode.blockBy = peerId + bnode.td = tdFromCurrentHead + self.status.lock.Lock() + self.status.values.Blocks++ + self.status.values.BlocksInPool++ + self.status.lock.Unlock() + } } diff --git a/blockpool/errors_test.go b/blockpool/errors_test.go index c9bf79a23..0fbf94d7d 100644 --- a/blockpool/errors_test.go +++ b/blockpool/errors_test.go @@ -128,7 +128,7 @@ func TestErrInsufficientChainInfo(t *testing.T) { } func TestIncorrectTD(t *testing.T) { - t.Skip() // td not tested atm + t.Skip("skipping TD check until network is healthy") test.LogInit() _, blockPool, blockPoolTester := newTestBlockPool(t) blockPoolTester.blockChain[0] = nil diff --git a/blockpool/peers.go b/blockpool/peers.go index e5d6a16d6..90dc5daef 100644 --- a/blockpool/peers.go +++ b/blockpool/peers.go @@ -18,6 +18,7 @@ type peer struct { // last known blockchain status td *big.Int + tdAdvertised bool currentBlockHash common.Hash currentBlock *types.Block parentHash common.Hash @@ -135,21 +136,52 @@ func (self *peer) addError(code int, format string, params ...interface{}) { } // caller must hold peer lock -func (self *peer) setChainInfo(td *big.Int, c common.Hash) { - self.td = td - self.currentBlockHash = c - self.currentBlock = nil - self.parentHash = common.Hash{} - self.headSection = nil +func (self *peer) setChainInfo(td *big.Int, currentBlockHash common.Hash) { + self.lock.Lock() + defer self.lock.Unlock() + if self.currentBlockHash != currentBlockHash { + previousBlockHash := self.currentBlockHash + plog.Debugf("addPeer: Update peer <%s> with td %v and current block %s (was %v)", self.id, td, hex(currentBlockHash), hex(previousBlockHash)) + self.td = td + self.currentBlockHash = currentBlockHash + self.currentBlock = nil + self.parentHash = common.Hash{} + self.headSection = nil + } + self.tdAdvertised = true } -// caller must hold peer lock -func (self *peer) setChainInfoFromBlock(block *types.Block) { - // use the optional TD to update peer td, this helps second best peer selection +func (self *peer) setChainInfoFromBlock(block *types.Block) (td *big.Int, currentBlockHash common.Hash) { + self.lock.Lock() + defer self.lock.Unlock() + hash := block.Hash() + // this happens when block came in a newblock message but + // also if sent in a blockmsg (for instance, if we requested, only if we + // dont apply on blockrequests the restriction of flood control) + currentBlockHash = self.currentBlockHash + if currentBlockHash == hash && self.currentBlock == nil { + // signal to head section process + plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) received\n", hex(hash), self.id, hex(currentBlockHash)) + select { + case self.currentBlockC <- block: + case <-self.switchC: + } + return self.td, currentBlockHash + } else { + plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), self.id, hex(currentBlockHash)) + return nil, currentBlockHash + } +} + +// this will use the TD given by the first peer to update peer td, this helps second best peer selection +// :FIXME: node +func (self *peer) setChainInfoFromNode(n *node) { // in case best peer is lost - if block.Td != nil && block.Td.Cmp(self.td) > 0 { - plog.DebugDetailf("setChainInfoFromBlock: update <%s> - head: %v->%v - TD: %v->%v", self.id, hex(self.currentBlockHash), hex(block.Hash()), self.td, block.Td) - self.td = block.Td + block := n.block + hash := block.Hash() + if n.td != nil && n.td.Cmp(self.td) > 0 { + plog.DebugDetailf("AddBlock: update peer <%s> - head: %v->%v - TD: %v->%v", self.id, hex(self.currentBlockHash), hex(hash), self.td, n.td) + self.td = n.td self.currentBlockHash = block.Hash() self.parentHash = block.ParentHash() self.currentBlock = block @@ -218,17 +250,11 @@ func (self *peers) addPeer( if found { // when called on an already connected peer, it means a newBlockMsg is received // peer head info is updated - p.lock.Lock() - if p.currentBlockHash != currentBlockHash { - previousBlockHash = p.currentBlockHash - plog.Debugf("addPeer: Update peer <%s> with td %v and current block %s (was %v)", id, td, hex(currentBlockHash), hex(previousBlockHash)) - p.setChainInfo(td, currentBlockHash) - - self.status.lock.Lock() - self.status.values.NewBlocks++ - self.status.lock.Unlock() - } - p.lock.Unlock() + p.setChainInfo(td, currentBlockHash) + // FIXME: only count the same block once + self.status.lock.Lock() + self.status.values.NewBlocks++ + self.status.lock.Unlock() } else { p = self.newPeer(td, currentBlockHash, id, requestBlockHashes, requestBlocks, peerError) @@ -333,8 +359,8 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) { close(oldp.switchC) } if newp != nil { - newp.idleC = make(chan bool) - newp.switchC = make(chan bool) + // newp.idleC = make(chan bool) + // newp.switchC = make(chan bool) // if new best peer has no head section yet, create it and run it // otherwise head section is an element of peer.sections if newp.headSection == nil { @@ -354,6 +380,9 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) { } }() + } else { + newp.idleC = make(chan bool) + newp.switchC = make(chan bool) } var connected = make(map[common.Hash]*section) @@ -528,10 +557,12 @@ func (self *peer) getBlockHashes() bool { // main loop for head section process func (self *peer) run() { - self.lock.RLock() + self.lock.Lock() + self.switchC = make(chan bool) + self.idleC = make(chan bool) switchC := self.switchC plog.Debugf("HeadSection: <%s> section process for head %s started", self.id, hex(self.currentBlockHash)) - self.lock.RUnlock() + self.lock.Unlock() self.blockHashesRequestTimer = nil diff --git a/blockpool/peers_test.go b/blockpool/peers_test.go index e32bb6fc8..e5788e379 100644 --- a/blockpool/peers_test.go +++ b/blockpool/peers_test.go @@ -145,7 +145,6 @@ func TestAddPeer(t *testing.T) { } func TestPeerPromotionByTdOnBlock(t *testing.T) { - t.Skip() test.LogInit() _, blockPool, blockPoolTester := newTestBlockPool(t) blockPoolTester.blockChain[0] = nil @@ -155,28 +154,26 @@ func TestPeerPromotionByTdOnBlock(t *testing.T) { peer2 := blockPoolTester.newPeer("peer2", 4, 4) blockPool.Start() - blockPoolTester.tds = make(map[int]int) - blockPoolTester.tds[3] = 3 - // pool peer0.AddPeer() peer0.serveBlocks(1, 2) best := peer1.AddPeer() // this tests that peer1 is not promoted over peer0 yet if best { t.Errorf("peer1 (TD=1) should not be set as best") + return } best = peer2.AddPeer() peer2.serveBlocks(3, 4) peer2.serveBlockHashes(4, 3, 2, 1) - // hashes := blockPoolTester.hashPool.IndexesToHashes([]int{2, 3}) - peer1.serveBlocks(2, 3) + peer1.sendBlocks(3, 4) blockPool.RemovePeer("peer2") if blockPool.peers.best.id != "peer1" { t.Errorf("peer1 (TD=3) should be set as best") + return } - peer1.serveBlocks(0, 1, 2) + peer1.serveBlocks(0, 1, 2, 3) blockPool.Wait(waitTimeout) blockPool.Stop() -- cgit v1.2.3