aboutsummaryrefslogblamecommitdiffstats
path: root/eth/protocol.go
blob: 1999d9807e1a700bc2703fd58d448774a240064a (plain) (tree)
1
2
3
4
5
6
7
8
9


           
             

                  
                                                
                                                    
                                              
                                                
                                             
                                             

 
       
                               


                                             

                                




                             
                           







                         








                                  
                        










                                                                
                                                     

 


                                                    








                                         
 


                                                                          
                       
                                             
                                            


                             


                                                                                      

 
                          
                                                                      
                                                   
                                                                                                                                                                                                              


                                 
                                             





                                   
                          
                     

 







                                   
                                                                       

                                                                       
                                                                                                                              
                            
                               
                                               

                                                                       
                                                                                                                    



                  

                                               
                                                                                                                                                                      
                       
                             






                                                 




                                               
         
















                                                                             
                 
         







                                          
                                                                                               




                                                             
                                
                       
                                                             
 
                   
                                                     

                                                        
                                                                                 
                 

                                                                 
                                                          


                                                                  
                                                



                                                            
                                                                                   
                 
 


                                                  
                                                                                                
                                                                

                            
                                                       


                                                           
 
                         


                                                            
                                           
                                                           

                                                                                                       
                                                           
                         
 

                                         
                 
                                                            

                          
                                                       



                                                           
                                         


                           



                                                      
                                              
                                                                                         
                         
 
                                                                 
                                         

                                                              
                                           
                                     

                         
                                                           

                       
                                                       


                                                           
                     
                                             
                                                                        
                                                   

                                             
                                                                                                 

                                 
                                                                




                                                            
                                                                             
                 
                                            


                                                                    
                                                  
                                                                                          

                                                                        

                                                               
                                                                   
                                                                                  
                                                 
                                                                                                                                                               
                                                                       


                 
                                                                         



                  
                                               
                                                 







                                        
                                  
                                                                                                            
         
                                          
                                                                                               



                                                   
                                                                         

         
                                                        
 
                                                
                                                                                                                

         

                                                                                                            

         

                                                                                                                              

         



                                                                                                                                                               
 
                                                                                                                                                    



                  
                                                                     
                                                                              
                                                                                           

 
                                                                    
                                                           
                                                      

 


                                                                                                       

              
 










                                                                    

                                                                
                        
                                                              
         
 
package eth

import (
    "fmt"
    "math/big"

    "github.com/ethereum/go-ethereum/common"
    "github.com/ethereum/go-ethereum/core/types"
    "github.com/ethereum/go-ethereum/errs"
    "github.com/ethereum/go-ethereum/logger"
    "github.com/ethereum/go-ethereum/p2p"
    "github.com/ethereum/go-ethereum/rlp"
)

const (
    ProtocolVersion    = 58
    NetworkId          = 0
    ProtocolLength     = uint64(8)
    ProtocolMaxMsgSize = 10 * 1024 * 1024
    maxHashes          = 256
    maxBlocks          = 64
)

// eth protocol message codes
const (
    StatusMsg = iota
    GetTxMsg  // unused
    TxMsg
    GetBlockHashesMsg
    BlockHashesMsg
    GetBlocksMsg
    BlocksMsg
    NewBlockMsg
)

const (
    ErrMsgTooLarge = iota
    ErrDecode
    ErrInvalidMsgCode
    ErrProtocolVersionMismatch
    ErrNetworkIdMismatch
    ErrGenesisBlockMismatch
    ErrNoStatusMsg
    ErrExtraStatusMsg
    ErrSuspendedPeer
)

var errorToString = map[int]string{
    ErrMsgTooLarge:             "Message too long",
    ErrDecode:                  "Invalid message",
    ErrInvalidMsgCode:          "Invalid message code",
    ErrProtocolVersionMismatch: "Protocol version mismatch",
    ErrNetworkIdMismatch:       "NetworkId mismatch",
    ErrGenesisBlockMismatch:    "Genesis block mismatch",
    ErrNoStatusMsg:             "No status message",
    ErrExtraStatusMsg:          "Extra status message",
    ErrSuspendedPeer:           "Suspended peer",
}

// ethProtocol represents the ethereum wire protocol
// instance is running on each peer
type ethProtocol struct {
    txPool          txPool
    chainManager    chainManager
    blockPool       blockPool
    peer            *p2p.Peer
    id              string
    rw              p2p.MsgReadWriter
    errors          *errs.Errors
    protocolVersion int
    networkId       int
}

// backend is the interface the ethereum protocol backend should implement
// used as an argument to EthProtocol
type txPool interface {
    AddTransactions([]*types.Transaction)
    GetTransactions() types.Transactions
}

