aboutsummaryrefslogtreecommitdiffstats
path: root/peer.go
diff options
context:
space:
mode:
Diffstat (limited to 'peer.go')
-rw-r--r--peer.go400
1 files changed, 234 insertions, 166 deletions
diff --git a/peer.go b/peer.go
index ab17466e1..67bf4e555 100644
--- a/peer.go
+++ b/peer.go
@@ -24,7 +24,11 @@ const (
// The size of the output buffer for writing messages
outputBufferSize = 50
// Current protocol version
- ProtocolVersion = 28
+ ProtocolVersion = 33
+ // Current P2P version
+ P2PVersion = 0
+ // Ethereum network version
+ NetVersion = 0
// Interval for ping/pong message
pingPongTimer = 2 * time.Second
)
@@ -70,7 +74,7 @@ func (d DiscReason) String() string {
type Caps byte
const (
- CapPeerDiscTy = 1 << iota
+ CapPeerDiscTy Caps = 1 << iota
CapTxTy
CapChainTy
@@ -122,6 +126,7 @@ type Peer struct {
// This flag is used by writeMessage to check if messages are allowed
// to be send or not. If no version is known all messages are ignored.
versionKnown bool
+ statusKnown bool
// Last received pong message
lastPong int64
@@ -179,6 +184,7 @@ func NewOutboundPeer(addr string, ethereum *Ethereum, caps Caps) *Peer {
inbound: false,
connected: 0,
disconnect: 0,
+ port: 30303,
caps: caps,
version: ethereum.ClientIdentity().String(),
}
@@ -271,9 +277,19 @@ func (p *Peer) writeMessage(msg *ethwire.Msg) {
default: // Anything but ack is allowed
return
}
+ } else {
+ /*
+ if !p.statusKnown {
+ switch msg.Type {
+ case ethwire.MsgStatusTy: // Ok
+ default: // Anything but ack is allowed
+ return
+ }
+ }
+ */
}
- peerlogger.DebugDetailf("(%v) <= %v %v\n", p.conn.RemoteAddr(), msg.Type, msg.Data)
+ peerlogger.DebugDetailf("(%v) <= %v\n", p.conn.RemoteAddr(), formatMessage(msg))
err := ethwire.WriteMessage(p.conn, msg)
if err != nil {
@@ -295,6 +311,14 @@ out:
select {
// Main message queue. All outbound messages are processed through here
case msg := <-p.outputQueue:
+ if !p.statusKnown {
+ switch msg.Type {
+ case ethwire.MsgGetTxsTy, ethwire.MsgGetBlockHashesTy, ethwire.MsgGetBlocksTy, ethwire.MsgBlockHashesTy, ethwire.MsgBlockTy:
+ peerlogger.Debugln("Blocked outgoing [eth] message to peer without the [eth] cap.")
+ break
+ }
+ }
+
p.writeMessage(msg)
p.lastSend = time.Now()
@@ -337,6 +361,29 @@ clean:
}
}
+func formatMessage(msg *ethwire.Msg) (ret string) {
+ ret = fmt.Sprintf("%v %v", msg.Type, msg.Data)
+
+ /*
+ XXX Commented out because I need the log level here to determine
+ if i should or shouldn't generate this message
+ */
+ /*
+ switch msg.Type {
+ case ethwire.MsgPeersTy:
+ ret += fmt.Sprintf("(%d entries)", msg.Data.Len())
+ case ethwire.MsgBlockTy:
+ b1, b2 := ethchain.NewBlockFromRlpValue(msg.Data.Get(0)), ethchain.NewBlockFromRlpValue(msg.Data.Get(msg.Data.Len()-1))
+ ret += fmt.Sprintf("(%d entries) %x - %x", msg.Data.Len(), b1.Hash()[0:4], b2.Hash()[0:4])
+ case ethwire.MsgBlockHashesTy:
+ h1, h2 := msg.Data.Get(0).Bytes(), msg.Data.Get(msg.Data.Len()-1).Bytes()
+ ret += fmt.Sprintf("(%d entries) %x - %x", msg.Data.Len(), h1, h2)
+ }
+ */
+
+ return
+}
+
// Inbound handler. Inbound messages are received here and passed to the appropriate methods
func (p *Peer) HandleInbound() {
for atomic.LoadInt32(&p.disconnect) == 0 {
@@ -349,16 +396,16 @@ func (p *Peer) HandleInbound() {
peerlogger.Debugln(err)
}
for _, msg := range msgs {
- peerlogger.DebugDetailf("(%v) => %v %v\n", p.conn.RemoteAddr(), msg.Type, msg.Data)
+ peerlogger.DebugDetailf("(%v) => %v\n", p.conn.RemoteAddr(), formatMessage(msg))
switch msg.Type {
case ethwire.MsgHandshakeTy:
// Version message
p.handleHandshake(msg)
- if p.caps.IsCap(CapPeerDiscTy) {
- p.QueueMessage(ethwire.NewMessage(ethwire.MsgGetPeersTy, ""))
- }
+ //if p.caps.IsCap(CapPeerDiscTy) {
+ p.QueueMessage(ethwire.NewMessage(ethwire.MsgGetPeersTy, ""))
+ //}
case ethwire.MsgDiscTy:
p.Stop()
@@ -396,95 +443,111 @@ func (p *Peer) HandleInbound() {
// Connect to the list of peers
p.ethereum.ProcessPeerList(peers)
- case ethwire.MsgGetTxsTy:
- // Get the current transactions of the pool
- txs := p.ethereum.TxPool().CurrentTransactions()
- // Get the RlpData values from the txs
- txsInterface := make([]interface{}, len(txs))
- for i, tx := range txs {
- txsInterface[i] = tx.RlpData()
- }
- // Broadcast it back to the peer
- p.QueueMessage(ethwire.NewMessage(ethwire.MsgTxTy, txsInterface))
- case ethwire.MsgGetBlockHashesTy:
- if msg.Data.Len() < 2 {
- peerlogger.Debugln("err: argument length invalid ", msg.Data.Len())
- }
+ case ethwire.MsgStatusTy:
+ // Handle peer's status msg
+ p.handleStatus(msg)
+ }
- hash := msg.Data.Get(0).Bytes()
- amount := msg.Data.Get(1).Uint()
+ // TMP
+ if p.statusKnown {
+ switch msg.Type {
+ case ethwire.MsgGetTxsTy:
+ // Get the current transactions of the pool
+ txs := p.ethereum.TxPool().CurrentTransactions()
+ // Get the RlpData values from the txs
+ txsInterface := make([]interface{}, len(txs))
+ for i, tx := range txs {
+ txsInterface[i] = tx.RlpData()
+ }
+ // Broadcast it back to the peer
+ p.QueueMessage(ethwire.NewMessage(ethwire.MsgTxTy, txsInterface))
- hashes := p.ethereum.BlockChain().GetChainHashesFromHash(hash, amount)
+ case ethwire.MsgGetBlockHashesTy:
+ if msg.Data.Len() < 2 {
+ peerlogger.Debugln("err: argument length invalid ", msg.Data.Len())
+ }
- p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockHashesTy, ethutil.ByteSliceToInterface(hashes)))
+ hash := msg.Data.Get(0).Bytes()
+ amount := msg.Data.Get(1).Uint()
- case ethwire.MsgGetBlocksTy:
- // Limit to max 300 blocks
- max := int(math.Min(float64(msg.Data.Len()), 300.0))
- var blocks []interface{}
+ hashes := p.ethereum.BlockChain().GetChainHashesFromHash(hash, amount)
- for i := 0; i < max; i++ {
- hash := msg.Data.Get(i).Bytes()
- block := p.ethereum.BlockChain().GetBlock(hash)
- if block != nil {
- blocks = append(blocks, block.Value().Raw())
- }
- }
+ p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockHashesTy, ethutil.ByteSliceToInterface(hashes)))
- p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockTy, blocks))
+ case ethwire.MsgGetBlocksTy:
+ // Limit to max 300 blocks
+ max := int(math.Min(float64(msg.Data.Len()), 300.0))
+ var blocks []interface{}
- case ethwire.MsgBlockHashesTy:
- p.catchingUp = true
+ for i := 0; i < max; i++ {
+ hash := msg.Data.Get(i).Bytes()
+ block := p.ethereum.BlockChain().GetBlock(hash)
+ if block != nil {
+ blocks = append(blocks, block.Value().Raw())
+ }
+ }
- blockPool := p.ethereum.blockPool
+ p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockTy, blocks))
- foundCommonHash := false
+ case ethwire.MsgBlockHashesTy:
+ p.catchingUp = true
- it := msg.Data.NewIterator()
- for it.Next() {
- hash := it.Value().Bytes()
+ blockPool := p.ethereum.blockPool
- if blockPool.HasCommonHash(hash) {
- foundCommonHash = true
+ foundCommonHash := false
- break
- }
+ it := msg.Data.NewIterator()
+ for it.Next() {
+ hash := it.Value().Bytes()
- blockPool.AddHash(hash)
+ if blockPool.HasCommonHash(hash) {
+ foundCommonHash = true
- p.lastReceivedHash = hash
+ break
+ }
- p.lastBlockReceived = time.Now()
- }
+ blockPool.AddHash(hash)
- if foundCommonHash {
- p.FetchBlocks()
- } else {
- p.FetchHashes()
- }
+ p.lastReceivedHash = hash
- case ethwire.MsgBlockTy:
- p.catchingUp = true
+ p.lastBlockReceived = time.Now()
+ }
- blockPool := p.ethereum.blockPool
+ if foundCommonHash || msg.Data.Len() == 0 {
+ p.FetchBlocks()
+ } else {
+ p.FetchHashes()
+ }
- it := msg.Data.NewIterator()
+ case ethwire.MsgBlockTy:
+ p.catchingUp = true
- for it.Next() {
- block := ethchain.NewBlockFromRlpValue(it.Value())
+ blockPool := p.ethereum.blockPool
- blockPool.SetBlock(block)
+ it := msg.Data.NewIterator()
+ for it.Next() {
+ block := ethchain.NewBlockFromRlpValue(it.Value())
+ //fmt.Printf("%v %x - %x\n", block.Number, block.Hash()[0:4], block.PrevHash[0:4])
- p.lastBlockReceived = time.Now()
- }
+ blockPool.SetBlock(block, p)
- linked := blockPool.CheckLinkAndProcess(func(block *ethchain.Block) {
- p.ethereum.StateManager().Process(block, false)
- })
+ p.lastBlockReceived = time.Now()
+ }
- if !linked {
- p.FetchBlocks()
+ var err error
+ blockPool.CheckLinkAndProcess(func(block *ethchain.Block) {
+ err = p.ethereum.StateManager().Process(block, false)
+ })
+
+ if err != nil {
+ peerlogger.Infoln(err)
+ } else {
+ // Don't trigger if there's just one block.
+ if blockPool.Len() != 0 && msg.Data.Len() > 1 {
+ p.FetchBlocks()
+ }
+ }
}
}
}
@@ -506,10 +569,10 @@ func (self *Peer) FetchHashes() {
blockPool := self.ethereum.blockPool
if self.td.Cmp(blockPool.td) >= 0 {
- peerlogger.Debugf("Requesting hashes from %x\n", self.lastReceivedHash)
+ blockPool.td = self.td
if !blockPool.HasLatestHash() {
- self.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{self.lastReceivedHash, uint32(200)}))
+ self.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{self.lastReceivedHash, uint32(256)}))
}
}
}
@@ -580,18 +643,6 @@ func (p *Peer) Stop() {
p.ethereum.RemovePeer(p)
}
-func (p *Peer) pushHandshake() error {
- pubkey := p.ethereum.KeyManager().PublicKey()
- msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, []interface{}{
- uint32(ProtocolVersion), uint32(0), []byte(p.version), byte(p.caps), p.port, pubkey[1:],
- p.ethereum.BlockChain().TD.Uint64(), p.ethereum.BlockChain().CurrentBlock.Hash(),
- })
-
- p.QueueMessage(msg)
-
- return nil
-}
-
func (p *Peer) peersMessage() *ethwire.Msg {
outPeers := make([]interface{}, len(p.ethereum.InOutPeers()))
// Serialise each peer
@@ -611,13 +662,93 @@ func (p *Peer) pushPeers() {
p.QueueMessage(p.peersMessage())
}
+func (self *Peer) pushStatus() {
+ msg := ethwire.NewMessage(ethwire.MsgStatusTy, []interface{}{
+ uint32(ProtocolVersion),
+ uint32(NetVersion),
+ self.ethereum.BlockChain().TD,
+ self.ethereum.BlockChain().CurrentBlock.Hash(),
+ self.ethereum.BlockChain().Genesis().Hash(),
+ })
+
+ self.QueueMessage(msg)
+}
+
+func (self *Peer) handleStatus(msg *ethwire.Msg) {
+ c := msg.Data
+
+ var (
+ protoVersion = c.Get(0).Uint()
+ netVersion = c.Get(1).Uint()
+ td = c.Get(2).BigInt()
+ bestHash = c.Get(3).Bytes()
+ genesis = c.Get(4).Bytes()
+ )
+
+ if bytes.Compare(self.ethereum.BlockChain().Genesis().Hash(), genesis) != 0 {
+ ethlogger.Warnf("Invalid genisis hash %x. Disabling [eth]\n", genesis)
+ return
+ }
+
+ if netVersion != NetVersion {
+ ethlogger.Warnf("Invalid network version %d. Disabling [eth]\n", netVersion)
+ return
+ }
+
+ if protoVersion != ProtocolVersion {
+ ethlogger.Warnf("Invalid protocol version %d. Disabling [eth]\n", protoVersion)
+ return
+ }
+
+ // Get the td and last hash
+ self.td = td
+ self.bestHash = bestHash
+ self.lastReceivedHash = bestHash
+
+ self.statusKnown = true
+
+ // Compare the total TD with the blockchain TD. If remote is higher
+ // fetch hashes from highest TD node.
+ if self.td.Cmp(self.ethereum.BlockChain().TD) > 0 {
+ self.ethereum.blockPool.AddHash(self.lastReceivedHash)
+ self.FetchHashes()
+ }
+
+ ethlogger.Infof("Peer is [eth] capable. (TD = %v ~ %x) %d / %d", self.td, self.bestHash, protoVersion, netVersion)
+
+}
+
+func (p *Peer) pushHandshake() error {
+ pubkey := p.ethereum.KeyManager().PublicKey()
+ msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, []interface{}{
+ P2PVersion, []byte(p.version), []interface{}{"eth"}, p.port, pubkey[1:],
+ })
+
+ p.QueueMessage(msg)
+
+ return nil
+}
+
func (p *Peer) handleHandshake(msg *ethwire.Msg) {
c := msg.Data
- // Set pubkey
- p.pubkey = c.Get(5).Bytes()
+ var (
+ p2pVersion = c.Get(0).Uint()
+ clientId = c.Get(1).Str()
+ caps = c.Get(2)
+ port = c.Get(3).Uint()
+ pub = c.Get(4).Bytes()
+ )
+
+ // Check correctness of p2p protocol version
+ if p2pVersion != P2PVersion {
+ peerlogger.Debugf("Invalid P2P version. Require protocol %d, received %d\n", P2PVersion, p2pVersion)
+ p.Stop()
+ return
+ }
- if p.pubkey == nil {
+ // Handle the pub key (validation, uniqueness)
+ if len(pub) == 0 {
peerlogger.Warnln("Pubkey required, not supplied in handshake.")
p.Stop()
return
@@ -625,9 +756,8 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) {
usedPub := 0
// This peer is already added to the peerlist so we expect to find a double pubkey at least once
-
eachPeer(p.ethereum.Peers(), func(peer *Peer, e *list.Element) {
- if bytes.Compare(p.pubkey, peer.pubkey) == 0 {
+ if bytes.Compare(pub, peer.pubkey) == 0 {
usedPub++
}
})
@@ -637,19 +767,11 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) {
p.Stop()
return
}
-
- if c.Get(0).Uint() != ProtocolVersion {
- peerlogger.Debugf("Invalid peer version. Require protocol: %d. Received: %d\n", ProtocolVersion, c.Get(0).Uint())
- p.Stop()
- return
- }
-
- // [PROTOCOL_VERSION, NETWORK_ID, CLIENT_ID, CAPS, PORT, PUBKEY]
- p.versionKnown = true
+ p.pubkey = pub
// If this is an inbound connection send an ack back
if p.inbound {
- p.port = uint16(c.Get(4).Uint())
+ p.port = uint16(port)
// Self connect detection
pubkey := p.ethereum.KeyManager().PublicKey()
@@ -660,40 +782,27 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) {
}
}
+ p.SetVersion(clientId)
- // Set the peer's caps
- p.caps = Caps(c.Get(3).Byte())
-
- // Get a reference to the peers version
- versionString := c.Get(2).Str()
- if len(versionString) > 0 {
- p.SetVersion(c.Get(2).Str())
- }
-
- // Get the td and last hash
- p.td = c.Get(6).BigInt()
- p.bestHash = c.Get(7).Bytes()
- p.lastReceivedHash = p.bestHash
+ p.versionKnown = true
p.ethereum.PushPeer(p)
p.ethereum.reactor.Post("peerList", p.ethereum.Peers())
- ethlogger.Infof("Added peer (%s) %d / %d (TD = %v ~ %x)\n", p.conn.RemoteAddr(), p.ethereum.Peers().Len(), p.ethereum.MaxPeers, p.td, p.bestHash)
-
- /*
- // Catch up with the connected peer
- if !p.ethereum.IsUpToDate() {
- peerlogger.Debugln("Already syncing up with a peer; sleeping")
- time.Sleep(10 * time.Second)
+ capsIt := caps.NewIterator()
+ var capsStrs []string
+ for capsIt.Next() {
+ cap := capsIt.Value().Str()
+ switch cap {
+ case "eth":
+ p.pushStatus()
}
- */
- //p.SyncWithPeerToLastKnown()
- if p.td.Cmp(p.ethereum.BlockChain().TD) > 0 {
- p.ethereum.blockPool.AddHash(p.lastReceivedHash)
- p.FetchHashes()
+ capsStrs = append(capsStrs, cap)
}
+ ethlogger.Infof("Added peer (%s) %d / %d (%v)\n", p.conn.RemoteAddr(), p.ethereum.Peers().Len(), p.ethereum.MaxPeers, capsStrs)
+
peerlogger.Debugln(p)
}
@@ -714,47 +823,6 @@ func (p *Peer) String() string {
return fmt.Sprintf("[%s] (%s) %v %s [%s]", strConnectType, strBoundType, p.conn.RemoteAddr(), p.version, p.caps)
}
-func (p *Peer) SyncWithPeerToLastKnown() {
- p.catchingUp = false
- p.CatchupWithPeer(p.ethereum.BlockChain().CurrentBlock.Hash())
-}
-
-func (p *Peer) FindCommonParentBlock() {
- if p.catchingUp {
- return
- }
-
- p.catchingUp = true
- if p.blocksRequested == 0 {
- p.blocksRequested = 20
- }
- blocks := p.ethereum.BlockChain().GetChain(p.ethereum.BlockChain().CurrentBlock.Hash(), p.blocksRequested)
-
- var hashes []interface{}
- for _, block := range blocks {
- hashes = append(hashes, block.Hash())
- }
-
- msgInfo := append(hashes, uint64(len(hashes)))
-
- peerlogger.DebugDetailf("Asking for block from %x (%d total) from %s\n", p.ethereum.BlockChain().CurrentBlock.Hash(), len(hashes), p.conn.RemoteAddr().String())
-
- msg := ethwire.NewMessage(ethwire.MsgGetChainTy, msgInfo)
- p.QueueMessage(msg)
-}
-func (p *Peer) CatchupWithPeer(blockHash []byte) {
- if !p.catchingUp {
- // Make sure nobody else is catching up when you want to do this
- p.catchingUp = true
- msg := ethwire.NewMessage(ethwire.MsgGetChainTy, []interface{}{blockHash, uint64(100)})
- p.QueueMessage(msg)
-
- peerlogger.DebugDetailf("Requesting blockchain %x... from peer %s\n", p.ethereum.BlockChain().CurrentBlock.Hash()[:4], p.conn.RemoteAddr())
-
- msg = ethwire.NewMessage(ethwire.MsgGetTxsTy, []interface{}{})
- p.QueueMessage(msg)
- }
-}
func (p *Peer) RlpData() []interface{} {
return []interface{}{p.host, p.port, p.pubkey}