From 16ecda951b767800b4e09ad8e86e0866b05136be Mon Sep 17 00:00:00 2001 From: zelig Date: Wed, 25 Feb 2015 20:06:59 +0700 Subject: integrate blockpool into eth - remove blockpool code - remove blockpool integration test (kinda embarrassing) - remove errors.go --- eth/protocol.go | 69 +++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 43 insertions(+), 26 deletions(-) (limited to 'eth/protocol.go') diff --git a/eth/protocol.go b/eth/protocol.go index 8221c1b29..ee2316836 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -7,6 +7,7 @@ import ( "math/big" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/errs" "github.com/ethereum/go-ethereum/ethutil" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rlp" @@ -17,6 +18,8 @@ const ( NetworkId = 0 ProtocolLength = uint64(8) ProtocolMaxMsgSize = 10 * 1024 * 1024 + maxHashes = 256 + maxBlocks = 64 ) // eth protocol message codes @@ -31,6 +34,28 @@ const ( NewBlockMsg ) +const ( + ErrMsgTooLarge = iota + ErrDecode + ErrInvalidMsgCode + ErrProtocolVersionMismatch + ErrNetworkIdMismatch + ErrGenesisBlockMismatch + ErrNoStatusMsg + ErrExtraStatusMsg +) + +var errorToString = map[int]string{ + ErrMsgTooLarge: "Message too long", + ErrDecode: "Invalid message", + ErrInvalidMsgCode: "Invalid message code", + ErrProtocolVersionMismatch: "Protocol version mismatch", + ErrNetworkIdMismatch: "NetworkId mismatch", + ErrGenesisBlockMismatch: "Genesis block mismatch", + ErrNoStatusMsg: "No status message", + ErrExtraStatusMsg: "Extra status message", +} + // ethProtocol represents the ethereum wire protocol // instance is running on each peer type ethProtocol struct { @@ -40,6 +65,7 @@ type ethProtocol struct { peer *p2p.Peer id string rw p2p.MsgReadWriter + errors *errs.Errors } // backend is the interface the ethereum protocol backend should implement @@ -58,7 +84,7 @@ type chainManager interface { type blockPool interface { AddBlockHashes(next func() ([]byte, bool), peerId string) AddBlock(block *types.Block, peerId string) - AddPeer(td *big.Int, currentBlock []byte, peerId string, requestHashes func([]byte) error, requestBlocks func([][]byte) error, peerError func(int, string, ...interface{})) (best bool) + AddPeer(td *big.Int, currentBlock []byte, peerId string, requestHashes func([]byte) error, requestBlocks func([][]byte) error, peerError func(*errs.Error)) (best bool) RemovePeer(peerId string) } @@ -68,8 +94,6 @@ type newBlockMsgData struct { TD *big.Int } -const maxHashes = 255 - type getBlockHashesMsgData struct { Hash []byte Amount uint64 @@ -99,7 +123,11 @@ func runEthProtocol(txPool txPool, chainManager chainManager, blockPool blockPoo blockPool: blockPool, rw: rw, peer: peer, - id: fmt.Sprintf("%x", id[:8]), + errors: &errs.Errors{ + Package: "ETH", + Errors: errorToString, + }, + id: fmt.Sprintf("%x", id[:8]), } err = self.handleStatus() if err == nil { @@ -145,7 +173,6 @@ func (self *ethProtocol) handle() error { return self.protoError(ErrDecode, "->msg %v: %v", msg, err) } - //request.Amount = uint64(math.Min(float64(maxHashes), float64(request.Amount))) if request.Amount > maxHashes { request.Amount = maxHashes } @@ -153,7 +180,6 @@ func (self *ethProtocol) handle() error { return p2p.EncodeMsg(self.rw, BlockHashesMsg, ethutil.ByteSliceToInterface(hashes)...) case BlockHashesMsg: - // TODO: redo using lazy decode , this way very inefficient on known chains msgStream := rlp.NewStream(msg.Payload) var err error var i int @@ -191,7 +217,7 @@ func (self *ethProtocol) handle() error { if block != nil { blocks = append(blocks, block) } - if i == blockHashesBatchSize { + if i == maxBlocks { break } } @@ -218,7 +244,7 @@ func (self *ethProtocol) handle() error { } hash := request.Block.Hash() // to simplify backend interface adding a new block - // uses AddPeer followed by AddHashes, AddBlock only if peer is the best peer + // uses AddPeer followed by AddBlock only if peer is the best peer // (or selected as new best peer) if self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) { self.blockPool.AddBlock(request.Block, self.id) @@ -277,7 +303,7 @@ func (self *ethProtocol) handleStatus() error { _, _, genesisBlock := self.chainManager.Status() - if bytes.Compare(status.GenesisBlock, genesisBlock) != 0 { + if !bytes.Equal(status.GenesisBlock, genesisBlock) { return self.protoError(ErrGenesisBlockMismatch, "%x (!= %x)", status.GenesisBlock, genesisBlock) } @@ -297,8 +323,8 @@ func (self *ethProtocol) handleStatus() error { } func (self *ethProtocol) requestBlockHashes(from []byte) error { - self.peer.Debugf("fetching hashes (%d) %x...\n", blockHashesBatchSize, from[0:4]) - return p2p.EncodeMsg(self.rw, GetBlockHashesMsg, interface{}(from), uint64(blockHashesBatchSize)) + self.peer.Debugf("fetching hashes (%d) %x...\n", maxHashes, from[0:4]) + return p2p.EncodeMsg(self.rw, GetBlockHashesMsg, interface{}(from), uint64(maxHashes)) } func (self *ethProtocol) requestBlocks(hashes [][]byte) error { @@ -306,26 +332,17 @@ func (self *ethProtocol) requestBlocks(hashes [][]byte) error { return p2p.EncodeMsg(self.rw, GetBlocksMsg, ethutil.ByteSliceToInterface(hashes)...) } -func (self *ethProtocol) protoError(code int, format string, params ...interface{}) (err *protocolError) { - err = ProtocolError(code, format, params...) - if err.Fatal() { - self.peer.Errorln("err %v", err) - // disconnect - } else { - self.peer.Debugf("fyi %v", err) - } +func (self *ethProtocol) protoError(code int, format string, params ...interface{}) (err *errs.Error) { + err = self.errors.New(code, format, params...) + err.Log(self.peer.Logger) return } -func (self *ethProtocol) protoErrorDisconnect(code int, format string, params ...interface{}) { - err := ProtocolError(code, format, params...) +func (self *ethProtocol) protoErrorDisconnect(err *errs.Error) { + err.Log(self.peer.Logger) if err.Fatal() { - self.peer.Errorln("err %v", err) - // disconnect - } else { - self.peer.Debugf("fyi %v", err) + self.peer.Disconnect(p2p.DiscSubprotocolError) } - } func (self *ethProtocol) propagateTxs() { -- cgit v1.2.3 From e31ec57f8875147766b2bf8e6f129b9a0c1b5e69 Mon Sep 17 00:00:00 2001 From: Taylor Gerring Date: Mon, 2 Mar 2015 08:17:09 -0600 Subject: Add event eth.tx.received --- eth/protocol.go | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'eth/protocol.go') diff --git a/eth/protocol.go b/eth/protocol.go index 8221c1b29..a5cc8ee1a 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -8,6 +8,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethutil" + "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rlp" ) @@ -137,6 +138,12 @@ func (self *ethProtocol) handle() error { if err := msg.Decode(&txs); err != nil { return self.protoError(ErrDecode, "msg %v: %v", msg, err) } + for _, tx := range txs { + jsonlogger.LogJson(&logger.EthTxReceived{ + TxHash: ethutil.Bytes2Hex(tx.Hash()), + RemoteId: self.peer.ID().String(), + }) + } self.txPool.AddTransactions(txs) case GetBlockHashesMsg: -- cgit v1.2.3 From fc47f0f27b4b672c84e8de230a5a3c5e5519f3aa Mon Sep 17 00:00:00 2001 From: zelig Date: Tue, 3 Mar 2015 15:43:05 +0700 Subject: add eth.chain.received.new_block log to eth protocol --- eth/protocol.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) (limited to 'eth/protocol.go') diff --git a/eth/protocol.go b/eth/protocol.go index a5cc8ee1a..5a7af0e33 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -152,7 +152,6 @@ func (self *ethProtocol) handle() error { return self.protoError(ErrDecode, "->msg %v: %v", msg, err) } - //request.Amount = uint64(math.Min(float64(maxHashes), float64(request.Amount))) if request.Amount > maxHashes { request.Amount = maxHashes } @@ -224,6 +223,15 @@ func (self *ethProtocol) handle() error { return self.protoError(ErrDecode, "msg %v: %v", msg, err) } hash := request.Block.Hash() + _, chainHead, _ := self.chainManager.Status() + + jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{ + BlockHash: ethutil.Bytes2Hex(hash), + BlockNumber: request.Block.Number(), // this surely must be zero + ChainHeadHash: ethutil.Bytes2Hex(chainHead), + BlockPrevHash: ethutil.Bytes2Hex(request.Block.ParentHash()), + RemoteId: self.peer.ID().String(), + }) // to simplify backend interface adding a new block // uses AddPeer followed by AddHashes, AddBlock only if peer is the best peer // (or selected as new best peer) -- cgit v1.2.3 From 313fe3861b3c338b3b6304adac46c86af7e2d52e Mon Sep 17 00:00:00 2001 From: obscuren Date: Tue, 3 Mar 2015 17:55:23 +0100 Subject: fixed pow stuff --- eth/protocol.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'eth/protocol.go') diff --git a/eth/protocol.go b/eth/protocol.go index ee2316836..67ed8f9e5 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -14,7 +14,7 @@ import ( ) const ( - ProtocolVersion = 54 + ProtocolVersion = 55 NetworkId = 0 ProtocolLength = uint64(8) ProtocolMaxMsgSize = 10 * 1024 * 1024 -- cgit v1.2.3 From 15f491e5007d1507f20d0edce36cc9c0bd5cbd37 Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 4 Mar 2015 12:18:26 +0100 Subject: Clean up REPL --- eth/protocol.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'eth/protocol.go') diff --git a/eth/protocol.go b/eth/protocol.go index d394ba739..663af43fe 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -15,7 +15,7 @@ import ( ) const ( - ProtocolVersion = 55 + ProtocolVersion = 56 NetworkId = 0 ProtocolLength = uint64(8) ProtocolMaxMsgSize = 10 * 1024 * 1024 @@ -250,6 +250,7 @@ func (self *ethProtocol) handle() error { return self.protoError(ErrDecode, "msg %v: %v", msg, err) } hash := request.Block.Hash() + fmt.Println("received block: %x", hash) _, chainHead, _ := self.chainManager.Status() jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{ -- cgit v1.2.3 From 6e7e5d5fd56a9a6f73e51239ed6648d76db9650d Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 4 Mar 2015 13:12:50 +0100 Subject: eth, whisper: fix msg.Payload reads --- eth/protocol.go | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) (limited to 'eth/protocol.go') diff --git a/eth/protocol.go b/eth/protocol.go index 663af43fe..b86f33614 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -3,7 +3,6 @@ package eth import ( "bytes" "fmt" - "io" "math/big" "github.com/ethereum/go-ethereum/core/types" @@ -188,33 +187,37 @@ func (self *ethProtocol) handle() error { case BlockHashesMsg: msgStream := rlp.NewStream(msg.Payload) - var err error - var i int + if _, err := msgStream.List(); err != nil { + return err + } + var i int iter := func() (hash []byte, ok bool) { - hash, err = msgStream.Bytes() - if err == nil { - i++ - ok = true - } else { - if err != io.EOF { - self.protoError(ErrDecode, "msg %v: after %v hashes : %v", msg, i, err) - } + hash, err := msgStream.Bytes() + if err == rlp.EOL { + return nil, false + } else if err != nil { + self.protoError(ErrDecode, "msg %v: after %v hashes : %v", msg, i, err) + return nil, false } - return + i++ + return hash, true } - self.blockPool.AddBlockHashes(iter, self.id) case GetBlocksMsg: msgStream := rlp.NewStream(msg.Payload) + if _, err := msgStream.List(); err != nil { + return err + } + var blocks []interface{} var i int for { i++ var hash []byte if err := msgStream.Decode(&hash); err != nil { - if err == io.EOF { + if err == rlp.EOL { break } else { return self.protoError(ErrDecode, "msg %v: %v", msg, err) @@ -232,10 +235,13 @@ func (self *ethProtocol) handle() error { case BlocksMsg: msgStream := rlp.NewStream(msg.Payload) + if _, err := msgStream.List(); err != nil { + return err + } for { var block types.Block if err := msgStream.Decode(&block); err != nil { - if err == io.EOF { + if err == rlp.EOL { break } else { return self.protoError(ErrDecode, "msg %v: %v", msg, err) -- cgit v1.2.3 From c47866d25174bd783ee6bcd5b400d81d7bf598bb Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 5 Mar 2015 09:14:58 +0100 Subject: Miner fixes and updates (including miner) --- eth/protocol.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'eth/protocol.go') diff --git a/eth/protocol.go b/eth/protocol.go index 663af43fe..c887af129 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -250,7 +250,7 @@ func (self *ethProtocol) handle() error { return self.protoError(ErrDecode, "msg %v: %v", msg, err) } hash := request.Block.Hash() - fmt.Println("received block: %x", hash) + fmt.Printf("received block: %x\n", hash) _, chainHead, _ := self.chainManager.Status() jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{ -- cgit v1.2.3 From 23ad2f02c0992c212d7d179991560eb44f1b1f78 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 6 Mar 2015 10:22:40 +0100 Subject: debug comments & pow handling --- eth/protocol.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'eth/protocol.go') diff --git a/eth/protocol.go b/eth/protocol.go index b52c7db42..b9b485292 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -253,10 +253,9 @@ func (self *ethProtocol) handle() error { case NewBlockMsg: var request newBlockMsgData if err := msg.Decode(&request); err != nil { - return self.protoError(ErrDecode, "msg %v: %v", msg, err) + return self.protoError(ErrDecode, "%v: %v", msg, err) } hash := request.Block.Hash() - fmt.Printf("received block: %x\n", hash) _, chainHead, _ := self.chainManager.Status() jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{ -- cgit v1.2.3