aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorobscuren <geffobscura@gmail.com>2014-08-21 20:47:58 +0800
committerobscuren <geffobscura@gmail.com>2014-08-21 20:47:58 +0800
commiteaa2e8900d1036e09b002c4e20fc6e4f9cd031bb (patch)
tree9a0c4c4b1bda39560c70147c73862d72eff38dd1
parent79c64f6bca4fcfb257496be22c64f4b2faed7050 (diff)
downloaddexon-eaa2e8900d1036e09b002c4e20fc6e4f9cd031bb.tar
dexon-eaa2e8900d1036e09b002c4e20fc6e4f9cd031bb.tar.gz
dexon-eaa2e8900d1036e09b002c4e20fc6e4f9cd031bb.tar.bz2
dexon-eaa2e8900d1036e09b002c4e20fc6e4f9cd031bb.tar.lz
dexon-eaa2e8900d1036e09b002c4e20fc6e4f9cd031bb.tar.xz
dexon-eaa2e8900d1036e09b002c4e20fc6e4f9cd031bb.tar.zst
dexon-eaa2e8900d1036e09b002c4e20fc6e4f9cd031bb.zip
PoC 6 networking code.
* Added block pool for gathering blocks from the network (chunks) * Re wrote syncing
-rw-r--r--block_pool.go116
-rw-r--r--ethchain/block_chain.go20
-rw-r--r--ethchain/genesis.go6
-rw-r--r--ethereum.go3
-rw-r--r--ethutil/bytes.go8
-rw-r--r--ethwire/messaging.go52
-rw-r--r--peer.go286
7 files changed, 295 insertions, 196 deletions
diff --git a/block_pool.go b/block_pool.go
new file mode 100644
index 000000000..3225bdff2
--- /dev/null
+++ b/block_pool.go
@@ -0,0 +1,116 @@
+package eth
+
+import (
+ "math"
+ "math/big"
+ "sync"
+
+ "github.com/ethereum/eth-go/ethchain"
+ "github.com/ethereum/eth-go/ethutil"
+)
+
+type block struct {
+ peer *Peer
+ block *ethchain.Block
+}
+
+type BlockPool struct {
+ mut sync.Mutex
+
+ eth *Ethereum
+
+ hashPool [][]byte
+ pool map[string]*block
+
+ td *big.Int
+}
+
+func NewBlockPool(eth *Ethereum) *BlockPool {
+ return &BlockPool{
+ eth: eth,
+ pool: make(map[string]*block),
+ td: ethutil.Big0,
+ }
+}
+
+func (self *BlockPool) HasLatestHash() bool {
+ return self.pool[string(self.eth.BlockChain().CurrentBlock.Hash())] != nil
+}
+
+func (self *BlockPool) HasCommonHash(hash []byte) bool {
+ return self.eth.BlockChain().GetBlock(hash) != nil
+}
+
+func (self *BlockPool) AddHash(hash []byte) {
+ if self.pool[string(hash)] == nil {
+ self.pool[string(hash)] = &block{nil, nil}
+
+ self.hashPool = append([][]byte{hash}, self.hashPool...)
+ }
+}
+
+func (self *BlockPool) SetBlock(b *ethchain.Block) {
+ hash := string(b.Hash())
+
+ if self.pool[string(hash)] == nil {
+ self.pool[hash] = &block{nil, nil}
+ }
+
+ self.pool[hash].block = b
+}
+
+func (self *BlockPool) CheckLinkAndProcess(f func(block *ethchain.Block)) bool {
+ self.mut.Lock()
+ defer self.mut.Unlock()
+
+ if self.IsLinked() {
+ for i, hash := range self.hashPool {
+ block := self.pool[string(hash)].block
+ if block != nil {
+ f(block)
+
+ delete(self.pool, string(hash))
+ } else {
+ self.hashPool = self.hashPool[i:]
+
+ return false
+ }
+ }
+
+ return true
+ }
+
+ return false
+}
+
+func (self *BlockPool) IsLinked() bool {
+ if len(self.hashPool) == 0 {
+ return false
+ }
+
+ block := self.pool[string(self.hashPool[0])].block
+ if block != nil {
+ return self.eth.BlockChain().HasBlock(block.PrevHash)
+ }
+
+ return false
+}
+
+func (self *BlockPool) Take(amount int, peer *Peer) (hashes [][]byte) {
+ self.mut.Lock()
+ defer self.mut.Unlock()
+
+ num := int(math.Min(float64(amount), float64(len(self.pool))))
+ j := 0
+ for i := 0; i < len(self.hashPool) && j < num; i++ {
+ hash := string(self.hashPool[i])
+ if self.pool[hash].peer == nil || self.pool[hash].peer == peer {
+ self.pool[hash].peer = peer
+
+ hashes = append(hashes, self.hashPool[i])
+ j++
+ }
+ }
+
+ return
+}
diff --git a/ethchain/block_chain.go b/ethchain/block_chain.go
index 611735707..3445bbb87 100644
--- a/ethchain/block_chain.go
+++ b/ethchain/block_chain.go
@@ -208,6 +208,26 @@ func (bc *BlockChain) GenesisBlock() *Block {
return bc.genesisBlock
}
+func (self *BlockChain) GetChainHashesFromHash(hash []byte, max uint64) (chain [][]byte) {
+ block := self.GetBlock(hash)
+ if block == nil {
+ return
+ }
+
+ // XXX Could be optimised by using a different database which only holds hashes (i.e., linked list)
+ for i := uint64(0); i < max; i++ {
+ chain = append(chain, block.Hash())
+
+ if block.Number.Cmp(ethutil.Big0) <= 0 {
+ break
+ }
+
+ block = self.GetBlock(block.PrevHash)
+ }
+
+ return
+}
+
// Get chain return blocks from hash up to max in RLP format
func (bc *BlockChain) GetChainFromHash(hash []byte, max uint64) []interface{} {
var chain []interface{}
diff --git a/ethchain/genesis.go b/ethchain/genesis.go
index 54a3bc766..0ce53a6ee 100644
--- a/ethchain/genesis.go
+++ b/ethchain/genesis.go
@@ -1,9 +1,10 @@
package ethchain
import (
+ "math/big"
+
"github.com/ethereum/eth-go/ethcrypto"
"github.com/ethereum/eth-go/ethutil"
- "math/big"
)
/*
@@ -26,7 +27,8 @@ var GenesisHeader = []interface{}{
// tx sha
"",
// Difficulty
- ethutil.BigPow(2, 22),
+ //ethutil.BigPow(2, 22),
+ big.NewInt(4096),
// Number
ethutil.Big0,
// Block minimum gas price
diff --git a/ethereum.go b/ethereum.go
index c1c4c2f2f..1e1891589 100644
--- a/ethereum.go
+++ b/ethereum.go
@@ -54,6 +54,8 @@ type Ethereum struct {
txPool *ethchain.TxPool
// The canonical chain
blockChain *ethchain.BlockChain
+ // The block pool
+ blockPool *BlockPool
// Peers (NYI)
peers *list.List
// Nonce
@@ -116,6 +118,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager
}
ethereum.reactor = ethreact.New()
+ ethereum.blockPool = NewBlockPool(ethereum)
ethereum.txPool = ethchain.NewTxPool(ethereum)
ethereum.blockChain = ethchain.NewBlockChain(ethereum)
ethereum.stateManager = ethchain.NewStateManager(ethereum)
diff --git a/ethutil/bytes.go b/ethutil/bytes.go
index 63c1606c2..e38f89454 100644
--- a/ethutil/bytes.go
+++ b/ethutil/bytes.go
@@ -208,3 +208,11 @@ func Address(slice []byte) (addr []byte) {
return
}
+
+func ByteSliceToInterface(slice [][]byte) (ret []interface{}) {
+ for _, i := range slice {
+ ret = append(ret, i)
+ }
+
+ return
+}
diff --git a/ethwire/messaging.go b/ethwire/messaging.go
index d114a1c9d..7ac0188a1 100644
--- a/ethwire/messaging.go
+++ b/ethwire/messaging.go
@@ -27,33 +27,41 @@ const (
// Values are given explicitly instead of by iota because these values are
// defined by the wire protocol spec; it is easier for humans to ensure
// correctness when values are explicit.
- MsgHandshakeTy = 0x00
- MsgDiscTy = 0x01
- MsgPingTy = 0x02
- MsgPongTy = 0x03
- MsgGetPeersTy = 0x10
- MsgPeersTy = 0x11
- MsgTxTy = 0x12
- MsgBlockTy = 0x13
- MsgGetChainTy = 0x14
- MsgNotInChainTy = 0x15
- MsgGetTxsTy = 0x16
+ MsgHandshakeTy = 0x00
+ MsgDiscTy = 0x01
+ MsgPingTy = 0x02
+ MsgPongTy = 0x03
+ MsgGetPeersTy = 0x10
+ MsgPeersTy = 0x11
+ MsgTxTy = 0x12
+ MsgGetChainTy = 0x14
+ MsgNotInChainTy = 0x15
+ MsgGetTxsTy = 0x16
+ MsgGetBlockHashesTy = 0x17
+ MsgBlockHashesTy = 0x18
+ MsgGetBlocksTy = 0x19
+ MsgBlockTy = 0x13
+
+ MsgOldBlockTy = 0xbb
MsgTalkTy = 0xff
)
var msgTypeToString = map[MsgType]string{
- MsgHandshakeTy: "Handshake",
- MsgDiscTy: "Disconnect",
- MsgPingTy: "Ping",
- MsgPongTy: "Pong",
- MsgGetPeersTy: "Get peers",
- MsgPeersTy: "Peers",
- MsgTxTy: "Transactions",
- MsgBlockTy: "Blocks",
- MsgGetChainTy: "Get chain",
- MsgGetTxsTy: "Get Txs",
- MsgNotInChainTy: "Not in chain",
+ MsgHandshakeTy: "Handshake",
+ MsgDiscTy: "Disconnect",
+ MsgPingTy: "Ping",
+ MsgPongTy: "Pong",
+ MsgGetPeersTy: "Get peers",
+ MsgPeersTy: "Peers",
+ MsgTxTy: "Transactions",
+ MsgBlockTy: "Blocks",
+ MsgGetChainTy: "Get chain",
+ MsgGetTxsTy: "Get Txs",
+ MsgNotInChainTy: "Not in chain",
+ MsgGetBlockHashesTy: "Get block hashes",
+ MsgBlockHashesTy: "Block hashes",
+ MsgGetBlocksTy: "Get blocks",
}
func (mt MsgType) String() string {
diff --git a/peer.go b/peer.go
index 4cbe8b652..4e6fc55d4 100644
--- a/peer.go
+++ b/peer.go
@@ -4,6 +4,8 @@ import (
"bytes"
"container/list"
"fmt"
+ "math"
+ "math/big"
"net"
"strconv"
"strings"
@@ -22,7 +24,7 @@ const (
// The size of the output buffer for writing messages
outputBufferSize = 50
// Current protocol version
- ProtocolVersion = 26
+ ProtocolVersion = 27
// Interval for ping/pong message
pingPongTimer = 2 * time.Second
)
@@ -125,9 +127,13 @@ type Peer struct {
lastPong int64
lastBlockReceived time.Time
- host []byte
- port uint16
- caps Caps
+ host []byte
+ port uint16
+ caps Caps
+ td *big.Int
+ bestHash []byte
+ lastReceivedHash []byte
+ requestedHashes [][]byte
// This peer's public key
pubkey []byte
@@ -345,7 +351,6 @@ func (p *Peer) HandleInbound() {
for _, msg := range msgs {
peerlogger.DebugDetailf("(%v) => %v %v\n", p.conn.RemoteAddr(), msg.Type, msg.Data)
- nextMsg:
switch msg.Type {
case ethwire.MsgHandshakeTy:
// Version message
@@ -354,6 +359,7 @@ func (p *Peer) HandleInbound() {
if p.caps.IsCap(CapPeerDiscTy) {
p.QueueMessage(ethwire.NewMessage(ethwire.MsgGetPeersTy, ""))
}
+
case ethwire.MsgDiscTy:
p.Stop()
peerlogger.Infoln("Disconnect peer: ", DiscReason(msg.Data.Get(0).Uint()))
@@ -366,117 +372,6 @@ func (p *Peer) HandleInbound() {
// active.
p.lastPong = time.Now().Unix()
p.pingTime = time.Since(p.pingStartTime)
- case ethwire.MsgBlockTy:
- // Get all blocks and process them
- //var block, lastBlock *ethchain.Block
- //var err error
-
- var (
- block, lastBlock *ethchain.Block
- blockChain = p.ethereum.BlockChain()
- err error
- )
-
- // Make sure we are actually receiving anything
- if msg.Data.Len()-1 > 1 && p.diverted {
- // We requested blocks and now we need to make sure we have a common ancestor somewhere in these blocks so we can find
- // common ground to start syncing from
- lastBlock = ethchain.NewBlockFromRlpValue(msg.Data.Get(msg.Data.Len() - 1))
- if p.lastRequestedBlock != nil && bytes.Compare(lastBlock.Hash(), p.lastRequestedBlock.Hash()) == 0 {
- p.catchingUp = false
- continue
- }
- p.lastRequestedBlock = lastBlock
- peerlogger.Infof("Last block: %x. Checking if we have it locally.\n", lastBlock.Hash())
- for i := msg.Data.Len() - 1; i >= 0; i-- {
- block = ethchain.NewBlockFromRlpValue(msg.Data.Get(i))
- // Do we have this block on our chain? If so we can continue
- if !blockChain.HasBlock(block.Hash()) {
- // We don't have this block, but we do have a block with the same prevHash, diversion time!
- if blockChain.HasBlockWithPrevHash(block.PrevHash) {
- p.diverted = false
- if !blockChain.FindCanonicalChainFromMsg(msg, block.PrevHash) {
- p.SyncWithPeerToLastKnown()
- break nextMsg
- }
- break
- }
- }
- }
- if !blockChain.HasBlock(lastBlock.Hash()) {
- // If we can't find a common ancenstor we need to request more blocks.
- // FIXME: At one point this won't scale anymore since we are not asking for an offset
- // we just keep increasing the amount of blocks.
- p.blocksRequested = p.blocksRequested * 2
-
- peerlogger.Infof("No common ancestor found, requesting %d more blocks.\n", p.blocksRequested)
- p.FindCommonParentBlock()
- break nextMsg
- }
-
- p.catchingUp = false
- }
-
- for i := msg.Data.Len() - 1; i >= 0; i-- {
- block = ethchain.NewBlockFromRlpValue(msg.Data.Get(i))
-
- err = p.ethereum.StateManager().Process(block, false)
- if err != nil {
- if ethutil.Config.Debug {
- peerlogger.Infof("Block %x failed\n", block.Hash())
- peerlogger.Infof("%v\n", err)
- peerlogger.Debugln(block)
- }
- break
- } else {
- lastBlock = block
- }
-
- p.lastBlockReceived = time.Now()
- }
-
- if msg.Data.Len() <= 1 {
- // Set catching up to false if
- // the peer has nothing left to give
- p.catchingUp = false
- }
-
- if err != nil {
- // If the parent is unknown try to catch up with this peer
- if ethchain.IsParentErr(err) {
- b := ethchain.NewBlockFromRlpValue(msg.Data.Get(0))
-
- peerlogger.Infof("Attempting to catch (%x). Parent unknown\n", b.Hash())
- p.catchingUp = false
-
- p.CatchupWithPeer(b.PrevHash)
-
- peerlogger.Infoln(b)
-
- /*
- peerlogger.Infoln("Attempting to catch. Parent known")
- p.catchingUp = false
- p.CatchupWithPeer(p.ethereum.BlockChain().CurrentBlock.Hash())
- */
- } else if ethchain.IsValidationErr(err) {
- fmt.Println("Err:", err)
- p.catchingUp = false
- }
- } else {
- // If we're catching up, try to catch up further.
- if p.catchingUp && msg.Data.Len() > 1 {
- if lastBlock != nil {
- blockInfo := lastBlock.BlockInfo()
- peerlogger.DebugDetailf("Synced chain to #%d %x %x\n", blockInfo.Number, lastBlock.Hash(), blockInfo.Hash)
- }
-
- p.catchingUp = false
-
- hash := p.ethereum.BlockChain().CurrentBlock.Hash()
- p.CatchupWithPeer(hash)
- }
- }
-
case ethwire.MsgTxTy:
// If the message was a transaction queue the transaction
// in the TxPool where it will undergo validation and
@@ -501,78 +396,114 @@ func (p *Peer) HandleInbound() {
// Connect to the list of peers
p.ethereum.ProcessPeerList(peers)
- case ethwire.MsgGetChainTy:
- var parent *ethchain.Block
- // Length minus one since the very last element in the array is a count
- l := msg.Data.Len() - 1
- // Ignore empty get chains
- if l == 0 {
- break
+ case ethwire.MsgGetTxsTy:
+ // Get the current transactions of the pool
+ txs := p.ethereum.TxPool().CurrentTransactions()
+ // Get the RlpData values from the txs
+ txsInterface := make([]interface{}, len(txs))
+ for i, tx := range txs {
+ txsInterface[i] = tx.RlpData()
+ }
+ // Broadcast it back to the peer
+ p.QueueMessage(ethwire.NewMessage(ethwire.MsgTxTy, txsInterface))
+
+ case ethwire.MsgGetBlockHashesTy:
+ if msg.Data.Len() < 2 {
+ peerlogger.Debugln("err: argument length invalid ", msg.Data.Len())
}
- // Amount of parents in the canonical chain
- //amountOfBlocks := msg.Data.Get(l).AsUint()
- amountOfBlocks := uint64(100)
+ hash := msg.Data.Get(0).Bytes()
+ amount := msg.Data.Get(1).Uint()
- // Check each SHA block hash from the message and determine whether
- // the SHA is in the database
- for i := 0; i < l; i++ {
- if data := msg.Data.Get(i).Bytes(); p.ethereum.StateManager().BlockChain().HasBlock(data) {
- parent = p.ethereum.BlockChain().GetBlock(data)
- break
+ hashes := p.ethereum.BlockChain().GetChainHashesFromHash(hash, amount)
+
+ p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockHashesTy, ethutil.ByteSliceToInterface(hashes)))
+
+ case ethwire.MsgGetBlocksTy:
+ // Limit to max 300 blocks
+ max := int(math.Min(float64(msg.Data.Len()), 300.0))
+ var blocks []interface{}
+
+ for i := 0; i < max; i++ {
+ hash := msg.Data.Get(i).Bytes()
+ block := p.ethereum.BlockChain().GetBlock(hash)
+ if block != nil {
+ blocks = append(blocks, block.Value().Raw())
}
}
- // If a parent is found send back a reply
- if parent != nil {
- peerlogger.DebugDetailf("Found canonical block, returning chain from: %x ", parent.Hash())
- chain := p.ethereum.BlockChain().GetChainFromHash(parent.Hash(), amountOfBlocks)
- if len(chain) > 0 {
- //peerlogger.Debugf("Returning %d blocks: %x ", len(chain), parent.Hash())
- p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockTy, chain))
- } else {
- p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockTy, []interface{}{}))
- }
+ p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockTy, blocks))
- } else {
- //peerlogger.Debugf("Could not find a similar block")
- // If no blocks are found we send back a reply with msg not in chain
- // and the last hash from get chain
- if l > 0 {
- lastHash := msg.Data.Get(l - 1)
- //log.Printf("Sending not in chain with hash %x\n", lastHash.AsRaw())
- p.QueueMessage(ethwire.NewMessage(ethwire.MsgNotInChainTy, []interface{}{lastHash.Raw()}))
+ case ethwire.MsgBlockHashesTy:
+ blockPool := p.ethereum.blockPool
+
+ foundCommonHash := false
+
+ it := msg.Data.NewIterator()
+ for it.Next() {
+ hash := it.Value().Bytes()
+
+ if blockPool.HasCommonHash(hash) {
+ foundCommonHash = true
+
+ break
}
+
+ blockPool.AddHash(hash)
+
+ p.lastReceivedHash = hash
}
- case ethwire.MsgNotInChainTy:
- peerlogger.DebugDetailf("Not in chain: %x\n", msg.Data.Get(0).Bytes())
- if p.diverted == true {
- // If were already looking for a common parent and we get here again we need to go deeper
- p.blocksRequested = p.blocksRequested * 2
+
+ if foundCommonHash {
+ p.FetchBlocks()
+ } else {
+ p.FetchHashes()
}
- p.diverted = true
- p.catchingUp = false
- p.FindCommonParentBlock()
- case ethwire.MsgGetTxsTy:
- // Get the current transactions of the pool
- txs := p.ethereum.TxPool().CurrentTransactions()
- // Get the RlpData values from the txs
- txsInterface := make([]interface{}, len(txs))
- for i, tx := range txs {
- txsInterface[i] = tx.RlpData()
+ case ethwire.MsgBlockTy:
+ blockPool := p.ethereum.blockPool
+
+ it := msg.Data.NewIterator()
+
+ for it.Next() {
+ block := ethchain.NewBlockFromRlpValue(it.Value())
+ blockPool.SetBlock(block)
}
- // Broadcast it back to the peer
- p.QueueMessage(ethwire.NewMessage(ethwire.MsgTxTy, txsInterface))
- // Unofficial but fun nonetheless
- case ethwire.MsgTalkTy:
- peerlogger.Infoln("%v says: %s\n", p.conn.RemoteAddr(), msg.Data.Str())
+ linked := blockPool.CheckLinkAndProcess(func(block *ethchain.Block) {
+ p.ethereum.StateManager().Process(block, false)
+ })
+
+ if !linked {
+ p.FetchBlocks()
+ }
}
}
}
+
p.Stop()
}
+func (self *Peer) FetchBlocks() {
+ blockPool := self.ethereum.blockPool
+
+ hashes := blockPool.Take(100, self)
+ if len(hashes) > 0 {
+ self.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlocksTy, ethutil.ByteSliceToInterface(hashes)))
+ }
+}
+
+func (self *Peer) FetchHashes() {
+ blockPool := self.ethereum.blockPool
+
+ if self.td.Cmp(blockPool.td) >= 0 {
+ peerlogger.Debugf("Requesting hashes from %x\n", self.lastReceivedHash)
+
+ if !blockPool.HasLatestHash() {
+ self.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{self.lastReceivedHash, uint32(200)}))
+ }
+ }
+}
+
// General update method
func (self *Peer) update() {
serviceTimer := time.NewTicker(5 * time.Second)
@@ -643,6 +574,7 @@ func (p *Peer) pushHandshake() error {
pubkey := p.ethereum.KeyManager().PublicKey()
msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, []interface{}{
uint32(ProtocolVersion), uint32(0), []byte(p.version), byte(p.caps), p.port, pubkey[1:],
+ p.ethereum.BlockChain().TD.Uint64(), p.ethereum.BlockChain().CurrentBlock.Hash(),
})
p.QueueMessage(msg)
@@ -728,10 +660,15 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) {
p.SetVersion(c.Get(2).Str())
}
+ // Get the td and last hash
+ p.td = c.Get(6).BigInt()
+ p.bestHash = c.Get(7).Bytes()
+ p.lastReceivedHash = p.bestHash
+
p.ethereum.PushPeer(p)
p.ethereum.reactor.Post("peerList", p.ethereum.Peers())
- ethlogger.Infof("Added peer (%s) %d / %d\n", p.conn.RemoteAddr(), p.ethereum.Peers().Len(), p.ethereum.MaxPeers)
+ ethlogger.Infof("Added peer (%s) %d / %d (TD = %v ~ %x)\n", p.conn.RemoteAddr(), p.ethereum.Peers().Len(), p.ethereum.MaxPeers, p.td, p.bestHash)
/*
// Catch up with the connected peer
@@ -740,7 +677,12 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) {
time.Sleep(10 * time.Second)
}
*/
- p.SyncWithPeerToLastKnown()
+ //p.SyncWithPeerToLastKnown()
+
+ if p.td.Cmp(p.ethereum.BlockChain().TD) > 0 {
+ p.ethereum.blockPool.AddHash(p.lastReceivedHash)
+ p.FetchHashes()
+ }
peerlogger.Debugln(p)
}