type chainManager interface {
    GetBlockHashesFromHash(hash common.Hash, amount uint64) (hashes []common.Hash)
    GetBlock(hash common.Hash) (block *types.Block)
    Status() (td *big.Int, currentBlock common.Hash, genesisBlock common.Hash)
}

type blockPool interface {
    AddBlockHashes(next func() (common.Hash, bool), peerId string)
    AddBlock(block *types.Block, peerId string)
    AddPeer(td *big.Int, currentBlock common.Hash, peerId string, requestHashes func(common.Hash) error, requestBlocks func([]common.Hash) error, peerError func(*errs.Error)) (best bool, suspended bool)
    RemovePeer(peerId string)
}

// message structs used for RLP serialization
type newBlockMsgData struct {
    Block *types.Block
    TD    *big.Int
}

type getBlockHashesMsgData struct {
    Hash   common.Hash
    Amount uint64
}

type statusMsgData struct {
    ProtocolVersion uint32
    NetworkId       uint32
    TD              *big.Int
    CurrentBlock    common.Hash
    GenesisBlock    common.Hash
}

// main entrypoint, wrappers starting a server running the eth protocol
// use this constructor to attach the protocol ("class") to server caps
// the Dev p2p layer then runs the protocol instance on each peer
func EthProtocol(protocolVersion, networkId int, txPool txPool, chainManager chainManager, blockPool blockPool) p2p.Protocol {
    return p2p.Protocol{
        Name:    "eth",
        Version: uint(protocolVersion),
        Length:  ProtocolLength,
        Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
            return runEthProtocol(protocolVersion, networkId, txPool, chainManager, blockPool, peer, rw)
        },
    }
}

// the main loop that handles incoming messages
// note RemovePeer in the post-disconnect hook
func runEthProtocol(protocolVersion, networkId int, txPool txPool, chainManager chainManager, blockPool blockPool, peer *p2p.Peer, rw p2p.MsgReadWriter) (err error) {
    id := peer.ID()
    self := &ethProtocol{
        txPool:          txPool,
        chainManager:    chainManager,
        blockPool:       blockPool,
        rw:              rw,
        peer:            peer,
        protocolVersion: protocolVersion,
        networkId:       networkId,
        errors: &errs.Errors{
            Package: "ETH",
            Errors:  errorToString,
        },
        id: fmt.Sprintf("%x", id[:8]),
    }

    // handshake.
    if err := self.handleStatus(); err != nil {
        return err
    }
    defer self.blockPool.RemovePeer(self.id)

    // propagate existing transactions. new transactions appearing
    // after this will be sent via broadcasts.
    if err := p2p.Send(rw, TxMsg, txPool.GetTransactions()); err != nil {
        return err
    }

    // main loop. handle incoming messages.
    for {
        if err := self.handle(); err != nil {
            return err
        }
    }
}

func (self *ethProtocol) handle() error {
    msg, err := self.rw.ReadMsg()
    if err != nil {
        return err
    }
    if msg.Size > ProtocolMaxMsgSize {
        return self.protoError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
    }
    // make sure that the payload has been fully consumed
    defer msg.Discard()

    switch msg.Code {
    case GetTxMsg: // ignore
    case StatusMsg:
        return self.protoError(ErrExtraStatusMsg, "")

    case TxMsg:
        // TODO: rework using lazy RLP stream
        var txs []*types.Transaction
        if err := msg.Decode(&txs); err != nil {
            return self.protoError(ErrDecode, "msg %v: %v", msg, err)
        }
        for _, tx := range txs {
            jsonlogger.LogJson(&logger.EthTxReceived{
                TxHash:   tx.Hash().Hex(),
                RemoteId: self.peer.ID().String(),
            })
        }
        self.txPool.AddTransactions(txs)

    case GetBlockHashesMsg:
        var request getBlockHashesMsgData
        if err := msg.Decode(&request); err != nil {
            return self.protoError(ErrDecode, "->msg %v: %v", msg, err)
        }

        if request.Amount > maxHashes {
            request.Amount = maxHashes
        }
        hashes := self.chainManager.GetBlockHashesFromHash(request.Hash, request.Amount)
        return p2p.Send(self.rw, BlockHashesMsg, hashes)

    case BlockHashesMsg:
        msgStream := rlp.NewStream(msg.Payload)
        if _, err := msgStream.List(); err != nil {
            return err
        }

        var i int
        iter := func() (hash common.Hash, ok bool) {
            var h common.Hash
            err := msgStream.Decode(&h)
            if err == rlp.EOL {
                return common.Hash{}, false
            } else if err != nil {
                self.protoError(ErrDecode, "msg %v: after %v hashes : %v", msg, i, err)
                return common.Hash{}, false
            }

            i++
            return hash, true
        }
        self.blockPool.AddBlockHashes(iter, self.id)

    case GetBlocksMsg:
        msgStream := rlp.NewStream(msg.Payload)
        if _, err := msgStream.List(); err != nil {
            return err
        }

        var blocks []*types.Block
        var i int
        for {
            i++
            var hash common.Hash
            err := msgStream.Decode(&hash)
            if err == rlp.EOL {
                break
            } else if err != nil {
                return self.protoError(ErrDecode, "msg %v: %v", msg, err)
            }

            block := self.chainManager.GetBlock(hash)
            if block != nil {
                blocks = append(blocks, block)
            }
            if i == maxBlocks {
                break
            }
        }
        return p2p.Send(self.rw, BlocksMsg, blocks)

    case BlocksMsg:
        msgStream := rlp.NewStream(msg.Payload)
        if _, err := msgStream.List(); err != nil {
            return err
        }
        for {
            var block types.Block
            if err := msgStream.Decode(&block); err != nil {
                if err == rlp.EOL {
                    break
                } else {
                    return self.protoError(ErrDecode, "msg %v: %v", msg, err)
                }
            }
            self.blockPool.AddBlock(&block, self.id)
        }

    case NewBlockMsg:
        var request newBlockMsgData
        if err := msg.Decode(&request); err != nil {
            return self.protoError(ErrDecode, "%v: %v", msg, err)
        }
        hash := request.Block.Hash()
        _, chainHead, _ := self.chainManager.Status()

        jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{
            BlockHash:     hash.Hex(),
            BlockNumber:   request.Block.Number(), // this surely must be zero
            ChainHeadHash: chainHead.Hex(),
            BlockPrevHash: request.Block.ParentHash().Hex(),
            RemoteId:      self.peer.ID().String(),
        })
        // to simplify backend interface adding a new block
        // uses AddPeer followed by AddBlock only if peer is the best peer
        // (or selected as new best peer)
        if best, _ := self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect); best {
            self.blockPool.AddBlock(request.Block, self.id)
        }

    default:
        return self.protoError(ErrInvalidMsgCode, "%v", msg.Code)
    }
    return nil
}

