aboutsummaryrefslogtreecommitdiffstats
path: root/eth/protocol.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/protocol.go')
-rw-r--r--eth/protocol.go125
1 files changed, 82 insertions, 43 deletions
diff --git a/eth/protocol.go b/eth/protocol.go
index 8221c1b29..b9b485292 100644
--- a/eth/protocol.go
+++ b/eth/protocol.go
@@ -3,20 +3,23 @@ package eth
import (
"bytes"
"fmt"
- "io"
"math/big"
"github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/errs"
"github.com/ethereum/go-ethereum/ethutil"
+ "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
)
const (
- ProtocolVersion = 54
+ ProtocolVersion = 56
NetworkId = 0
ProtocolLength = uint64(8)
ProtocolMaxMsgSize = 10 * 1024 * 1024
+ maxHashes = 256
+ maxBlocks = 64
)
// eth protocol message codes
@@ -31,6 +34,28 @@ const (
NewBlockMsg
)
+const (
+ ErrMsgTooLarge = iota
+ ErrDecode
+ ErrInvalidMsgCode
+ ErrProtocolVersionMismatch
+ ErrNetworkIdMismatch
+ ErrGenesisBlockMismatch
+ ErrNoStatusMsg
+ ErrExtraStatusMsg
+)
+
+var errorToString = map[int]string{
+ ErrMsgTooLarge: "Message too long",
+ ErrDecode: "Invalid message",
+ ErrInvalidMsgCode: "Invalid message code",
+ ErrProtocolVersionMismatch: "Protocol version mismatch",
+ ErrNetworkIdMismatch: "NetworkId mismatch",
+ ErrGenesisBlockMismatch: "Genesis block mismatch",
+ ErrNoStatusMsg: "No status message",
+ ErrExtraStatusMsg: "Extra status message",
+}
+
// ethProtocol represents the ethereum wire protocol
// instance is running on each peer
type ethProtocol struct {
@@ -40,6 +65,7 @@ type ethProtocol struct {
peer *p2p.Peer
id string
rw p2p.MsgReadWriter
+ errors *errs.Errors
}
// backend is the interface the ethereum protocol backend should implement
@@ -58,7 +84,7 @@ type chainManager interface {
type blockPool interface {
AddBlockHashes(next func() ([]byte, bool), peerId string)
AddBlock(block *types.Block, peerId string)
- AddPeer(td *big.Int, currentBlock []byte, peerId string, requestHashes func([]byte) error, requestBlocks func([][]byte) error, peerError func(int, string, ...interface{})) (best bool)
+ AddPeer(td *big.Int, currentBlock []byte, peerId string, requestHashes func([]byte) error, requestBlocks func([][]byte) error, peerError func(*errs.Error)) (best bool)
RemovePeer(peerId string)
}
@@ -68,8 +94,6 @@ type newBlockMsgData struct {
TD *big.Int
}
-const maxHashes = 255
-
type getBlockHashesMsgData struct {
Hash []byte
Amount uint64
@@ -99,7 +123,11 @@ func runEthProtocol(txPool txPool, chainManager chainManager, blockPool blockPoo
blockPool: blockPool,
rw: rw,
peer: peer,
- id: fmt.Sprintf("%x", id[:8]),
+ errors: &errs.Errors{
+ Package: "ETH",
+ Errors: errorToString,
+ },
+ id: fmt.Sprintf("%x", id[:8]),
}
err = self.handleStatus()
if err == nil {
@@ -137,6 +165,12 @@ func (self *ethProtocol) handle() error {
if err := msg.Decode(&txs); err != nil {
return self.protoError(ErrDecode, "msg %v: %v", msg, err)
}
+ for _, tx := range txs {
+ jsonlogger.LogJson(&logger.EthTxReceived{
+ TxHash: ethutil.Bytes2Hex(tx.Hash()),
+ RemoteId: self.peer.ID().String(),
+ })
+ }
self.txPool.AddTransactions(txs)
case GetBlockHashesMsg:
@@ -145,7 +179,6 @@ func (self *ethProtocol) handle() error {
return self.protoError(ErrDecode, "->msg %v: %v", msg, err)
}
- //request.Amount = uint64(math.Min(float64(maxHashes), float64(request.Amount)))
if request.Amount > maxHashes {
request.Amount = maxHashes
}
@@ -153,35 +186,38 @@ func (self *ethProtocol) handle() error {
return p2p.EncodeMsg(self.rw, BlockHashesMsg, ethutil.ByteSliceToInterface(hashes)...)
case BlockHashesMsg:
- // TODO: redo using lazy decode , this way very inefficient on known chains
msgStream := rlp.NewStream(msg.Payload)
- var err error
- var i int
+ if _, err := msgStream.List(); err != nil {
+ return err
+ }
+ var i int
iter := func() (hash []byte, ok bool) {
- hash, err = msgStream.Bytes()
- if err == nil {
- i++
- ok = true
- } else {
- if err != io.EOF {
- self.protoError(ErrDecode, "msg %v: after %v hashes : %v", msg, i, err)
- }
+ hash, err := msgStream.Bytes()
+ if err == rlp.EOL {
+ return nil, false
+ } else if err != nil {
+ self.protoError(ErrDecode, "msg %v: after %v hashes : %v", msg, i, err)
+ return nil, false
}
- return
+ i++
+ return hash, true
}
-
self.blockPool.AddBlockHashes(iter, self.id)
case GetBlocksMsg:
msgStream := rlp.NewStream(msg.Payload)
+ if _, err := msgStream.List(); err != nil {
+ return err
+ }
+
var blocks []interface{}
var i int
for {
i++
var hash []byte
if err := msgStream.Decode(&hash); err != nil {
- if err == io.EOF {
+ if err == rlp.EOL {
break
} else {
return self.protoError(ErrDecode, "msg %v: %v", msg, err)
@@ -191,7 +227,7 @@ func (self *ethProtocol) handle() error {
if block != nil {
blocks = append(blocks, block)
}
- if i == blockHashesBatchSize {
+ if i == maxBlocks {
break
}
}
@@ -199,10 +235,13 @@ func (self *ethProtocol) handle() error {
case BlocksMsg:
msgStream := rlp.NewStream(msg.Payload)
+ if _, err := msgStream.List(); err != nil {
+ return err
+ }
for {
var block types.Block
if err := msgStream.Decode(&block); err != nil {
- if err == io.EOF {
+ if err == rlp.EOL {
break
} else {
return self.protoError(ErrDecode, "msg %v: %v", msg, err)
@@ -214,11 +253,20 @@ func (self *ethProtocol) handle() error {
case NewBlockMsg:
var request newBlockMsgData
if err := msg.Decode(&request); err != nil {
- return self.protoError(ErrDecode, "msg %v: %v", msg, err)
+ return self.protoError(ErrDecode, "%v: %v", msg, err)
}
hash := request.Block.Hash()
+ _, chainHead, _ := self.chainManager.Status()
+
+ jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{
+ BlockHash: ethutil.Bytes2Hex(hash),
+ BlockNumber: request.Block.Number(), // this surely must be zero
+ ChainHeadHash: ethutil.Bytes2Hex(chainHead),
+ BlockPrevHash: ethutil.Bytes2Hex(request.Block.ParentHash()),
+ RemoteId: self.peer.ID().String(),
+ })
// to simplify backend interface adding a new block
- // uses AddPeer followed by AddHashes, AddBlock only if peer is the best peer
+ // uses AddPeer followed by AddBlock only if peer is the best peer
// (or selected as new best peer)
if self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) {
self.blockPool.AddBlock(request.Block, self.id)
@@ -277,7 +325,7 @@ func (self *ethProtocol) handleStatus() error {
_, _, genesisBlock := self.chainManager.Status()
- if bytes.Compare(status.GenesisBlock, genesisBlock) != 0 {
+ if !bytes.Equal(status.GenesisBlock, genesisBlock) {
return self.protoError(ErrGenesisBlockMismatch, "%x (!= %x)", status.GenesisBlock, genesisBlock)
}
@@ -297,8 +345,8 @@ func (self *ethProtocol) handleStatus() error {
}
func (self *ethProtocol) requestBlockHashes(from []byte) error {
- self.peer.Debugf("fetching hashes (%d) %x...\n", blockHashesBatchSize, from[0:4])
- return p2p.EncodeMsg(self.rw, GetBlockHashesMsg, interface{}(from), uint64(blockHashesBatchSize))
+ self.peer.Debugf("fetching hashes (%d) %x...\n", maxHashes, from[0:4])
+ return p2p.EncodeMsg(self.rw, GetBlockHashesMsg, interface{}(from), uint64(maxHashes))
}
func (self *ethProtocol) requestBlocks(hashes [][]byte) error {
@@ -306,26 +354,17 @@ func (self *ethProtocol) requestBlocks(hashes [][]byte) error {
return p2p.EncodeMsg(self.rw, GetBlocksMsg, ethutil.ByteSliceToInterface(hashes)...)
}
-func (self *ethProtocol) protoError(code int, format string, params ...interface{}) (err *protocolError) {
- err = ProtocolError(code, format, params...)
- if err.Fatal() {
- self.peer.Errorln("err %v", err)
- // disconnect
- } else {
- self.peer.Debugf("fyi %v", err)
- }
+func (self *ethProtocol) protoError(code int, format string, params ...interface{}) (err *errs.Error) {
+ err = self.errors.New(code, format, params...)
+ err.Log(self.peer.Logger)
return
}
-func (self *ethProtocol) protoErrorDisconnect(code int, format string, params ...interface{}) {
- err := ProtocolError(code, format, params...)
+func (self *ethProtocol) protoErrorDisconnect(err *errs.Error) {
+ err.Log(self.peer.Logger)
if err.Fatal() {
- self.peer.Errorln("err %v", err)
- // disconnect
- } else {
- self.peer.Debugf("fyi %v", err)
+ self.peer.Disconnect(p2p.DiscSubprotocolError)
}
-
}
func (self *ethProtocol) propagateTxs() {