aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--eth/backend.go6
-rw-r--r--eth/downloader/downloader.go101
-rw-r--r--eth/protocol.go128
3 files changed, 191 insertions, 44 deletions
diff --git a/eth/backend.go b/eth/backend.go
index cde7b167d..3d5c4ba09 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -16,6 +16,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
@@ -134,6 +135,7 @@ type Ethereum struct {
accountManager *accounts.Manager
whisper *whisper.Whisper
pow *ethash.Ethash
+ downloader *downloader.Downloader
net *p2p.Server
eventMux *event.TypeMux
@@ -208,6 +210,7 @@ func New(config *Config) (*Ethereum, error) {
}
eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.EventMux())
+ eth.downloader = downloader.New(eth.chainManager.HasBlock, eth.chainManager.InsertChain, eth.chainManager.Td)
eth.pow = ethash.New(eth.chainManager)
eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State)
eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux())
@@ -226,7 +229,7 @@ func New(config *Config) (*Ethereum, error) {
return nil, err
}
- ethProto := EthProtocol(config.ProtocolVersion, config.NetworkId, eth.txPool, eth.chainManager, eth.blockPool)
+ ethProto := EthProtocol(config.ProtocolVersion, config.NetworkId, eth.txPool, eth.chainManager, eth.blockPool, eth.downloader)
protocols := []p2p.Protocol{ethProto}
if config.Shh {
protocols = append(protocols, eth.whisper.Protocol())
@@ -363,6 +366,7 @@ func (s *Ethereum) ClientVersion() string { return s.clientVersio
func (s *Ethereum) EthVersion() int { return s.ethVersionId }
func (s *Ethereum) NetVersion() int { return s.netVersionId }
func (s *Ethereum) ShhVersion() int { return s.shhVersionId }
+func (s *Ethereum) Downloader() *downloader.Downloader { return s.downloader }
// Start the ethereum
func (s *Ethereum) Start() error {
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 4e795af6d..5f9d9ed74 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -1,6 +1,7 @@
package downloader
import (
+ "errors"
"math"
"math/big"
"sync"
@@ -20,6 +21,12 @@ const (
minDesiredPeerCount = 3 // Amount of peers desired to start syncing
)
+var (
+ errLowTd = errors.New("peer's TD is too low")
+ errBusy = errors.New("busy")
+ errUnknownPeer = errors.New("peer's unknown or unhealthy")
+)
+
type hashCheckFn func(common.Hash) bool
type chainInsertFn func(types.Blocks) error
type hashIterFn func() (common.Hash, error)
@@ -82,18 +89,19 @@ func (d *Downloader) RegisterPeer(id string, td *big.Int, hash common.Hash, getH
d.mu.Lock()
defer d.mu.Unlock()
- glog.V(logger.Detail).Infoln("Register peer", id)
+ glog.V(logger.Detail).Infoln("Register peer", id, "TD =", td)
// Create a new peer and add it to the list of known peers
peer := newPeer(id, td, hash, getHashes, getBlocks)
// add peer to our peer set
d.peers[id] = peer
// broadcast new peer
- d.newPeerCh <- peer
+ //d.newPeerCh <- peer
return nil
}
+// UnregisterPeer unregister's a peer. This will prevent any action from the specified peer.
func (d *Downloader) UnregisterPeer(id string) {
d.mu.Lock()
defer d.mu.Unlock()
@@ -103,6 +111,73 @@ func (d *Downloader) UnregisterPeer(id string) {
delete(d.peers, id)
}
+// SynchroniseWithPeer will select the peer and use it for synchronising. If an empty string is given
+// it will use the best peer possible and synchronise if it's TD is higher than our own. If any of the
+// checks fail an error will be returned. This method is synchronous
+func (d *Downloader) SynchroniseWithPeer(id string) (types.Blocks, error) {
+ // Check if we're busy
+ if d.isFetchingHashes() || d.isDownloadingBlocks() || d.isProcessing() {
+ return nil, errBusy
+ }
+
+ // Attempt to select a peer. This can either be nothing, which returns, best peer
+ // or selected peer. If no peer could be found an error will be returned
+ var p *peer
+ if len(id) == 0 {
+ p = d.peers[id]
+ if p == nil {
+ return nil, errUnknownPeer
+ }
+ } else {
+ p = d.peers.bestPeer()
+ }
+
+ // Make sure our td is lower than the peer's td
+ if p.td.Cmp(d.currentTd()) <= 0 || d.hasBlock(p.recentHash) {
+ return nil, errLowTd
+ }
+
+ // Get the hash from the peer and initiate the downloading progress.
+ err := d.getFromPeer(p, p.recentHash, false)
+ if err != nil {
+ return nil, err
+ }
+
+ return d.queue.blocks, nil
+}
+
+// Synchronise will synchronise using the best peer.
+func (d *Downloader) Synchronise() (types.Blocks, error) {
+ return d.SynchroniseWithPeer("")
+}
+
+func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) error {
+ glog.V(logger.Detail).Infoln("Synchronising with the network using:", p.id)
+ // Start the fetcher. This will block the update entirely
+ // interupts need to be send to the appropriate channels
+ // respectively.
+ if err := d.startFetchingHashes(p, hash, ignoreInitial); err != nil {
+ // handle error
+ glog.V(logger.Debug).Infoln("Error fetching hashes:", err)
+ // XXX Reset
+ return err
+ }
+
+ // Start fetching blocks in paralel. The strategy is simple
+ // take any available peers, seserve a chunk for each peer available,
+ // let the peer deliver the chunkn and periodically check if a peer
+ // has timedout. When done downloading, process blocks.
+ if err := d.startFetchingBlocks(p); err != nil {
+ glog.V(logger.Debug).Infoln("Error downloading blocks:", err)
+ // XXX reset
+ return err
+ }
+
+ glog.V(logger.Detail).Infoln("Sync completed")
+
+ return nil
+}
+
func (d *Downloader) peerHandler() {
// itimer is used to determine when to start ignoring `minDesiredPeerCount`
//itimer := time.NewTicker(5 * time.Second)
@@ -116,11 +191,18 @@ out:
if len(d.peers) < minDesiredPeerCount {
break
}
+
d.selectPeer(d.peers.bestPeer())
case <-itimer.C:
// The timer will make sure that the downloader keeps an active state
// in which it attempts to always check the network for highest td peers
- d.selectPeer(d.peers.bestPeer())
+ // Either select the peer or restart the timer if no peers could
+ // be selected.
+ if peer := d.peers.bestPeer(); peer != nil {
+ d.selectPeer(d.peers.bestPeer())
+ } else {
+ itimer.Reset(5 * time.Second)
+ }
case <-d.quit:
break out
}
@@ -142,6 +224,7 @@ func (d *Downloader) selectPeer(p *peer) {
glog.V(logger.Detail).Infoln("New peer with highest TD =", p.td)
d.syncCh <- syncPack{p, p.recentHash, false}
}
+
}
func (d *Downloader) update() {
@@ -150,7 +233,7 @@ out:
select {
case sync := <-d.syncCh:
selectedPeer := sync.peer
- glog.V(logger.Detail).Infoln("Synchronising with network using:", selectedPeer.id)
+ glog.V(logger.Detail).Infoln("Synchronising with the network using:", selectedPeer.id)
// Start the fetcher. This will block the update entirely
// interupts need to be send to the appropriate channels
// respectively.
@@ -245,9 +328,13 @@ out:
for {
select {
case blockPack := <-d.blockCh:
- d.peers[blockPack.peerId].promote()
- d.queue.deliver(blockPack.peerId, blockPack.blocks)
- d.peers.setState(blockPack.peerId, idleState)
+ // If the peer was previously banned and failed to deliver it's pack
+ // in a reasonable time frame, ignore it's message.
+ if d.peers[blockPack.peerId] != nil {
+ d.peers[blockPack.peerId].promote()
+ d.queue.deliver(blockPack.peerId, blockPack.blocks)
+ d.peers.setState(blockPack.peerId, idleState)
+ }
case <-ticker.C:
// If there are unrequested hashes left start fetching
// from the available peers.
diff --git a/eth/protocol.go b/eth/protocol.go
index 1a19307db..66f3cbac8 100644
--- a/eth/protocol.go
+++ b/eth/protocol.go
@@ -7,6 +7,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/errs"
+ "github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/p2p"
@@ -18,8 +19,8 @@ const (
NetworkId = 0
ProtocolLength = uint64(8)
ProtocolMaxMsgSize = 10 * 1024 * 1024
- maxHashes = 256
- maxBlocks = 64
+ maxHashes = 512
+ maxBlocks = 128
)
// eth protocol message codes
@@ -64,6 +65,7 @@ type ethProtocol struct {
txPool txPool
chainManager chainManager
blockPool blockPool
+ downloader *downloader.Downloader
peer *p2p.Peer
id string
rw p2p.MsgReadWriter
@@ -114,25 +116,26 @@ type statusMsgData struct {
// main entrypoint, wrappers starting a server running the eth protocol
// use this constructor to attach the protocol ("class") to server caps
// the Dev p2p layer then runs the protocol instance on each peer
-func EthProtocol(protocolVersion, networkId int, txPool txPool, chainManager chainManager, blockPool blockPool) p2p.Protocol {
+func EthProtocol(protocolVersion, networkId int, txPool txPool, chainManager chainManager, blockPool blockPool, downloader *downloader.Downloader) p2p.Protocol {
return p2p.Protocol{
Name: "eth",
Version: uint(protocolVersion),
Length: ProtocolLength,
Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
- return runEthProtocol(protocolVersion, networkId, txPool, chainManager, blockPool, peer, rw)
+ return runEthProtocol(protocolVersion, networkId, txPool, chainManager, blockPool, downloader, peer, rw)
},
}
}
// the main loop that handles incoming messages
// note RemovePeer in the post-disconnect hook
-func runEthProtocol(protocolVersion, networkId int, txPool txPool, chainManager chainManager, blockPool blockPool, peer *p2p.Peer, rw p2p.MsgReadWriter) (err error) {
+func runEthProtocol(protocolVersion, networkId int, txPool txPool, chainManager chainManager, blockPool blockPool, downloader *downloader.Downloader, peer *p2p.Peer, rw p2p.MsgReadWriter) (err error) {
id := peer.ID()
self := &ethProtocol{
txPool: txPool,
chainManager: chainManager,
blockPool: blockPool,
+ downloader: downloader,
rw: rw,
peer: peer,
protocolVersion: protocolVersion,
@@ -211,24 +214,33 @@ func (self *ethProtocol) handle() error {
case BlockHashesMsg:
msgStream := rlp.NewStream(msg.Payload)
- if _, err := msgStream.List(); err != nil {
- return err
+
+ var hashes []common.Hash
+ if err := msgStream.Decode(&hashes); err != nil {
+ break
}
+ self.downloader.HashCh <- hashes
- var i int
- iter := func() (hash common.Hash, ok bool) {
- err := msgStream.Decode(&hash)
- if err == rlp.EOL {
- return common.Hash{}, false
- } else if err != nil {
- self.protoError(ErrDecode, "msg %v: after %v hashes : %v", msg, i, err)
- return common.Hash{}, false
+ /*
+ if _, err := msgStream.List(); err != nil {
+ return err
}
- i++
- return hash, true
- }
- self.blockPool.AddBlockHashes(iter, self.id)
+ var i int
+ iter := func() (hash common.Hash, err error) {
+ err = msgStream.Decode(&hash)
+ if err == rlp.EOL {
+ return common.Hash{}, err
+ } else if err != nil {
+ return common.Hash{}, fmt.Errorf("Fetching hashes err (%d): %v", i, err)
+ }
+
+ i++
+ return hash, nil
+ }
+ self.downloader.HashCh <- iter
+ //self.blockPool.AddBlockHashes(iter, self.id)
+ */
case GetBlocksMsg:
msgStream := rlp.NewStream(msg.Payload)
@@ -260,23 +272,34 @@ func (self *ethProtocol) handle() error {
case BlocksMsg:
msgStream := rlp.NewStream(msg.Payload)
- if _, err := msgStream.List(); err != nil {
- return err
+
+ var blocks []*types.Block
+ if err := msgStream.Decode(&blocks); err != nil {
+ glog.V(logger.Detail).Infoln("Decode error", err)
+ fmt.Println("decode error", err)
+ blocks = nil
}
- for {
- var block types.Block
- if err := msgStream.Decode(&block); err != nil {
- if err == rlp.EOL {
- break
- } else {
- return self.protoError(ErrDecode, "msg %v: %v", msg, err)
- }
+ self.downloader.DeliverChunk(self.id, blocks)
+ /*
+ msgStream := rlp.NewStream(msg.Payload)
+ if _, err := msgStream.List(); err != nil {
+ return err
}
- if err := block.ValidateFields(); err != nil {
- return self.protoError(ErrDecode, "block validation %v: %v", msg, err)
+ for {
+ var block types.Block
+ if err := msgStream.Decode(&block); err != nil {
+ if err == rlp.EOL {
+ break
+ } else {
+ return self.protoError(ErrDecode, "msg %v: %v", msg, err)
+ }
+ }
+ if err := block.ValidateFields(); err != nil {
+ return self.protoError(ErrDecode, "block validation %v: %v", msg, err)
+ }
+ self.blockPool.AddBlock(&block, self.id)
}
- self.blockPool.AddBlock(&block, self.id)
- }
+ */
case NewBlockMsg:
var request newBlockMsgData
@@ -296,6 +319,8 @@ func (self *ethProtocol) handle() error {
BlockPrevHash: request.Block.ParentHash().Hex(),
RemoteId: self.peer.ID().String(),
})
+
+ self.downloader.AddBlock(self.id, request.Block, request.TD)
// to simplify backend interface adding a new block
// uses AddPeer followed by AddBlock only if peer is the best peer
// (or selected as new best peer)
@@ -345,10 +370,16 @@ func (self *ethProtocol) handleStatus() error {
return self.protoError(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, self.protocolVersion)
}
- _, suspended := self.blockPool.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect)
- if suspended {
- return self.protoError(ErrSuspendedPeer, "")
+ err = self.downloader.RegisterPeer(self.id, status.TD, status.CurrentBlock, self.requestBlockHashes, self.requestBlocks)
+ if err != nil {
+ return self.protoError(ErrSuspendedPeer, "something")
}
+ /*
+ _, suspended := self.blockPool.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect)
+ if suspended {
+ return self.protoError(ErrSuspendedPeer, "")
+ }
+ */
self.peer.Debugf("Peer is [eth] capable (%d/%d). TD=%v H=%x\n", status.ProtocolVersion, status.NetworkId, status.TD, status.CurrentBlock[:4])
@@ -365,6 +396,31 @@ func (self *ethProtocol) requestBlocks(hashes []common.Hash) error {
return p2p.Send(self.rw, GetBlocksMsg, hashes)
}
+/*
+func (self *ethProtocol) newRespBlockCh() BlockPack {
+ self.blockRespCh = make(chan blockResp)
+ return self.blockRespCh
+}
+
+func (self *ethProtocol) RequestBlocks(hashes *set.Set) <-chan []*types.Block {
+ out := make(chan []*types.Block)
+ go func() {
+ done:
+ for {
+ select {
+ case blockResp := <-self.newRespBlockCh():
+ if len(blockResp.blocks) {
+ }
+ case <-time.After(5 * time.Second):
+ }
+ }
+
+ close(out)
+ }()
+ return out
+}
+*/
+
func (self *ethProtocol) protoError(code int, format string, params ...interface{}) (err *errs.Error) {
err = self.errors.New(code, format, params...)
//err.Log(self.peer.Logger)