From c44e02589877722af7ef61738ff7ec05a7a14279 Mon Sep 17 00:00:00 2001 From: zelig Date: Sun, 14 Dec 2014 18:04:50 +0000 Subject: protocol - new interface explicit backend components txPool, chainManager, blockPool - added protoErrorDisconnect for blockpool callback (FIXME: handling peer disconnects) --- eth/protocol.go | 102 +++++++++++++++++++++++++++++--------------------------- 1 file changed, 52 insertions(+), 50 deletions(-) (limited to 'eth') diff --git a/eth/protocol.go b/eth/protocol.go index 380bcc8d2..3b5b49696 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -14,26 +14,33 @@ import ( // ethProtocol represents the ethereum wire protocol // instance is running on each peer type ethProtocol struct { - eth backend - peer *p2p.Peer - id string - rw p2p.MsgReadWriter + txPool txPool + chainManager chainManager + blockPool blockPool + peer *p2p.Peer + id string + rw p2p.MsgReadWriter } // backend is the interface the ethereum protocol backend should implement // used as an argument to EthProtocol -type backend interface { - GetTransactions() (txs []*types.Transaction) +type txPool interface { AddTransactions([]*types.Transaction) - GetBlockHashes(hash []byte, amount uint32) (hashes [][]byte) - AddBlockHashes(next func() ([]byte, bool), peerId string) +} + +type chainManager interface { + GetBlockHashesFromHash(hash []byte, amount uint64) (hashes [][]byte) GetBlock(hash []byte) (block *types.Block) - AddBlock(block *types.Block, peerId string) (err error) - AddPeer(td *big.Int, currentBlock []byte, peerId string, requestHashes func([]byte) error, requestBlocks func([][]byte) error, invalidBlock func(error)) (best bool) - RemovePeer(peerId string) Status() (td *big.Int, currentBlock []byte, genesisBlock []byte) } +type blockPool interface { + AddBlockHashes(next func() ([]byte, bool), peerId string) + AddBlock(block *types.Block, peerId string) + AddPeer(td *big.Int, currentBlock []byte, peerId string, requestHashes func([]byte) error, requestBlocks func([][]byte) error, peerError func(int, string, ...interface{})) (best bool) + RemovePeer(peerId string) +} + const ( ProtocolVersion = 43 NetworkId = 0 @@ -61,31 +68,33 @@ type newBlockMsgData struct { type getBlockHashesMsgData struct { Hash []byte - Amount uint32 + Amount uint64 } // main entrypoint, wrappers starting a server running the eth protocol // use this constructor to attach the protocol ("class") to server caps // the Dev p2p layer then runs the protocol instance on each peer -func EthProtocol(eth backend) *p2p.Protocol { - return &p2p.Protocol{ +func EthProtocol(txPool txPool, chainManager chainManager, blockPool blockPool) p2p.Protocol { + return p2p.Protocol{ Name: "eth", Version: ProtocolVersion, Length: ProtocolLength, Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) error { - return runEthProtocol(eth, peer, rw) + return runEthProtocol(txPool, chainManager, blockPool, peer, rw) }, } } // the main loop that handles incoming messages // note RemovePeer in the post-disconnect hook -func runEthProtocol(eth backend, peer *p2p.Peer, rw p2p.MsgReadWriter) (err error) { +func runEthProtocol(txPool txPool, chainManager chainManager, blockPool blockPool, peer *p2p.Peer, rw p2p.MsgReadWriter) (err error) { self := ðProtocol{ - eth: eth, - rw: rw, - peer: peer, - id: (string)(peer.Identity().Pubkey()), + txPool: txPool, + chainManager: chainManager, + blockPool: blockPool, + rw: rw, + peer: peer, + id: (string)(peer.Identity().Pubkey()), } err = self.handleStatus() if err == nil { @@ -93,7 +102,7 @@ func runEthProtocol(eth backend, peer *p2p.Peer, rw p2p.MsgReadWriter) (err erro for { err = self.handle() if err != nil { - self.eth.RemovePeer(self.id) + self.blockPool.RemovePeer(self.id) break } } @@ -118,29 +127,20 @@ func (self *ethProtocol) handle() error { case StatusMsg: return ProtocolError(ErrExtraStatusMsg, "") - case GetTxMsg: - txs := self.eth.GetTransactions() - // TODO: rewrite using rlp flat - txsInterface := make([]interface{}, len(txs)) - for i, tx := range txs { - txsInterface[i] = tx.RlpData() - } - return self.rw.EncodeMsg(TxMsg, txsInterface...) - case TxMsg: // TODO: rework using lazy RLP stream var txs []*types.Transaction if err := msg.Decode(&txs); err != nil { return ProtocolError(ErrDecode, "%v", err) } - self.eth.AddTransactions(txs) + self.txPool.AddTransactions(txs) case GetBlockHashesMsg: var request getBlockHashesMsgData if err := msg.Decode(&request); err != nil { return ProtocolError(ErrDecode, "%v", err) } - hashes := self.eth.GetBlockHashes(request.Hash, request.Amount) + hashes := self.chainManager.GetBlockHashesFromHash(request.Hash, request.Amount) return self.rw.EncodeMsg(BlockHashesMsg, ethutil.ByteSliceToInterface(hashes)...) case BlockHashesMsg: @@ -154,7 +154,7 @@ func (self *ethProtocol) handle() error { } return } - self.eth.AddBlockHashes(iter, self.id) + self.blockPool.AddBlockHashes(iter, self.id) if err != nil && err != rlp.EOL { return ProtocolError(ErrDecode, "%v", err) } @@ -170,7 +170,7 @@ func (self *ethProtocol) handle() error { if i >= max { break } - block := self.eth.GetBlock(hash) + block := self.chainManager.GetBlock(hash) if block != nil { blocks = append(blocks, block.Value().Raw()) } @@ -188,9 +188,7 @@ func (self *ethProtocol) handle() error { return ProtocolError(ErrDecode, "%v", err) } } - if err := self.eth.AddBlock(block, self.id); err != nil { - return ProtocolError(ErrInvalidBlock, "%v", err) - } + self.blockPool.AddBlock(block, self.id) } case NewBlockMsg: @@ -202,7 +200,7 @@ func (self *ethProtocol) handle() error { // to simplify backend interface adding a new block // uses AddPeer followed by AddHashes, AddBlock only if peer is the best peer // (or selected as new best peer) - if self.eth.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.invalidBlock) { + if self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) { called := true iter := func() (hash []byte, ok bool) { if called { @@ -212,10 +210,8 @@ func (self *ethProtocol) handle() error { return } } - self.eth.AddBlockHashes(iter, self.id) - if err := self.eth.AddBlock(request.Block, self.id); err != nil { - return ProtocolError(ErrInvalidBlock, "%v", err) - } + self.blockPool.AddBlockHashes(iter, self.id) + self.blockPool.AddBlock(request.Block, self.id) } default: @@ -233,7 +229,7 @@ type statusMsgData struct { } func (self *ethProtocol) statusMsg() p2p.Msg { - td, currentBlock, genesisBlock := self.eth.Status() + td, currentBlock, genesisBlock := self.chainManager.Status() return p2p.NewMsg(StatusMsg, uint32(ProtocolVersion), @@ -269,7 +265,7 @@ func (self *ethProtocol) handleStatus() error { return ProtocolError(ErrDecode, "%v", err) } - _, _, genesisBlock := self.eth.Status() + _, _, genesisBlock := self.chainManager.Status() if bytes.Compare(status.GenesisBlock, genesisBlock) != 0 { return ProtocolError(ErrGenesisBlockMismatch, "%x (!= %x)", status.GenesisBlock, genesisBlock) @@ -285,7 +281,7 @@ func (self *ethProtocol) handleStatus() error { self.peer.Infof("Peer is [eth] capable (%d/%d). TD = %v ~ %x", status.ProtocolVersion, status.NetworkId, status.CurrentBlock) - self.eth.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.invalidBlock) + self.blockPool.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) return nil } @@ -300,11 +296,6 @@ func (self *ethProtocol) requestBlocks(hashes [][]byte) error { return self.rw.EncodeMsg(GetBlocksMsg, ethutil.ByteSliceToInterface(hashes)) } -func (self *ethProtocol) invalidBlock(err error) { - ProtocolError(ErrInvalidBlock, "%v", err) - self.peer.Disconnect(p2p.DiscSubprotocolError) -} - func (self *ethProtocol) protoError(code int, format string, params ...interface{}) (err *protocolError) { err = ProtocolError(code, format, params...) if err.Fatal() { @@ -314,3 +305,14 @@ func (self *ethProtocol) protoError(code int, format string, params ...interface } return } + +func (self *ethProtocol) protoErrorDisconnect(code int, format string, params ...interface{}) { + err := ProtocolError(code, format, params...) + if err.Fatal() { + self.peer.Errorln(err) + // disconnect + } else { + self.peer.Debugln(err) + } + +} -- cgit v1.2.3