From 97d2954e227049a089652d91e6fb0ea1c8115cc6 Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 13 Apr 2015 17:22:32 +0200 Subject: eth: added downloader for syncing up the chain --- eth/protocol.go | 103 ++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 67 insertions(+), 36 deletions(-) (limited to 'eth/protocol.go') diff --git a/eth/protocol.go b/eth/protocol.go index 878038f74..b15868898 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/errs" + "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/p2p" @@ -18,8 +19,8 @@ const ( NetworkId = 0 ProtocolLength = uint64(8) ProtocolMaxMsgSize = 10 * 1024 * 1024 - maxHashes = 256 - maxBlocks = 64 + maxHashes = 512 + maxBlocks = 128 ) // eth protocol message codes @@ -64,6 +65,7 @@ type ethProtocol struct { txPool txPool chainManager chainManager blockPool blockPool + downloader *downloader.Downloader peer *p2p.Peer id string rw p2p.MsgReadWriter @@ -114,25 +116,26 @@ type statusMsgData struct { // main entrypoint, wrappers starting a server running the eth protocol // use this constructor to attach the protocol ("class") to server caps // the Dev p2p layer then runs the protocol instance on each peer -func EthProtocol(protocolVersion, networkId int, txPool txPool, chainManager chainManager, blockPool blockPool) p2p.Protocol { +func EthProtocol(protocolVersion, networkId int, txPool txPool, chainManager chainManager, blockPool blockPool, downloader *downloader.Downloader) p2p.Protocol { return p2p.Protocol{ Name: "eth", Version: uint(protocolVersion), Length: ProtocolLength, Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) error { - return runEthProtocol(protocolVersion, networkId, txPool, chainManager, blockPool, peer, rw) + return runEthProtocol(protocolVersion, networkId, txPool, chainManager, blockPool, downloader, peer, rw) }, } } // the main loop that handles incoming messages // note RemovePeer in the post-disconnect hook -func runEthProtocol(protocolVersion, networkId int, txPool txPool, chainManager chainManager, blockPool blockPool, peer *p2p.Peer, rw p2p.MsgReadWriter) (err error) { +func runEthProtocol(protocolVersion, networkId int, txPool txPool, chainManager chainManager, blockPool blockPool, downloader *downloader.Downloader, peer *p2p.Peer, rw p2p.MsgReadWriter) (err error) { id := peer.ID() self := ðProtocol{ txPool: txPool, chainManager: chainManager, blockPool: blockPool, + downloader: downloader, rw: rw, peer: peer, protocolVersion: protocolVersion, @@ -211,24 +214,33 @@ func (self *ethProtocol) handle() error { case BlockHashesMsg: msgStream := rlp.NewStream(msg.Payload) - if _, err := msgStream.List(); err != nil { - return err + + var hashes []common.Hash + if err := msgStream.Decode(&hashes); err != nil { + break } + self.downloader.HashCh <- hashes - var i int - iter := func() (hash common.Hash, ok bool) { - err := msgStream.Decode(&hash) - if err == rlp.EOL { - return common.Hash{}, false - } else if err != nil { - self.protoError(ErrDecode, "msg %v: after %v hashes : %v", msg, i, err) - return common.Hash{}, false + /* + if _, err := msgStream.List(); err != nil { + return err } - i++ - return hash, true - } - self.blockPool.AddBlockHashes(iter, self.id) + var i int + iter := func() (hash common.Hash, err error) { + err = msgStream.Decode(&hash) + if err == rlp.EOL { + return common.Hash{}, err + } else if err != nil { + return common.Hash{}, fmt.Errorf("Fetching hashes err (%d): %v", i, err) + } + + i++ + return hash, nil + } + self.downloader.HashCh <- iter + //self.blockPool.AddBlockHashes(iter, self.id) + */ case GetBlocksMsg: msgStream := rlp.NewStream(msg.Payload) @@ -260,23 +272,34 @@ func (self *ethProtocol) handle() error { case BlocksMsg: msgStream := rlp.NewStream(msg.Payload) - if _, err := msgStream.List(); err != nil { - return err + + var blocks []*types.Block + if err := msgStream.Decode(&blocks); err != nil { + glog.V(logger.Detail).Infoln("Decode error", err) + fmt.Println("decode error", err) + blocks = nil } - for { - var block types.Block - if err := msgStream.Decode(&block); err != nil { - if err == rlp.EOL { - break - } else { - return self.protoError(ErrDecode, "msg %v: %v", msg, err) - } + self.downloader.DeliverChunk(self.id, blocks) + /* + msgStream := rlp.NewStream(msg.Payload) + if _, err := msgStream.List(); err != nil { + return err } - if err := block.ValidateFields(); err != nil { - return self.protoError(ErrDecode, "block validation %v: %v", msg, err) + for { + var block types.Block + if err := msgStream.Decode(&block); err != nil { + if err == rlp.EOL { + break + } else { + return self.protoError(ErrDecode, "msg %v: %v", msg, err) + } + } + if err := block.ValidateFields(); err != nil { + return self.protoError(ErrDecode, "block validation %v: %v", msg, err) + } + self.blockPool.AddBlock(&block, self.id) } - self.blockPool.AddBlock(&block, self.id) - } + */ case NewBlockMsg: var request newBlockMsgData @@ -296,6 +319,8 @@ func (self *ethProtocol) handle() error { BlockPrevHash: request.Block.ParentHash().Hex(), RemoteId: self.peer.ID().String(), }) + + self.downloader.AddBlock(self.id, request.Block, request.TD) // to simplify backend interface adding a new block // uses AddPeer followed by AddBlock only if peer is the best peer // (or selected as new best peer) @@ -345,10 +370,16 @@ func (self *ethProtocol) handleStatus() error { return self.protoError(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, self.protocolVersion) } - _, suspended := self.blockPool.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) - if suspended { - return self.protoError(ErrSuspendedPeer, "") + err = self.downloader.RegisterPeer(self.id, status.TD, status.CurrentBlock, self.requestBlockHashes, self.requestBlocks) + if err != nil { + return self.protoError(ErrSuspendedPeer, "something") } + /* + _, suspended := self.blockPool.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) + if suspended { + return self.protoError(ErrSuspendedPeer, "") + } + */ self.peer.Debugf("Peer is [eth] capable (%d/%d). TD=%v H=%x\n", status.ProtocolVersion, status.NetworkId, status.TD, status.CurrentBlock[:4]) -- cgit v1.2.3