From 94e525ae12bf3455ae434feb83ad834c8dcfa1ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 26 May 2015 12:44:09 +0300 Subject: eth, eth/downloader: fix #1098, elevate empty hash errors to peer drops --- eth/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'eth/sync.go') diff --git a/eth/sync.go b/eth/sync.go index 76e137630..56084f2f0 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -109,7 +109,7 @@ func (pm *ProtocolManager) synchronise(peer *peer) { case downloader.ErrBusy: glog.V(logger.Debug).Infof("Synchronisation already in progress") - case downloader.ErrTimeout, downloader.ErrBadPeer, downloader.ErrInvalidChain, downloader.ErrCrossCheckFailed: + case downloader.ErrTimeout, downloader.ErrBadPeer, downloader.ErrEmptyHashSet, downloader.ErrInvalidChain, downloader.ErrCrossCheckFailed: glog.V(logger.Debug).Infof("Removing peer %v: %v", peer.id, err) pm.removePeer(peer.id) -- cgit v1.2.3 From fdccce781e94819ec9dc13ef6540a33efd3b26c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 8 Jun 2015 19:24:56 +0300 Subject: eth: fetch announced hashes from origin, periodically --- eth/sync.go | 58 +++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 55 insertions(+), 3 deletions(-) (limited to 'eth/sync.go') diff --git a/eth/sync.go b/eth/sync.go index 56084f2f0..1a1cbdb47 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -5,15 +5,67 @@ import ( "sync/atomic" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" ) -// update periodically tries to synchronise with the network, both downloading -// hashes and blocks as well as retrieving cached ones. -func (pm *ProtocolManager) update() { +// blockAnnounce is the hash notification of the availability of a new block in +// the network. +type blockAnnounce struct { + hash common.Hash + peer *peer + time time.Time +} + +// fetcher is responsible for collecting hash notifications, and periodically +// checking all unknown ones and individually fetching them. +func (pm *ProtocolManager) fetcher() { + announces := make(map[common.Hash]*blockAnnounce) + request := make(map[*peer][]common.Hash) + cycle := time.Tick(notifyCheckCycle) + + // Iterate the block fetching until a quit is requested + for { + select { + case notifications := <-pm.newHashCh: + // A batch of hashes the notified, schedule them for retrieval + glog.V(logger.Detail).Infof("Scheduling %d hash announces from %s", len(notifications), notifications[0].peer.id) + for _, announce := range notifications { + announces[announce.hash] = announce + } + + case <-cycle: + // Check if any notified blocks failed to arrive + for hash, announce := range announces { + if time.Since(announce.time) > notifyArriveTimeout { + if !pm.chainman.HasBlock(hash) { + request[announce.peer] = append(request[announce.peer], hash) + } + delete(announces, hash) + } + } + if len(request) == 0 { + break + } + // Send out all block requests + for peer, hashes := range request { + glog.V(logger.Detail).Infof("Fetching specific %d blocks from %s", len(hashes), peer.id) + peer.requestBlocks(hashes) + } + request = make(map[*peer][]common.Hash) + + case <-pm.quitSync: + return + } + } +} + +// syncer is responsible for periodically synchronising with the network, both +// downloading hashes and blocks as well as retrieving cached ones. +func (pm *ProtocolManager) syncer() { forceSync := time.Tick(forceSyncCycle) blockProc := time.Tick(blockProcCycle) blockProcPend := int32(0) -- cgit v1.2.3 From 9ed166c196b07047299579e5ea2b6ece26aec5c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 8 Jun 2015 20:38:39 +0300 Subject: eth: split and handle explicitly vs. download requested blocks --- eth/sync.go | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) (limited to 'eth/sync.go') diff --git a/eth/sync.go b/eth/sync.go index 1a1cbdb47..f761f3cd1 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -12,6 +12,16 @@ import ( "github.com/ethereum/go-ethereum/logger/glog" ) +const ( + forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available + blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process + notifyCheckCycle = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching + notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested + notifyFetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block + minDesiredPeerCount = 5 // Amount of peers desired to start syncing + blockProcAmount = 256 +) + // blockAnnounce is the hash notification of the availability of a new block in // the network. type blockAnnounce struct { @@ -25,6 +35,7 @@ type blockAnnounce struct { func (pm *ProtocolManager) fetcher() { announces := make(map[common.Hash]*blockAnnounce) request := make(map[*peer][]common.Hash) + pending := make(map[common.Hash]*blockAnnounce) cycle := time.Tick(notifyCheckCycle) // Iterate the block fetching until a quit is requested @@ -38,11 +49,18 @@ func (pm *ProtocolManager) fetcher() { } case <-cycle: + // Clean up any expired block fetches + for hash, announce := range pending { + if time.Since(announce.time) > notifyFetchTimeout { + delete(pending, hash) + } + } // Check if any notified blocks failed to arrive for hash, announce := range announces { if time.Since(announce.time) > notifyArriveTimeout { if !pm.chainman.HasBlock(hash) { request[announce.peer] = append(request[announce.peer], hash) + pending[hash] = announce } delete(announces, hash) } @@ -57,6 +75,44 @@ func (pm *ProtocolManager) fetcher() { } request = make(map[*peer][]common.Hash) + case filter := <-pm.newBlockCh: + // Blocks arrived, extract any explicit requests, return all else + var blocks types.Blocks + select { + case blocks = <-filter: + case <-pm.quitSync: + return + } + + fetch, sync := []*types.Block{}, []*types.Block{} + for _, block := range blocks { + hash := block.Hash() + if _, ok := pending[hash]; ok { + fetch = append(fetch, block) + } else { + sync = append(sync, block) + } + } + + select { + case filter <- sync: + case <-pm.quitSync: + return + } + // If any explicit fetches were replied to, import them + if len(fetch) > 0 { + go func() { + for _, block := range fetch { + if announce := pending[block.Hash()]; announce != nil { + if err := pm.importBlock(announce.peer, block, nil); err != nil { + glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err) + return + } + } + } + }() + } + case <-pm.quitSync: return } -- cgit v1.2.3 From 8216bb901c9fbdcde427cc42ca7e82ec3ee2e8e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 9 Jun 2015 00:37:10 +0300 Subject: eth: clean up pending announce download map, polish logs --- eth/sync.go | 38 +++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) (limited to 'eth/sync.go') diff --git a/eth/sync.go b/eth/sync.go index f761f3cd1..dd7414da8 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -43,7 +43,7 @@ func (pm *ProtocolManager) fetcher() { select { case notifications := <-pm.newHashCh: // A batch of hashes the notified, schedule them for retrieval - glog.V(logger.Detail).Infof("Scheduling %d hash announces from %s", len(notifications), notifications[0].peer.id) + glog.V(logger.Debug).Infof("Scheduling %d hash announcements from %s", len(notifications), notifications[0].peer.id) for _, announce := range notifications { announces[announce.hash] = announce } @@ -70,13 +70,13 @@ func (pm *ProtocolManager) fetcher() { } // Send out all block requests for peer, hashes := range request { - glog.V(logger.Detail).Infof("Fetching specific %d blocks from %s", len(hashes), peer.id) + glog.V(logger.Debug).Infof("Explicitly fetching %d blocks from %s", len(hashes), peer.id) peer.requestBlocks(hashes) } request = make(map[*peer][]common.Hash) case filter := <-pm.newBlockCh: - // Blocks arrived, extract any explicit requests, return all else + // Blocks arrived, extract any explicit fetches, return all else var blocks types.Blocks select { case blocks = <-filter: @@ -84,26 +84,38 @@ func (pm *ProtocolManager) fetcher() { return } - fetch, sync := []*types.Block{}, []*types.Block{} + explicit, download := []*types.Block{}, []*types.Block{} for _, block := range blocks { hash := block.Hash() + + // Filter explicitly requested blocks from hash announcements if _, ok := pending[hash]; ok { - fetch = append(fetch, block) + // Discard if already imported by other means + if !pm.chainman.HasBlock(hash) { + explicit = append(explicit, block) + } else { + delete(pending, hash) + } } else { - sync = append(sync, block) + download = append(download, block) } } select { - case filter <- sync: + case filter <- download: case <-pm.quitSync: return } // If any explicit fetches were replied to, import them - if len(fetch) > 0 { + if count := len(explicit); count > 0 { + glog.V(logger.Debug).Infof("Importing %d explicitly fetched blocks", count) go func() { - for _, block := range fetch { - if announce := pending[block.Hash()]; announce != nil { + for _, block := range explicit { + hash := block.Hash() + + // Make sure there's still something pending to import + if announce := pending[hash]; announce != nil { + delete(pending, hash) if err := pm.importBlock(announce.peer, block, nil); err != nil { glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err) return @@ -207,15 +219,15 @@ func (pm *ProtocolManager) synchronise(peer *peer) { return } // Get the hashes from the peer (synchronously) - glog.V(logger.Debug).Infof("Attempting synchronisation: %v, 0x%x", peer.id, peer.recentHash) + glog.V(logger.Detail).Infof("Attempting synchronisation: %v, 0x%x", peer.id, peer.recentHash) err := pm.downloader.Synchronise(peer.id, peer.recentHash) switch err { case nil: - glog.V(logger.Debug).Infof("Synchronisation completed") + glog.V(logger.Detail).Infof("Synchronisation completed") case downloader.ErrBusy: - glog.V(logger.Debug).Infof("Synchronisation already in progress") + glog.V(logger.Detail).Infof("Synchronisation already in progress") case downloader.ErrTimeout, downloader.ErrBadPeer, downloader.ErrEmptyHashSet, downloader.ErrInvalidChain, downloader.ErrCrossCheckFailed: glog.V(logger.Debug).Infof("Removing peer %v: %v", peer.id, err) -- cgit v1.2.3 From 44147d057dd91d8b35dd6f4ed025bdb4baf225eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 9 Jun 2015 14:27:44 +0300 Subject: eth: fix data race accessing peer.recentHash --- eth/sync.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'eth/sync.go') diff --git a/eth/sync.go b/eth/sync.go index dd7414da8..b3184364f 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -214,14 +214,15 @@ func (pm *ProtocolManager) synchronise(peer *peer) { // FIXME if we have the hash in our chain and the TD of the peer is // much higher than ours, something is wrong with us or the peer. // Check if the hash is on our own chain - if pm.chainman.HasBlock(peer.recentHash) { + head := peer.Head() + if pm.chainman.HasBlock(head) { glog.V(logger.Debug).Infoln("Synchronisation canceled: head already known") return } // Get the hashes from the peer (synchronously) - glog.V(logger.Detail).Infof("Attempting synchronisation: %v, 0x%x", peer.id, peer.recentHash) + glog.V(logger.Detail).Infof("Attempting synchronisation: %v, 0x%x", peer.id, head) - err := pm.downloader.Synchronise(peer.id, peer.recentHash) + err := pm.downloader.Synchronise(peer.id, head) switch err { case nil: glog.V(logger.Detail).Infof("Synchronisation completed") -- cgit v1.2.3 From f86707713c42da02b70a3a389684e19e902d8759 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 9 Jun 2015 14:56:27 +0300 Subject: eth: fix data race accessing peer.td --- eth/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'eth/sync.go') diff --git a/eth/sync.go b/eth/sync.go index b3184364f..3a33fe149 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -208,7 +208,7 @@ func (pm *ProtocolManager) synchronise(peer *peer) { return } // Make sure the peer's TD is higher than our own. If not drop. - if peer.td.Cmp(pm.chainman.Td()) <= 0 { + if peer.Td().Cmp(pm.chainman.Td()) <= 0 { return } // FIXME if we have the hash in our chain and the TD of the peer is -- cgit v1.2.3 From d09ead546cbdf8e4659e65581f23715101f5b686 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 9 Jun 2015 15:09:15 +0300 Subject: eth: fix a data race in the hash announcement processing --- eth/sync.go | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) (limited to 'eth/sync.go') diff --git a/eth/sync.go b/eth/sync.go index 3a33fe149..8e4e3cfbe 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -109,17 +109,25 @@ func (pm *ProtocolManager) fetcher() { // If any explicit fetches were replied to, import them if count := len(explicit); count > 0 { glog.V(logger.Debug).Infof("Importing %d explicitly fetched blocks", count) + + // Create a closure with the retrieved blocks and origin peers + peers := make([]*peer, 0, count) + blocks := make([]*types.Block, 0, count) + for _, block := range explicit { + hash := block.Hash() + if announce := pending[hash]; announce != nil { + peers = append(peers, announce.peer) + blocks = append(blocks, block) + + delete(pending, hash) + } + } + // Run the importer on a new thread go func() { - for _, block := range explicit { - hash := block.Hash() - - // Make sure there's still something pending to import - if announce := pending[hash]; announce != nil { - delete(pending, hash) - if err := pm.importBlock(announce.peer, block, nil); err != nil { - glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err) - return - } + for i := 0; i < len(blocks); i++ { + if err := pm.importBlock(peers[i], blocks[i], nil); err != nil { + glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err) + return } } }() -- cgit v1.2.3 From 6c73a5980640581903d8f56b3912b22641d5195c Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 9 Jun 2015 12:03:14 +0200 Subject: eth: limit number of sent transactions based on message size Nodes that are out of sync will queue many transactions, which causes the initial transactions message to grow very large. Larger transactions messages can make communication impossible if the message is too big to send. Big transactions messages also exhaust egress bandwidth, which degrades other peer connections. The new approach to combat these issues is to send transactions in smaller batches. This commit introduces a new goroutine that handles delivery of all initial transaction transfers. Size-limited packs of transactions are sent to one peer at a time, conserving precious egress bandwidth. --- eth/sync.go | 94 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) (limited to 'eth/sync.go') diff --git a/eth/sync.go b/eth/sync.go index 8e4e3cfbe..a25d4d4fd 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -2,6 +2,7 @@ package eth import ( "math" + "math/rand" "sync/atomic" "time" @@ -10,6 +11,7 @@ import ( "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/p2p/discover" ) const ( @@ -20,6 +22,10 @@ const ( notifyFetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block minDesiredPeerCount = 5 // Amount of peers desired to start syncing blockProcAmount = 256 + + // This is the target size for the packs of transactions sent by txsyncLoop. + // A pack can get larger than this if a single transactions exceeds this size. + txsyncPackSize = 100 * 1024 ) // blockAnnounce is the hash notification of the availability of a new block in @@ -30,6 +36,94 @@ type blockAnnounce struct { time time.Time } +type txsync struct { + p *peer + txs []*types.Transaction +} + +// syncTransactions starts sending all currently pending transactions to the given peer. +func (pm *ProtocolManager) syncTransactions(p *peer) { + txs := pm.txpool.GetTransactions() + if len(txs) == 0 { + return + } + select { + case pm.txsyncCh <- &txsync{p, txs}: + case <-pm.quitSync: + } +} + +// txsyncLoop takes care of the initial transaction sync for each new +// connection. When a new peer appears, we relay all currently pending +// transactions. In order to minimise egress bandwidth usage, we send +// the transactions in small packs to one peer at a time. +func (pm *ProtocolManager) txsyncLoop() { + var ( + pending = make(map[discover.NodeID]*txsync) + sending = false // whether a send is active + pack = new(txsync) // the pack that is being sent + done = make(chan error, 1) // result of the send + ) + + // send starts a sending a pack of transactions from the sync. + send := func(s *txsync) { + // Fill pack with transactions up to the target size. + size := common.StorageSize(0) + pack.p = s.p + pack.txs = pack.txs[:0] + for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ { + pack.txs = append(pack.txs, s.txs[i]) + size += s.txs[i].Size() + } + // Remove the transactions that will be sent. + s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])] + if len(s.txs) == 0 { + delete(pending, s.p.ID()) + } + // Send the pack in the background. + glog.V(logger.Detail).Infof("%v: sending %d transactions (%v)", s.p.Peer, len(pack.txs), size) + sending = true + go func() { done <- pack.p.sendTransactions(pack.txs) }() + } + + // pick chooses the next pending sync. + pick := func() *txsync { + if len(pending) == 0 { + return nil + } + n := rand.Intn(len(pending)) + 1 + for _, s := range pending { + if n--; n == 0 { + return s + } + } + return nil + } + + for { + select { + case s := <-pm.txsyncCh: + pending[s.p.ID()] = s + if !sending { + send(s) + } + case err := <-done: + sending = false + // Stop tracking peers that cause send failures. + if err != nil { + glog.V(logger.Debug).Infof("%v: tx send failed: %v", pack.p.Peer, err) + delete(pending, pack.p.ID()) + } + // Schedule the next send. + if s := pick(); s != nil { + send(s) + } + case <-pm.quitSync: + return + } + } +} + // fetcher is responsible for collecting hash notifications, and periodically // checking all unknown ones and individually fetching them. func (pm *ProtocolManager) fetcher() { -- cgit v1.2.3