aboutsummaryrefslogtreecommitdiffstats
path: root/eth
diff options
context:
space:
mode:
authorobscuren <geffobscura@gmail.com>2015-01-02 17:30:27 +0800
committerobscuren <geffobscura@gmail.com>2015-01-02 17:30:27 +0800
commit0fb1bcd32192b8bf05a328b955a08da4cefe0180 (patch)
tree67460b927eb41b2876e8e6b7eb9dece494dbd088 /eth
parent8da07e91e40c1d1bb43763b7e959ae92e5770af2 (diff)
parenta4dc12f12c7a06f5e28d5b1e760249875ef7a8c5 (diff)
downloadgo-tangerine-0fb1bcd32192b8bf05a328b955a08da4cefe0180.tar
go-tangerine-0fb1bcd32192b8bf05a328b955a08da4cefe0180.tar.gz
go-tangerine-0fb1bcd32192b8bf05a328b955a08da4cefe0180.tar.bz2
go-tangerine-0fb1bcd32192b8bf05a328b955a08da4cefe0180.tar.lz
go-tangerine-0fb1bcd32192b8bf05a328b955a08da4cefe0180.tar.xz
go-tangerine-0fb1bcd32192b8bf05a328b955a08da4cefe0180.tar.zst
go-tangerine-0fb1bcd32192b8bf05a328b955a08da4cefe0180.zip
Merge branch 'poc8' into docbranch
Diffstat (limited to 'eth')
-rw-r--r--eth/backend.go249
-rw-r--r--eth/block_pool.go1015
-rw-r--r--eth/block_pool_test.go198
-rw-r--r--eth/error.go71
-rw-r--r--eth/peer_util.go23
-rw-r--r--eth/protocol.go319
-rw-r--r--eth/protocol_test.go232
7 files changed, 2107 insertions, 0 deletions
diff --git a/eth/backend.go b/eth/backend.go
new file mode 100644
index 000000000..383cda46f
--- /dev/null
+++ b/eth/backend.go
@@ -0,0 +1,249 @@
+package eth
+
+import (
+ "net"
+ "sync"
+
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/ethutil"
+ "github.com/ethereum/go-ethereum/event"
+ ethlogger "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/pow/ezp"
+ "github.com/ethereum/go-ethereum/rpc"
+ "github.com/ethereum/go-ethereum/whisper"
+)
+
+const (
+ seedNodeAddress = "poc-7.ethdev.com:30300"
+)
+
+var logger = ethlogger.NewLogger("SERV")
+
+type Ethereum struct {
+ // Channel for shutting down the ethereum
+ shutdownChan chan bool
+ quit chan bool
+
+ // DB interface
+ db ethutil.Database
+ blacklist p2p.Blacklist
+
+ //*** SERVICES ***
+ // State manager for processing new blocks and managing the over all states
+ blockManager *core.BlockManager
+ txPool *core.TxPool
+ chainManager *core.ChainManager
+ blockPool *BlockPool
+ whisper *whisper.Whisper
+
+ server *p2p.Server
+ eventMux *event.TypeMux
+ txSub event.Subscription
+ blockSub event.Subscription
+
+ RpcServer *rpc.JsonRpcServer
+ keyManager *crypto.KeyManager
+
+ clientIdentity p2p.ClientIdentity
+
+ synclock sync.Mutex
+ syncGroup sync.WaitGroup
+
+ Mining bool
+}
+
+func New(db ethutil.Database, identity p2p.ClientIdentity, keyManager *crypto.KeyManager, nat p2p.NAT, port string, maxPeers int) (*Ethereum, error) {
+
+ saveProtocolVersion(db)
+ ethutil.Config.Db = db
+
+ eth := &Ethereum{
+ shutdownChan: make(chan bool),
+ quit: make(chan bool),
+ db: db,
+ keyManager: keyManager,
+ clientIdentity: identity,
+ blacklist: p2p.NewBlacklist(),
+ eventMux: &event.TypeMux{},
+ }
+
+ eth.chainManager = core.NewChainManager(eth.EventMux())
+ eth.txPool = core.NewTxPool(eth.chainManager, eth.EventMux())
+ eth.blockManager = core.NewBlockManager(eth.txPool, eth.chainManager, eth.EventMux())
+ eth.chainManager.SetProcessor(eth.blockManager)
+ eth.whisper = whisper.New()
+
+ hasBlock := eth.chainManager.HasBlock
+ insertChain := eth.chainManager.InsertChain
+ eth.blockPool = NewBlockPool(hasBlock, insertChain, ezp.Verify)
+
+ // Start services
+ eth.txPool.Start()
+
+ ethProto := EthProtocol(eth.txPool, eth.chainManager, eth.blockPool)
+ protocols := []p2p.Protocol{ethProto, eth.whisper.Protocol()}
+
+ server := &p2p.Server{
+ Identity: identity,
+ MaxPeers: maxPeers,
+ Protocols: protocols,
+ ListenAddr: ":" + port,
+ Blacklist: eth.blacklist,
+ NAT: nat,
+ }
+
+ eth.server = server
+
+ return eth, nil
+}
+
+func (s *Ethereum) KeyManager() *crypto.KeyManager {
+ return s.keyManager
+}
+
+func (s *Ethereum) ClientIdentity() p2p.ClientIdentity {
+ return s.clientIdentity
+}
+
+func (s *Ethereum) ChainManager() *core.ChainManager {
+ return s.chainManager
+}
+
+func (s *Ethereum) BlockManager() *core.BlockManager {
+ return s.blockManager
+}
+
+func (s *Ethereum) TxPool() *core.TxPool {
+ return s.txPool
+}
+
+func (s *Ethereum) BlockPool() *BlockPool {
+ return s.blockPool
+}
+
+func (s *Ethereum) Whisper() *whisper.Whisper {
+ return s.whisper
+}
+
+func (s *Ethereum) EventMux() *event.TypeMux {
+ return s.eventMux
+}
+func (self *Ethereum) Db() ethutil.Database {
+ return self.db
+}
+
+func (s *Ethereum) IsMining() bool {
+ return s.Mining
+}
+
+func (s *Ethereum) IsListening() bool {
+ // XXX TODO
+ return false
+}
+
+func (s *Ethereum) PeerCount() int {
+ return s.server.PeerCount()
+}
+
+func (s *Ethereum) Peers() []*p2p.Peer {
+ return s.server.Peers()
+}
+
+func (s *Ethereum) MaxPeers() int {
+ return s.server.MaxPeers
+}
+
+// Start the ethereum
+func (s *Ethereum) Start(seed bool) error {
+ err := s.server.Start()
+ if err != nil {
+ return err
+ }
+ s.blockPool.Start()
+ s.whisper.Start()
+
+ // broadcast transactions
+ s.txSub = s.eventMux.Subscribe(core.TxPreEvent{})
+ go s.txBroadcastLoop()
+
+ // broadcast mined blocks
+ s.blockSub = s.eventMux.Subscribe(core.NewMinedBlockEvent{})
+ go s.blockBroadcastLoop()
+
+ // TODO: read peers here
+ if seed {
+ logger.Infof("Connect to seed node %v", seedNodeAddress)
+ if err := s.SuggestPeer(seedNodeAddress); err != nil {
+ return err
+ }
+ }
+
+ logger.Infoln("Server started")
+ return nil
+}
+
+func (self *Ethereum) SuggestPeer(addr string) error {
+ netaddr, err := net.ResolveTCPAddr("tcp", addr)
+ if err != nil {
+ logger.Errorf("couldn't resolve %s:", addr, err)
+ return err
+ }
+
+ self.server.SuggestPeer(netaddr.IP, netaddr.Port, nil)
+ return nil
+}
+
+func (s *Ethereum) Stop() {
+ // Close the database
+ defer s.db.Close()
+
+ close(s.quit)
+
+ s.txSub.Unsubscribe() // quits txBroadcastLoop
+ s.blockSub.Unsubscribe() // quits blockBroadcastLoop
+
+ if s.RpcServer != nil {
+ s.RpcServer.Stop()
+ }
+ s.txPool.Stop()
+ s.eventMux.Stop()
+ s.blockPool.Stop()
+ s.whisper.Stop()
+
+ logger.Infoln("Server stopped")
+ close(s.shutdownChan)
+}
+
+// This function will wait for a shutdown and resumes main thread execution
+func (s *Ethereum) WaitForShutdown() {
+ <-s.shutdownChan
+}
+
+// now tx broadcasting is taken out of txPool
+// handled here via subscription, efficiency?
+func (self *Ethereum) txBroadcastLoop() {
+ // automatically stops if unsubscribe
+ for obj := range self.txSub.Chan() {
+ event := obj.(core.TxPreEvent)
+ self.server.Broadcast("eth", TxMsg, []interface{}{event.Tx.RlpData()})
+ }
+}
+
+func (self *Ethereum) blockBroadcastLoop() {
+ // automatically stops if unsubscribe
+ for obj := range self.txSub.Chan() {
+ event := obj.(core.NewMinedBlockEvent)
+ self.server.Broadcast("eth", NewBlockMsg, event.Block.RlpData())
+ }
+}
+
+func saveProtocolVersion(db ethutil.Database) {
+ d, _ := db.Get([]byte("ProtocolVersion"))
+ protocolVersion := ethutil.NewValue(d).Uint()
+
+ if protocolVersion == 0 {
+ db.Put([]byte("ProtocolVersion"), ethutil.NewValue(ProtocolVersion).Bytes())
+ }
+}
diff --git a/eth/block_pool.go b/eth/block_pool.go
new file mode 100644
index 000000000..7cfbc63f8
--- /dev/null
+++ b/eth/block_pool.go
@@ -0,0 +1,1015 @@
+package eth
+
+import (
+ "math"
+ "math/big"
+ "math/rand"
+ "sort"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/ethutil"
+ ethlogger "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/pow"
+)
+
+var poolLogger = ethlogger.NewLogger("Blockpool")
+
+const (
+ blockHashesBatchSize = 256
+ blockBatchSize = 64
+ blocksRequestInterval = 10 // seconds
+ blocksRequestRepetition = 1
+ blockHashesRequestInterval = 10 // seconds
+ blocksRequestMaxIdleRounds = 10
+ cacheTimeout = 3 // minutes
+ blockTimeout = 5 // minutes
+)
+
+type poolNode struct {
+ lock sync.RWMutex
+ hash []byte
+ block *types.Block
+ child *poolNode
+ parent *poolNode
+ section *section
+ knownParent bool
+ peer string
+ source string
+ complete bool
+}
+
+type BlockPool struct {
+ lock sync.RWMutex
+ pool map[string]*poolNode
+
+ peersLock sync.RWMutex
+ peers map[string]*peerInfo
+ peer *peerInfo
+
+ quit chan bool
+ wg sync.WaitGroup
+ running bool
+
+ // the minimal interface with blockchain
+ hasBlock func(hash []byte) bool
+ insertChain func(types.Blocks) error
+ verifyPoW func(pow.Block) bool
+}
+
+type peerInfo struct {
+ lock sync.RWMutex
+
+ td *big.Int
+ currentBlock []byte
+ id string
+
+ requestBlockHashes func([]byte) error
+ requestBlocks func([][]byte) error
+ peerError func(int, string, ...interface{})
+
+ sections map[string]*section
+ roots []*poolNode
+ quitC chan bool
+}
+
+func NewBlockPool(hasBlock func(hash []byte) bool, insertChain func(types.Blocks) error, verifyPoW func(pow.Block) bool,
+) *BlockPool {
+ return &BlockPool{
+ hasBlock: hasBlock,
+ insertChain: insertChain,
+ verifyPoW: verifyPoW,
+ }
+}
+
+// allows restart
+func (self *BlockPool) Start() {
+ self.lock.Lock()
+ if self.running {
+ self.lock.Unlock()
+ return
+ }
+ self.running = true
+ self.quit = make(chan bool)
+ self.pool = make(map[string]*poolNode)
+ self.lock.Unlock()
+
+ self.peersLock.Lock()
+ self.peers = make(map[string]*peerInfo)
+ self.peersLock.Unlock()
+
+ poolLogger.Infoln("Started")
+
+}
+
+func (self *BlockPool) Stop() {
+ self.lock.Lock()
+ if !self.running {
+ self.lock.Unlock()
+ return
+ }
+ self.running = false
+ self.lock.Unlock()
+
+ poolLogger.Infoln("Stopping")
+
+ close(self.quit)
+ self.lock.Lock()
+ self.peersLock.Lock()
+ self.peers = nil
+ self.pool = nil
+ self.peer = nil
+ self.wg.Wait()
+ self.lock.Unlock()
+ self.peersLock.Unlock()
+ poolLogger.Infoln("Stopped")
+
+}
+
+// AddPeer is called by the eth protocol instance running on the peer after
+// the status message has been received with total difficulty and current block hash
+// AddPeer can only be used once, RemovePeer needs to be called when the peer disconnects
+func (self *BlockPool) AddPeer(td *big.Int, currentBlock []byte, peerId string, requestBlockHashes func([]byte) error, requestBlocks func([][]byte) error, peerError func(int, string, ...interface{})) bool {
+ self.peersLock.Lock()
+ defer self.peersLock.Unlock()
+ if self.peers[peerId] != nil {
+ panic("peer already added")
+ }
+ peer := &peerInfo{
+ td: td,
+ currentBlock: currentBlock,
+ id: peerId, //peer.Identity().Pubkey()
+ requestBlockHashes: requestBlockHashes,
+ requestBlocks: requestBlocks,
+ peerError: peerError,
+ }
+ self.peers[peerId] = peer
+ poolLogger.Debugf("add new peer %v with td %v", peerId, td)
+ currentTD := ethutil.Big0
+ if self.peer != nil {
+ currentTD = self.peer.td
+ }
+ if td.Cmp(currentTD) > 0 {
+ self.peer.stop(peer)
+ peer.start(self.peer)
+ poolLogger.Debugf("peer %v promoted to best peer", peerId)
+ self.peer = peer
+ return true
+ }
+ return false
+}
+
+// RemovePeer is called by the eth protocol when the peer disconnects
+func (self *BlockPool) RemovePeer(peerId string) {
+ self.peersLock.Lock()
+ defer self.peersLock.Unlock()
+ peer := self.peers[peerId]
+ if peer == nil {
+ return
+ }
+ self.peers[peerId] = nil
+ poolLogger.Debugf("remove peer %v", peerId[0:4])
+
+ // if current best peer is removed, need find a better one
+ if self.peer != nil && peerId == self.peer.id {
+ var newPeer *peerInfo
+ max := ethutil.Big0
+ // peer with the highest self-acclaimed TD is chosen
+ for _, info := range self.peers {
+ if info.td.Cmp(max) > 0 {
+ max = info.td
+ newPeer = info
+ }
+ }
+ self.peer.stop(peer)
+ peer.start(self.peer)
+ if newPeer != nil {
+ poolLogger.Debugf("peer %v with td %v promoted to best peer", newPeer.id[0:4], newPeer.td)
+ } else {
+ poolLogger.Warnln("no peers left")
+ }
+ }
+}
+
+// Entry point for eth protocol to add block hashes received via BlockHashesMsg
+// only hashes from the best peer is handled
+// this method is always responsible to initiate further hash requests until
+// a known parent is reached unless cancelled by a peerChange event
+// this process also launches all request processes on each chain section
+// this function needs to run asynchronously for one peer since the message is discarded???
+func (self *BlockPool) AddBlockHashes(next func() ([]byte, bool), peerId string) {
+
+ // check if this peer is the best
+ peer, best := self.getPeer(peerId)
+ if !best {
+ return
+ }
+ // peer is still the best
+
+ var child *poolNode
+ var depth int
+
+ // iterate using next (rlp stream lazy decoder) feeding hashesC
+ self.wg.Add(1)
+ go func() {
+ for {
+ select {
+ case <-self.quit:
+ return
+ case <-peer.quitC:
+ // if the peer is demoted, no more hashes taken
+ break
+ default:
+ hash, ok := next()
+ if !ok {
+ // message consumed chain skeleton built
+ break
+ }
+ // check if known block connecting the downloaded chain to our blockchain
+ if self.hasBlock(hash) {
+ poolLogger.Infof("known block (%x...)\n", hash[0:4])
+ if child != nil {
+ child.Lock()
+ // mark child as absolute pool root with parent known to blockchain
+ child.knownParent = true
+ child.Unlock()
+ }
+ break
+ }
+ //
+ var parent *poolNode
+ // look up node in pool
+ parent = self.get(hash)
+ if parent != nil {
+ // reached a known chain in the pool
+ // request blocks on the newly added part of the chain
+ if child != nil {
+ self.link(parent, child)
+
+ // activate the current chain
+ self.activateChain(parent, peer, true)
+ poolLogger.Debugf("potential chain of %v blocks added, reached blockpool, activate chain", depth)
+ break
+ }
+ // if this is the first hash, we expect to find it
+ parent.RLock()
+ grandParent := parent.parent
+ parent.RUnlock()
+ if grandParent != nil {
+ // activate the current chain
+ self.activateChain(parent, peer, true)
+ poolLogger.Debugf("block hash found, activate chain")
+ break
+ }
+ // the first node is the root of a chain in the pool, rejoice and continue
+ }
+ // if node does not exist, create it and index in the pool
+ section := &section{}
+ if child == nil {
+ section.top = parent
+ }
+ parent = &poolNode{
+ hash: hash,
+ child: child,
+ section: section,
+ peer: peerId,
+ }
+ self.set(hash, parent)
+ poolLogger.Debugf("create potential block for %x...", hash[0:4])
+
+ depth++
+ child = parent
+ }
+ }
+ if child != nil {
+ poolLogger.Debugf("chain of %v hashes added", depth)
+ // start a processSection on the last node, but switch off asking
+ // hashes and blocks until next peer confirms this chain
+ section := self.processSection(child)
+ peer.addSection(child.hash, section)
+ section.start()
+ }
+ }()
+}
+
+// AddBlock is the entry point for the eth protocol when blockmsg is received upon requests
+// It has a strict interpretation of the protocol in that if the block received has not been requested, it results in an error (which can be ignored)
+// block is checked for PoW
+// only the first PoW-valid block for a hash is considered legit
+func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
+ hash := block.Hash()
+ node := self.get(hash)
+ node.RLock()
+ b := node.block
+ node.RUnlock()
+ if b != nil {
+ return
+ }
+ if node == nil && !self.hasBlock(hash) {
+ self.peerError(peerId, ErrUnrequestedBlock, "%x", hash)
+ return
+ }
+ // validate block for PoW
+ if !self.verifyPoW(block) {
+ self.peerError(peerId, ErrInvalidPoW, "%x", hash)
+ }
+ node.Lock()
+ node.block = block
+ node.source = peerId
+ node.Unlock()
+}
+
+// iterates down a known poolchain and activates fetching processes
+// on each chain section for the peer
+// stops if the peer is demoted
+// registers last section root as root for the peer (in case peer is promoted a second time, to remember)
+func (self *BlockPool) activateChain(node *poolNode, peer *peerInfo, on bool) {
+ self.wg.Add(1)
+ go func() {
+ for {
+ node.sectionRLock()
+ bottom := node.section.bottom
+ if bottom == nil { // the chain section is being created or killed
+ break
+ }
+ // register this section with the peer
+ if peer != nil {
+ peer.addSection(bottom.hash, bottom.section)
+ if on {
+ bottom.section.start()
+ } else {
+ bottom.section.start()
+ }
+ }
+ if bottom.parent == nil {
+ node = bottom
+ break
+ }
+ // if peer demoted stop activation
+ select {
+ case <-peer.quitC:
+ break
+ default:
+ }
+
+ node = bottom.parent
+ bottom.sectionRUnlock()
+ }
+ // remember root for this peer
+ peer.addRoot(node)
+ self.wg.Done()
+ }()
+}
+
+// main worker thread on each section in the poolchain
+// - kills the section if there are blocks missing after an absolute time
+// - kills the section if there are maxIdleRounds of idle rounds of block requests with no response
+// - periodically polls the chain section for missing blocks which are then requested from peers
+// - registers the process controller on the peer so that if the peer is promoted as best peer the second time (after a disconnect of a better one), all active processes are switched back on unless they expire and killed ()
+// - when turned off (if peer disconnects and new peer connects with alternative chain), no blockrequests are made but absolute expiry timer is ticking
+// - when turned back on it recursively calls itself on the root of the next chain section
+// - when exits, signals to
+func (self *BlockPool) processSection(node *poolNode) *section {
+ // absolute time after which sub-chain is killed if not complete (some blocks are missing)
+ suicideTimer := time.After(blockTimeout * time.Minute)
+ var blocksRequestTimer, blockHashesRequestTimer <-chan time.Time
+ var nodeC, missingC, processC chan *poolNode
+ controlC := make(chan bool)
+ resetC := make(chan bool)
+ var hashes [][]byte
+ var i, total, missing, lastMissing, depth int
+ var blockHashesRequests, blocksRequests int
+ var idle int
+ var init, alarm, done, same, running, once bool
+ orignode := node
+ hash := node.hash
+
+ node.sectionLock()
+ defer node.sectionUnlock()
+ section := &section{controlC: controlC, resetC: resetC}
+ node.section = section
+
+ go func() {
+ self.wg.Add(1)
+ for {
+ node.sectionRLock()
+ controlC = node.section.controlC
+ node.sectionRUnlock()
+
+ if init {
+ // missing blocks read from nodeC
+ // initialized section
+ if depth == 0 {
+ break
+ }
+ // enable select case to read missing block when ready
+ processC = missingC
+ missingC = make(chan *poolNode, lastMissing)
+ nodeC = nil
+ // only do once
+ init = false
+ } else {
+ if !once {
+ missingC = nil
+ processC = nil
+ i = 0
+ total = 0
+ lastMissing = 0
+ }
+ }
+
+ // went through all blocks in section
+ if i != 0 && i == lastMissing {
+ if len(hashes) > 0 {
+ // send block requests to peers
+ self.requestBlocks(blocksRequests, hashes)
+ }
+ blocksRequests++
+ poolLogger.Debugf("[%x] block request attempt %v: missing %v/%v/%v", hash[0:4], blocksRequests, missing, total, depth)
+ if missing == lastMissing {
+ // idle round
+ if same {
+ // more than once
+ idle++
+ // too many idle rounds
+ if idle > blocksRequestMaxIdleRounds {
+ poolLogger.Debugf("[%x] block requests had %v idle rounds (%v total attempts): missing %v/%v/%v\ngiving up...", hash[0:4], idle, blocksRequests, missing, total, depth)
+ self.killChain(node, nil)
+ break
+ }
+ } else {
+ idle = 0
+ }
+ same = true
+ } else {
+ if missing == 0 {
+ // no missing nodes
+ poolLogger.Debugf("block request process complete on section %x... (%v total blocksRequests): missing %v/%v/%v", hash[0:4], blockHashesRequests, blocksRequests, missing, total, depth)
+ node.Lock()
+ orignode.complete = true
+ node.Unlock()
+ blocksRequestTimer = nil
+ if blockHashesRequestTimer == nil {
+ // not waiting for hashes any more
+ poolLogger.Debugf("hash request on root %x... successful (%v total attempts)\nquitting...", hash[0:4], blockHashesRequests)
+ break
+ } // otherwise suicide if no hashes coming
+ }
+ same = false
+ }
+ lastMissing = missing
+ i = 0
+ missing = 0
+ // ready for next round
+ done = true
+ }
+ if done && alarm {
+ poolLogger.Debugf("start checking if new blocks arrived (attempt %v): missing %v/%v/%v", blocksRequests, missing, total, depth)
+ blocksRequestTimer = time.After(blocksRequestInterval * time.Second)
+ alarm = false
+ done = false
+ // processC supposed to be empty and never closed so just swap, no need to allocate
+ tempC := processC
+ processC = missingC
+ missingC = tempC
+ }
+ select {
+ case <-self.quit:
+ break
+ case <-suicideTimer:
+ self.killChain(node, nil)
+ poolLogger.Warnf("[%x] timeout. (%v total attempts): missing %v/%v/%v", hash[0:4], blocksRequests, missing, total, depth)
+ break
+ case <-blocksRequestTimer:
+ alarm = true
+ case <-blockHashesRequestTimer:
+ orignode.RLock()
+ parent := orignode.parent
+ orignode.RUnlock()
+ if parent != nil {
+ // if not root of chain, switch off
+ poolLogger.Debugf("[%x] parent found, hash requests deactivated (after %v total attempts)\n", hash[0:4], blockHashesRequests)
+ blockHashesRequestTimer = nil
+ } else {
+ blockHashesRequests++
+ poolLogger.Debugf("[%x] hash request on root (%v total attempts)\n", hash[0:4], blockHashesRequests)
+ self.requestBlockHashes(parent.hash)
+ blockHashesRequestTimer = time.After(blockHashesRequestInterval * time.Second)
+ }
+ case r, ok := <-controlC:
+ if !ok {
+ break
+ }
+ if running && !r {
+ poolLogger.Debugf("process on section %x... (%v total attempts): missing %v/%v/%v", hash[0:4], blocksRequests, missing, total, depth)
+
+ alarm = false
+ blocksRequestTimer = nil
+ blockHashesRequestTimer = nil
+ processC = nil
+ }
+ if !running && r {
+ poolLogger.Debugf("[%x] on", hash[0:4])
+
+ orignode.RLock()
+ parent := orignode.parent
+ complete := orignode.complete
+ knownParent := orignode.knownParent
+ orignode.RUnlock()
+ if !complete {
+ poolLogger.Debugf("[%x] activate block requests", hash[0:4])
+ blocksRequestTimer = time.After(0)
+ }
+ if parent == nil && !knownParent {
+ // if no parent but not connected to blockchain
+ poolLogger.Debugf("[%x] activate block hashes requests", hash[0:4])
+ blockHashesRequestTimer = time.After(0)
+ } else {
+ blockHashesRequestTimer = nil
+ }
+ alarm = true
+ processC = missingC
+ if !once {
+ // if not run at least once fully, launch iterator
+ processC = make(chan *poolNode)
+ missingC = make(chan *poolNode)
+ self.foldUp(orignode, processC)
+ once = true
+ }
+ }
+ total = lastMissing
+ case <-resetC:
+ once = false
+ init = false
+ done = false
+ case node, ok := <-processC:
+ if !ok {
+ // channel closed, first iteration finished
+ init = true
+ once = true
+ continue
+ }
+ i++
+ // if node has no block
+ node.RLock()
+ block := node.block
+ nhash := node.hash
+ knownParent := node.knownParent
+ node.RUnlock()
+ if !init {
+ depth++
+ }
+ if block == nil {
+ missing++
+ if !init {
+ total++
+ }
+ hashes = append(hashes, nhash)
+ if len(hashes) == blockBatchSize {
+ self.requestBlocks(blocksRequests, hashes)
+ hashes = nil
+ }
+ missingC <- node
+ } else {
+ // block is found
+ if knownParent {
+ // connected to the blockchain, insert the longest chain of blocks
+ var blocks types.Blocks
+ child := node
+ parent := node
+ node.sectionRLock()
+ for child != nil && child.block != nil {
+ parent = child
+ blocks = append(blocks, parent.block)
+ child = parent.child
+ }
+ node.sectionRUnlock()
+ poolLogger.Debugf("[%x] insert %v blocks into blockchain", hash[0:4], len(blocks))
+ if err := self.insertChain(blocks); err != nil {
+ // TODO: not clear which peer we need to address
+ // peerError should dispatch to peer if still connected and disconnect
+ self.peerError(node.source, ErrInvalidBlock, "%v", err)
+ poolLogger.Debugf("invalid block %v", node.hash)
+ poolLogger.Debugf("penalise peers %v (hash), %v (block)", node.peer, node.source)
+ // penalise peer in node.source
+ self.killChain(node, nil)
+ // self.disconnect()
+ break
+ }
+ // if suceeded mark the next one (no block yet) as connected to blockchain
+ if child != nil {
+ child.Lock()
+ child.knownParent = true
+ child.Unlock()
+ }
+ // reset starting node to first node with missing block
+ orignode = child
+ // pop the inserted ancestors off the channel
+ for i := 1; i < len(blocks); i++ {
+ <-processC
+ }
+ // delink inserted chain section
+ self.killChain(node, parent)
+ }
+ }
+ }
+ }
+ poolLogger.Debugf("[%x] quit after\n%v block hashes requests\n%v block requests: missing %v/%v/%v", hash[0:4], blockHashesRequests, blocksRequests, missing, total, depth)
+
+ self.wg.Done()
+ node.sectionLock()
+ node.section.controlC = nil
+ node.sectionUnlock()
+ // this signals that controller not available
+ }()
+ return section
+
+}
+
+func (self *BlockPool) peerError(peerId string, code int, format string, params ...interface{}) {
+ self.peersLock.RLock()
+ defer self.peersLock.RUnlock()
+ peer, ok := self.peers[peerId]
+ if ok {
+ peer.peerError(code, format, params...)
+ }
+}
+
+func (self *BlockPool) requestBlockHashes(hash []byte) {
+ self.peersLock.Lock()
+ defer self.peersLock.Unlock()
+ if self.peer != nil {
+ self.peer.requestBlockHashes(hash)
+ }
+}
+
+func (self *BlockPool) requestBlocks(attempts int, hashes [][]byte) {
+ // distribute block request among known peers
+ self.peersLock.Lock()
+ defer self.peersLock.Unlock()
+ peerCount := len(self.peers)
+ // on first attempt use the best peer
+ if attempts == 0 {
+ self.peer.requestBlocks(hashes)
+ return
+ }
+ repetitions := int(math.Min(float64(peerCount), float64(blocksRequestRepetition)))
+ poolLogger.Debugf("request %v missing blocks from %v/%v peers", len(hashes), repetitions, peerCount)
+ i := 0
+ indexes := rand.Perm(peerCount)[0:(repetitions - 1)]
+ sort.Ints(indexes)
+ for _, peer := range self.peers {
+ if i == indexes[0] {
+ peer.requestBlocks(hashes)
+ indexes = indexes[1:]
+ if len(indexes) == 0 {
+ break
+ }
+ }
+ i++
+ }
+}
+
+func (self *BlockPool) getPeer(peerId string) (*peerInfo, bool) {
+ self.peersLock.RLock()
+ defer self.peersLock.RUnlock()
+ if self.peer != nil && self.peer.id == peerId {
+ return self.peer, true
+ }
+ info, ok := self.peers[peerId]
+ if !ok {
+ panic("unknown peer")
+ }
+ return info, false
+}
+
+func (self *peerInfo) addSection(hash []byte, section *section) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ self.sections[string(hash)] = section
+}
+
+func (self *peerInfo) addRoot(node *poolNode) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ self.roots = append(self.roots, node)
+}
+
+// (re)starts processes registered for this peer (self)
+func (self *peerInfo) start(peer *peerInfo) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ self.quitC = make(chan bool)
+ for _, root := range self.roots {
+ root.sectionRLock()
+ if root.section.bottom != nil {
+ if root.parent == nil {
+ self.requestBlockHashes(root.hash)
+ }
+ }
+ root.sectionRUnlock()
+ }
+ self.roots = nil
+ self.controlSections(peer, true)
+}
+
+// (re)starts process without requests, only suicide timer
+func (self *peerInfo) stop(peer *peerInfo) {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ close(self.quitC)
+ self.controlSections(peer, false)
+}
+
+func (self *peerInfo) controlSections(peer *peerInfo, on bool) {
+ if peer != nil {
+ peer.lock.RLock()
+ defer peer.lock.RUnlock()
+ }
+ for hash, section := range peer.sections {
+ if section.done() {
+ delete(self.sections, hash)
+ }
+ _, exists := peer.sections[hash]
+ if on || peer == nil || exists {
+ if on {
+ // self is best peer
+ section.start()
+ } else {
+ // (re)starts process without requests, only suicide timer
+ section.stop()
+ }
+ }
+ }
+}
+
+// called when parent is found in pool
+// parent and child are guaranteed to be on different sections
+func (self *BlockPool) link(parent, child *poolNode) {
+ var top bool
+ parent.sectionLock()
+ if child != nil {
+ child.sectionLock()
+ }
+ if parent == parent.section.top && parent.section.top != nil {
+ top = true
+ }
+ var bottom bool
+
+ if child == child.section.bottom {
+ bottom = true
+ }
+ if parent.child != child {
+ orphan := parent.child
+ if orphan != nil {
+ // got a fork in the chain
+ if top {
+ orphan.lock.Lock()
+ // make old child orphan
+ orphan.parent = nil
+ orphan.lock.Unlock()
+ } else { // we are under section lock
+ // make old child orphan
+ orphan.parent = nil
+ // reset section objects above the fork
+ nchild := orphan.child
+ node := orphan
+ section := &section{bottom: orphan}
+ for node.section == nchild.section {
+ node = nchild
+ node.section = section
+ nchild = node.child
+ }
+ section.top = node
+ // set up a suicide
+ self.processSection(orphan).stop()
+ }
+ } else {
+ // child is on top of a chain need to close section
+ child.section.bottom = child
+ }
+ // adopt new child
+ parent.child = child
+ if !top {
+ parent.section.top = parent
+ // restart section process so that shorter section is scanned for blocks
+ parent.section.reset()
+ }
+ }
+
+ if child != nil {
+ if child.parent != parent {
+ stepParent := child.parent
+ if stepParent != nil {
+ if bottom {
+ stepParent.Lock()
+ stepParent.child = nil
+ stepParent.Unlock()
+ } else {
+ // we are on the same section
+ // if it is a aberrant reverse fork,
+ stepParent.child = nil
+ node := stepParent
+ nparent := stepParent.child
+ section := &section{top: stepParent}
+ for node.section == nparent.section {
+ node = nparent
+ node.section = section
+ node = node.parent
+ }
+ }
+ } else {
+ // linking to a root node, ie. parent is under the root of a chain
+ parent.section.top = parent
+ }
+ }
+ child.parent = parent
+ child.section.bottom = child
+ }
+ // this needed if someone lied about the parent before
+ child.knownParent = false
+
+ parent.sectionUnlock()
+ if child != nil {
+ child.sectionUnlock()
+ }
+}
+
+// this immediately kills the chain from node to end (inclusive) section by section
+func (self *BlockPool) killChain(node *poolNode, end *poolNode) {
+ poolLogger.Debugf("kill chain section with root node %v", node)
+
+ node.sectionLock()
+ node.section.abort()
+ self.set(node.hash, nil)
+ child := node.child
+ top := node.section.top
+ i := 1
+ self.wg.Add(1)
+ go func() {
+ var quit bool
+ for node != top && node != end && child != nil {
+ node = child
+ select {
+ case <-self.quit:
+ quit = true
+ break
+ default:
+ }
+ self.set(node.hash, nil)
+ child = node.child
+ }
+ poolLogger.Debugf("killed chain section of %v blocks with root node %v", i, node)
+ if !quit {
+ if node == top {
+ if node != end && child != nil && end != nil {
+ //
+ self.killChain(child, end)
+ }
+ } else {
+ if child != nil {
+ // delink rest of this section if ended midsection
+ child.section.bottom = child
+ child.parent = nil
+ }
+ }
+ }
+ node.section.bottom = nil
+ node.sectionUnlock()
+ self.wg.Done()
+ }()
+}
+
+// structure to store long range links on chain to skip along
+type section struct {
+ lock sync.RWMutex
+ bottom *poolNode
+ top *poolNode
+ controlC chan bool
+ resetC chan bool
+}
+
+func (self *section) start() {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ if self.controlC != nil {
+ self.controlC <- true
+ }
+}
+
+func (self *section) stop() {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ if self.controlC != nil {
+ self.controlC <- false
+ }
+}
+
+func (self *section) reset() {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ if self.controlC != nil {
+ self.resetC <- true
+ self.controlC <- false
+ }
+}
+
+func (self *section) abort() {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ if self.controlC != nil {
+ close(self.controlC)
+ self.controlC = nil
+ }
+}
+
+func (self *section) done() bool {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ if self.controlC != nil {
+ return true
+ }
+ return false
+}
+
+func (self *BlockPool) get(hash []byte) (node *poolNode) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ return self.pool[string(hash)]
+}
+
+func (self *BlockPool) set(hash []byte, node *poolNode) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ self.pool[string(hash)] = node
+}
+
+// first time for block request, this iteration retrieves nodes of the chain
+// from node up to top (all the way if nil) via child links
+// copies the controller
+// and feeds nodeC channel
+// this is performed under section readlock to prevent top from going away
+// when
+func (self *BlockPool) foldUp(node *poolNode, nodeC chan *poolNode) {
+ self.wg.Add(1)
+ go func() {
+ node.sectionRLock()
+ defer node.sectionRUnlock()
+ for node != nil {
+ select {
+ case <-self.quit:
+ break
+ case nodeC <- node:
+ if node == node.section.top {
+ break
+ }
+ node = node.child
+ }
+ }
+ close(nodeC)
+ self.wg.Done()
+ }()
+}
+
+func (self *poolNode) Lock() {
+ self.sectionLock()
+ self.lock.Lock()
+}
+
+func (self *poolNode) Unlock() {
+ self.lock.Unlock()
+ self.sectionUnlock()
+}
+
+func (self *poolNode) RLock() {
+ self.lock.RLock()
+}
+
+func (self *poolNode) RUnlock() {
+ self.lock.RUnlock()
+}
+
+func (self *poolNode) sectionLock() {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ self.section.lock.Lock()
+}
+
+func (self *poolNode) sectionUnlock() {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ self.section.lock.Unlock()
+}
+
+func (self *poolNode) sectionRLock() {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ self.section.lock.RLock()
+}
+
+func (self *poolNode) sectionRUnlock() {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ self.section.lock.RUnlock()
+}
diff --git a/eth/block_pool_test.go b/eth/block_pool_test.go
new file mode 100644
index 000000000..315cc748d
--- /dev/null
+++ b/eth/block_pool_test.go
@@ -0,0 +1,198 @@
+package eth
+
+import (
+ "bytes"
+ "fmt"
+ "log"
+ "os"
+ "sync"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/ethutil"
+ ethlogger "github.com/ethereum/go-ethereum/logger"
+)
+
+var sys = ethlogger.NewStdLogSystem(os.Stdout, log.LstdFlags, ethlogger.LogLevel(ethlogger.DebugDetailLevel))
+
+type testChainManager struct {
+ knownBlock func(hash []byte) bool
+ addBlock func(*types.Block) error
+ checkPoW func(*types.Block) bool
+}
+
+func (self *testChainManager) KnownBlock(hash []byte) bool {
+ if self.knownBlock != nil {
+ return self.knownBlock(hash)
+ }
+ return false
+}
+
+func (self *testChainManager) AddBlock(block *types.Block) error {
+ if self.addBlock != nil {
+ return self.addBlock(block)
+ }
+ return nil
+}
+
+func (self *testChainManager) CheckPoW(block *types.Block) bool {
+ if self.checkPoW != nil {
+ return self.checkPoW(block)
+ }
+ return false
+}
+
+func knownBlock(hashes ...[]byte) (f func([]byte) bool) {
+ f = func(block []byte) bool {
+ for _, hash := range hashes {
+ if bytes.Compare(block, hash) == 0 {
+ return true
+ }
+ }
+ return false
+ }
+ return
+}
+
+func addBlock(hashes ...[]byte) (f func(*types.Block) error) {
+ f = func(block *types.Block) error {
+ for _, hash := range hashes {
+ if bytes.Compare(block.Hash(), hash) == 0 {
+ return fmt.Errorf("invalid by test")
+ }
+ }
+ return nil
+ }
+ return
+}
+
+func checkPoW(hashes ...[]byte) (f func(*types.Block) bool) {
+ f = func(block *types.Block) bool {
+ for _, hash := range hashes {
+ if bytes.Compare(block.Hash(), hash) == 0 {
+ return false
+ }
+ }
+ return true
+ }
+ return
+}
+
+func newTestChainManager(knownBlocks [][]byte, invalidBlocks [][]byte, invalidPoW [][]byte) *testChainManager {
+ return &testChainManager{
+ knownBlock: knownBlock(knownBlocks...),
+ addBlock: addBlock(invalidBlocks...),
+ checkPoW: checkPoW(invalidPoW...),
+ }
+}
+
+type intToHash map[int][]byte
+
+type hashToInt map[string]int
+
+type testHashPool struct {
+ intToHash
+ hashToInt
+}
+
+func newHash(i int) []byte {
+ return crypto.Sha3([]byte(string(i)))
+}
+
+func newTestBlockPool(knownBlockIndexes []int, invalidBlockIndexes []int, invalidPoWIndexes []int) (hashPool *testHashPool, blockPool *BlockPool) {
+ hashPool = &testHashPool{make(intToHash), make(hashToInt)}
+ knownBlocks := hashPool.indexesToHashes(knownBlockIndexes)
+ invalidBlocks := hashPool.indexesToHashes(invalidBlockIndexes)
+ invalidPoW := hashPool.indexesToHashes(invalidPoWIndexes)
+ blockPool = NewBlockPool(newTestChainManager(knownBlocks, invalidBlocks, invalidPoW))
+ return
+}
+
+func (self *testHashPool) indexesToHashes(indexes []int) (hashes [][]byte) {
+ for _, i := range indexes {
+ hash, found := self.intToHash[i]
+ if !found {
+ hash = newHash(i)
+ self.intToHash[i] = hash
+ self.hashToInt[string(hash)] = i
+ }
+ hashes = append(hashes, hash)
+ }
+ return
+}
+
+func (self *testHashPool) hashesToIndexes(hashes [][]byte) (indexes []int) {
+ for _, hash := range hashes {
+ i, found := self.hashToInt[string(hash)]
+ if !found {
+ i = -1
+ }
+ indexes = append(indexes, i)
+ }
+ return
+}
+
+type protocolChecker struct {
+ blockHashesRequests []int
+ blocksRequests [][]int
+ invalidBlocks []error
+ hashPool *testHashPool
+ lock sync.Mutex
+}
+
+// -1 is special: not found (a hash never seen)
+func (self *protocolChecker) requestBlockHashesCallBack() (requestBlockHashesCallBack func([]byte) error) {
+ requestBlockHashesCallBack = func(hash []byte) error {
+ indexes := self.hashPool.hashesToIndexes([][]byte{hash})
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ self.blockHashesRequests = append(self.blockHashesRequests, indexes[0])
+ return nil
+ }
+ return
+}
+
+func (self *protocolChecker) requestBlocksCallBack() (requestBlocksCallBack func([][]byte) error) {
+ requestBlocksCallBack = func(hashes [][]byte) error {
+ indexes := self.hashPool.hashesToIndexes(hashes)
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ self.blocksRequests = append(self.blocksRequests, indexes)
+ return nil
+ }
+ return
+}
+
+func (self *protocolChecker) invalidBlockCallBack() (invalidBlockCallBack func(error)) {
+ invalidBlockCallBack = func(err error) {
+ self.invalidBlocks = append(self.invalidBlocks, err)
+ }
+ return
+}
+
+func TestAddPeer(t *testing.T) {
+ ethlogger.AddLogSystem(sys)
+ knownBlockIndexes := []int{0, 1}
+ invalidBlockIndexes := []int{2, 3}
+ invalidPoWIndexes := []int{4, 5}
+ hashPool, blockPool := newTestBlockPool(knownBlockIndexes, invalidBlockIndexes, invalidPoWIndexes)
+ // TODO:
+ // hashPool, blockPool, blockChainChecker = newTestBlockPool(knownBlockIndexes, invalidBlockIndexes, invalidPoWIndexes)
+ peer0 := &protocolChecker{
+ // blockHashesRequests: make([]int),
+ // blocksRequests: make([][]int),
+ // invalidBlocks: make([]error),
+ hashPool: hashPool,
+ }
+ best := blockPool.AddPeer(ethutil.Big1, newHash(100), "0",
+ peer0.requestBlockHashesCallBack(),
+ peer0.requestBlocksCallBack(),
+ peer0.invalidBlockCallBack(),
+ )
+ if !best {
+ t.Errorf("peer not accepted as best")
+ }
+ blockPool.Stop()
+
+}
diff --git a/eth/error.go b/eth/error.go
new file mode 100644
index 000000000..d1daad575
--- /dev/null
+++ b/eth/error.go
@@ -0,0 +1,71 @@
+package eth
+
+import (
+ "fmt"
+)
+
+const (
+ ErrMsgTooLarge = iota
+ ErrDecode
+ ErrInvalidMsgCode
+ ErrProtocolVersionMismatch
+ ErrNetworkIdMismatch
+ ErrGenesisBlockMismatch
+ ErrNoStatusMsg
+ ErrExtraStatusMsg
+ ErrInvalidBlock
+ ErrInvalidPoW
+ ErrUnrequestedBlock
+)
+
+var errorToString = map[int]string{
+ ErrMsgTooLarge: "Message too long",
+ ErrDecode: "Invalid message",
+ ErrInvalidMsgCode: "Invalid message code",
+ ErrProtocolVersionMismatch: "Protocol version mismatch",
+ ErrNetworkIdMismatch: "NetworkId mismatch",
+ ErrGenesisBlockMismatch: "Genesis block mismatch",
+ ErrNoStatusMsg: "No status message",
+ ErrExtraStatusMsg: "Extra status message",
+ ErrInvalidBlock: "Invalid block",
+ ErrInvalidPoW: "Invalid PoW",
+ ErrUnrequestedBlock: "Unrequested block",
+}
+
+type protocolError struct {
+ Code int
+ fatal bool
+ message string
+ format string
+ params []interface{}
+ // size int
+}
+
+func newProtocolError(code int, format string, params ...interface{}) *protocolError {
+ return &protocolError{Code: code, format: format, params: params}
+}
+
+func ProtocolError(code int, format string, params ...interface{}) (err *protocolError) {
+ err = newProtocolError(code, format, params...)
+ // report(err)
+ return
+}
+
+func (self protocolError) Error() (message string) {
+ message = self.message
+ if message == "" {
+ message, ok := errorToString[self.Code]
+ if !ok {
+ panic("invalid error code")
+ }
+ if self.format != "" {
+ message += ": " + fmt.Sprintf(self.format, self.params...)
+ }
+ self.message = message
+ }
+ return
+}
+
+func (self *protocolError) Fatal() bool {
+ return self.fatal
+}
diff --git a/eth/peer_util.go b/eth/peer_util.go
new file mode 100644
index 000000000..6cf80cde2
--- /dev/null
+++ b/eth/peer_util.go
@@ -0,0 +1,23 @@
+package eth
+
+import (
+ "encoding/json"
+
+ "github.com/ethereum/go-ethereum/ethutil"
+)
+
+func WritePeers(path string, addresses []string) {
+ if len(addresses) > 0 {
+ data, _ := json.MarshalIndent(addresses, "", " ")
+ ethutil.WriteFile(path, data)
+ }
+}
+
+func ReadPeers(path string) (ips []string, err error) {
+ var data string
+ data, err = ethutil.ReadAllFile(path)
+ if err != nil {
+ json.Unmarshal([]byte(data), &ips)
+ }
+ return
+}
diff --git a/eth/protocol.go b/eth/protocol.go
new file mode 100644
index 000000000..963d41794
--- /dev/null
+++ b/eth/protocol.go
@@ -0,0 +1,319 @@
+package eth
+
+import (
+ "bytes"
+ "fmt"
+ "math"
+ "math/big"
+
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/ethutil"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/rlp"
+)
+
+const (
+ ProtocolVersion = 49
+ NetworkId = 0
+ ProtocolLength = uint64(8)
+ ProtocolMaxMsgSize = 10 * 1024 * 1024
+)
+
+// eth protocol message codes
+const (
+ StatusMsg = iota
+ GetTxMsg // unused
+ TxMsg
+ GetBlockHashesMsg
+ BlockHashesMsg
+ GetBlocksMsg
+ BlocksMsg
+ NewBlockMsg
+)
+
+// ethProtocol represents the ethereum wire protocol
+// instance is running on each peer
+type ethProtocol struct {
+ txPool txPool
+ chainManager chainManager
+ blockPool blockPool
+ peer *p2p.Peer
+ id string
+ rw p2p.MsgReadWriter
+}
+
+// backend is the interface the ethereum protocol backend should implement
+// used as an argument to EthProtocol
+type txPool interface {
+ AddTransactions([]*types.Transaction)
+}
+
+type chainManager interface {
+ GetBlockHashesFromHash(hash []byte, amount uint64) (hashes [][]byte)
+ GetBlock(hash []byte) (block *types.Block)
+ Status() (td *big.Int, currentBlock []byte, genesisBlock []byte)
+}
+
+type blockPool interface {
+ AddBlockHashes(next func() ([]byte, bool), peerId string)
+ AddBlock(block *types.Block, peerId string)
+ AddPeer(td *big.Int, currentBlock []byte, peerId string, requestHashes func([]byte) error, requestBlocks func([][]byte) error, peerError func(int, string, ...interface{})) (best bool)
+ RemovePeer(peerId string)
+}
+
+// message structs used for rlp decoding
+type newBlockMsgData struct {
+ Block *types.Block
+ TD *big.Int
+}
+
+type getBlockHashesMsgData struct {
+ Hash []byte
+ Amount uint64
+}
+
+// main entrypoint, wrappers starting a server running the eth protocol
+// use this constructor to attach the protocol ("class") to server caps
+// the Dev p2p layer then runs the protocol instance on each peer
+func EthProtocol(txPool txPool, chainManager chainManager, blockPool blockPool) p2p.Protocol {
+ return p2p.Protocol{
+ Name: "eth",
+ Version: ProtocolVersion,
+ Length: ProtocolLength,
+ Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
+ return runEthProtocol(txPool, chainManager, blockPool, peer, rw)
+ },
+ }
+}
+
+// the main loop that handles incoming messages
+// note RemovePeer in the post-disconnect hook
+func runEthProtocol(txPool txPool, chainManager chainManager, blockPool blockPool, peer *p2p.Peer, rw p2p.MsgReadWriter) (err error) {
+ self := &ethProtocol{
+ txPool: txPool,
+ chainManager: chainManager,
+ blockPool: blockPool,
+ rw: rw,
+ peer: peer,
+ id: (string)(peer.Identity().Pubkey()),
+ }
+ err = self.handleStatus()
+ if err == nil {
+ for {
+ err = self.handle()
+ if err != nil {
+ fmt.Println(err)
+ self.blockPool.RemovePeer(self.id)
+ break
+ }
+ }
+ }
+ return
+}
+
+func (self *ethProtocol) handle() error {
+ msg, err := self.rw.ReadMsg()
+ if err != nil {
+ return err
+ }
+ if msg.Size > ProtocolMaxMsgSize {
+ return ProtocolError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
+ }
+ // make sure that the payload has been fully consumed
+ defer msg.Discard()
+
+ switch msg.Code {
+
+ case StatusMsg:
+ return ProtocolError(ErrExtraStatusMsg, "")
+
+ case TxMsg:
+ // TODO: rework using lazy RLP stream
+ var txs []*types.Transaction
+ if err := msg.Decode(&txs); err != nil {
+ return ProtocolError(ErrDecode, "%v", err)
+ }
+ self.txPool.AddTransactions(txs)
+
+ case GetBlockHashesMsg:
+ var request getBlockHashesMsgData
+ if err := msg.Decode(&request); err != nil {
+ return ProtocolError(ErrDecode, "%v", err)
+ }
+ hashes := self.chainManager.GetBlockHashesFromHash(request.Hash, request.Amount)
+ return self.rw.EncodeMsg(BlockHashesMsg, ethutil.ByteSliceToInterface(hashes)...)
+
+ case BlockHashesMsg:
+ // TODO: redo using lazy decode , this way very inefficient on known chains
+ msgStream := rlp.NewListStream(msg.Payload, uint64(msg.Size))
+ var err error
+ iter := func() (hash []byte, ok bool) {
+ hash, err = msgStream.Bytes()
+ if err == nil {
+ ok = true
+ }
+ return
+ }
+ self.blockPool.AddBlockHashes(iter, self.id)
+ if err != nil && err != rlp.EOL {
+ return ProtocolError(ErrDecode, "%v", err)
+ }
+
+ case GetBlocksMsg:
+ var blockHashes [][]byte
+ if err := msg.Decode(&blockHashes); err != nil {
+ return ProtocolError(ErrDecode, "%v", err)
+ }
+ max := int(math.Min(float64(len(blockHashes)), blockHashesBatchSize))
+ var blocks []interface{}
+ for i, hash := range blockHashes {
+ if i >= max {
+ break
+ }
+ block := self.chainManager.GetBlock(hash)
+ if block != nil {
+ blocks = append(blocks, block.RlpData())
+ }
+ }
+ return self.rw.EncodeMsg(BlocksMsg, blocks...)
+
+ case BlocksMsg:
+ msgStream := rlp.NewListStream(msg.Payload, uint64(msg.Size))
+ for {
+ var block *types.Block
+ if err := msgStream.Decode(&block); err != nil {
+ if err == rlp.EOL {
+ break
+ } else {
+ return ProtocolError(ErrDecode, "%v", err)
+ }
+ }
+ self.blockPool.AddBlock(block, self.id)
+ }
+
+ case NewBlockMsg:
+ var request newBlockMsgData
+ if err := msg.Decode(&request); err != nil {
+ return ProtocolError(ErrDecode, "%v", err)
+ }
+ hash := request.Block.Hash()
+ // to simplify backend interface adding a new block
+ // uses AddPeer followed by AddHashes, AddBlock only if peer is the best peer
+ // (or selected as new best peer)
+ if self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) {
+ called := true
+ iter := func() (hash []byte, ok bool) {
+ if called {
+ called = false
+ return hash, true
+ } else {
+ return
+ }
+ }
+ self.blockPool.AddBlockHashes(iter, self.id)
+ self.blockPool.AddBlock(request.Block, self.id)
+ }
+
+ default:
+ return ProtocolError(ErrInvalidMsgCode, "%v", msg.Code)
+ }
+ return nil
+}
+
+type statusMsgData struct {
+ ProtocolVersion uint
+ NetworkId uint
+ TD *big.Int
+ CurrentBlock []byte
+ GenesisBlock []byte
+}
+
+func (self *ethProtocol) statusMsg() p2p.Msg {
+ td, currentBlock, genesisBlock := self.chainManager.Status()
+
+ return p2p.NewMsg(StatusMsg,
+ uint32(ProtocolVersion),
+ uint32(NetworkId),
+ td,
+ currentBlock,
+ genesisBlock,
+ )
+}
+
+func (self *ethProtocol) handleStatus() error {
+ // send precanned status message
+ if err := self.rw.WriteMsg(self.statusMsg()); err != nil {
+ return err
+ }
+
+ // read and handle remote status
+ msg, err := self.rw.ReadMsg()
+ if err != nil {
+ return err
+ }
+
+ if msg.Code != StatusMsg {
+ return ProtocolError(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, StatusMsg)
+ }
+
+ if msg.Size > ProtocolMaxMsgSize {
+ return ProtocolError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
+ }
+
+ var status statusMsgData
+ if err := msg.Decode(&status); err != nil {
+ return ProtocolError(ErrDecode, "%v", err)
+ }
+
+ _, _, genesisBlock := self.chainManager.Status()
+
+ if bytes.Compare(status.GenesisBlock, genesisBlock) != 0 {
+ return ProtocolError(ErrGenesisBlockMismatch, "%x (!= %x)", status.GenesisBlock, genesisBlock)
+ }
+
+ if status.NetworkId != NetworkId {
+ return ProtocolError(ErrNetworkIdMismatch, "%d (!= %d)", status.NetworkId, NetworkId)
+ }
+
+ if ProtocolVersion != status.ProtocolVersion {
+ return ProtocolError(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, ProtocolVersion)
+ }
+
+ self.peer.Infof("Peer is [eth] capable (%d/%d). TD=%v H=%x\n", status.ProtocolVersion, status.NetworkId, status.TD, status.CurrentBlock[:4])
+
+ //self.blockPool.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect)
+ self.peer.Infoln("AddPeer(IGNORED)")
+
+ return nil
+}
+
+func (self *ethProtocol) requestBlockHashes(from []byte) error {
+ self.peer.Debugf("fetching hashes (%d) %x...\n", blockHashesBatchSize, from[0:4])
+ return self.rw.EncodeMsg(GetBlockHashesMsg, from, blockHashesBatchSize)
+}
+
+func (self *ethProtocol) requestBlocks(hashes [][]byte) error {
+ self.peer.Debugf("fetching %v blocks", len(hashes))
+ return self.rw.EncodeMsg(GetBlocksMsg, ethutil.ByteSliceToInterface(hashes))
+}
+
+func (self *ethProtocol) protoError(code int, format string, params ...interface{}) (err *protocolError) {
+ err = ProtocolError(code, format, params...)
+ if err.Fatal() {
+ self.peer.Errorln(err)
+ } else {
+ self.peer.Debugln(err)
+ }
+ return
+}
+
+func (self *ethProtocol) protoErrorDisconnect(code int, format string, params ...interface{}) {
+ err := ProtocolError(code, format, params...)
+ if err.Fatal() {
+ self.peer.Errorln(err)
+ // disconnect
+ } else {
+ self.peer.Debugln(err)
+ }
+
+}
diff --git a/eth/protocol_test.go b/eth/protocol_test.go
new file mode 100644
index 000000000..322aec7b7
--- /dev/null
+++ b/eth/protocol_test.go
@@ -0,0 +1,232 @@
+package eth
+
+import (
+ "io"
+ "math/big"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/p2p"
+)
+
+type testMsgReadWriter struct {
+ in chan p2p.Msg
+ out chan p2p.Msg
+}
+
+func (self *testMsgReadWriter) In(msg p2p.Msg) {
+ self.in <- msg
+}
+
+func (self *testMsgReadWriter) Out(msg p2p.Msg) {
+ self.in <- msg
+}
+
+func (self *testMsgReadWriter) WriteMsg(msg p2p.Msg) error {
+ self.out <- msg
+ return nil
+}
+
+func (self *testMsgReadWriter) EncodeMsg(code uint64, data ...interface{}) error {
+ return self.WriteMsg(p2p.NewMsg(code, data))
+}
+
+func (self *testMsgReadWriter) ReadMsg() (p2p.Msg, error) {
+ msg, ok := <-self.in
+ if !ok {
+ return msg, io.EOF
+ }
+ return msg, nil
+}
+
+func errorCheck(t *testing.T, expCode int, err error) {
+ perr, ok := err.(*protocolError)
+ if ok && perr != nil {
+ if code := perr.Code; code != expCode {
+ ok = false
+ }
+ }
+ if !ok {
+ t.Errorf("expected error code %v, got %v", ErrNoStatusMsg, err)
+ }
+}
+
+type TestBackend struct {
+ getTransactions func() []*types.Transaction
+ addTransactions func(txs []*types.Transaction)
+ getBlockHashes func(hash []byte, amount uint32) (hashes [][]byte)
+ addBlockHashes func(next func() ([]byte, bool), peerId string)
+ getBlock func(hash []byte) *types.Block
+ addBlock func(block *types.Block, peerId string) (err error)
+ addPeer func(td *big.Int, currentBlock []byte, peerId string, requestHashes func([]byte) error, requestBlocks func([][]byte) error, invalidBlock func(error)) (best bool)
+ removePeer func(peerId string)
+ status func() (td *big.Int, currentBlock []byte, genesisBlock []byte)
+}
+
+func (self *TestBackend) GetTransactions() (txs []*types.Transaction) {
+ if self.getTransactions != nil {
+ txs = self.getTransactions()
+ }
+ return
+}
+
+func (self *TestBackend) AddTransactions(txs []*types.Transaction) {
+ if self.addTransactions != nil {
+ self.addTransactions(txs)
+ }
+}
+
+func (self *TestBackend) GetBlockHashes(hash []byte, amount uint32) (hashes [][]byte) {
+ if self.getBlockHashes != nil {
+ hashes = self.getBlockHashes(hash, amount)
+ }
+ return
+}
+
+<<<<<<< HEAD
+<<<<<<< HEAD
+func (self *TestBackend) AddBlockHashes(next func() ([]byte, bool), peerId string) {
+ if self.addBlockHashes != nil {
+ self.addBlockHashes(next, peerId)
+ }
+}
+
+=======
+func (self *TestBackend) AddHash(hash []byte, peer *p2p.Peer) (more bool) {
+ if self.addHash != nil {
+ more = self.addHash(hash, peer)
+=======
+func (self *TestBackend) AddBlockHashes(next func() ([]byte, bool), peerId string) {
+ if self.addBlockHashes != nil {
+ self.addBlockHashes(next, peerId)
+>>>>>>> eth protocol changes
+ }
+}
+<<<<<<< HEAD
+>>>>>>> initial commit for eth-p2p integration
+=======
+
+>>>>>>> eth protocol changes
+func (self *TestBackend) GetBlock(hash []byte) (block *types.Block) {
+ if self.getBlock != nil {
+ block = self.getBlock(hash)
+ }
+ return
+}
+
+<<<<<<< HEAD
+<<<<<<< HEAD
+func (self *TestBackend) AddBlock(block *types.Block, peerId string) (err error) {
+ if self.addBlock != nil {
+ err = self.addBlock(block, peerId)
+=======
+func (self *TestBackend) AddBlock(td *big.Int, block *types.Block, peer *p2p.Peer) (fetchHashes bool, err error) {
+ if self.addBlock != nil {
+ fetchHashes, err = self.addBlock(td, block, peer)
+>>>>>>> initial commit for eth-p2p integration
+=======
+func (self *TestBackend) AddBlock(block *types.Block, peerId string) (err error) {
+ if self.addBlock != nil {
+ err = self.addBlock(block, peerId)
+>>>>>>> eth protocol changes
+ }
+ return
+}
+
+<<<<<<< HEAD
+<<<<<<< HEAD
+func (self *TestBackend) AddPeer(td *big.Int, currentBlock []byte, peerId string, requestBlockHashes func([]byte) error, requestBlocks func([][]byte) error, invalidBlock func(error)) (best bool) {
+ if self.addPeer != nil {
+ best = self.addPeer(td, currentBlock, peerId, requestBlockHashes, requestBlocks, invalidBlock)
+=======
+func (self *TestBackend) AddPeer(td *big.Int, currentBlock []byte, peer *p2p.Peer) (fetchHashes bool) {
+ if self.addPeer != nil {
+ fetchHashes = self.addPeer(td, currentBlock, peer)
+>>>>>>> initial commit for eth-p2p integration
+=======
+func (self *TestBackend) AddPeer(td *big.Int, currentBlock []byte, peerId string, requestBlockHashes func([]byte) error, requestBlocks func([][]byte) error, invalidBlock func(error)) (best bool) {
+ if self.addPeer != nil {
+ best = self.addPeer(td, currentBlock, peerId, requestBlockHashes, requestBlocks, invalidBlock)
+>>>>>>> eth protocol changes
+ }
+ return
+}
+
+<<<<<<< HEAD
+<<<<<<< HEAD
+=======
+>>>>>>> eth protocol changes
+func (self *TestBackend) RemovePeer(peerId string) {
+ if self.removePeer != nil {
+ self.removePeer(peerId)
+ }
+}
+
+<<<<<<< HEAD
+=======
+>>>>>>> initial commit for eth-p2p integration
+=======
+>>>>>>> eth protocol changes
+func (self *TestBackend) Status() (td *big.Int, currentBlock []byte, genesisBlock []byte) {
+ if self.status != nil {
+ td, currentBlock, genesisBlock = self.status()
+ }
+ return
+}
+
+<<<<<<< HEAD
+<<<<<<< HEAD
+=======
+>>>>>>> eth protocol changes
+// TODO: refactor this into p2p/client_identity
+type peerId struct {
+ pubkey []byte
+}
+
+func (self *peerId) String() string {
+ return "test peer"
+}
+
+func (self *peerId) Pubkey() (pubkey []byte) {
+ pubkey = self.pubkey
+ if len(pubkey) == 0 {
+ pubkey = crypto.GenerateNewKeyPair().PublicKey
+ self.pubkey = pubkey
+ }
+ return
+}
+
+func testPeer() *p2p.Peer {
+ return p2p.NewPeer(&peerId{}, []p2p.Cap{})
+}
+
+func TestErrNoStatusMsg(t *testing.T) {
+<<<<<<< HEAD
+=======
+func TestEth(t *testing.T) {
+>>>>>>> initial commit for eth-p2p integration
+=======
+>>>>>>> eth protocol changes
+ quit := make(chan bool)
+ rw := &testMsgReadWriter{make(chan p2p.Msg, 10), make(chan p2p.Msg, 10)}
+ testBackend := &TestBackend{}
+ var err error
+ go func() {
+<<<<<<< HEAD
+<<<<<<< HEAD
+ err = runEthProtocol(testBackend, testPeer(), rw)
+=======
+ err = runEthProtocol(testBackend, nil, rw)
+>>>>>>> initial commit for eth-p2p integration
+=======
+ err = runEthProtocol(testBackend, testPeer(), rw)
+>>>>>>> eth protocol changes
+ close(quit)
+ }()
+ statusMsg := p2p.NewMsg(4)
+ rw.In(statusMsg)
+ <-quit
+ errorCheck(t, ErrNoStatusMsg, err)
+ // read(t, remote, []byte("hello, world"), nil)
+}