func (self *ethProtocol) handleStatus() error {
    if err := self.sendStatus(); err != nil {
        return err
    }

    // read and handle remote status
    msg, err := self.rw.ReadMsg()
    if err != nil {
        return err
    }
    if msg.Code != StatusMsg {
        return self.protoError(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, StatusMsg)
    }
    if msg.Size > ProtocolMaxMsgSize {
        return self.protoError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
    }

    var status statusMsgData
    if err := msg.Decode(&status); err != nil {
        return self.protoError(ErrDecode, "msg %v: %v", msg, err)
    }

    _, _, genesisBlock := self.chainManager.Status()

    if status.GenesisBlock != genesisBlock {
        return self.protoError(ErrGenesisBlockMismatch, "%x (!= %x)", status.GenesisBlock, genesisBlock)
    }

    if int(status.NetworkId) != self.networkId {
        return self.protoError(ErrNetworkIdMismatch, "%d (!= %d)", status.NetworkId, self.networkId)
    }

    if int(status.ProtocolVersion) != self.protocolVersion {
        return self.protoError(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, self.protocolVersion)
    }

    _, suspended := self.blockPool.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect)
    if suspended {
        return self.protoError(ErrSuspendedPeer, "")
    }

    self.peer.Infof("Peer is [eth] capable (%d/%d). TD=%v H=%x\n", status.ProtocolVersion, status.NetworkId, status.TD, status.CurrentBlock[:4])

    return nil
}

func (self *ethProtocol) requestBlockHashes(from common.Hash) error {
    self.peer.Debugf("fetching hashes (%d) %x...\n", maxHashes, from[0:4])
    return p2p.Send(self.rw, GetBlockHashesMsg, getBlockHashesMsgData{from, maxHashes})
}

func (self *ethProtocol) requestBlocks(hashes []common.Hash) error {
    self.peer.Debugf("fetching %v blocks", len(hashes))
    return p2p.Send(self.rw, GetBlocksMsg, hashes)
}

func (self *ethProtocol) protoError(code int, format string, params ...interface{}) (err *errs.Error) {
    err = self.errors.New(code, format, params...)
    err.Log(self.peer.Logger)
    return
}

func (self *ethProtocol) sendStatus() error {
    td, currentBlock, genesisBlock := self.chainManager.Status()
    return p2p.Send(self.rw, StatusMsg, &statusMsgData{
        ProtocolVersion: uint32(self.protocolVersion),
        NetworkId:       uint32(self.networkId),
        TD:              td,
        CurrentBlock:    currentBlock,
        GenesisBlock:    genesisBlock,
    })
}

func (self *ethProtocol) protoErrorDisconnect(err *errs.Error) {
    err.Log(self.peer.Logger)
    if err.Fatal() {
        self.peer.Disconnect(p2p.DiscSubprotocolError)
    }
}