diff options
-rw-r--r-- | eth/backend.go | 6 | ||||
-rw-r--r-- | eth/downloader/downloader.go | 101 | ||||
-rw-r--r-- | eth/protocol.go | 128 |
3 files changed, 191 insertions, 44 deletions
diff --git a/eth/backend.go b/eth/backend.go index cde7b167d..3d5c4ba09 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -16,6 +16,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" @@ -134,6 +135,7 @@ type Ethereum struct { accountManager *accounts.Manager whisper *whisper.Whisper pow *ethash.Ethash + downloader *downloader.Downloader net *p2p.Server eventMux *event.TypeMux @@ -208,6 +210,7 @@ func New(config *Config) (*Ethereum, error) { } eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.EventMux()) + eth.downloader = downloader.New(eth.chainManager.HasBlock, eth.chainManager.InsertChain, eth.chainManager.Td) eth.pow = ethash.New(eth.chainManager) eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State) eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux()) @@ -226,7 +229,7 @@ func New(config *Config) (*Ethereum, error) { return nil, err } - ethProto := EthProtocol(config.ProtocolVersion, config.NetworkId, eth.txPool, eth.chainManager, eth.blockPool) + ethProto := EthProtocol(config.ProtocolVersion, config.NetworkId, eth.txPool, eth.chainManager, eth.blockPool, eth.downloader) protocols := []p2p.Protocol{ethProto} if config.Shh { protocols = append(protocols, eth.whisper.Protocol()) @@ -363,6 +366,7 @@ func (s *Ethereum) ClientVersion() string { return s.clientVersio func (s *Ethereum) EthVersion() int { return s.ethVersionId } func (s *Ethereum) NetVersion() int { return s.netVersionId } func (s *Ethereum) ShhVersion() int { return s.shhVersionId } +func (s *Ethereum) Downloader() *downloader.Downloader { return s.downloader } // Start the ethereum func (s *Ethereum) Start() error { diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 4e795af6d..5f9d9ed74 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -1,6 +1,7 @@ package downloader import ( + "errors" "math" "math/big" "sync" @@ -20,6 +21,12 @@ const ( minDesiredPeerCount = 3 // Amount of peers desired to start syncing ) +var ( + errLowTd = errors.New("peer's TD is too low") + errBusy = errors.New("busy") + errUnknownPeer = errors.New("peer's unknown or unhealthy") +) + type hashCheckFn func(common.Hash) bool type chainInsertFn func(types.Blocks) error type hashIterFn func() (common.Hash, error) @@ -82,18 +89,19 @@ func (d *Downloader) RegisterPeer(id string, td *big.Int, hash common.Hash, getH d.mu.Lock() defer d.mu.Unlock() - glog.V(logger.Detail).Infoln("Register peer", id) + glog.V(logger.Detail).Infoln("Register peer", id, "TD =", td) // Create a new peer and add it to the list of known peers peer := newPeer(id, td, hash, getHashes, getBlocks) // add peer to our peer set d.peers[id] = peer // broadcast new peer - d.newPeerCh <- peer + //d.newPeerCh <- peer return nil } +// UnregisterPeer unregister's a peer. This will prevent any action from the specified peer. func (d *Downloader) UnregisterPeer(id string) { d.mu.Lock() defer d.mu.Unlock() @@ -103,6 +111,73 @@ func (d *Downloader) UnregisterPeer(id string) { delete(d.peers, id) } +// SynchroniseWithPeer will select the peer and use it for synchronising. If an empty string is given +// it will use the best peer possible and synchronise if it's TD is higher than our own. If any of the +// checks fail an error will be returned. This method is synchronous +func (d *Downloader) SynchroniseWithPeer(id string) (types.Blocks, error) { + // Check if we're busy + if d.isFetchingHashes() || d.isDownloadingBlocks() || d.isProcessing() { + return nil, errBusy + } + + // Attempt to select a peer. This can either be nothing, which returns, best peer + // or selected peer. If no peer could be found an error will be returned + var p *peer + if len(id) == 0 { + p = d.peers[id] + if p == nil { + return nil, errUnknownPeer + } + } else { + p = d.peers.bestPeer() + } + + // Make sure our td is lower than the peer's td + if p.td.Cmp(d.currentTd()) <= 0 || d.hasBlock(p.recentHash) { + return nil, errLowTd + } + + // Get the hash from the peer and initiate the downloading progress. + err := d.getFromPeer(p, p.recentHash, false) + if err != nil { + return nil, err + } + + return d.queue.blocks, nil +} + +// Synchronise will synchronise using the best peer. +func (d *Downloader) Synchronise() (types.Blocks, error) { + return d.SynchroniseWithPeer("") +} + +func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) error { + glog.V(logger.Detail).Infoln("Synchronising with the network using:", p.id) + // Start the fetcher. This will block the update entirely + // interupts need to be send to the appropriate channels + // respectively. + if err := d.startFetchingHashes(p, hash, ignoreInitial); err != nil { + // handle error + glog.V(logger.Debug).Infoln("Error fetching hashes:", err) + // XXX Reset + return err + } + + // Start fetching blocks in paralel. The strategy is simple + // take any available peers, seserve a chunk for each peer available, + // let the peer deliver the chunkn and periodically check if a peer + // has timedout. When done downloading, process blocks. + if err := d.startFetchingBlocks(p); err != nil { + glog.V(logger.Debug).Infoln("Error downloading blocks:", err) + // XXX reset + return err + } + + glog.V(logger.Detail).Infoln("Sync completed") + + return nil +} + func (d *Downloader) peerHandler() { // itimer is used to determine when to start ignoring `minDesiredPeerCount` //itimer := time.NewTicker(5 * time.Second) @@ -116,11 +191,18 @@ out: if len(d.peers) < minDesiredPeerCount { break } + d.selectPeer(d.peers.bestPeer()) case <-itimer.C: // The timer will make sure that the downloader keeps an active state // in which it attempts to always check the network for highest td peers - d.selectPeer(d.peers.bestPeer()) + // Either select the peer or restart the timer if no peers could + // be selected. + if peer := d.peers.bestPeer(); peer != nil { + d.selectPeer(d.peers.bestPeer()) + } else { + itimer.Reset(5 * time.Second) + } case <-d.quit: break out } @@ -142,6 +224,7 @@ func (d *Downloader) selectPeer(p *peer) { glog.V(logger.Detail).Infoln("New peer with highest TD =", p.td) d.syncCh <- syncPack{p, p.recentHash, false} } + } func (d *Downloader) update() { @@ -150,7 +233,7 @@ out: select { case sync := <-d.syncCh: selectedPeer := sync.peer - glog.V(logger.Detail).Infoln("Synchronising with network using:", selectedPeer.id) + glog.V(logger.Detail).Infoln("Synchronising with the network using:", selectedPeer.id) // Start the fetcher. This will block the update entirely // interupts need to be send to the appropriate channels // respectively. @@ -245,9 +328,13 @@ out: for { select { case blockPack := <-d.blockCh: - d.peers[blockPack.peerId].promote() - d.queue.deliver(blockPack.peerId, blockPack.blocks) - d.peers.setState(blockPack.peerId, idleState) + // If the peer was previously banned and failed to deliver it's pack + // in a reasonable time frame, ignore it's message. + if d.peers[blockPack.peerId] != nil { + d.peers[blockPack.peerId].promote() + d.queue.deliver(blockPack.peerId, blockPack.blocks) + d.peers.setState(blockPack.peerId, idleState) + } case <-ticker.C: // If there are unrequested hashes left start fetching // from the available peers. diff --git a/eth/protocol.go b/eth/protocol.go index 1a19307db..66f3cbac8 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/errs" + "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/p2p" @@ -18,8 +19,8 @@ const ( NetworkId = 0 ProtocolLength = uint64(8) ProtocolMaxMsgSize = 10 * 1024 * 1024 - maxHashes = 256 - maxBlocks = 64 + maxHashes = 512 + maxBlocks = 128 ) // eth protocol message codes @@ -64,6 +65,7 @@ type ethProtocol struct { txPool txPool chainManager chainManager blockPool blockPool + downloader *downloader.Downloader peer *p2p.Peer id string rw p2p.MsgReadWriter @@ -114,25 +116,26 @@ type statusMsgData struct { // main entrypoint, wrappers starting a server running the eth protocol // use this constructor to attach the protocol ("class") to server caps // the Dev p2p layer then runs the protocol instance on each peer -func EthProtocol(protocolVersion, networkId int, txPool txPool, chainManager chainManager, blockPool blockPool) p2p.Protocol { +func EthProtocol(protocolVersion, networkId int, txPool txPool, chainManager chainManager, blockPool blockPool, downloader *downloader.Downloader) p2p.Protocol { return p2p.Protocol{ Name: "eth", Version: uint(protocolVersion), Length: ProtocolLength, Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) error { - return runEthProtocol(protocolVersion, networkId, txPool, chainManager, blockPool, peer, rw) + return runEthProtocol(protocolVersion, networkId, txPool, chainManager, blockPool, downloader, peer, rw) }, } } // the main loop that handles incoming messages // note RemovePeer in the post-disconnect hook -func runEthProtocol(protocolVersion, networkId int, txPool txPool, chainManager chainManager, blockPool blockPool, peer *p2p.Peer, rw p2p.MsgReadWriter) (err error) { +func runEthProtocol(protocolVersion, networkId int, txPool txPool, chainManager chainManager, blockPool blockPool, downloader *downloader.Downloader, peer *p2p.Peer, rw p2p.MsgReadWriter) (err error) { id := peer.ID() self := ðProtocol{ txPool: txPool, chainManager: chainManager, blockPool: blockPool, + downloader: downloader, rw: rw, peer: peer, protocolVersion: protocolVersion, @@ -211,24 +214,33 @@ func (self *ethProtocol) handle() error { case BlockHashesMsg: msgStream := rlp.NewStream(msg.Payload) - if _, err := msgStream.List(); err != nil { - return err + + var hashes []common.Hash + if err := msgStream.Decode(&hashes); err != nil { + break } + self.downloader.HashCh <- hashes - var i int - iter := func() (hash common.Hash, ok bool) { - err := msgStream.Decode(&hash) - if err == rlp.EOL { - return common.Hash{}, false - } else if err != nil { - self.protoError(ErrDecode, "msg %v: after %v hashes : %v", msg, i, err) - return common.Hash{}, false + /* + if _, err := msgStream.List(); err != nil { + return err } - i++ - return hash, true - } - self.blockPool.AddBlockHashes(iter, self.id) + var i int + iter := func() (hash common.Hash, err error) { + err = msgStream.Decode(&hash) + if err == rlp.EOL { + return common.Hash{}, err + } else if err != nil { + return common.Hash{}, fmt.Errorf("Fetching hashes err (%d): %v", i, err) + } + + i++ + return hash, nil + } + self.downloader.HashCh <- iter + //self.blockPool.AddBlockHashes(iter, self.id) + */ case GetBlocksMsg: msgStream := rlp.NewStream(msg.Payload) @@ -260,23 +272,34 @@ func (self *ethProtocol) handle() error { case BlocksMsg: msgStream := rlp.NewStream(msg.Payload) - if _, err := msgStream.List(); err != nil { - return err + + var blocks []*types.Block + if err := msgStream.Decode(&blocks); err != nil { + glog.V(logger.Detail).Infoln("Decode error", err) + fmt.Println("decode error", err) + blocks = nil } - for { - var block types.Block - if err := msgStream.Decode(&block); err != nil { - if err == rlp.EOL { - break - } else { - return self.protoError(ErrDecode, "msg %v: %v", msg, err) - } + self.downloader.DeliverChunk(self.id, blocks) + /* + msgStream := rlp.NewStream(msg.Payload) + if _, err := msgStream.List(); err != nil { + return err } - if err := block.ValidateFields(); err != nil { - return self.protoError(ErrDecode, "block validation %v: %v", msg, err) + for { + var block types.Block + if err := msgStream.Decode(&block); err != nil { + if err == rlp.EOL { + break + } else { + return self.protoError(ErrDecode, "msg %v: %v", msg, err) + } + } + if err := block.ValidateFields(); err != nil { + return self.protoError(ErrDecode, "block validation %v: %v", msg, err) + } + self.blockPool.AddBlock(&block, self.id) } - self.blockPool.AddBlock(&block, self.id) - } + */ case NewBlockMsg: var request newBlockMsgData @@ -296,6 +319,8 @@ func (self *ethProtocol) handle() error { BlockPrevHash: request.Block.ParentHash().Hex(), RemoteId: self.peer.ID().String(), }) + + self.downloader.AddBlock(self.id, request.Block, request.TD) // to simplify backend interface adding a new block // uses AddPeer followed by AddBlock only if peer is the best peer // (or selected as new best peer) @@ -345,10 +370,16 @@ func (self *ethProtocol) handleStatus() error { return self.protoError(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, self.protocolVersion) } - _, suspended := self.blockPool.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) - if suspended { - return self.protoError(ErrSuspendedPeer, "") + err = self.downloader.RegisterPeer(self.id, status.TD, status.CurrentBlock, self.requestBlockHashes, self.requestBlocks) + if err != nil { + return self.protoError(ErrSuspendedPeer, "something") } + /* + _, suspended := self.blockPool.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) + if suspended { + return self.protoError(ErrSuspendedPeer, "") + } + */ self.peer.Debugf("Peer is [eth] capable (%d/%d). TD=%v H=%x\n", status.ProtocolVersion, status.NetworkId, status.TD, status.CurrentBlock[:4]) @@ -365,6 +396,31 @@ func (self *ethProtocol) requestBlocks(hashes []common.Hash) error { return p2p.Send(self.rw, GetBlocksMsg, hashes) } +/* +func (self *ethProtocol) newRespBlockCh() BlockPack { + self.blockRespCh = make(chan blockResp) + return self.blockRespCh +} + +func (self *ethProtocol) RequestBlocks(hashes *set.Set) <-chan []*types.Block { + out := make(chan []*types.Block) + go func() { + done: + for { + select { + case blockResp := <-self.newRespBlockCh(): + if len(blockResp.blocks) { + } + case <-time.After(5 * time.Second): + } + } + + close(out) + }() + return out +} +*/ + func (self *ethProtocol) protoError(code int, format string, params ...interface{}) (err *errs.Error) { err = self.errors.New(code, format, params...) //err.Log(self.peer.Logger) |