aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--block_pool.go82
-rw-r--r--ethereum.go6
-rw-r--r--peer.go61
3 files changed, 121 insertions, 28 deletions
diff --git a/block_pool.go b/block_pool.go
index f5c53b9f7..e31babfd6 100644
--- a/block_pool.go
+++ b/block_pool.go
@@ -11,6 +11,7 @@ import (
"github.com/ethereum/eth-go/ethchain"
"github.com/ethereum/eth-go/ethlog"
"github.com/ethereum/eth-go/ethutil"
+ "github.com/ethereum/eth-go/ethwire"
)
var poollogger = ethlog.NewLogger("BPOOL")
@@ -38,6 +39,8 @@ type BlockPool struct {
downloadStartedAt time.Time
ChainLength, BlocksProcessed int
+
+ peer *Peer
}
func NewBlockPool(eth *Ethereum) *BlockPool {
@@ -74,6 +77,27 @@ func (self *BlockPool) Blocks() (blocks ethchain.Blocks) {
return
}
+func (self *BlockPool) FetchHashes(peer *Peer) {
+ highestTd := self.eth.HighestTDPeer()
+
+ if (self.peer == nil && peer.td.Cmp(highestTd) >= 0) || (self.peer != nil && peer.td.Cmp(self.peer.td) >= 0) || self.peer == peer {
+ if self.peer != peer {
+ poollogger.Debugf("Found better suitable peer (%v vs %v)\n", self.td, peer.td)
+ }
+
+ self.peer = peer
+ self.td = peer.td
+
+ if !self.HasLatestHash() {
+ peer.doneFetchingHashes = false
+
+ const amount = 256
+ peerlogger.Debugf("Fetching hashes (%d)\n", amount)
+ peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{peer.lastReceivedHash, uint32(amount)}))
+ }
+ }
+}
+
func (self *BlockPool) AddHash(hash []byte, peer *Peer) {
self.mut.Lock()
defer self.mut.Unlock()
@@ -99,7 +123,7 @@ func (self *BlockPool) Add(b *ethchain.Block, peer *Peer) {
if !self.eth.BlockChain().HasBlock(b.PrevHash) && self.pool[string(b.PrevHash)] == nil && !self.fetchingHashes {
poollogger.Infof("Unknown block, requesting parent (%x...)\n", b.PrevHash[0:4])
- //peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.Hash(), uint32(256)}))
+ peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.Hash(), uint32(256)}))
}
} else if self.pool[hash] != nil {
self.pool[hash].block = b
@@ -227,7 +251,7 @@ out:
}
func (self *BlockPool) chainThread() {
- procTimer := time.NewTicker(500 * time.Millisecond)
+ procTimer := time.NewTicker(1000 * time.Millisecond)
out:
for {
select {
@@ -237,6 +261,14 @@ out:
blocks := self.Blocks()
ethchain.BlockBy(ethchain.Number).Sort(blocks)
+ // Find common block
+ for i, block := range blocks {
+ if self.eth.BlockChain().HasBlock(block.PrevHash) {
+ blocks = blocks[i:]
+ break
+ }
+ }
+
if len(blocks) > 0 {
if self.eth.BlockChain().HasBlock(blocks[0].PrevHash) {
for i, block := range blocks[1:] {
@@ -253,13 +285,51 @@ out:
}
}
- // Handle in batches of 4k
- max := int(math.Min(4000, float64(len(blocks))))
- for _, block := range blocks[:max] {
- self.eth.Eventer().Post("block", block)
+ if len(blocks) > 0 {
+ self.eth.Eventer().Post("blocks", blocks)
+
+ }
+
+ var err error
+ for i, block := range blocks {
+ //self.eth.Eventer().Post("block", block)
+ err = self.eth.StateManager().Process(block, false)
+ if err != nil {
+ poollogger.Infoln(err)
+ poollogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4])
+ poollogger.Debugln(block)
+
+ blocks = blocks[i:]
+
+ break
+ }
self.Remove(block.Hash())
}
+
+ if err != nil {
+ // Remove this bad chain
+ for _, block := range blocks {
+ self.Remove(block.Hash())
+ }
+
+ poollogger.Debugf("Punishing peer for supplying bad chain (%v)\n", self.peer.conn.RemoteAddr())
+ // This peer gave us bad hashes and made us fetch a bad chain, therefor he shall be punished.
+ self.eth.BlacklistPeer(self.peer)
+ self.peer.StopWithReason(DiscBadPeer)
+ self.td = ethutil.Big0
+ self.peer = nil
+ }
+
+ /*
+ // Handle in batches of 4k
+ //max := int(math.Min(4000, float64(len(blocks))))
+ for _, block := range blocks {
+ self.eth.Eventer().Post("block", block)
+
+ self.Remove(block.Hash())
+ }
+ */
}
}
}
diff --git a/ethereum.go b/ethereum.go
index 5fb3f2909..013214726 100644
--- a/ethereum.go
+++ b/ethereum.go
@@ -69,6 +69,8 @@ type Ethereum struct {
Addr net.Addr
Port string
+ blacklist [][]byte
+
peerMut sync.Mutex
// Capabilities for outgoing peers
@@ -211,6 +213,10 @@ func (s *Ethereum) HighestTDPeer() (td *big.Int) {
return
}
+func (self *Ethereum) BlacklistPeer(peer *Peer) {
+ self.blacklist = append(self.blacklist, peer.pubkey)
+}
+
func (s *Ethereum) AddPeer(conn net.Conn) {
peer := NewPeer(conn, s, true)
diff --git a/peer.go b/peer.go
index b8f850b5a..2806e8a11 100644
--- a/peer.go
+++ b/peer.go
@@ -39,15 +39,15 @@ const (
// Values are given explicitly instead of by iota because these values are
// defined by the wire protocol spec; it is easier for humans to ensure
// correctness when values are explicit.
- DiscReRequested = 0x00
- DiscReTcpSysErr = 0x01
- DiscBadProto = 0x02
- DiscBadPeer = 0x03
- DiscTooManyPeers = 0x04
- DiscConnDup = 0x05
- DiscGenesisErr = 0x06
- DiscProtoErr = 0x07
- DiscQuitting = 0x08
+ DiscRequested DiscReason = iota
+ DiscReTcpSysErr
+ DiscBadProto
+ DiscBadPeer
+ DiscTooManyPeers
+ DiscConnDup
+ DiscGenesisErr
+ DiscProtoErr
+ DiscQuitting
)
var discReasonToString = []string{
@@ -554,19 +554,22 @@ func (self *Peer) FetchBlocks(hashes [][]byte) {
}
func (self *Peer) FetchHashes() {
- self.doneFetchingHashes = false
-
blockPool := self.ethereum.blockPool
+ blockPool.FetchHashes(self)
+
+ /*
+ if self.td.Cmp(self.ethereum.HighestTDPeer()) >= 0 {
+ blockPool.td = self.td
- if self.td.Cmp(self.ethereum.HighestTDPeer()) >= 0 {
- blockPool.td = self.td
+ if !blockPool.HasLatestHash() {
+ self.doneFetchingHashes = false
- if !blockPool.HasLatestHash() {
- const amount = 256
- peerlogger.Debugf("Fetching hashes (%d)\n", amount)
- self.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{self.lastReceivedHash, uint32(amount)}))
+ const amount = 256
+ peerlogger.Debugf("Fetching hashes (%d)\n", amount)
+ self.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{self.lastReceivedHash, uint32(amount)}))
+ }
}
- }
+ */
}
func (self *Peer) FetchingHashes() bool {
@@ -631,18 +634,22 @@ func (p *Peer) Start() {
}
func (p *Peer) Stop() {
+ p.StopWithReason(DiscRequested)
+}
+
+func (p *Peer) StopWithReason(reason DiscReason) {
if atomic.AddInt32(&p.disconnect, 1) != 1 {
return
}
+ // Pre-emptively remove the peer; don't wait for reaping. We already know it's dead if we are here
+ p.ethereum.RemovePeer(p)
+
close(p.quit)
if atomic.LoadInt32(&p.connected) != 0 {
- p.writeMessage(ethwire.NewMessage(ethwire.MsgDiscTy, ""))
+ p.writeMessage(ethwire.NewMessage(ethwire.MsgDiscTy, reason))
p.conn.Close()
}
-
- // Pre-emptively remove the peer; don't wait for reaping. We already know it's dead if we are here
- p.ethereum.RemovePeer(p)
}
func (p *Peer) peersMessage() *ethwire.Msg {
@@ -764,6 +771,16 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) {
return
}
+ // Check for blacklisting
+ for _, pk := range p.ethereum.blacklist {
+ if bytes.Compare(pk, pub) == 0 {
+ peerlogger.Debugf("Blacklisted peer tried to connect (%x...)\n", pubkey[0:4])
+ p.StopWithReason(DiscBadPeer)
+
+ return
+ }
+ }
+
usedPub := 0
// This peer is already added to the peerlist so we expect to find a double pubkey at least once
eachPeer(p.ethereum.Peers(), func(peer *Peer, e *list.Element) {