aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-05-08 20:22:48 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-05-08 20:22:48 +0800
commitbd5720f4804788d91154a10ef5bb10425c502658 (patch)
treed8914c112f86404314060e6d04029fe212c50ac0
parent9d188f73b58ee1fe4bda00a9536bda4056755f2c (diff)
downloadgo-tangerine-bd5720f4804788d91154a10ef5bb10425c502658.tar
go-tangerine-bd5720f4804788d91154a10ef5bb10425c502658.tar.gz
go-tangerine-bd5720f4804788d91154a10ef5bb10425c502658.tar.bz2
go-tangerine-bd5720f4804788d91154a10ef5bb10425c502658.tar.lz
go-tangerine-bd5720f4804788d91154a10ef5bb10425c502658.tar.xz
go-tangerine-bd5720f4804788d91154a10ef5bb10425c502658.tar.zst
go-tangerine-bd5720f4804788d91154a10ef5bb10425c502658.zip
eth, eth/downloader: handle sync errors a bit more gracefully
-rw-r--r--eth/downloader/downloader.go28
-rw-r--r--eth/downloader/downloader_test.go6
-rw-r--r--eth/handler.go8
-rw-r--r--eth/sync.go54
4 files changed, 48 insertions, 48 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index ef2a193ff..a97cce1ef 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -24,12 +24,12 @@ var (
blockTtl = 20 * time.Second // The amount of time it takes for a block request to time out
errLowTd = errors.New("peer's TD is too low")
- errBusy = errors.New("busy")
+ ErrBusy = errors.New("busy")
errUnknownPeer = errors.New("peer's unknown or unhealthy")
- ErrBadPeer = errors.New("action from bad peer ignored")
+ errBadPeer = errors.New("action from bad peer ignored")
errNoPeers = errors.New("no peers to keep download active")
errPendingQueue = errors.New("pending items in queue")
- errTimeout = errors.New("timeout")
+ ErrTimeout = errors.New("timeout")
errEmptyHashSet = errors.New("empty hash set by peer")
errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
errAlreadyInPool = errors.New("hash already in pool")
@@ -68,7 +68,7 @@ type Downloader struct {
getBlock getBlockFn
// Status
- synchronizing int32
+ synchronising int32
// Channels
newPeerCh chan *peer
@@ -119,15 +119,15 @@ func (d *Downloader) UnregisterPeer(id string) {
delete(d.peers, id)
}
-// Synchronize will select the peer and use it for synchronizing. If an empty string is given
+// Synchronise will select the peer and use it for synchronising. If an empty string is given
// it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
// checks fail an error will be returned. This method is synchronous
-func (d *Downloader) Synchronize(id string, hash common.Hash) error {
+func (d *Downloader) Synchronise(id string, hash common.Hash) error {
// Make sure only one goroutine is ever allowed past this point at once
- if !atomic.CompareAndSwapInt32(&d.synchronizing, 0, 1) {
- return nil
+ if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
+ return ErrBusy
}
- defer atomic.StoreInt32(&d.synchronizing, 0)
+ defer atomic.StoreInt32(&d.synchronising, 0)
// Abort if the queue still contains some leftover data
if _, cached := d.queue.Size(); cached > 0 {
@@ -272,7 +272,7 @@ out:
// the zero hash.
if p == nil || (hash == common.Hash{}) {
d.queue.Reset()
- return errTimeout
+ return ErrTimeout
}
// set p to the active peer. this will invalidate any hashes that may be returned
@@ -282,7 +282,7 @@ out:
glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)\n", p.id)
}
}
- glog.V(logger.Detail).Infof("Downloaded hashes (%d) in %v\n", d.queue.Pending(), time.Since(start))
+ glog.V(logger.Debug).Infof("Downloaded hashes (%d) in %v\n", d.queue.Pending(), time.Since(start))
return nil
}
@@ -384,7 +384,6 @@ out:
}
}
}
-
glog.V(logger.Detail).Infoln("Downloaded block(s) in", time.Since(start))
return nil
@@ -404,11 +403,10 @@ func (d *Downloader) AddHashes(id string, hashes []common.Hash) error {
return fmt.Errorf("received hashes from %s while active peer is %s", id, d.activePeer)
}
- if glog.V(logger.Detail) && len(hashes) != 0 {
+ if glog.V(logger.Debug) && len(hashes) != 0 {
from, to := hashes[0], hashes[len(hashes)-1]
- glog.Infof("adding %d (T=%d) hashes [ %x / %x ] from: %s\n", len(hashes), d.queue.Pending(), from[:4], to[:4], id)
+ glog.V(logger.Debug).Infof("adding %d (T=%d) hashes [ %x / %x ] from: %s\n", len(hashes), d.queue.Pending(), from[:4], to[:4], id)
}
-
d.hashCh <- hashPack{id, hashes}
return nil
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index f3402794b..8ccc4d1a5 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -61,7 +61,7 @@ func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types
func (dl *downloadTester) sync(peerId string, hash common.Hash) error {
dl.activePeerId = peerId
- return dl.downloader.Synchronize(peerId, hash)
+ return dl.downloader.Synchronise(peerId, hash)
}
func (dl *downloadTester) hasBlock(hash common.Hash) bool {
@@ -217,13 +217,13 @@ func TestThrottling(t *testing.T) {
}
}()
- // Synchronize the two threads and verify
+ // Synchronise the two threads and verify
err := <-errc
done <- struct{}{}
<-done
if err != nil {
- t.Fatalf("failed to synchronize blocks: %v", err)
+ t.Fatalf("failed to synchronise blocks: %v", err)
}
if len(took) != targetBlocks {
t.Fatalf("downloaded block mismatch: have %v, want %v", len(took), targetBlocks)
diff --git a/eth/handler.go b/eth/handler.go
index b2018f336..41b6728d9 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -19,9 +19,9 @@ import (
)
const (
- peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount
- blockProcTimer = 500 * time.Millisecond
- minDesiredPeerCount = 5 // Amount of peers desired to start syncing
+ forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
+ blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process
+ minDesiredPeerCount = 5 // Amount of peers desired to start syncing
blockProcAmount = 256
)
@@ -324,7 +324,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
}
self.BroadcastBlock(hash, request.Block)
} else {
- go self.synchronize(p)
+ go self.synchronise(p)
}
default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
diff --git a/eth/sync.go b/eth/sync.go
index b259c1d47..c49f5209d 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -12,10 +12,8 @@ import (
// Sync contains all synchronisation code for the eth protocol
func (pm *ProtocolManager) update() {
- // itimer is used to determine when to start ignoring `minDesiredPeerCount`
- itimer := time.NewTimer(peerCountTimeout)
- // btimer is used for picking of blocks from the downloader
- btimer := time.Tick(blockProcTimer)
+ forceSync := time.Tick(forceSyncCycle)
+ blockProc := time.Tick(blockProcCycle)
for {
select {
@@ -24,27 +22,22 @@ func (pm *ProtocolManager) update() {
if len(pm.peers) < minDesiredPeerCount {
break
}
-
- // Find the best peer
+ // Find the best peer and synchronise with it
peer := getBestPeer(pm.peers)
if peer == nil {
- glog.V(logger.Debug).Infoln("Sync attempt cancelled. No peers available")
+ glog.V(logger.Debug).Infoln("Sync attempt canceled. No peers available")
}
+ go pm.synchronise(peer)
- itimer.Stop()
- go pm.synchronize(peer)
- case <-itimer.C:
- // The timer will make sure that the downloader keeps an active state
- // in which it attempts to always check the network for highest td peers
- // Either select the peer or restart the timer if no peers could
- // be selected.
+ case <-forceSync:
+ // Force a sync even if not enough peers are present
if peer := getBestPeer(pm.peers); peer != nil {
- go pm.synchronize(peer)
- } else {
- itimer.Reset(5 * time.Second)
+ go pm.synchronise(peer)
}
- case <-btimer:
+ case <-blockProc:
+ // Try to pull some blocks from the downloaded
go pm.processBlocks()
+
case <-pm.quitSync:
return
}
@@ -59,11 +52,11 @@ func (pm *ProtocolManager) processBlocks() error {
pm.wg.Add(1)
defer pm.wg.Done()
+ // Take a batch of blocks (will return nil if a previous batch has not reached the chain yet)
blocks := pm.downloader.TakeBlocks()
if len(blocks) == 0 {
return nil
}
-
glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].Number(), blocks[len(blocks)-1].Number())
for len(blocks) != 0 && !pm.quit {
@@ -77,7 +70,7 @@ func (pm *ProtocolManager) processBlocks() error {
return nil
}
-func (pm *ProtocolManager) synchronize(peer *peer) {
+func (pm *ProtocolManager) synchronise(peer *peer) {
// Make sure the peer's TD is higher than our own. If not drop.
if peer.td.Cmp(pm.chainman.Td()) <= 0 {
return
@@ -89,12 +82,21 @@ func (pm *ProtocolManager) synchronize(peer *peer) {
return
}
// Get the hashes from the peer (synchronously)
- err := pm.downloader.Synchronize(peer.id, peer.recentHash)
- if err != nil && err == downloader.ErrBadPeer {
- glog.V(logger.Debug).Infoln("removed peer from peer set due to bad action")
+ glog.V(logger.Debug).Infof("Attempting synchronisation: %v, 0x%x", peer.id, peer.recentHash)
+
+ err := pm.downloader.Synchronise(peer.id, peer.recentHash)
+ switch err {
+ case nil:
+ glog.V(logger.Debug).Infof("Synchronisation completed")
+
+ case downloader.ErrBusy:
+ glog.V(logger.Debug).Infof("Synchronisation already in progress")
+
+ case downloader.ErrTimeout:
+ glog.V(logger.Debug).Infof("Removing peer %v due to sync timeout", peer.id)
pm.removePeer(peer)
- } else if err != nil {
- // handle error
- glog.V(logger.Detail).Infoln("error downloading:", err)
+
+ default:
+ glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
}
}