aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader')
-rw-r--r--eth/downloader/downloader.go132
-rw-r--r--eth/downloader/downloader_test.go42
-rw-r--r--eth/downloader/peer.go15
-rw-r--r--eth/downloader/queue.go3
-rw-r--r--eth/downloader/synchronous.go79
5 files changed, 70 insertions, 201 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index cfc494b2f..60d908758 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -39,7 +39,6 @@ var (
type hashCheckFn func(common.Hash) bool
type chainInsertFn func(types.Blocks) error
type hashIterFn func() (common.Hash, error)
-type currentTdFn func() *big.Int
type blockPack struct {
peerId string
@@ -61,7 +60,6 @@ type Downloader struct {
// Callbacks
hasBlock hashCheckFn
insertChain chainInsertFn
- currentTd currentTdFn
// Status
fetchingHashes int32
@@ -70,27 +68,20 @@ type Downloader struct {
// Channels
newPeerCh chan *peer
- syncCh chan syncPack
hashCh chan []common.Hash
blockCh chan blockPack
- quit chan struct{}
}
-func New(hasBlock hashCheckFn, insertChain chainInsertFn, currentTd currentTdFn) *Downloader {
+func New(hasBlock hashCheckFn, insertChain chainInsertFn) *Downloader {
downloader := &Downloader{
queue: newqueue(),
peers: make(peers),
hasBlock: hasBlock,
insertChain: insertChain,
- currentTd: currentTd,
newPeerCh: make(chan *peer, 1),
- syncCh: make(chan syncPack, 1),
hashCh: make(chan []common.Hash, 1),
blockCh: make(chan blockPack, 1),
- quit: make(chan struct{}),
}
- go downloader.peerHandler()
- go downloader.update()
return downloader
}
@@ -99,18 +90,17 @@ func (d *Downloader) Stats() (current int, max int) {
return d.queue.blockHashes.Size(), d.queue.fetchPool.Size() + d.queue.hashPool.Size()
}
-func (d *Downloader) RegisterPeer(id string, td *big.Int, hash common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) error {
+func (d *Downloader) RegisterPeer(id string, hash common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) error {
d.mu.Lock()
defer d.mu.Unlock()
- glog.V(logger.Detail).Infoln("Register peer", id, "TD =", td)
+ glog.V(logger.Detail).Infoln("Register peer", id)
// Create a new peer and add it to the list of known peers
- peer := newPeer(id, td, hash, getHashes, getBlocks)
+ peer := newPeer(id, hash, getHashes, getBlocks)
// add peer to our peer set
d.peers[id] = peer
// broadcast new peer
- d.newPeerCh <- peer
return nil
}
@@ -125,72 +115,59 @@ func (d *Downloader) UnregisterPeer(id string) {
delete(d.peers, id)
}
-func (d *Downloader) peerHandler() {
- // itimer is used to determine when to start ignoring `minDesiredPeerCount`
- itimer := time.NewTimer(peerCountTimeout)
-out:
- for {
- select {
- case <-d.newPeerCh:
- // Meet the `minDesiredPeerCount` before we select our best peer
- if len(d.peers) < minDesiredPeerCount {
- break
- }
- itimer.Stop()
-
- d.selectPeer(d.peers.bestPeer())
- case <-itimer.C:
- // The timer will make sure that the downloader keeps an active state
- // in which it attempts to always check the network for highest td peers
- // Either select the peer or restart the timer if no peers could
- // be selected.
- if peer := d.peers.bestPeer(); peer != nil {
- d.selectPeer(d.peers.bestPeer())
- } else {
- itimer.Reset(5 * time.Second)
- }
- case <-d.quit:
- break out
- }
- }
-}
-
-func (d *Downloader) selectPeer(p *peer) {
+// SynchroniseWithPeer will select the peer and use it for synchronising. If an empty string is given
+// it will use the best peer possible and synchronise if it's TD is higher than our own. If any of the
+// checks fail an error will be returned. This method is synchronous
+func (d *Downloader) Synchronise(id string, hash common.Hash) error {
// Make sure it's doing neither. Once done we can restart the
// downloading process if the TD is higher. For now just get on
// with whatever is going on. This prevents unecessary switching.
if d.isBusy() {
- return
+ return errBusy
}
- // selected peer must be better than our own
- // XXX we also check the peer's recent hash to make sure we
- // don't have it. Some peers report (i think) incorrect TD.
- if p.td.Cmp(d.currentTd()) <= 0 || d.hasBlock(p.recentHash) {
- return
+
+ // Fetch the peer using the id or throw an error if the peer couldn't be found
+ p := d.peers[id]
+ if p == nil {
+ return errUnknownPeer
}
- glog.V(logger.Detail).Infoln("New peer with highest TD =", p.td)
- d.syncCh <- syncPack{p, p.recentHash, false}
+ // Get the hash from the peer and initiate the downloading progress.
+ err := d.getFromPeer(p, hash, false)
+ if err != nil {
+ return err
+ }
+ return d.process(p)
}
-func (d *Downloader) update() {
-out:
- for {
- select {
- case sync := <-d.syncCh:
- var peer *peer = sync.peer
- err := d.getFromPeer(peer, sync.hash, sync.ignoreInitial)
- if err != nil {
- glog.V(logger.Detail).Infoln(err)
- break
- }
+func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) error {
+ d.activePeer = p.id
+
+ glog.V(logger.Detail).Infoln("Synchronising with the network using:", p.id)
+ // Start the fetcher. This will block the update entirely
+ // interupts need to be send to the appropriate channels
+ // respectively.
+ if err := d.startFetchingHashes(p, hash, ignoreInitial); err != nil {
+ // handle error
+ glog.V(logger.Debug).Infoln("Error fetching hashes:", err)
+ // XXX Reset
+ return err
+ }
- d.process()
- case <-d.quit:
- break out
- }
+ // Start fetching blocks in paralel. The strategy is simple
+ // take any available peers, seserve a chunk for each peer available,
+ // let the peer deliver the chunkn and periodically check if a peer
+ // has timedout. When done downloading, process blocks.
+ if err := d.startFetchingBlocks(p); err != nil {
+ glog.V(logger.Debug).Infoln("Error downloading blocks:", err)
+ // XXX reset
+ return err
}
+
+ glog.V(logger.Detail).Infoln("Sync completed")
+
+ return nil
}
// XXX Make synchronous
@@ -403,13 +380,12 @@ func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) error
}
peer.mu.Lock()
- peer.td = td
peer.recentHash = block.Hash()
peer.mu.Unlock()
peer.promote()
glog.V(logger.Detail).Infoln("Inserting new block from:", id)
- d.queue.addBlock(id, block, td)
+ d.queue.addBlock(id, block)
// if neither go ahead to process
if d.isBusy() {
@@ -429,10 +405,10 @@ func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) error
}
}
- return d.process()
+ return d.process(peer)
}
-func (d *Downloader) process() error {
+func (d *Downloader) process(peer *peer) error {
atomic.StoreInt32(&d.processingBlocks, 1)
defer atomic.StoreInt32(&d.processingBlocks, 0)
@@ -458,18 +434,8 @@ func (d *Downloader) process() error {
// grandparents can be requested and queued.
err = d.insertChain(blocks[:max])
if err != nil && core.IsParentErr(err) {
- glog.V(logger.Debug).Infoln("Aborting process due to missing parent. Fetching hashes")
-
- // TODO change this. This shite
- for i, block := range blocks[:max] {
- if !d.hasBlock(block.ParentHash()) {
- d.syncCh <- syncPack{d.peers.bestPeer(), block.Hash(), true}
- // remove processed blocks
- blocks = blocks[i:]
+ glog.V(logger.Debug).Infoln("Aborting process due to missing parent.")
- break
- }
- }
break
} else if err != nil {
// immediatly unregister the false peer but do not disconnect
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index 1d449cfba..8843ca0c7 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -49,7 +49,7 @@ type downloadTester struct {
func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types.Block) *downloadTester {
tester := &downloadTester{t: t, hashes: hashes, blocks: blocks, done: make(chan bool)}
- downloader := New(tester.hasBlock, tester.insertChain, func() *big.Int { return new(big.Int) })
+ downloader := New(tester.hasBlock, tester.insertChain)
tester.downloader = downloader
return tester
@@ -65,10 +65,6 @@ func (dl *downloadTester) hasBlock(hash common.Hash) bool {
func (dl *downloadTester) insertChain(blocks types.Blocks) error {
dl.insertedBlocks += len(blocks)
- if len(dl.blocks)-1 <= dl.insertedBlocks {
- dl.done <- true
- }
-
return nil
}
@@ -93,14 +89,14 @@ func (dl *downloadTester) getBlocks(id string) func([]common.Hash) error {
func (dl *downloadTester) newPeer(id string, td *big.Int, hash common.Hash) {
dl.pcount++
- dl.downloader.RegisterPeer(id, td, hash, dl.getHashes, dl.getBlocks(id))
+ dl.downloader.RegisterPeer(id, hash, dl.getHashes, dl.getBlocks(id))
}
func (dl *downloadTester) badBlocksPeer(id string, td *big.Int, hash common.Hash) {
dl.pcount++
// This bad peer never returns any blocks
- dl.downloader.RegisterPeer(id, td, hash, dl.getHashes, func([]common.Hash) error {
+ dl.downloader.RegisterPeer(id, hash, dl.getHashes, func([]common.Hash) error {
return nil
})
}
@@ -112,7 +108,8 @@ func TestDownload(t *testing.T) {
minDesiredPeerCount = 4
blockTtl = 1 * time.Second
- hashes := createHashes(0, 1000)
+ targetBlocks := 1000
+ hashes := createHashes(0, targetBlocks)
blocks := createBlocksFromHashes(hashes)
tester := newTester(t, hashes, blocks)
@@ -121,21 +118,21 @@ func TestDownload(t *testing.T) {
tester.badBlocksPeer("peer3", big.NewInt(0), common.Hash{})
tester.badBlocksPeer("peer4", big.NewInt(0), common.Hash{})
-success:
- select {
- case <-tester.done:
- break success
- case <-time.After(10 * time.Second): // XXX this could actually fail on a slow computer
- t.Error("timeout")
+ err := tester.downloader.Synchronise("peer1", hashes[0])
+ if err != nil {
+ t.Error("download error", err)
+ }
+
+ if tester.insertedBlocks != targetBlocks {
+ t.Error("expected", targetBlocks, "have", tester.insertedBlocks)
}
}
func TestMissing(t *testing.T) {
- t.Skip()
-
glog.SetV(logger.Detail)
glog.SetToStderr(true)
+ targetBlocks := 1000
hashes := createHashes(0, 1000)
extraHashes := createHashes(1001, 1003)
blocks := createBlocksFromHashes(append(extraHashes, hashes...))
@@ -146,13 +143,12 @@ func TestMissing(t *testing.T) {
hashes = append(extraHashes, hashes[:len(hashes)-1]...)
tester.newPeer("peer2", big.NewInt(0), common.Hash{})
-success1:
- select {
- case <-tester.done:
- break success1
- case <-time.After(10 * time.Second): // XXX this could actually fail on a slow computer
- t.Error("timout")
+ err := tester.downloader.Synchronise("peer1", hashes[0])
+ if err != nil {
+ t.Error("download error", err)
}
- tester.downloader.AddBlock("peer2", blocks[hashes[len(hashes)-1]], big.NewInt(10001))
+ if tester.insertedBlocks != targetBlocks {
+ t.Error("expected", targetBlocks, "have", tester.insertedBlocks)
+ }
}
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index bcb8ad43a..91977f592 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -2,7 +2,6 @@ package downloader
import (
"errors"
- "math/big"
"sync"
"github.com/ethereum/go-ethereum/common"
@@ -51,16 +50,6 @@ func (p peers) getPeer(id string) *peer {
return p[id]
}
-func (p peers) bestPeer() *peer {
- var peer *peer
- for _, cp := range p {
- if peer == nil || cp.td.Cmp(peer.td) > 0 {
- peer = cp
- }
- }
- return peer
-}
-
// peer represents an active peer
type peer struct {
state int // Peer state (working, idle)
@@ -68,7 +57,6 @@ type peer struct {
mu sync.RWMutex
id string
- td *big.Int
recentHash common.Hash
ignored *set.Set
@@ -78,10 +66,9 @@ type peer struct {
}
// create a new peer
-func newPeer(id string, td *big.Int, hash common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer {
+func newPeer(id string, hash common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer {
return &peer{
id: id,
- td: td,
recentHash: hash,
getHashes: getHashes,
getBlocks: getBlocks,
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index adbc2a0d0..a21a44706 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -2,7 +2,6 @@ package downloader
import (
"math"
- "math/big"
"sync"
"time"
@@ -93,7 +92,7 @@ func (c *queue) has(hash common.Hash) bool {
return c.hashPool.Has(hash) || c.fetchPool.Has(hash)
}
-func (c *queue) addBlock(id string, block *types.Block, td *big.Int) {
+func (c *queue) addBlock(id string, block *types.Block) {
c.mu.Lock()
defer c.mu.Unlock()
diff --git a/eth/downloader/synchronous.go b/eth/downloader/synchronous.go
deleted file mode 100644
index 7bb49d24e..000000000
--- a/eth/downloader/synchronous.go
+++ /dev/null
@@ -1,79 +0,0 @@
-package downloader
-
-import (
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/logger"
- "github.com/ethereum/go-ethereum/logger/glog"
-)
-
-// THIS IS PENDING AND TO DO CHANGES FOR MAKING THE DOWNLOADER SYNCHRONOUS
-
-// SynchroniseWithPeer will select the peer and use it for synchronising. If an empty string is given
-// it will use the best peer possible and synchronise if it's TD is higher than our own. If any of the
-// checks fail an error will be returned. This method is synchronous
-func (d *Downloader) SynchroniseWithPeer(id string) (types.Blocks, error) {
- // Check if we're busy
- if d.isBusy() {
- return nil, errBusy
- }
-
- // Attempt to select a peer. This can either be nothing, which returns, best peer
- // or selected peer. If no peer could be found an error will be returned
- var p *peer
- if len(id) == 0 {
- p = d.peers[id]
- if p == nil {
- return nil, errUnknownPeer
- }
- } else {
- p = d.peers.bestPeer()
- }
-
- // Make sure our td is lower than the peer's td
- if p.td.Cmp(d.currentTd()) <= 0 || d.hasBlock(p.recentHash) {
- return nil, errLowTd
- }
-
- // Get the hash from the peer and initiate the downloading progress.
- err := d.getFromPeer(p, p.recentHash, false)
- if err != nil {
- return nil, err
- }
-
- return d.queue.blocks, nil
-}
-
-// Synchronise will synchronise using the best peer.
-func (d *Downloader) Synchronise() (types.Blocks, error) {
- return d.SynchroniseWithPeer("")
-}
-
-func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) error {
- d.activePeer = p.id
-
- glog.V(logger.Detail).Infoln("Synchronising with the network using:", p.id)
- // Start the fetcher. This will block the update entirely
- // interupts need to be send to the appropriate channels
- // respectively.
- if err := d.startFetchingHashes(p, hash, ignoreInitial); err != nil {
- // handle error
- glog.V(logger.Debug).Infoln("Error fetching hashes:", err)
- // XXX Reset
- return err
- }
-
- // Start fetching blocks in paralel. The strategy is simple
- // take any available peers, seserve a chunk for each peer available,
- // let the peer deliver the chunkn and periodically check if a peer
- // has timedout. When done downloading, process blocks.
- if err := d.startFetchingBlocks(p); err != nil {
- glog.V(logger.Debug).Infoln("Error downloading blocks:", err)
- // XXX reset
- return err
- }
-
- glog.V(logger.Detail).Infoln("Sync completed")
-
- return nil
-}