aboutsummaryrefslogtreecommitdiffstats
path: root/eth/handler.go
diff options
context:
space:
mode:
authorobscuren <geffobscura@gmail.com>2015-04-18 07:11:09 +0800
committerobscuren <geffobscura@gmail.com>2015-04-18 07:11:09 +0800
commitc2f410214c99ee3636cfb670e84e5f05d179a1ef (patch)
tree79e4f6a2150e62bafc3f5a2d082ae9b38e1de08e /eth/handler.go
parent2339ee9910c10fa498f4a4ebc25f6a780ccf78e9 (diff)
downloadgo-tangerine-c2f410214c99ee3636cfb670e84e5f05d179a1ef.tar
go-tangerine-c2f410214c99ee3636cfb670e84e5f05d179a1ef.tar.gz
go-tangerine-c2f410214c99ee3636cfb670e84e5f05d179a1ef.tar.bz2
go-tangerine-c2f410214c99ee3636cfb670e84e5f05d179a1ef.tar.lz
go-tangerine-c2f410214c99ee3636cfb670e84e5f05d179a1ef.tar.xz
go-tangerine-c2f410214c99ee3636cfb670e84e5f05d179a1ef.tar.zst
go-tangerine-c2f410214c99ee3636cfb670e84e5f05d179a1ef.zip
eth: began split up of peers and protocol manager
Diffstat (limited to 'eth/handler.go')
-rw-r--r--eth/handler.go224
1 files changed, 224 insertions, 0 deletions
diff --git a/eth/handler.go b/eth/handler.go
new file mode 100644
index 000000000..b3890d365
--- /dev/null
+++ b/eth/handler.go
@@ -0,0 +1,224 @@
+package eth
+
+import (
+ "fmt"
+ "sync"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/eth/downloader"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/rlp"
+)
+
+func errResp(code errCode, format string, v ...interface{}) error {
+ return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
+}
+
+// main entrypoint, wrappers starting a server running the eth protocol
+// use this constructor to attach the protocol ("class") to server caps
+// the Dev p2p layer then runs the protocol instance on each peer
+func EthProtocol(protocolVersion, networkId int, txPool txPool, chainManager chainManager, downloader *downloader.Downloader) p2p.Protocol {
+ protocol := newProtocolManager(txPool, chainManager, downloader)
+
+ return p2p.Protocol{
+ Name: "eth",
+ Version: uint(protocolVersion),
+ Length: ProtocolLength,
+ Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
+ //return runEthProtocol(protocolVersion, networkId, txPool, chainManager, downloader, p, rw)
+ peer := protocol.newPeer(protocolVersion, networkId, p, rw)
+ err := protocol.handle(peer)
+ glog.V(logger.Detail).Infof("[%s]: %v\n", peer.id, err)
+
+ return err
+ },
+ }
+}
+
+type hashFetcherFn func(common.Hash) error
+type blockFetcherFn func([]common.Hash) error
+
+// extProt is an interface which is passed around so we can expose GetHashes and GetBlock without exposing it to the rest of the protocol
+// extProt is passed around to peers which require to GetHashes and GetBlocks
+type extProt struct {
+ getHashes hashFetcherFn
+ getBlocks blockFetcherFn
+}
+
+func (ep extProt) GetHashes(hash common.Hash) error { return ep.getHashes(hash) }
+func (ep extProt) GetBlock(hashes []common.Hash) error { return ep.getBlocks(hashes) }
+
+type EthProtocolManager struct {
+ protVer, netId int
+ txpool txPool
+ chainman chainManager
+ downloader *downloader.Downloader
+
+ pmu sync.Mutex
+ peers map[string]*peer
+}
+
+func newProtocolManager(txpool txPool, chainman chainManager, downloader *downloader.Downloader) *EthProtocolManager {
+ return &EthProtocolManager{
+ txpool: txpool,
+ chainman: chainman,
+ downloader: downloader,
+ peers: make(map[string]*peer),
+ }
+}
+
+func (pm *EthProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
+ pm.pmu.Lock()
+ defer pm.pmu.Unlock()
+
+ td, current, genesis := pm.chainman.Status()
+
+ peer := newPeer(pv, nv, genesis, current, td, p, rw)
+ pm.peers[peer.id] = peer
+
+ return peer
+}
+
+func (pm *EthProtocolManager) handle(p *peer) error {
+ if err := p.handleStatus(); err != nil {
+ return err
+ }
+
+ pm.downloader.RegisterPeer(p.id, p.td, p.currentHash, p.requestHashes, p.requestBlocks)
+ defer pm.downloader.UnregisterPeer(p.id)
+
+ // propagate existing transactions. new transactions appearing
+ // after this will be sent via broadcasts.
+ if err := p.sendTransactions(pm.txpool.GetTransactions()); err != nil {
+ return err
+ }
+
+ // main loop. handle incoming messages.
+ for {
+ if err := pm.handleMsg(p); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (self *EthProtocolManager) handleMsg(p *peer) error {
+ msg, err := p.rw.ReadMsg()
+ if err != nil {
+ return err
+ }
+ if msg.Size > ProtocolMaxMsgSize {
+ return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
+ }
+ // make sure that the payload has been fully consumed
+ defer msg.Discard()
+
+ switch msg.Code {
+ case GetTxMsg: // ignore
+ case StatusMsg:
+ return errResp(ErrExtraStatusMsg, "uncontrolled status message")
+
+ case TxMsg:
+ // TODO: rework using lazy RLP stream
+ var txs []*types.Transaction
+ if err := msg.Decode(&txs); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ for i, tx := range txs {
+ if tx == nil {
+ return errResp(ErrDecode, "transaction %d is nil", i)
+ }
+ jsonlogger.LogJson(&logger.EthTxReceived{
+ TxHash: tx.Hash().Hex(),
+ RemoteId: p.ID().String(),
+ })
+ }
+ self.txpool.AddTransactions(txs)
+
+ case GetBlockHashesMsg:
+ var request getBlockHashesMsgData
+ if err := msg.Decode(&request); err != nil {
+ return errResp(ErrDecode, "->msg %v: %v", msg, err)
+ }
+
+ if request.Amount > maxHashes {
+ request.Amount = maxHashes
+ }
+ hashes := self.chainman.GetBlockHashesFromHash(request.Hash, request.Amount)
+ return p.sendBlockHashes(hashes)
+ case BlockHashesMsg:
+ msgStream := rlp.NewStream(msg.Payload)
+
+ var hashes []common.Hash
+ if err := msgStream.Decode(&hashes); err != nil {
+ break
+ }
+ self.downloader.HashCh <- hashes
+
+ case GetBlocksMsg:
+ msgStream := rlp.NewStream(msg.Payload)
+ if _, err := msgStream.List(); err != nil {
+ return err
+ }
+
+ var blocks []*types.Block
+ var i int
+ for {
+ i++
+ var hash common.Hash
+ err := msgStream.Decode(&hash)
+ if err == rlp.EOL {
+ break
+ } else if err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+
+ block := self.chainman.GetBlock(hash)
+ if block != nil {
+ blocks = append(blocks, block)
+ }
+ if i == maxBlocks {
+ break
+ }
+ }
+ return p.sendBlocks(blocks)
+ case BlocksMsg:
+ msgStream := rlp.NewStream(msg.Payload)
+
+ var blocks []*types.Block
+ if err := msgStream.Decode(&blocks); err != nil {
+ glog.V(logger.Detail).Infoln("Decode error", err)
+ fmt.Println("decode error", err)
+ blocks = nil
+ }
+ self.downloader.DeliverChunk(p.id, blocks)
+
+ case NewBlockMsg:
+ var request newBlockMsgData
+ if err := msg.Decode(&request); err != nil {
+ return errResp(ErrDecode, "%v: %v", msg, err)
+ }
+ if err := request.Block.ValidateFields(); err != nil {
+ return errResp(ErrDecode, "block validation %v: %v", msg, err)
+ }
+ hash := request.Block.Hash()
+ _, chainHead, _ := self.chainman.Status()
+
+ jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{
+ BlockHash: hash.Hex(),
+ BlockNumber: request.Block.Number(), // this surely must be zero
+ ChainHeadHash: chainHead.Hex(),
+ BlockPrevHash: request.Block.ParentHash().Hex(),
+ RemoteId: p.ID().String(),
+ })
+ self.downloader.AddBlock(p.id, request.Block, request.TD)
+
+ default:
+ return errResp(ErrInvalidMsgCode, "%v", msg.Code)
+ }
+ return nil
+}