From 4dfce4624dcc89302ec0b1f22cf853a8382fb7c7 Mon Sep 17 00:00:00 2001 From: zelig Date: Sun, 14 Dec 2014 18:04:50 +0000 Subject: protocol - new interface explicit backend components txPool, chainManager, blockPool - added protoErrorDisconnect for blockpool callback (FIXME: handling peer disconnects) --- eth/protocol.go | 90 ++++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 83 insertions(+), 7 deletions(-) (limited to 'eth/protocol.go') diff --git a/eth/protocol.go b/eth/protocol.go index fbc4610ec..37e642fd0 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -20,7 +20,7 @@ type ethProtocol struct { peer *p2p.Peer id string rw p2p.MsgReadWriter - } +} // backend is the interface the ethereum protocol backend should implement // used as an argument to EthProtocol @@ -68,6 +68,7 @@ type newBlockMsgData struct { type getBlockHashesMsgData struct { Hash []byte +<<<<<<< HEAD <<<<<<< HEAD Amount uint64 } @@ -79,23 +80,35 @@ func EthProtocol(txPool txPool, chainManager chainManager, blockPool blockPool) return p2p.Protocol{ ======= Amount uint32 +======= + Amount uint64 +>>>>>>> protocol } // main entrypoint, wrappers starting a server running the eth protocol // use this constructor to attach the protocol ("class") to server caps // the Dev p2p layer then runs the protocol instance on each peer +<<<<<<< HEAD func EthProtocol(eth backend) *p2p.Protocol { return &p2p.Protocol{ >>>>>>> initial commit for eth-p2p integration +======= +func EthProtocol(txPool txPool, chainManager chainManager, blockPool blockPool) p2p.Protocol { + return p2p.Protocol{ +>>>>>>> protocol Name: "eth", Version: ProtocolVersion, Length: ProtocolLength, Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) error { +<<<<<<< HEAD <<<<<<< HEAD return runEthProtocol(txPool, chainManager, blockPool, peer, rw) ======= return runEthProtocol(eth, peer, rw) >>>>>>> initial commit for eth-p2p integration +======= + return runEthProtocol(txPool, chainManager, blockPool, peer, rw) +>>>>>>> protocol }, } } @@ -105,6 +118,7 @@ func EthProtocol(eth backend) *p2p.Protocol { // the main loop that handles incoming messages // note RemovePeer in the post-disconnect hook func runEthProtocol(txPool txPool, chainManager chainManager, blockPool blockPool, peer *p2p.Peer, rw p2p.MsgReadWriter) (err error) { +<<<<<<< HEAD self := ðProtocol{ txPool: txPool, chainManager: chainManager, @@ -127,6 +141,15 @@ func runEthProtocol(eth backend, peer *p2p.Peer, rw p2p.MsgReadWriter) (err erro ======= id: (string)(peer.Identity().Pubkey()), >>>>>>> eth protocol changes +======= + self := ðProtocol{ + txPool: txPool, + chainManager: chainManager, + blockPool: blockPool, + rw: rw, + peer: peer, + id: (string)(peer.Identity().Pubkey()), +>>>>>>> protocol } err = self.handleStatus() if err == nil { @@ -135,6 +158,7 @@ func runEthProtocol(eth backend, peer *p2p.Peer, rw p2p.MsgReadWriter) (err erro err = self.handle() if err != nil { <<<<<<< HEAD +<<<<<<< HEAD <<<<<<< HEAD self.blockPool.RemovePeer(self.id) ======= @@ -142,6 +166,9 @@ func runEthProtocol(eth backend, peer *p2p.Peer, rw p2p.MsgReadWriter) (err erro ======= self.eth.RemovePeer(self.id) >>>>>>> eth protocol changes +======= + self.blockPool.RemovePeer(self.id) +>>>>>>> protocol break } } @@ -166,6 +193,7 @@ func (self *ethProtocol) handle() error { case StatusMsg: return ProtocolError(ErrExtraStatusMsg, "") +<<<<<<< HEAD <<<<<<< HEAD case TxMsg: // TODO: rework using lazy RLP stream @@ -179,6 +207,8 @@ func (self *ethProtocol) handle() error { } return self.rw.EncodeMsg(TxMsg, txsInterface...) +======= +>>>>>>> protocol case TxMsg: <<<<<<< HEAD >>>>>>> initial commit for eth-p2p integration @@ -189,22 +219,30 @@ func (self *ethProtocol) handle() error { if err := msg.Decode(&txs); err != nil { return ProtocolError(ErrDecode, "%v", err) } +<<<<<<< HEAD <<<<<<< HEAD self.txPool.AddTransactions(txs) ======= self.eth.AddTransactions(txs) >>>>>>> initial commit for eth-p2p integration +======= + self.txPool.AddTransactions(txs) +>>>>>>> protocol case GetBlockHashesMsg: var request getBlockHashesMsgData if err := msg.Decode(&request); err != nil { return ProtocolError(ErrDecode, "%v", err) } +<<<<<<< HEAD <<<<<<< HEAD hashes := self.chainManager.GetBlockHashesFromHash(request.Hash, request.Amount) ======= hashes := self.eth.GetBlockHashes(request.Hash, request.Amount) >>>>>>> initial commit for eth-p2p integration +======= + hashes := self.chainManager.GetBlockHashesFromHash(request.Hash, request.Amount) +>>>>>>> protocol return self.rw.EncodeMsg(BlockHashesMsg, ethutil.ByteSliceToInterface(hashes)...) case BlockHashesMsg: @@ -245,7 +283,7 @@ func (self *ethProtocol) handle() error { } return } - self.eth.AddBlockHashes(iter, self.id) + self.blockPool.AddBlockHashes(iter, self.id) if err != nil && err != rlp.EOL { return ProtocolError(ErrDecode, "%v", err) } @@ -274,11 +312,15 @@ func (self *ethProtocol) handle() error { if i >= max { break } +<<<<<<< HEAD <<<<<<< HEAD block := self.chainManager.GetBlock(hash) ======= block := self.eth.GetBlock(hash) >>>>>>> initial commit for eth-p2p integration +======= + block := self.chainManager.GetBlock(hash) +>>>>>>> protocol if block != nil { blocks = append(blocks, block.Value().Raw()) } @@ -321,10 +363,14 @@ func (self *ethProtocol) handle() error { ======= } } +<<<<<<< HEAD if err := self.eth.AddBlock(block, self.id); err != nil { return ProtocolError(ErrInvalidBlock, "%v", err) } >>>>>>> eth protocol changes +======= + self.blockPool.AddBlock(block, self.id) +>>>>>>> protocol } case NewBlockMsg: @@ -340,11 +386,15 @@ func (self *ethProtocol) handle() error { // to simplify backend interface adding a new block // uses AddPeer followed by AddHashes, AddBlock only if peer is the best peer // (or selected as new best peer) +<<<<<<< HEAD <<<<<<< HEAD if self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) { ======= if self.eth.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.invalidBlock) { >>>>>>> eth protocol changes +======= + if self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) { +>>>>>>> protocol called := true iter := func() (hash []byte, ok bool) { if called { @@ -354,6 +404,7 @@ func (self *ethProtocol) handle() error { return } } +<<<<<<< HEAD <<<<<<< HEAD self.blockPool.AddBlockHashes(iter, self.id) self.blockPool.AddBlock(request.Block, self.id) @@ -372,6 +423,10 @@ func (self *ethProtocol) handle() error { return ProtocolError(ErrInvalidBlock, "%v", err) } >>>>>>> eth protocol changes +======= + self.blockPool.AddBlockHashes(iter, self.id) + self.blockPool.AddBlock(request.Block, self.id) +>>>>>>> protocol } default: @@ -389,11 +444,15 @@ type statusMsgData struct { } func (self *ethProtocol) statusMsg() p2p.Msg { +<<<<<<< HEAD <<<<<<< HEAD td, currentBlock, genesisBlock := self.chainManager.Status() ======= td, currentBlock, genesisBlock := self.eth.Status() >>>>>>> initial commit for eth-p2p integration +======= + td, currentBlock, genesisBlock := self.chainManager.Status() +>>>>>>> protocol return p2p.NewMsg(StatusMsg, uint32(ProtocolVersion), @@ -429,11 +488,15 @@ func (self *ethProtocol) handleStatus() error { return ProtocolError(ErrDecode, "%v", err) } +<<<<<<< HEAD <<<<<<< HEAD _, _, genesisBlock := self.chainManager.Status() ======= _, _, genesisBlock := self.eth.Status() >>>>>>> initial commit for eth-p2p integration +======= + _, _, genesisBlock := self.chainManager.Status() +>>>>>>> protocol if bytes.Compare(status.GenesisBlock, genesisBlock) != 0 { return ProtocolError(ErrGenesisBlockMismatch, "%x (!= %x)", status.GenesisBlock, genesisBlock) @@ -462,8 +525,12 @@ func (self *ethProtocol) handleStatus() error { ======= self.peer.Infof("Peer is [eth] capable (%d/%d). TD = %v ~ %x", status.ProtocolVersion, status.NetworkId, status.CurrentBlock) +<<<<<<< HEAD self.eth.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.invalidBlock) >>>>>>> eth protocol changes +======= + self.blockPool.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) +>>>>>>> protocol return nil } @@ -517,11 +584,6 @@ func (self *ethProtocol) requestBlocks(hashes [][]byte) error { return self.rw.EncodeMsg(GetBlocksMsg, ethutil.ByteSliceToInterface(hashes)) } -func (self *ethProtocol) invalidBlock(err error) { - ProtocolError(ErrInvalidBlock, "%v", err) - self.peer.Disconnect(p2p.DiscSubprotocolError) -} - func (self *ethProtocol) protoError(code int, format string, params ...interface{}) (err *protocolError) { err = ProtocolError(code, format, params...) if err.Fatal() { @@ -531,4 +593,18 @@ func (self *ethProtocol) protoError(code int, format string, params ...interface } return } +<<<<<<< HEAD >>>>>>> eth protocol changes +======= + +func (self *ethProtocol) protoErrorDisconnect(code int, format string, params ...interface{}) { + err := ProtocolError(code, format, params...) + if err.Fatal() { + self.peer.Errorln(err) + // disconnect + } else { + self.peer.Debugln(err) + } + +} +>>>>>>> protocol -- cgit v1.2.3