aboutsummaryrefslogtreecommitdiffstats
path: root/eth/protocol.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/protocol.go')
-rw-r--r--eth/protocol.go103
1 files changed, 67 insertions, 36 deletions
diff --git a/eth/protocol.go b/eth/protocol.go
index 1a19307db..a85d15a0c 100644
--- a/eth/protocol.go
+++ b/eth/protocol.go
@@ -7,6 +7,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/errs"
+ "github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/p2p"
@@ -18,8 +19,8 @@ const (
NetworkId = 0
ProtocolLength = uint64(8)
ProtocolMaxMsgSize = 10 * 1024 * 1024
- maxHashes = 256
- maxBlocks = 64
+ maxHashes = 512
+ maxBlocks = 128
)
// eth protocol message codes
@@ -64,6 +65,7 @@ type ethProtocol struct {
txPool txPool
chainManager chainManager
blockPool blockPool
+ downloader *downloader.Downloader
peer *p2p.Peer
id string
rw p2p.MsgReadWriter
@@ -114,25 +116,26 @@ type statusMsgData struct {
// main entrypoint, wrappers starting a server running the eth protocol
// use this constructor to attach the protocol ("class") to server caps
// the Dev p2p layer then runs the protocol instance on each peer
-func EthProtocol(protocolVersion, networkId int, txPool txPool, chainManager chainManager, blockPool blockPool) p2p.Protocol {
+func EthProtocol(protocolVersion, networkId int, txPool txPool, chainManager chainManager, blockPool blockPool, downloader *downloader.Downloader) p2p.Protocol {
return p2p.Protocol{
Name: "eth",
Version: uint(protocolVersion),
Length: ProtocolLength,
Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
- return runEthProtocol(protocolVersion, networkId, txPool, chainManager, blockPool, peer, rw)
+ return runEthProtocol(protocolVersion, networkId, txPool, chainManager, blockPool, downloader, peer, rw)
},
}
}
// the main loop that handles incoming messages
// note RemovePeer in the post-disconnect hook
-func runEthProtocol(protocolVersion, networkId int, txPool txPool, chainManager chainManager, blockPool blockPool, peer *p2p.Peer, rw p2p.MsgReadWriter) (err error) {
+func runEthProtocol(protocolVersion, networkId int, txPool txPool, chainManager chainManager, blockPool blockPool, downloader *downloader.Downloader, peer *p2p.Peer, rw p2p.MsgReadWriter) (err error) {
id := peer.ID()
self := &ethProtocol{
txPool: txPool,
chainManager: chainManager,
blockPool: blockPool,
+ downloader: downloader,
rw: rw,
peer: peer,
protocolVersion: protocolVersion,
@@ -211,24 +214,33 @@ func (self *ethProtocol) handle() error {
case BlockHashesMsg:
msgStream := rlp.NewStream(msg.Payload)
- if _, err := msgStream.List(); err != nil {
- return err
+
+ var hashes []common.Hash
+ if err := msgStream.Decode(&hashes); err != nil {
+ break
}
+ self.downloader.HashCh <- hashes
- var i int
- iter := func() (hash common.Hash, ok bool) {
- err := msgStream.Decode(&hash)
- if err == rlp.EOL {
- return common.Hash{}, false
- } else if err != nil {
- self.protoError(ErrDecode, "msg %v: after %v hashes : %v", msg, i, err)
- return common.Hash{}, false
+ /*
+ if _, err := msgStream.List(); err != nil {
+ return err
}
- i++
- return hash, true
- }
- self.blockPool.AddBlockHashes(iter, self.id)
+ var i int
+ iter := func() (hash common.Hash, err error) {
+ err = msgStream.Decode(&hash)
+ if err == rlp.EOL {
+ return common.Hash{}, err
+ } else if err != nil {
+ return common.Hash{}, fmt.Errorf("Fetching hashes err (%d): %v", i, err)
+ }
+
+ i++
+ return hash, nil
+ }
+ self.downloader.HashCh <- iter
+ //self.blockPool.AddBlockHashes(iter, self.id)
+ */
case GetBlocksMsg:
msgStream := rlp.NewStream(msg.Payload)
@@ -260,23 +272,34 @@ func (self *ethProtocol) handle() error {
case BlocksMsg:
msgStream := rlp.NewStream(msg.Payload)
- if _, err := msgStream.List(); err != nil {
- return err
+
+ var blocks []*types.Block
+ if err := msgStream.Decode(&blocks); err != nil {
+ glog.V(logger.Detail).Infoln("Decode error", err)
+ fmt.Println("decode error", err)
+ blocks = nil
}
- for {
- var block types.Block
- if err := msgStream.Decode(&block); err != nil {
- if err == rlp.EOL {
- break
- } else {
- return self.protoError(ErrDecode, "msg %v: %v", msg, err)
- }
+ self.downloader.DeliverChunk(self.id, blocks)
+ /*
+ msgStream := rlp.NewStream(msg.Payload)
+ if _, err := msgStream.List(); err != nil {
+ return err
}
- if err := block.ValidateFields(); err != nil {
- return self.protoError(ErrDecode, "block validation %v: %v", msg, err)
+ for {
+ var block types.Block
+ if err := msgStream.Decode(&block); err != nil {
+ if err == rlp.EOL {
+ break
+ } else {
+ return self.protoError(ErrDecode, "msg %v: %v", msg, err)
+ }
+ }
+ if err := block.ValidateFields(); err != nil {
+ return self.protoError(ErrDecode, "block validation %v: %v", msg, err)
+ }
+ self.blockPool.AddBlock(&block, self.id)
}
- self.blockPool.AddBlock(&block, self.id)
- }
+ */
case NewBlockMsg:
var request newBlockMsgData
@@ -296,6 +319,8 @@ func (self *ethProtocol) handle() error {
BlockPrevHash: request.Block.ParentHash().Hex(),
RemoteId: self.peer.ID().String(),
})
+
+ self.downloader.AddBlock(self.id, request.Block, request.TD)
// to simplify backend interface adding a new block
// uses AddPeer followed by AddBlock only if peer is the best peer
// (or selected as new best peer)
@@ -345,10 +370,16 @@ func (self *ethProtocol) handleStatus() error {
return self.protoError(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, self.protocolVersion)
}
- _, suspended := self.blockPool.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect)
- if suspended {
- return self.protoError(ErrSuspendedPeer, "")
+ err = self.downloader.RegisterPeer(self.id, status.TD, status.CurrentBlock, self.requestBlockHashes, self.requestBlocks)
+ if err != nil {
+ return self.protoError(ErrSuspendedPeer, "something")
}
+ /*
+ _, suspended := self.blockPool.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect)
+ if suspended {
+ return self.protoError(ErrSuspendedPeer, "")
+ }
+ */
self.peer.Debugf("Peer is [eth] capable (%d/%d). TD=%v H=%x\n", status.ProtocolVersion, status.NetworkId, status.TD, status.CurrentBlock[:4])