From eaa2e8900d1036e09b002c4e20fc6e4f9cd031bb Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 21 Aug 2014 14:47:58 +0200 Subject: PoC 6 networking code. * Added block pool for gathering blocks from the network (chunks) * Re wrote syncing --- block_pool.go | 116 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 116 insertions(+) create mode 100644 block_pool.go (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go new file mode 100644 index 000000000..3225bdff2 --- /dev/null +++ b/block_pool.go @@ -0,0 +1,116 @@ +package eth + +import ( + "math" + "math/big" + "sync" + + "github.com/ethereum/eth-go/ethchain" + "github.com/ethereum/eth-go/ethutil" +) + +type block struct { + peer *Peer + block *ethchain.Block +} + +type BlockPool struct { + mut sync.Mutex + + eth *Ethereum + + hashPool [][]byte + pool map[string]*block + + td *big.Int +} + +func NewBlockPool(eth *Ethereum) *BlockPool { + return &BlockPool{ + eth: eth, + pool: make(map[string]*block), + td: ethutil.Big0, + } +} + +func (self *BlockPool) HasLatestHash() bool { + return self.pool[string(self.eth.BlockChain().CurrentBlock.Hash())] != nil +} + +func (self *BlockPool) HasCommonHash(hash []byte) bool { + return self.eth.BlockChain().GetBlock(hash) != nil +} + +func (self *BlockPool) AddHash(hash []byte) { + if self.pool[string(hash)] == nil { + self.pool[string(hash)] = &block{nil, nil} + + self.hashPool = append([][]byte{hash}, self.hashPool...) + } +} + +func (self *BlockPool) SetBlock(b *ethchain.Block) { + hash := string(b.Hash()) + + if self.pool[string(hash)] == nil { + self.pool[hash] = &block{nil, nil} + } + + self.pool[hash].block = b +} + +func (self *BlockPool) CheckLinkAndProcess(f func(block *ethchain.Block)) bool { + self.mut.Lock() + defer self.mut.Unlock() + + if self.IsLinked() { + for i, hash := range self.hashPool { + block := self.pool[string(hash)].block + if block != nil { + f(block) + + delete(self.pool, string(hash)) + } else { + self.hashPool = self.hashPool[i:] + + return false + } + } + + return true + } + + return false +} + +func (self *BlockPool) IsLinked() bool { + if len(self.hashPool) == 0 { + return false + } + + block := self.pool[string(self.hashPool[0])].block + if block != nil { + return self.eth.BlockChain().HasBlock(block.PrevHash) + } + + return false +} + +func (self *BlockPool) Take(amount int, peer *Peer) (hashes [][]byte) { + self.mut.Lock() + defer self.mut.Unlock() + + num := int(math.Min(float64(amount), float64(len(self.pool)))) + j := 0 + for i := 0; i < len(self.hashPool) && j < num; i++ { + hash := string(self.hashPool[i]) + if self.pool[hash].peer == nil || self.pool[hash].peer == peer { + self.pool[hash].peer = peer + + hashes = append(hashes, self.hashPool[i]) + j++ + } + } + + return +} -- cgit v1.2.3 From a9f9a594160405737657083476535f3e48df1558 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 22 Aug 2014 10:58:57 +0200 Subject: Extra checks --- block_pool.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index 3225bdff2..2be2bc787 100644 --- a/block_pool.go +++ b/block_pool.go @@ -49,11 +49,11 @@ func (self *BlockPool) AddHash(hash []byte) { } } -func (self *BlockPool) SetBlock(b *ethchain.Block) { +func (self *BlockPool) SetBlock(b *ethchain.Block, peer *Peer) { hash := string(b.Hash()) - if self.pool[string(hash)] == nil { - self.pool[hash] = &block{nil, nil} + if self.pool[hash] == nil { + self.pool[hash] = &block{peer, nil} } self.pool[hash].block = b @@ -65,6 +65,10 @@ func (self *BlockPool) CheckLinkAndProcess(f func(block *ethchain.Block)) bool { if self.IsLinked() { for i, hash := range self.hashPool { + if self.pool[string(hash)] == nil { + continue + } + block := self.pool[string(hash)].block if block != nil { f(block) @@ -84,7 +88,7 @@ func (self *BlockPool) CheckLinkAndProcess(f func(block *ethchain.Block)) bool { } func (self *BlockPool) IsLinked() bool { - if len(self.hashPool) == 0 { + if len(self.hashPool) == 0 || self.pool[string(self.hashPool[0])] == nil { return false } @@ -104,7 +108,7 @@ func (self *BlockPool) Take(amount int, peer *Peer) (hashes [][]byte) { j := 0 for i := 0; i < len(self.hashPool) && j < num; i++ { hash := string(self.hashPool[i]) - if self.pool[hash].peer == nil || self.pool[hash].peer == peer { + if self.pool[hash] != nil && (self.pool[hash].peer == nil || self.pool[hash].peer == peer) && self.pool[hash].block == nil { self.pool[hash].peer = peer hashes = append(hashes, self.hashPool[i]) -- cgit v1.2.3 From be9bfb5536c7410bdd9cb3fbd13fb622bfc00a57 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 22 Aug 2014 14:52:20 +0200 Subject: Minor improvement catching up * When catching up check linked up the chain of hashes --- block_pool.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index 2be2bc787..e3f0f6ff0 100644 --- a/block_pool.go +++ b/block_pool.go @@ -1,6 +1,7 @@ package eth import ( + "fmt" "math" "math/big" "sync" @@ -51,6 +52,7 @@ func (self *BlockPool) AddHash(hash []byte) { func (self *BlockPool) SetBlock(b *ethchain.Block, peer *Peer) { hash := string(b.Hash()) + fmt.Printf("::SetBlock %x\n", hash) if self.pool[hash] == nil { self.pool[hash] = &block{peer, nil} @@ -88,13 +90,19 @@ func (self *BlockPool) CheckLinkAndProcess(f func(block *ethchain.Block)) bool { } func (self *BlockPool) IsLinked() bool { - if len(self.hashPool) == 0 || self.pool[string(self.hashPool[0])] == nil { + if len(self.hashPool) == 0 { return false } - block := self.pool[string(self.hashPool[0])].block - if block != nil { - return self.eth.BlockChain().HasBlock(block.PrevHash) + for i := 0; i < len(self.hashPool); i++ { + item := self.pool[string(self.hashPool[i])] + if item != nil && item.block != nil { + if self.eth.BlockChain().HasBlock(item.block.PrevHash) { + self.hashPool = self.hashPool[i:] + + return true + } + } } return false -- cgit v1.2.3 From 56103f07517fe32522e510213c02ea982dfcef42 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 22 Aug 2014 17:10:18 +0200 Subject: Log --- block_pool.go | 2 -- 1 file changed, 2 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index e3f0f6ff0..25627eb5c 100644 --- a/block_pool.go +++ b/block_pool.go @@ -1,7 +1,6 @@ package eth import ( - "fmt" "math" "math/big" "sync" @@ -52,7 +51,6 @@ func (self *BlockPool) AddHash(hash []byte) { func (self *BlockPool) SetBlock(b *ethchain.Block, peer *Peer) { hash := string(b.Hash()) - fmt.Printf("::SetBlock %x\n", hash) if self.pool[hash] == nil { self.pool[hash] = &block{peer, nil} -- cgit v1.2.3 From 74ef22d8247c08b6b827f5e9f1001f8bcce9d0e0 Mon Sep 17 00:00:00 2001 From: obscuren Date: Sun, 14 Sep 2014 14:30:33 +0200 Subject: add it to the list --- block_pool.go | 1 + 1 file changed, 1 insertion(+) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index 25627eb5c..38827242d 100644 --- a/block_pool.go +++ b/block_pool.go @@ -53,6 +53,7 @@ func (self *BlockPool) SetBlock(b *ethchain.Block, peer *Peer) { hash := string(b.Hash()) if self.pool[hash] == nil { + self.hashPool = append(self.hashPool, b.Hash()) self.pool[hash] = &block{peer, nil} } -- cgit v1.2.3 From 33a0dec8a157b9687ca6038f4deb011f3f1f7bdc Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 15 Sep 2014 15:42:12 +0200 Subject: Improved catching up and refactored --- block_pool.go | 59 +++++++++++++++++------------------------------------------ 1 file changed, 17 insertions(+), 42 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index 38827242d..0a668e111 100644 --- a/block_pool.go +++ b/block_pool.go @@ -52,59 +52,34 @@ func (self *BlockPool) AddHash(hash []byte) { func (self *BlockPool) SetBlock(b *ethchain.Block, peer *Peer) { hash := string(b.Hash()) - if self.pool[hash] == nil { + if self.pool[hash] == nil && !self.eth.BlockChain().HasBlock(b.Hash()) { self.hashPool = append(self.hashPool, b.Hash()) - self.pool[hash] = &block{peer, nil} + self.pool[hash] = &block{peer, b} + } else if self.pool[hash] != nil { + self.pool[hash].block = b } - - self.pool[hash].block = b } -func (self *BlockPool) CheckLinkAndProcess(f func(block *ethchain.Block)) bool { - self.mut.Lock() - defer self.mut.Unlock() - - if self.IsLinked() { - for i, hash := range self.hashPool { - if self.pool[string(hash)] == nil { - continue - } +func (self *BlockPool) CheckLinkAndProcess(f func(block *ethchain.Block)) { - block := self.pool[string(hash)].block - if block != nil { - f(block) - - delete(self.pool, string(hash)) - } else { - self.hashPool = self.hashPool[i:] - - return false - } + var blocks ethchain.Blocks + for _, item := range self.pool { + if item.block != nil { + blocks = append(blocks, item.block) } - - return true } - return false -} - -func (self *BlockPool) IsLinked() bool { - if len(self.hashPool) == 0 { - return false - } + ethchain.BlockBy(ethchain.Number).Sort(blocks) + for _, block := range blocks { + if self.eth.BlockChain().HasBlock(block.PrevHash) { + f(block) - for i := 0; i < len(self.hashPool); i++ { - item := self.pool[string(self.hashPool[i])] - if item != nil && item.block != nil { - if self.eth.BlockChain().HasBlock(item.block.PrevHash) { - self.hashPool = self.hashPool[i:] - - return true - } + hash := block.Hash() + self.hashPool = ethutil.DeleteFromByteSlice(self.hashPool, hash) + delete(self.pool, string(hash)) } - } - return false + } } func (self *BlockPool) Take(amount int, peer *Peer) (hashes [][]byte) { -- cgit v1.2.3 From a26c479182375a076833068aa6125724fda647fe Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 17 Sep 2014 15:58:02 +0200 Subject: Added len --- block_pool.go | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index 0a668e111..6753308b6 100644 --- a/block_pool.go +++ b/block_pool.go @@ -33,6 +33,10 @@ func NewBlockPool(eth *Ethereum) *BlockPool { } } +func (self *BlockPool) Len() int { + return len(self.hashPool) +} + func (self *BlockPool) HasLatestHash() bool { return self.pool[string(self.eth.BlockChain().CurrentBlock.Hash())] != nil } -- cgit v1.2.3 From ba43364f36db690528cc62196969414ef5e98833 Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 24 Sep 2014 11:41:57 +0200 Subject: tmp --- block_pool.go | 101 ++++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 95 insertions(+), 6 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index 6753308b6..f3a863429 100644 --- a/block_pool.go +++ b/block_pool.go @@ -1,17 +1,23 @@ package eth import ( + "bytes" + "container/list" + "fmt" "math" "math/big" "sync" + "time" "github.com/ethereum/eth-go/ethchain" "github.com/ethereum/eth-go/ethutil" ) type block struct { - peer *Peer - block *ethchain.Block + peer *Peer + block *ethchain.Block + reqAt time.Time + requested int } type BlockPool struct { @@ -22,7 +28,8 @@ type BlockPool struct { hashPool [][]byte pool map[string]*block - td *big.Int + td *big.Int + quit chan bool } func NewBlockPool(eth *Ethereum) *BlockPool { @@ -30,6 +37,7 @@ func NewBlockPool(eth *Ethereum) *BlockPool { eth: eth, pool: make(map[string]*block), td: ethutil.Big0, + quit: make(chan bool), } } @@ -47,7 +55,7 @@ func (self *BlockPool) HasCommonHash(hash []byte) bool { func (self *BlockPool) AddHash(hash []byte) { if self.pool[string(hash)] == nil { - self.pool[string(hash)] = &block{nil, nil} + self.pool[string(hash)] = &block{nil, nil, time.Now(), 0} self.hashPool = append([][]byte{hash}, self.hashPool...) } @@ -58,12 +66,34 @@ func (self *BlockPool) SetBlock(b *ethchain.Block, peer *Peer) { if self.pool[hash] == nil && !self.eth.BlockChain().HasBlock(b.Hash()) { self.hashPool = append(self.hashPool, b.Hash()) - self.pool[hash] = &block{peer, b} + self.pool[hash] = &block{peer, b, time.Now(), 0} } else if self.pool[hash] != nil { self.pool[hash].block = b } } +func (self *BlockPool) getParent(block *ethchain.Block) *ethchain.Block { + for _, item := range self.pool { + if item.block != nil { + if bytes.Compare(item.block.Hash(), block.PrevHash) == 0 { + return item.block + } + } + } + + return nil +} + +func (self *BlockPool) GetChainFromBlock(block *ethchain.Block) ethchain.Blocks { + var blocks ethchain.Blocks + + for b := block; b != nil; b = self.getParent(b) { + blocks = append(ethchain.Blocks{b}, blocks...) + } + + return blocks +} + func (self *BlockPool) CheckLinkAndProcess(f func(block *ethchain.Block)) { var blocks ethchain.Blocks @@ -94,8 +124,14 @@ func (self *BlockPool) Take(amount int, peer *Peer) (hashes [][]byte) { j := 0 for i := 0; i < len(self.hashPool) && j < num; i++ { hash := string(self.hashPool[i]) - if self.pool[hash] != nil && (self.pool[hash].peer == nil || self.pool[hash].peer == peer) && self.pool[hash].block == nil { + item := self.pool[hash] + if item != nil && item.block == nil && + (item.peer == nil || + ((time.Since(item.reqAt) > 5*time.Second && item.peer != peer) && self.eth.peers.Len() > 1) || // multiple peers + (time.Since(item.reqAt) > 5*time.Second && self.eth.peers.Len() == 1) /* single peer*/) { self.pool[hash].peer = peer + self.pool[hash].reqAt = time.Now() + self.pool[hash].requested++ hashes = append(hashes, self.hashPool[i]) j++ @@ -104,3 +140,56 @@ func (self *BlockPool) Take(amount int, peer *Peer) (hashes [][]byte) { return } + +func (self *BlockPool) Start() { + go self.update() +} + +func (self *BlockPool) Stop() { + close(self.quit) +} + +func (self *BlockPool) update() { + serviceTimer := time.NewTicker(100 * time.Millisecond) + procTimer := time.NewTicker(500 * time.Millisecond) +out: + for { + select { + case <-self.quit: + break out + case <-serviceTimer.C: + // Clean up hashes that can't be fetched + done := true + eachPeer(self.eth.peers, func(p *Peer, v *list.Element) { + if p.statusKnown && p.FetchingHashes() { + done = false + } + }) + + if done { + eachPeer(self.eth.peers, func(p *Peer, v *list.Element) { + if p.statusKnown { + hashes := self.Take(100, p) + if len(hashes) > 0 { + p.FetchBlocks(hashes) + if len(hashes) == 1 { + fmt.Printf("last hash = %x\n", hashes[0]) + } else { + fmt.Println("Requesting", len(hashes), "of", p) + } + } + } + }) + } + case <-procTimer.C: + var err error + self.CheckLinkAndProcess(func(block *ethchain.Block) { + err = self.eth.StateManager().Process(block, false) + }) + + if err != nil { + peerlogger.Infoln(err) + } + } + } +} -- cgit v1.2.3 From 84690bfbbe99b0c8b8d3a377d39b0900990b300c Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 24 Sep 2014 19:54:30 +0200 Subject: Changed the block fetching code and hash distribution --- block_pool.go | 130 ++++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 85 insertions(+), 45 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index f3a863429..88d1c3739 100644 --- a/block_pool.go +++ b/block_pool.go @@ -3,17 +3,21 @@ package eth import ( "bytes" "container/list" - "fmt" "math" "math/big" "sync" "time" "github.com/ethereum/eth-go/ethchain" + "github.com/ethereum/eth-go/ethlog" "github.com/ethereum/eth-go/ethutil" + "github.com/ethereum/eth-go/ethwire" ) +var poollogger = ethlog.NewLogger("[BPOOL]") + type block struct { + from *Peer peer *Peer block *ethchain.Block reqAt time.Time @@ -30,6 +34,8 @@ type BlockPool struct { td *big.Int quit chan bool + + ChainLength, BlocksProcessed int } func NewBlockPool(eth *Ethereum) *BlockPool { @@ -53,9 +59,9 @@ func (self *BlockPool) HasCommonHash(hash []byte) bool { return self.eth.BlockChain().GetBlock(hash) != nil } -func (self *BlockPool) AddHash(hash []byte) { +func (self *BlockPool) AddHash(hash []byte, peer *Peer) { if self.pool[string(hash)] == nil { - self.pool[string(hash)] = &block{nil, nil, time.Now(), 0} + self.pool[string(hash)] = &block{peer, nil, nil, time.Now(), 0} self.hashPool = append([][]byte{hash}, self.hashPool...) } @@ -66,10 +72,12 @@ func (self *BlockPool) SetBlock(b *ethchain.Block, peer *Peer) { if self.pool[hash] == nil && !self.eth.BlockChain().HasBlock(b.Hash()) { self.hashPool = append(self.hashPool, b.Hash()) - self.pool[hash] = &block{peer, b, time.Now(), 0} + self.pool[hash] = &block{peer, peer, b, time.Now(), 0} } else if self.pool[hash] != nil { self.pool[hash].block = b } + + self.BlocksProcessed++ } func (self *BlockPool) getParent(block *ethchain.Block) *ethchain.Block { @@ -94,18 +102,24 @@ func (self *BlockPool) GetChainFromBlock(block *ethchain.Block) ethchain.Blocks return blocks } -func (self *BlockPool) CheckLinkAndProcess(f func(block *ethchain.Block)) { - - var blocks ethchain.Blocks +func (self *BlockPool) Blocks() (blocks ethchain.Blocks) { for _, item := range self.pool { if item.block != nil { blocks = append(blocks, item.block) } } + return +} + +func (self *BlockPool) ProcessCanonical(f func(block *ethchain.Block)) (procAmount int) { + blocks := self.Blocks() + ethchain.BlockBy(ethchain.Number).Sort(blocks) for _, block := range blocks { if self.eth.BlockChain().HasBlock(block.PrevHash) { + procAmount++ + f(block) hash := block.Hash() @@ -114,31 +128,58 @@ func (self *BlockPool) CheckLinkAndProcess(f func(block *ethchain.Block)) { } } + + return } -func (self *BlockPool) Take(amount int, peer *Peer) (hashes [][]byte) { - self.mut.Lock() - defer self.mut.Unlock() +func (self *BlockPool) DistributeHashes() { + var ( + peerLen = self.eth.peers.Len() + amount = 200 * peerLen + dist = make(map[*Peer][][]byte) + ) num := int(math.Min(float64(amount), float64(len(self.pool)))) - j := 0 - for i := 0; i < len(self.hashPool) && j < num; i++ { - hash := string(self.hashPool[i]) - item := self.pool[hash] - if item != nil && item.block == nil && - (item.peer == nil || - ((time.Since(item.reqAt) > 5*time.Second && item.peer != peer) && self.eth.peers.Len() > 1) || // multiple peers - (time.Since(item.reqAt) > 5*time.Second && self.eth.peers.Len() == 1) /* single peer*/) { - self.pool[hash].peer = peer - self.pool[hash].reqAt = time.Now() - self.pool[hash].requested++ - - hashes = append(hashes, self.hashPool[i]) - j++ + for i, j := 0, 0; i < len(self.hashPool) && j < num; i++ { + hash := self.hashPool[i] + item := self.pool[string(hash)] + + if item != nil && item.block == nil { + var peer *Peer + lastFetchFailed := time.Since(item.reqAt) > 5*time.Second + + // Handle failed requests + if lastFetchFailed && item.requested > 0 && item.peer != nil { + if item.requested < 100 { + // Select peer the hash was retrieved off + peer = item.from + } else { + // Remove it + self.hashPool = ethutil.DeleteFromByteSlice(self.hashPool, hash) + delete(self.pool, string(hash)) + } + } else if lastFetchFailed || item.peer == nil { + // Find a suitable, available peer + eachPeer(self.eth.peers, func(p *Peer, v *list.Element) { + if peer == nil && len(dist[p]) < amount/peerLen { + peer = p + } + }) + } + + if peer != nil { + item.reqAt = time.Now() + item.peer = peer + item.requested++ + + dist[peer] = append(dist[peer], hash) + } } } - return + for peer, hashes := range dist { + peer.FetchBlocks(hashes) + } } func (self *BlockPool) Start() { @@ -158,7 +199,8 @@ out: case <-self.quit: break out case <-serviceTimer.C: - // Clean up hashes that can't be fetched + // Check if we're catching up. If not distribute the hashes to + // the peers and download the blockchain done := true eachPeer(self.eth.peers, func(p *Peer, v *list.Element) { if p.statusKnown && p.FetchingHashes() { @@ -166,29 +208,27 @@ out: } }) - if done { - eachPeer(self.eth.peers, func(p *Peer, v *list.Element) { - if p.statusKnown { - hashes := self.Take(100, p) - if len(hashes) > 0 { - p.FetchBlocks(hashes) - if len(hashes) == 1 { - fmt.Printf("last hash = %x\n", hashes[0]) - } else { - fmt.Println("Requesting", len(hashes), "of", p) - } - } - } - }) + if done && len(self.hashPool) > 0 { + self.DistributeHashes() + } + + if self.ChainLength < len(self.hashPool) { + self.ChainLength = len(self.hashPool) } case <-procTimer.C: - var err error - self.CheckLinkAndProcess(func(block *ethchain.Block) { - err = self.eth.StateManager().Process(block, false) + // XXX We can optimize this lifting this on to a new goroutine. + // We'd need to make sure that the pools are properly protected by a mutex + amount := self.ProcessCanonical(func(block *ethchain.Block) { + err := self.eth.StateManager().Process(block, false) + if err != nil { + poollogger.Infoln(err) + } }) - if err != nil { - peerlogger.Infoln(err) + // Do not propagate to the network on catchups + if amount == 1 { + block := self.eth.BlockChain().CurrentBlock + self.eth.Broadcast(ethwire.MsgBlockTy, []interface{}{block.Value().Val}) } } } -- cgit v1.2.3 From 9ed8dc7384deb932be624699d9f628d3d00ba31e Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 25 Sep 2014 16:57:49 +0200 Subject: Attempt to catch up from unknown block --- block_pool.go | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index 88d1c3739..f768f0f60 100644 --- a/block_pool.go +++ b/block_pool.go @@ -73,6 +73,10 @@ func (self *BlockPool) SetBlock(b *ethchain.Block, peer *Peer) { if self.pool[hash] == nil && !self.eth.BlockChain().HasBlock(b.Hash()) { self.hashPool = append(self.hashPool, b.Hash()) self.pool[hash] = &block{peer, peer, b, time.Now(), 0} + + if !self.eth.BlockChain().HasBlock(b.PrevHash) && self.pool[string(b.PrevHash)] == nil { + peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.PrevHash, uint32(256)})) + } } else if self.pool[hash] != nil { self.pool[hash].block = b } @@ -218,6 +222,7 @@ out: case <-procTimer.C: // XXX We can optimize this lifting this on to a new goroutine. // We'd need to make sure that the pools are properly protected by a mutex + // XXX This should moved in The Great Refactor(TM) amount := self.ProcessCanonical(func(block *ethchain.Block) { err := self.eth.StateManager().Process(block, false) if err != nil { -- cgit v1.2.3 From e20b11305366c8d05ea626eda0bb46fba5b373ec Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 26 Sep 2014 20:19:11 +0200 Subject: Logging messages --- block_pool.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index f768f0f60..73507a185 100644 --- a/block_pool.go +++ b/block_pool.go @@ -14,7 +14,7 @@ import ( "github.com/ethereum/eth-go/ethwire" ) -var poollogger = ethlog.NewLogger("[BPOOL]") +var poollogger = ethlog.NewLogger("BPOOL") type block struct { from *Peer @@ -71,6 +71,8 @@ func (self *BlockPool) SetBlock(b *ethchain.Block, peer *Peer) { hash := string(b.Hash()) if self.pool[hash] == nil && !self.eth.BlockChain().HasBlock(b.Hash()) { + poollogger.Infof("Got unrequested block (%x...)\n", hash[0:4]) + self.hashPool = append(self.hashPool, b.Hash()) self.pool[hash] = &block{peer, peer, b, time.Now(), 0} -- cgit v1.2.3 From 44d50bc8d2e8bf4a87d01d6ded24d79eb50ee666 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 26 Sep 2014 20:51:31 +0200 Subject: Have you seen my parents, sir? --- block_pool.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index 73507a185..26ef51992 100644 --- a/block_pool.go +++ b/block_pool.go @@ -76,7 +76,8 @@ func (self *BlockPool) SetBlock(b *ethchain.Block, peer *Peer) { self.hashPool = append(self.hashPool, b.Hash()) self.pool[hash] = &block{peer, peer, b, time.Now(), 0} - if !self.eth.BlockChain().HasBlock(b.PrevHash) && self.pool[string(b.PrevHash)] == nil { + if !self.eth.BlockChain().HasBlock(b.PrevHash) { + poollogger.Infof("Unknown block, requesting parent (%x...)\n", b.PrevHash[0:4]) peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.PrevHash, uint32(256)})) } } else if self.pool[hash] != nil { -- cgit v1.2.3 From ea0357bf02b61db94bd0ad8806ba7337a55a4f79 Mon Sep 17 00:00:00 2001 From: obscuren Date: Sun, 28 Sep 2014 14:52:58 +0200 Subject: Block pool is thread safe --- block_pool.go | 101 +++++++++++++++++++++++++++++++++------------------------- 1 file changed, 58 insertions(+), 43 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index 26ef51992..4ac096bda 100644 --- a/block_pool.go +++ b/block_pool.go @@ -1,7 +1,6 @@ package eth import ( - "bytes" "container/list" "math" "math/big" @@ -35,6 +34,9 @@ type BlockPool struct { td *big.Int quit chan bool + fetchingHashes bool + downloadStartedAt time.Time + ChainLength, BlocksProcessed int } @@ -52,6 +54,9 @@ func (self *BlockPool) Len() int { } func (self *BlockPool) HasLatestHash() bool { + self.mut.Lock() + defer self.mut.Unlock() + return self.pool[string(self.eth.BlockChain().CurrentBlock.Hash())] != nil } @@ -59,7 +64,20 @@ func (self *BlockPool) HasCommonHash(hash []byte) bool { return self.eth.BlockChain().GetBlock(hash) != nil } +func (self *BlockPool) Blocks() (blocks ethchain.Blocks) { + for _, item := range self.pool { + if item.block != nil { + blocks = append(blocks, item.block) + } + } + + return +} + func (self *BlockPool) AddHash(hash []byte, peer *Peer) { + self.mut.Lock() + defer self.mut.Unlock() + if self.pool[string(hash)] == nil { self.pool[string(hash)] = &block{peer, nil, nil, time.Now(), 0} @@ -67,7 +85,10 @@ func (self *BlockPool) AddHash(hash []byte, peer *Peer) { } } -func (self *BlockPool) SetBlock(b *ethchain.Block, peer *Peer) { +func (self *BlockPool) Add(b *ethchain.Block, peer *Peer) { + self.mut.Lock() + defer self.mut.Unlock() + hash := string(b.Hash()) if self.pool[hash] == nil && !self.eth.BlockChain().HasBlock(b.Hash()) { @@ -76,7 +97,7 @@ func (self *BlockPool) SetBlock(b *ethchain.Block, peer *Peer) { self.hashPool = append(self.hashPool, b.Hash()) self.pool[hash] = &block{peer, peer, b, time.Now(), 0} - if !self.eth.BlockChain().HasBlock(b.PrevHash) { + if !self.eth.BlockChain().HasBlock(b.PrevHash) && !self.fetchingHashes { poollogger.Infof("Unknown block, requesting parent (%x...)\n", b.PrevHash[0:4]) peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.PrevHash, uint32(256)})) } @@ -87,36 +108,12 @@ func (self *BlockPool) SetBlock(b *ethchain.Block, peer *Peer) { self.BlocksProcessed++ } -func (self *BlockPool) getParent(block *ethchain.Block) *ethchain.Block { - for _, item := range self.pool { - if item.block != nil { - if bytes.Compare(item.block.Hash(), block.PrevHash) == 0 { - return item.block - } - } - } - - return nil -} - -func (self *BlockPool) GetChainFromBlock(block *ethchain.Block) ethchain.Blocks { - var blocks ethchain.Blocks - - for b := block; b != nil; b = self.getParent(b) { - blocks = append(ethchain.Blocks{b}, blocks...) - } - - return blocks -} - -func (self *BlockPool) Blocks() (blocks ethchain.Blocks) { - for _, item := range self.pool { - if item.block != nil { - blocks = append(blocks, item.block) - } - } +func (self *BlockPool) Remove(hash []byte) { + self.mut.Lock() + defer self.mut.Unlock() - return + self.hashPool = ethutil.DeleteFromByteSlice(self.hashPool, hash) + delete(self.pool, string(hash)) } func (self *BlockPool) ProcessCanonical(f func(block *ethchain.Block)) (procAmount int) { @@ -129,9 +126,7 @@ func (self *BlockPool) ProcessCanonical(f func(block *ethchain.Block)) (procAmou f(block) - hash := block.Hash() - self.hashPool = ethutil.DeleteFromByteSlice(self.hashPool, hash) - delete(self.pool, string(hash)) + self.Remove(block.Hash()) } } @@ -140,9 +135,12 @@ func (self *BlockPool) ProcessCanonical(f func(block *ethchain.Block)) (procAmou } func (self *BlockPool) DistributeHashes() { + self.mut.Lock() + defer self.mut.Unlock() + var ( peerLen = self.eth.peers.Len() - amount = 200 * peerLen + amount = 256 * peerLen dist = make(map[*Peer][][]byte) ) @@ -156,7 +154,7 @@ func (self *BlockPool) DistributeHashes() { lastFetchFailed := time.Since(item.reqAt) > 5*time.Second // Handle failed requests - if lastFetchFailed && item.requested > 0 && item.peer != nil { + if lastFetchFailed && item.requested > 5 && item.peer != nil { if item.requested < 100 { // Select peer the hash was retrieved off peer = item.from @@ -187,19 +185,23 @@ func (self *BlockPool) DistributeHashes() { for peer, hashes := range dist { peer.FetchBlocks(hashes) } + + if len(dist) > 0 { + self.downloadStartedAt = time.Now() + } } func (self *BlockPool) Start() { - go self.update() + go self.downloadThread() + go self.chainThread() } func (self *BlockPool) Stop() { close(self.quit) } -func (self *BlockPool) update() { +func (self *BlockPool) downloadThread() { serviceTimer := time.NewTicker(100 * time.Millisecond) - procTimer := time.NewTicker(500 * time.Millisecond) out: for { select { @@ -208,20 +210,31 @@ out: case <-serviceTimer.C: // Check if we're catching up. If not distribute the hashes to // the peers and download the blockchain - done := true + self.fetchingHashes = false eachPeer(self.eth.peers, func(p *Peer, v *list.Element) { if p.statusKnown && p.FetchingHashes() { - done = false + self.fetchingHashes = true } }) - if done && len(self.hashPool) > 0 { + if !self.fetchingHashes && len(self.hashPool) > 0 { self.DistributeHashes() } if self.ChainLength < len(self.hashPool) { self.ChainLength = len(self.hashPool) } + } + } +} + +func (self *BlockPool) chainThread() { + procTimer := time.NewTicker(500 * time.Millisecond) +out: + for { + select { + case <-self.quit: + break out case <-procTimer.C: // XXX We can optimize this lifting this on to a new goroutine. // We'd need to make sure that the pools are properly protected by a mutex @@ -230,6 +243,8 @@ out: err := self.eth.StateManager().Process(block, false) if err != nil { poollogger.Infoln(err) + poollogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4]) + poollogger.Debugln(block) } }) -- cgit v1.2.3 From ab6ede51d7fedb9270cab08ee732a834be34dab2 Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 29 Sep 2014 12:57:51 +0200 Subject: Working on new (blocking) event machine. The new event machine will be used for loose coupling and handle the communications between the services: 1) Block pool finds blocks which "links" with our current canonical chain 2) Posts the blocks on to the event machine 3) State manager receives blocks & processes them 4) Broadcasts new post block event --- block_pool.go | 38 ++++++++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 14 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index 4ac096bda..957b7601b 100644 --- a/block_pool.go +++ b/block_pool.go @@ -1,6 +1,7 @@ package eth import ( + "bytes" "container/list" "math" "math/big" @@ -236,22 +237,31 @@ out: case <-self.quit: break out case <-procTimer.C: - // XXX We can optimize this lifting this on to a new goroutine. - // We'd need to make sure that the pools are properly protected by a mutex - // XXX This should moved in The Great Refactor(TM) - amount := self.ProcessCanonical(func(block *ethchain.Block) { - err := self.eth.StateManager().Process(block, false) - if err != nil { - poollogger.Infoln(err) - poollogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4]) - poollogger.Debugln(block) + blocks := self.Blocks() + ethchain.BlockBy(ethchain.Number).Sort(blocks) + + if len(blocks) > 0 { + if self.eth.BlockChain().HasBlock(blocks[0].PrevHash) { + for i, block := range blocks[1:] { + // NOTE: The Ith element in this loop refers to the previous block in + // outer "blocks" + if bytes.Compare(block.PrevHash, blocks[i].Hash()) != 0 { + blocks = blocks[:i] + + break + } + } + } else { + blocks = nil } - }) + } + + // Handle in batches of 4k + max := int(math.Min(4000, float64(len(blocks)))) + for _, block := range blocks[:max] { + self.eth.Eventer().Post("block", block) - // Do not propagate to the network on catchups - if amount == 1 { - block := self.eth.BlockChain().CurrentBlock - self.eth.Broadcast(ethwire.MsgBlockTy, []interface{}{block.Value().Val}) + self.Remove(block.Hash()) } } } -- cgit v1.2.3 From 3af211dd65d6690afce9976a9f47ab1cdddb8d58 Mon Sep 17 00:00:00 2001 From: obscuren Date: Tue, 30 Sep 2014 23:26:52 +0200 Subject: Implemented WebSocket package --- block_pool.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index 957b7601b..003d1db58 100644 --- a/block_pool.go +++ b/block_pool.go @@ -11,7 +11,6 @@ import ( "github.com/ethereum/eth-go/ethchain" "github.com/ethereum/eth-go/ethlog" "github.com/ethereum/eth-go/ethutil" - "github.com/ethereum/eth-go/ethwire" ) var poollogger = ethlog.NewLogger("BPOOL") @@ -99,8 +98,8 @@ func (self *BlockPool) Add(b *ethchain.Block, peer *Peer) { self.pool[hash] = &block{peer, peer, b, time.Now(), 0} if !self.eth.BlockChain().HasBlock(b.PrevHash) && !self.fetchingHashes { - poollogger.Infof("Unknown block, requesting parent (%x...)\n", b.PrevHash[0:4]) - peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.PrevHash, uint32(256)})) + //poollogger.Infof("Unknown block, requesting parent (%x...)\n", b.PrevHash[0:4]) + //peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.PrevHash, uint32(256)})) } } else if self.pool[hash] != nil { self.pool[hash].block = b -- cgit v1.2.3 From 5fa0173c4147baee54ad1846a23cd4410c50420b Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 2 Oct 2014 00:03:19 +0200 Subject: msg --- block_pool.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index 003d1db58..2dbe11069 100644 --- a/block_pool.go +++ b/block_pool.go @@ -97,9 +97,9 @@ func (self *BlockPool) Add(b *ethchain.Block, peer *Peer) { self.hashPool = append(self.hashPool, b.Hash()) self.pool[hash] = &block{peer, peer, b, time.Now(), 0} - if !self.eth.BlockChain().HasBlock(b.PrevHash) && !self.fetchingHashes { - //poollogger.Infof("Unknown block, requesting parent (%x...)\n", b.PrevHash[0:4]) - //peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.PrevHash, uint32(256)})) + if !self.eth.BlockChain().HasBlock(b.PrevHash) && self.pool[string(b.PrevHash)] == nil && !self.fetchingHashes { + poollogger.Infof("Unknown block, requesting parent (%x...)\n", b.PrevHash[0:4]) + //peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.Hash(), uint32(256)})) } } else if self.pool[hash] != nil { self.pool[hash].block = b -- cgit v1.2.3 From a34a971b508e1bc1fbeb3c2d02cbb8686d2491d8 Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 2 Oct 2014 01:36:59 +0200 Subject: improved blockchain downloading --- block_pool.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index 2dbe11069..f5c53b9f7 100644 --- a/block_pool.go +++ b/block_pool.go @@ -217,9 +217,7 @@ out: } }) - if !self.fetchingHashes && len(self.hashPool) > 0 { - self.DistributeHashes() - } + self.DistributeHashes() if self.ChainLength < len(self.hashPool) { self.ChainLength = len(self.hashPool) -- cgit v1.2.3 From a75c92000fab997a41479c8f92e62f6b0d3f3434 Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 2 Oct 2014 17:03:48 +0200 Subject: Black listing of bad peers --- block_pool.go | 82 ++++++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 76 insertions(+), 6 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index f5c53b9f7..e31babfd6 100644 --- a/block_pool.go +++ b/block_pool.go @@ -11,6 +11,7 @@ import ( "github.com/ethereum/eth-go/ethchain" "github.com/ethereum/eth-go/ethlog" "github.com/ethereum/eth-go/ethutil" + "github.com/ethereum/eth-go/ethwire" ) var poollogger = ethlog.NewLogger("BPOOL") @@ -38,6 +39,8 @@ type BlockPool struct { downloadStartedAt time.Time ChainLength, BlocksProcessed int + + peer *Peer } func NewBlockPool(eth *Ethereum) *BlockPool { @@ -74,6 +77,27 @@ func (self *BlockPool) Blocks() (blocks ethchain.Blocks) { return } +func (self *BlockPool) FetchHashes(peer *Peer) { + highestTd := self.eth.HighestTDPeer() + + if (self.peer == nil && peer.td.Cmp(highestTd) >= 0) || (self.peer != nil && peer.td.Cmp(self.peer.td) >= 0) || self.peer == peer { + if self.peer != peer { + poollogger.Debugf("Found better suitable peer (%v vs %v)\n", self.td, peer.td) + } + + self.peer = peer + self.td = peer.td + + if !self.HasLatestHash() { + peer.doneFetchingHashes = false + + const amount = 256 + peerlogger.Debugf("Fetching hashes (%d)\n", amount) + peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{peer.lastReceivedHash, uint32(amount)})) + } + } +} + func (self *BlockPool) AddHash(hash []byte, peer *Peer) { self.mut.Lock() defer self.mut.Unlock() @@ -99,7 +123,7 @@ func (self *BlockPool) Add(b *ethchain.Block, peer *Peer) { if !self.eth.BlockChain().HasBlock(b.PrevHash) && self.pool[string(b.PrevHash)] == nil && !self.fetchingHashes { poollogger.Infof("Unknown block, requesting parent (%x...)\n", b.PrevHash[0:4]) - //peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.Hash(), uint32(256)})) + peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.Hash(), uint32(256)})) } } else if self.pool[hash] != nil { self.pool[hash].block = b @@ -227,7 +251,7 @@ out: } func (self *BlockPool) chainThread() { - procTimer := time.NewTicker(500 * time.Millisecond) + procTimer := time.NewTicker(1000 * time.Millisecond) out: for { select { @@ -237,6 +261,14 @@ out: blocks := self.Blocks() ethchain.BlockBy(ethchain.Number).Sort(blocks) + // Find common block + for i, block := range blocks { + if self.eth.BlockChain().HasBlock(block.PrevHash) { + blocks = blocks[i:] + break + } + } + if len(blocks) > 0 { if self.eth.BlockChain().HasBlock(blocks[0].PrevHash) { for i, block := range blocks[1:] { @@ -253,13 +285,51 @@ out: } } - // Handle in batches of 4k - max := int(math.Min(4000, float64(len(blocks)))) - for _, block := range blocks[:max] { - self.eth.Eventer().Post("block", block) + if len(blocks) > 0 { + self.eth.Eventer().Post("blocks", blocks) + + } + + var err error + for i, block := range blocks { + //self.eth.Eventer().Post("block", block) + err = self.eth.StateManager().Process(block, false) + if err != nil { + poollogger.Infoln(err) + poollogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4]) + poollogger.Debugln(block) + + blocks = blocks[i:] + + break + } self.Remove(block.Hash()) } + + if err != nil { + // Remove this bad chain + for _, block := range blocks { + self.Remove(block.Hash()) + } + + poollogger.Debugf("Punishing peer for supplying bad chain (%v)\n", self.peer.conn.RemoteAddr()) + // This peer gave us bad hashes and made us fetch a bad chain, therefor he shall be punished. + self.eth.BlacklistPeer(self.peer) + self.peer.StopWithReason(DiscBadPeer) + self.td = ethutil.Big0 + self.peer = nil + } + + /* + // Handle in batches of 4k + //max := int(math.Min(4000, float64(len(blocks)))) + for _, block := range blocks { + self.eth.Eventer().Post("block", block) + + self.Remove(block.Hash()) + } + */ } } } -- cgit v1.2.3 From 677836cbee1105043335c672b41dc4402e98c227 Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 2 Oct 2014 17:35:38 +0200 Subject: Kick off bad peers on bad chains and improved catch up on diverted chain --- block_pool.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index e31babfd6..a951672c3 100644 --- a/block_pool.go +++ b/block_pool.go @@ -56,6 +56,11 @@ func (self *BlockPool) Len() int { return len(self.hashPool) } +func (self *BlockPool) Reset() { + self.pool = make(map[string]*block) + self.hashPool = nil +} + func (self *BlockPool) HasLatestHash() bool { self.mut.Lock() defer self.mut.Unlock() @@ -77,7 +82,7 @@ func (self *BlockPool) Blocks() (blocks ethchain.Blocks) { return } -func (self *BlockPool) FetchHashes(peer *Peer) { +func (self *BlockPool) FetchHashes(peer *Peer) bool { highestTd := self.eth.HighestTDPeer() if (self.peer == nil && peer.td.Cmp(highestTd) >= 0) || (self.peer != nil && peer.td.Cmp(self.peer.td) >= 0) || self.peer == peer { @@ -95,7 +100,11 @@ func (self *BlockPool) FetchHashes(peer *Peer) { peerlogger.Debugf("Fetching hashes (%d)\n", amount) peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{peer.lastReceivedHash, uint32(amount)})) } + + return true } + + return false } func (self *BlockPool) AddHash(hash []byte, peer *Peer) { @@ -122,7 +131,7 @@ func (self *BlockPool) Add(b *ethchain.Block, peer *Peer) { self.pool[hash] = &block{peer, peer, b, time.Now(), 0} if !self.eth.BlockChain().HasBlock(b.PrevHash) && self.pool[string(b.PrevHash)] == nil && !self.fetchingHashes { - poollogger.Infof("Unknown block, requesting parent (%x...)\n", b.PrevHash[0:4]) + poollogger.Infof("Unknown chain, requesting (%x...)\n", b.PrevHash[0:4]) peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.Hash(), uint32(256)})) } } else if self.pool[hash] != nil { @@ -308,6 +317,7 @@ out: } if err != nil { + self.Reset() // Remove this bad chain for _, block := range blocks { self.Remove(block.Hash()) -- cgit v1.2.3 From 0015ce1e353f52cca818d11f566b9a656fb85f24 Mon Sep 17 00:00:00 2001 From: obscuren Date: Tue, 7 Oct 2014 11:18:46 +0200 Subject: kick of bad peers --- block_pool.go | 88 ++++++++++++++++++++++++----------------------------------- 1 file changed, 36 insertions(+), 52 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index a951672c3..15d8ba3b9 100644 --- a/block_pool.go +++ b/block_pool.go @@ -3,6 +3,7 @@ package eth import ( "bytes" "container/list" + "fmt" "math" "math/big" "sync" @@ -29,8 +30,8 @@ type BlockPool struct { eth *Ethereum - hashPool [][]byte - pool map[string]*block + hashes [][]byte + pool map[string]*block td *big.Int quit chan bool @@ -53,12 +54,12 @@ func NewBlockPool(eth *Ethereum) *BlockPool { } func (self *BlockPool) Len() int { - return len(self.hashPool) + return len(self.hashes) } func (self *BlockPool) Reset() { self.pool = make(map[string]*block) - self.hashPool = nil + self.hashes = nil } func (self *BlockPool) HasLatestHash() bool { @@ -88,6 +89,10 @@ func (self *BlockPool) FetchHashes(peer *Peer) bool { if (self.peer == nil && peer.td.Cmp(highestTd) >= 0) || (self.peer != nil && peer.td.Cmp(self.peer.td) >= 0) || self.peer == peer { if self.peer != peer { poollogger.Debugf("Found better suitable peer (%v vs %v)\n", self.td, peer.td) + + if self.peer != nil { + self.peer.doneFetchingHashes = true + } } self.peer = peer @@ -114,7 +119,7 @@ func (self *BlockPool) AddHash(hash []byte, peer *Peer) { if self.pool[string(hash)] == nil { self.pool[string(hash)] = &block{peer, nil, nil, time.Now(), 0} - self.hashPool = append([][]byte{hash}, self.hashPool...) + self.hashes = append([][]byte{hash}, self.hashes...) } } @@ -127,9 +132,12 @@ func (self *BlockPool) Add(b *ethchain.Block, peer *Peer) { if self.pool[hash] == nil && !self.eth.BlockChain().HasBlock(b.Hash()) { poollogger.Infof("Got unrequested block (%x...)\n", hash[0:4]) - self.hashPool = append(self.hashPool, b.Hash()) + self.hashes = append(self.hashes, b.Hash()) self.pool[hash] = &block{peer, peer, b, time.Now(), 0} + fmt.Println("1.", !self.eth.BlockChain().HasBlock(b.PrevHash), ethutil.Bytes2Hex(b.Hash()[0:4]), ethutil.Bytes2Hex(b.PrevHash[0:4])) + fmt.Println("2.", self.pool[string(b.PrevHash)] == nil) + fmt.Println("3.", !self.fetchingHashes) if !self.eth.BlockChain().HasBlock(b.PrevHash) && self.pool[string(b.PrevHash)] == nil && !self.fetchingHashes { poollogger.Infof("Unknown chain, requesting (%x...)\n", b.PrevHash[0:4]) peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.Hash(), uint32(256)})) @@ -145,28 +153,10 @@ func (self *BlockPool) Remove(hash []byte) { self.mut.Lock() defer self.mut.Unlock() - self.hashPool = ethutil.DeleteFromByteSlice(self.hashPool, hash) + self.hashes = ethutil.DeleteFromByteSlice(self.hashes, hash) delete(self.pool, string(hash)) } -func (self *BlockPool) ProcessCanonical(f func(block *ethchain.Block)) (procAmount int) { - blocks := self.Blocks() - - ethchain.BlockBy(ethchain.Number).Sort(blocks) - for _, block := range blocks { - if self.eth.BlockChain().HasBlock(block.PrevHash) { - procAmount++ - - f(block) - - self.Remove(block.Hash()) - } - - } - - return -} - func (self *BlockPool) DistributeHashes() { self.mut.Lock() defer self.mut.Unlock() @@ -178,8 +168,8 @@ func (self *BlockPool) DistributeHashes() { ) num := int(math.Min(float64(amount), float64(len(self.pool)))) - for i, j := 0, 0; i < len(self.hashPool) && j < num; i++ { - hash := self.hashPool[i] + for i, j := 0, 0; i < len(self.hashes) && j < num; i++ { + hash := self.hashes[i] item := self.pool[string(hash)] if item != nil && item.block == nil { @@ -193,7 +183,7 @@ func (self *BlockPool) DistributeHashes() { peer = item.from } else { // Remove it - self.hashPool = ethutil.DeleteFromByteSlice(self.hashPool, hash) + self.hashes = ethutil.DeleteFromByteSlice(self.hashes, hash) delete(self.pool, string(hash)) } } else if lastFetchFailed || item.peer == nil { @@ -250,17 +240,31 @@ out: } }) - self.DistributeHashes() + if len(self.hashes) > 0 { + self.DistributeHashes() + } - if self.ChainLength < len(self.hashPool) { - self.ChainLength = len(self.hashPool) + if self.ChainLength < len(self.hashes) { + self.ChainLength = len(self.hashes) } + + /* + if !self.fetchingHashes { + blocks := self.Blocks() + ethchain.BlockBy(ethchain.Number).Sort(blocks) + + if len(blocks) > 0 { + if !self.eth.BlockChain().HasBlock(b.PrevHash) && self.pool[string(b.PrevHash)] == nil && !self.fetchingHashes { + } + } + } + */ } } } func (self *BlockPool) chainThread() { - procTimer := time.NewTicker(1000 * time.Millisecond) + procTimer := time.NewTicker(500 * time.Millisecond) out: for { select { @@ -294,14 +298,8 @@ out: } } - if len(blocks) > 0 { - self.eth.Eventer().Post("blocks", blocks) - - } - var err error for i, block := range blocks { - //self.eth.Eventer().Post("block", block) err = self.eth.StateManager().Process(block, false) if err != nil { poollogger.Infoln(err) @@ -318,10 +316,6 @@ out: if err != nil { self.Reset() - // Remove this bad chain - for _, block := range blocks { - self.Remove(block.Hash()) - } poollogger.Debugf("Punishing peer for supplying bad chain (%v)\n", self.peer.conn.RemoteAddr()) // This peer gave us bad hashes and made us fetch a bad chain, therefor he shall be punished. @@ -330,16 +324,6 @@ out: self.td = ethutil.Big0 self.peer = nil } - - /* - // Handle in batches of 4k - //max := int(math.Min(4000, float64(len(blocks)))) - for _, block := range blocks { - self.eth.Eventer().Post("block", block) - - self.Remove(block.Hash()) - } - */ } } } -- cgit v1.2.3 From b417766b36f46316cbae6fa42815f1a519e5f733 Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 8 Oct 2014 11:59:44 +0200 Subject: Minor tweaks for poc7 --- block_pool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index 15d8ba3b9..ea1f3633a 100644 --- a/block_pool.go +++ b/block_pool.go @@ -86,7 +86,7 @@ func (self *BlockPool) Blocks() (blocks ethchain.Blocks) { func (self *BlockPool) FetchHashes(peer *Peer) bool { highestTd := self.eth.HighestTDPeer() - if (self.peer == nil && peer.td.Cmp(highestTd) >= 0) || (self.peer != nil && peer.td.Cmp(self.peer.td) >= 0) || self.peer == peer { + if (self.peer == nil && peer.td.Cmp(highestTd) >= 0) || (self.peer != nil && peer.td.Cmp(self.peer.td) > 0) || self.peer == peer { if self.peer != peer { poollogger.Debugf("Found better suitable peer (%v vs %v)\n", self.td, peer.td) @@ -102,7 +102,7 @@ func (self *BlockPool) FetchHashes(peer *Peer) bool { peer.doneFetchingHashes = false const amount = 256 - peerlogger.Debugf("Fetching hashes (%d)\n", amount) + peerlogger.Debugf("Fetching hashes (%d) %x...\n", amount, peer.lastReceivedHash[0:4]) peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{peer.lastReceivedHash, uint32(amount)})) } -- cgit v1.2.3 From 4de3ad1712ce0fdc62b1acc27a3922b192e943c6 Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 8 Oct 2014 12:29:49 +0200 Subject: New block message --- block_pool.go | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index ea1f3633a..6ad2f5269 100644 --- a/block_pool.go +++ b/block_pool.go @@ -124,6 +124,14 @@ func (self *BlockPool) AddHash(hash []byte, peer *Peer) { } func (self *BlockPool) Add(b *ethchain.Block, peer *Peer) { + self.addBlock(b, peer, false) +} + +func (self *BlockPool) AddNew(b *ethchain.Block, peer *Peer) { + self.addBlock(b, peer, true) +} + +func (self *BlockPool) addBlock(b *ethchain.Block, peer *Peer, newBlock bool) { self.mut.Lock() defer self.mut.Unlock() @@ -135,12 +143,15 @@ func (self *BlockPool) Add(b *ethchain.Block, peer *Peer) { self.hashes = append(self.hashes, b.Hash()) self.pool[hash] = &block{peer, peer, b, time.Now(), 0} - fmt.Println("1.", !self.eth.BlockChain().HasBlock(b.PrevHash), ethutil.Bytes2Hex(b.Hash()[0:4]), ethutil.Bytes2Hex(b.PrevHash[0:4])) - fmt.Println("2.", self.pool[string(b.PrevHash)] == nil) - fmt.Println("3.", !self.fetchingHashes) - if !self.eth.BlockChain().HasBlock(b.PrevHash) && self.pool[string(b.PrevHash)] == nil && !self.fetchingHashes { - poollogger.Infof("Unknown chain, requesting (%x...)\n", b.PrevHash[0:4]) - peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.Hash(), uint32(256)})) + // The following is only performed on an unrequested new block + if newBlock { + fmt.Println("1.", !self.eth.BlockChain().HasBlock(b.PrevHash), ethutil.Bytes2Hex(b.Hash()[0:4]), ethutil.Bytes2Hex(b.PrevHash[0:4])) + fmt.Println("2.", self.pool[string(b.PrevHash)] == nil) + fmt.Println("3.", !self.fetchingHashes) + if !self.eth.BlockChain().HasBlock(b.PrevHash) && self.pool[string(b.PrevHash)] == nil && !self.fetchingHashes { + poollogger.Infof("Unknown chain, requesting (%x...)\n", b.PrevHash[0:4]) + peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.Hash(), uint32(256)})) + } } } else if self.pool[hash] != nil { self.pool[hash].block = b -- cgit v1.2.3 From 097ba56df59293f9225a8ecdc9e1c43a5ad891bb Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 20 Oct 2014 11:53:11 +0200 Subject: Renamed block_chain to chain_manager --- block_pool.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index 6ad2f5269..b2cade6ad 100644 --- a/block_pool.go +++ b/block_pool.go @@ -66,11 +66,11 @@ func (self *BlockPool) HasLatestHash() bool { self.mut.Lock() defer self.mut.Unlock() - return self.pool[string(self.eth.BlockChain().CurrentBlock.Hash())] != nil + return self.pool[string(self.eth.ChainManager().CurrentBlock.Hash())] != nil } func (self *BlockPool) HasCommonHash(hash []byte) bool { - return self.eth.BlockChain().GetBlock(hash) != nil + return self.eth.ChainManager().GetBlock(hash) != nil } func (self *BlockPool) Blocks() (blocks ethchain.Blocks) { @@ -137,7 +137,7 @@ func (self *BlockPool) addBlock(b *ethchain.Block, peer *Peer, newBlock bool) { hash := string(b.Hash()) - if self.pool[hash] == nil && !self.eth.BlockChain().HasBlock(b.Hash()) { + if self.pool[hash] == nil && !self.eth.ChainManager().HasBlock(b.Hash()) { poollogger.Infof("Got unrequested block (%x...)\n", hash[0:4]) self.hashes = append(self.hashes, b.Hash()) @@ -145,10 +145,10 @@ func (self *BlockPool) addBlock(b *ethchain.Block, peer *Peer, newBlock bool) { // The following is only performed on an unrequested new block if newBlock { - fmt.Println("1.", !self.eth.BlockChain().HasBlock(b.PrevHash), ethutil.Bytes2Hex(b.Hash()[0:4]), ethutil.Bytes2Hex(b.PrevHash[0:4])) + fmt.Println("1.", !self.eth.ChainManager().HasBlock(b.PrevHash), ethutil.Bytes2Hex(b.Hash()[0:4]), ethutil.Bytes2Hex(b.PrevHash[0:4])) fmt.Println("2.", self.pool[string(b.PrevHash)] == nil) fmt.Println("3.", !self.fetchingHashes) - if !self.eth.BlockChain().HasBlock(b.PrevHash) && self.pool[string(b.PrevHash)] == nil && !self.fetchingHashes { + if !self.eth.ChainManager().HasBlock(b.PrevHash) && self.pool[string(b.PrevHash)] == nil && !self.fetchingHashes { poollogger.Infof("Unknown chain, requesting (%x...)\n", b.PrevHash[0:4]) peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.Hash(), uint32(256)})) } @@ -265,7 +265,7 @@ out: ethchain.BlockBy(ethchain.Number).Sort(blocks) if len(blocks) > 0 { - if !self.eth.BlockChain().HasBlock(b.PrevHash) && self.pool[string(b.PrevHash)] == nil && !self.fetchingHashes { + if !self.eth.ChainManager().HasBlock(b.PrevHash) && self.pool[string(b.PrevHash)] == nil && !self.fetchingHashes { } } } @@ -287,14 +287,14 @@ out: // Find common block for i, block := range blocks { - if self.eth.BlockChain().HasBlock(block.PrevHash) { + if self.eth.ChainManager().HasBlock(block.PrevHash) { blocks = blocks[i:] break } } if len(blocks) > 0 { - if self.eth.BlockChain().HasBlock(blocks[0].PrevHash) { + if self.eth.ChainManager().HasBlock(blocks[0].PrevHash) { for i, block := range blocks[1:] { // NOTE: The Ith element in this loop refers to the previous block in // outer "blocks" -- cgit v1.2.3 From 9e2f071d26d5c4ed343d2a91e48fec4e7751b99d Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 29 Oct 2014 14:20:42 +0100 Subject: Removed events from the state manager --- block_pool.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index f65d9d576..334db9c1b 100644 --- a/block_pool.go +++ b/block_pool.go @@ -309,9 +309,13 @@ out: } } + // TODO figure out whether we were catching up + // If caught up and just a new block has been propagated: + // sm.eth.EventMux().Post(NewBlockEvent{block}) + // otherwise process and don't emit anything var err error for i, block := range blocks { - err = self.eth.StateManager().Process(block, false) + err = self.eth.StateManager().Process(block) if err != nil { poollogger.Infoln(err) poollogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4]) -- cgit v1.2.3 From 3ee0461cb5b6e4a5e2d287180afbdb681805a662 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 31 Oct 2014 10:59:17 +0100 Subject: Moved ethchain to chain --- block_pool.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index 334db9c1b..49fa07eb1 100644 --- a/block_pool.go +++ b/block_pool.go @@ -9,7 +9,7 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/ethchain" + "github.com/ethereum/go-ethereum/chain" "github.com/ethereum/go-ethereum/ethlog" "github.com/ethereum/go-ethereum/ethutil" "github.com/ethereum/go-ethereum/ethwire" @@ -20,7 +20,7 @@ var poollogger = ethlog.NewLogger("BPOOL") type block struct { from *Peer peer *Peer - block *ethchain.Block + block *chain.Block reqAt time.Time requested int } @@ -73,7 +73,7 @@ func (self *BlockPool) HasCommonHash(hash []byte) bool { return self.eth.ChainManager().GetBlock(hash) != nil } -func (self *BlockPool) Blocks() (blocks ethchain.Blocks) { +func (self *BlockPool) Blocks() (blocks chain.Blocks) { for _, item := range self.pool { if item.block != nil { blocks = append(blocks, item.block) @@ -123,15 +123,15 @@ func (self *BlockPool) AddHash(hash []byte, peer *Peer) { } } -func (self *BlockPool) Add(b *ethchain.Block, peer *Peer) { +func (self *BlockPool) Add(b *chain.Block, peer *Peer) { self.addBlock(b, peer, false) } -func (self *BlockPool) AddNew(b *ethchain.Block, peer *Peer) { +func (self *BlockPool) AddNew(b *chain.Block, peer *Peer) { self.addBlock(b, peer, true) } -func (self *BlockPool) addBlock(b *ethchain.Block, peer *Peer, newBlock bool) { +func (self *BlockPool) addBlock(b *chain.Block, peer *Peer, newBlock bool) { self.mut.Lock() defer self.mut.Unlock() @@ -262,7 +262,7 @@ out: /* if !self.fetchingHashes { blocks := self.Blocks() - ethchain.BlockBy(ethchain.Number).Sort(blocks) + chain.BlockBy(chain.Number).Sort(blocks) if len(blocks) > 0 { if !self.eth.ChainManager().HasBlock(b.PrevHash) && self.pool[string(b.PrevHash)] == nil && !self.fetchingHashes { @@ -283,7 +283,7 @@ out: break out case <-procTimer.C: blocks := self.Blocks() - ethchain.BlockBy(ethchain.Number).Sort(blocks) + chain.BlockBy(chain.Number).Sort(blocks) // Find common block for i, block := range blocks { -- cgit v1.2.3 From b1c247231b11f313ca0eedff75ea563926d23f68 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 31 Oct 2014 12:56:05 +0100 Subject: ethlog => logger --- block_pool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index 49fa07eb1..1cf3ab907 100644 --- a/block_pool.go +++ b/block_pool.go @@ -10,12 +10,12 @@ import ( "time" "github.com/ethereum/go-ethereum/chain" - "github.com/ethereum/go-ethereum/ethlog" "github.com/ethereum/go-ethereum/ethutil" "github.com/ethereum/go-ethereum/ethwire" + "github.com/ethereum/go-ethereum/logger" ) -var poollogger = ethlog.NewLogger("BPOOL") +var poollogger = logger.NewLogger("BPOOL") type block struct { from *Peer -- cgit v1.2.3 From 4914a78c8c650d7fc74570f25a682598aaeb6973 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 31 Oct 2014 14:53:42 +0100 Subject: ethwire => wire --- block_pool.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index 1cf3ab907..0e182623f 100644 --- a/block_pool.go +++ b/block_pool.go @@ -11,8 +11,8 @@ import ( "github.com/ethereum/go-ethereum/chain" "github.com/ethereum/go-ethereum/ethutil" - "github.com/ethereum/go-ethereum/ethwire" "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/wire" ) var poollogger = logger.NewLogger("BPOOL") @@ -103,7 +103,7 @@ func (self *BlockPool) FetchHashes(peer *Peer) bool { const amount = 256 peerlogger.Debugf("Fetching hashes (%d) %x...\n", amount, peer.lastReceivedHash[0:4]) - peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{peer.lastReceivedHash, uint32(amount)})) + peer.QueueMessage(wire.NewMessage(wire.MsgGetBlockHashesTy, []interface{}{peer.lastReceivedHash, uint32(amount)})) } return true @@ -150,7 +150,7 @@ func (self *BlockPool) addBlock(b *chain.Block, peer *Peer, newBlock bool) { fmt.Println("3.", !self.fetchingHashes) if !self.eth.ChainManager().HasBlock(b.PrevHash) && self.pool[string(b.PrevHash)] == nil && !self.fetchingHashes { poollogger.Infof("Unknown chain, requesting (%x...)\n", b.PrevHash[0:4]) - peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.Hash(), uint32(256)})) + peer.QueueMessage(wire.NewMessage(wire.MsgGetBlockHashesTy, []interface{}{b.Hash(), uint32(256)})) } } } else if self.pool[hash] != nil { -- cgit v1.2.3 From f59a3b67f69b26f969084e0de165435e80bd8e12 Mon Sep 17 00:00:00 2001 From: obscuren Date: Tue, 4 Nov 2014 10:57:02 +0100 Subject: StateManager => BlockManager --- block_pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index 0e182623f..9003256fd 100644 --- a/block_pool.go +++ b/block_pool.go @@ -315,7 +315,7 @@ out: // otherwise process and don't emit anything var err error for i, block := range blocks { - err = self.eth.StateManager().Process(block) + err = self.eth.BlockManager().Process(block) if err != nil { poollogger.Infoln(err) poollogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4]) -- cgit v1.2.3 From 699dcaf65ced99517724984f5930845417cfdfca Mon Sep 17 00:00:00 2001 From: obscuren Date: Tue, 4 Nov 2014 12:46:33 +0100 Subject: Reworked chain handling process * Forks * Rename * Moved inserting of blocks & processing * Added chain testing method for validating pieces of a **a** chain. --- block_pool.go | 40 +++++++++++++++++----------------------- 1 file changed, 17 insertions(+), 23 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index 9003256fd..ff0675c50 100644 --- a/block_pool.go +++ b/block_pool.go @@ -313,31 +313,25 @@ out: // If caught up and just a new block has been propagated: // sm.eth.EventMux().Post(NewBlockEvent{block}) // otherwise process and don't emit anything - var err error - for i, block := range blocks { - err = self.eth.BlockManager().Process(block) + if len(blocks) > 0 { + chainManager := self.eth.ChainManager() + chain := chain.NewChain(blocks) + _, err := chainManager.TestChain(chain) if err != nil { - poollogger.Infoln(err) - poollogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4]) - poollogger.Debugln(block) - - blocks = blocks[i:] - - break + self.Reset() + + poollogger.Debugf("Punishing peer for supplying bad chain (%v)\n", self.peer.conn.RemoteAddr()) + // This peer gave us bad hashes and made us fetch a bad chain, therefor he shall be punished. + self.eth.BlacklistPeer(self.peer) + self.peer.StopWithReason(DiscBadPeer) + self.td = ethutil.Big0 + self.peer = nil + } else { + chainManager.InsertChain(chain) + for _, block := range blocks { + self.Remove(block.Hash()) + } } - - self.Remove(block.Hash()) - } - - if err != nil { - self.Reset() - - poollogger.Debugf("Punishing peer for supplying bad chain (%v)\n", self.peer.conn.RemoteAddr()) - // This peer gave us bad hashes and made us fetch a bad chain, therefor he shall be punished. - self.eth.BlacklistPeer(self.peer) - self.peer.StopWithReason(DiscBadPeer) - self.td = ethutil.Big0 - self.peer = nil } } } -- cgit v1.2.3 From 429dd2a100f3b9e2b612b59bcb48f79a805cd6f9 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 7 Nov 2014 12:18:48 +0100 Subject: Implemented new miner w/ ui interface for merged mining. Closes #177 * Miner has been rewritten * Added new miner pane * Added option for local txs * Added option to read from MergeMining contract and list them for merged mining --- block_pool.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index ff0675c50..ec945fa6e 100644 --- a/block_pool.go +++ b/block_pool.go @@ -315,9 +315,12 @@ out: // otherwise process and don't emit anything if len(blocks) > 0 { chainManager := self.eth.ChainManager() + // Test and import chain := chain.NewChain(blocks) - _, err := chainManager.TestChain(chain) + _, err := chainManager.TestChain(chain, true) if err != nil { + poollogger.Debugln(err) + self.Reset() poollogger.Debugf("Punishing peer for supplying bad chain (%v)\n", self.peer.conn.RemoteAddr()) @@ -327,7 +330,7 @@ out: self.td = ethutil.Big0 self.peer = nil } else { - chainManager.InsertChain(chain) + //chainManager.InsertChain(chain) for _, block := range blocks { self.Remove(block.Hash()) } -- cgit v1.2.3 From cbeebcd47da846e1b8990313f1ff1ffe7d0bf00f Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 10 Nov 2014 01:17:31 +0100 Subject: Fixed bloom, updated mining & block processing * Reverted back to process blocks in batches method * Bloom generation and lookup fix * Minor UI changed (mainly debug) --- block_pool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index ec945fa6e..52f2f4f86 100644 --- a/block_pool.go +++ b/block_pool.go @@ -317,7 +317,7 @@ out: chainManager := self.eth.ChainManager() // Test and import chain := chain.NewChain(blocks) - _, err := chainManager.TestChain(chain, true) + _, err := chainManager.TestChain(chain) if err != nil { poollogger.Debugln(err) @@ -330,7 +330,7 @@ out: self.td = ethutil.Big0 self.peer = nil } else { - //chainManager.InsertChain(chain) + chainManager.InsertChain(chain) for _, block := range blocks { self.Remove(block.Hash()) } -- cgit v1.2.3 From 75ee3b3f089e703b728bb301cc6b2abe4c111c41 Mon Sep 17 00:00:00 2001 From: obscuren Date: Tue, 11 Nov 2014 12:16:36 +0100 Subject: debugging code --- block_pool.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index 52f2f4f86..f89ee71df 100644 --- a/block_pool.go +++ b/block_pool.go @@ -316,9 +316,9 @@ out: if len(blocks) > 0 { chainManager := self.eth.ChainManager() // Test and import - chain := chain.NewChain(blocks) - _, err := chainManager.TestChain(chain) - if err != nil { + bchain := chain.NewChain(blocks) + _, err := chainManager.TestChain(bchain) + if err != nil && !chain.IsTDError(err) { poollogger.Debugln(err) self.Reset() @@ -330,7 +330,7 @@ out: self.td = ethutil.Big0 self.peer = nil } else { - chainManager.InsertChain(chain) + chainManager.InsertChain(bchain) for _, block := range blocks { self.Remove(block.Hash()) } -- cgit v1.2.3 From f6e55962a8cadfb440dd03467017941b96838362 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 14 Nov 2014 13:47:12 +0100 Subject: Fixes for PV41/42 * Don't expand mem on empty value * Removed all coinbase logs for PV42 * Removed C++ bug stuff for LOG* --- block_pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index f89ee71df..a1c3fc096 100644 --- a/block_pool.go +++ b/block_pool.go @@ -200,7 +200,7 @@ func (self *BlockPool) DistributeHashes() { } else if lastFetchFailed || item.peer == nil { // Find a suitable, available peer eachPeer(self.eth.peers, func(p *Peer, v *list.Element) { - if peer == nil && len(dist[p]) < amount/peerLen { + if peer == nil && len(dist[p]) < amount/peerLen && p.statusKnown { peer = p } }) -- cgit v1.2.3 From 56aa24002de357c24a9644a49d5702c8d4663909 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 14 Nov 2014 14:17:54 +0100 Subject: Clean up --- block_pool.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index a1c3fc096..090871fd3 100644 --- a/block_pool.go +++ b/block_pool.go @@ -323,7 +323,10 @@ out: self.Reset() - poollogger.Debugf("Punishing peer for supplying bad chain (%v)\n", self.peer.conn.RemoteAddr()) + if self.peer != nil && self.peer.conn != nil { + poollogger.Debugf("Punishing peer for supplying bad chain (%v)\n", self.peer.conn.RemoteAddr()) + } + // This peer gave us bad hashes and made us fetch a bad chain, therefor he shall be punished. self.eth.BlacklistPeer(self.peer) self.peer.StopWithReason(DiscBadPeer) -- cgit v1.2.3 From ca74bcc4cdf389b5ef5520f9ab5a7aec08424f30 Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 17 Nov 2014 12:12:55 +0100 Subject: cleaning up --- block_pool.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index 090871fd3..f9bd3b28e 100644 --- a/block_pool.go +++ b/block_pool.go @@ -333,9 +333,11 @@ out: self.td = ethutil.Big0 self.peer = nil } else { - chainManager.InsertChain(bchain) - for _, block := range blocks { - self.Remove(block.Hash()) + if !chain.IsTDError(err) { + chainManager.InsertChain(bchain) + for _, block := range blocks { + self.Remove(block.Hash()) + } } } } -- cgit v1.2.3 From a1b6a9ac29d0aa8d29a2c0535bafdb5fe4d4830b Mon Sep 17 00:00:00 2001 From: obscuren Date: Tue, 18 Nov 2014 16:58:22 +0100 Subject: Begin of moving objects to types package * Block(s) * Transaction(s) --- block_pool.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index f9bd3b28e..dcddca58e 100644 --- a/block_pool.go +++ b/block_pool.go @@ -10,6 +10,7 @@ import ( "time" "github.com/ethereum/go-ethereum/chain" + "github.com/ethereum/go-ethereum/chain/types" "github.com/ethereum/go-ethereum/ethutil" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/wire" @@ -20,7 +21,7 @@ var poollogger = logger.NewLogger("BPOOL") type block struct { from *Peer peer *Peer - block *chain.Block + block *types.Block reqAt time.Time requested int } @@ -73,7 +74,7 @@ func (self *BlockPool) HasCommonHash(hash []byte) bool { return self.eth.ChainManager().GetBlock(hash) != nil } -func (self *BlockPool) Blocks() (blocks chain.Blocks) { +func (self *BlockPool) Blocks() (blocks types.Blocks) { for _, item := range self.pool { if item.block != nil { blocks = append(blocks, item.block) @@ -123,15 +124,15 @@ func (self *BlockPool) AddHash(hash []byte, peer *Peer) { } } -func (self *BlockPool) Add(b *chain.Block, peer *Peer) { +func (self *BlockPool) Add(b *types.Block, peer *Peer) { self.addBlock(b, peer, false) } -func (self *BlockPool) AddNew(b *chain.Block, peer *Peer) { +func (self *BlockPool) AddNew(b *types.Block, peer *Peer) { self.addBlock(b, peer, true) } -func (self *BlockPool) addBlock(b *chain.Block, peer *Peer, newBlock bool) { +func (self *BlockPool) addBlock(b *types.Block, peer *Peer, newBlock bool) { self.mut.Lock() defer self.mut.Unlock() @@ -283,7 +284,7 @@ out: break out case <-procTimer.C: blocks := self.Blocks() - chain.BlockBy(chain.Number).Sort(blocks) + types.BlockBy(types.Number).Sort(blocks) // Find common block for i, block := range blocks { -- cgit v1.2.3 From f8d0cd9906a1ec4a4a1e95868a279312363f8b49 Mon Sep 17 00:00:00 2001 From: obscuren Date: Tue, 18 Nov 2014 19:44:17 +0100 Subject: Added a callback mechanism to chain adding. Not sure if this is the right approach. Why? BlockChain shouldn't need the "Ethereum" object. BlockChain shouldn't need to worry about notifying listeners or message propagation. --- block_pool.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index dcddca58e..38302a4c7 100644 --- a/block_pool.go +++ b/block_pool.go @@ -13,6 +13,7 @@ import ( "github.com/ethereum/go-ethereum/chain/types" "github.com/ethereum/go-ethereum/ethutil" "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/state" "github.com/ethereum/go-ethereum/wire" ) @@ -310,10 +311,6 @@ out: } } - // TODO figure out whether we were catching up - // If caught up and just a new block has been propagated: - // sm.eth.EventMux().Post(NewBlockEvent{block}) - // otherwise process and don't emit anything if len(blocks) > 0 { chainManager := self.eth.ChainManager() // Test and import @@ -335,10 +332,13 @@ out: self.peer = nil } else { if !chain.IsTDError(err) { - chainManager.InsertChain(bchain) - for _, block := range blocks { + chainManager.InsertChain(bchain, func(block *types.Block, messages state.Messages) { + self.eth.EventMux().Post(chain.NewBlockEvent{block}) + self.eth.EventMux().Post(messages) + self.Remove(block.Hash()) - } + }) + } } } -- cgit v1.2.3 From 6dc46d3341dc5fa25bd005f9606de258874139be Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 1 Dec 2014 20:18:09 +0100 Subject: Changed the way transactions are being added to the transaction pool --- block_pool.go | 38 +++++++++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 7 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index 090871fd3..69c7a54de 100644 --- a/block_pool.go +++ b/block_pool.go @@ -314,11 +314,10 @@ out: // sm.eth.EventMux().Post(NewBlockEvent{block}) // otherwise process and don't emit anything if len(blocks) > 0 { - chainManager := self.eth.ChainManager() - // Test and import - bchain := chain.NewChain(blocks) - _, err := chainManager.TestChain(bchain) - if err != nil && !chain.IsTDError(err) { + chainman := self.eth.ChainManager() + + err := chainman.InsertChain(blocks) + if err != nil { poollogger.Debugln(err) self.Reset() @@ -332,12 +331,37 @@ out: self.peer.StopWithReason(DiscBadPeer) self.td = ethutil.Big0 self.peer = nil - } else { - chainManager.InsertChain(bchain) + for _, block := range blocks { self.Remove(block.Hash()) } } + + /* + // Test and import + bchain := chain.NewChain(blocks) + _, err := chainManager.TestChain(bchain) + if err != nil && !chain.IsTDError(err) { + poollogger.Debugln(err) + + self.Reset() + + if self.peer != nil && self.peer.conn != nil { + poollogger.Debugf("Punishing peer for supplying bad chain (%v)\n", self.peer.conn.RemoteAddr()) + } + + // This peer gave us bad hashes and made us fetch a bad chain, therefor he shall be punished. + self.eth.BlacklistPeer(self.peer) + self.peer.StopWithReason(DiscBadPeer) + self.td = ethutil.Big0 + self.peer = nil + } else { + chainManager.InsertChain(bchain) + for _, block := range blocks { + self.Remove(block.Hash()) + } + } + */ } } } -- cgit v1.2.3 From 9008b155d3c8d2a32c4c8945f1174243d48d4e90 Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 4 Dec 2014 10:28:02 +0100 Subject: Renamed `chain` => `core` --- block_pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index bb459cc7d..95c766e53 100644 --- a/block_pool.go +++ b/block_pool.go @@ -9,7 +9,7 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/chain/types" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethutil" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/wire" -- cgit v1.2.3 From 73c4ca3a6fcf948f4bc637f5c1b55277cf64c06f Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 4 Dec 2014 15:31:48 +0100 Subject: Upped protocol version --- block_pool.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index 95c766e53..d1be70187 100644 --- a/block_pool.go +++ b/block_pool.go @@ -88,7 +88,7 @@ func (self *BlockPool) FetchHashes(peer *Peer) bool { if (self.peer == nil && peer.td.Cmp(highestTd) >= 0) || (self.peer != nil && peer.td.Cmp(self.peer.td) > 0) || self.peer == peer { if self.peer != peer { - poollogger.Debugf("Found better suitable peer (%v vs %v)\n", self.td, peer.td) + poollogger.Infof("Found better suitable peer (%v vs %v)\n", self.td, peer.td) if self.peer != nil { self.peer.doneFetchingHashes = true @@ -99,8 +99,7 @@ func (self *BlockPool) FetchHashes(peer *Peer) bool { self.td = peer.td if !self.HasLatestHash() { - peer.doneFetchingHashes = false - + peer.doneFetchingHashes = fInfo const amount = 256 peerlogger.Debugf("Fetching hashes (%d) %x...\n", amount, peer.lastReceivedHash[0:4]) peer.QueueMessage(wire.NewMessage(wire.MsgGetBlockHashesTy, []interface{}{peer.lastReceivedHash, uint32(amount)})) -- cgit v1.2.3 From 565389815040d518529b973d4a6fc38c08e43e5a Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 4 Dec 2014 15:38:41 +0100 Subject: vim error :S --- block_pool.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index d1be70187..595400c79 100644 --- a/block_pool.go +++ b/block_pool.go @@ -99,7 +99,8 @@ func (self *BlockPool) FetchHashes(peer *Peer) bool { self.td = peer.td if !self.HasLatestHash() { - peer.doneFetchingHashes = fInfo + peer.doneFetchingHashes = false + const amount = 256 peerlogger.Debugf("Fetching hashes (%d) %x...\n", amount, peer.lastReceivedHash[0:4]) peer.QueueMessage(wire.NewMessage(wire.MsgGetBlockHashesTy, []interface{}{peer.lastReceivedHash, uint32(amount)})) -- cgit v1.2.3 From d80f8bda940a8ae8f6dab1502a46054c06cee5cc Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 5 Dec 2014 12:32:47 +0100 Subject: Fixed issue in VM where LOG didn't pop anything of the stack --- block_pool.go | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index 595400c79..02eb65d47 100644 --- a/block_pool.go +++ b/block_pool.go @@ -99,11 +99,7 @@ func (self *BlockPool) FetchHashes(peer *Peer) bool { self.td = peer.td if !self.HasLatestHash() { - peer.doneFetchingHashes = false - - const amount = 256 - peerlogger.Debugf("Fetching hashes (%d) %x...\n", amount, peer.lastReceivedHash[0:4]) - peer.QueueMessage(wire.NewMessage(wire.MsgGetBlockHashesTy, []interface{}{peer.lastReceivedHash, uint32(amount)})) + self.fetchHashes() } return true @@ -112,6 +108,16 @@ func (self *BlockPool) FetchHashes(peer *Peer) bool { return false } +func (self *BlockPool) fetchHashes() { + peer := self.peer + + peer.doneFetchingHashes = false + + const amount = 256 + peerlogger.Debugf("Fetching hashes (%d) %x...\n", amount, peer.lastReceivedHash[0:4]) + peer.QueueMessage(wire.NewMessage(wire.MsgGetBlockHashesTy, []interface{}{peer.lastReceivedHash, uint32(amount)})) +} + func (self *BlockPool) AddHash(hash []byte, peer *Peer) { self.mut.Lock() defer self.mut.Unlock() @@ -259,6 +265,13 @@ out: self.ChainLength = len(self.hashes) } + if self.peer != nil && + !self.peer.doneFetchingHashes && + time.Since(self.peer.lastHashAt) > 10*time.Second && + time.Since(self.peer.lastHashRequestedAt) > 5*time.Second { + self.fetchHashes() + } + /* if !self.fetchingHashes { blocks := self.Blocks() -- cgit v1.2.3 From 9925916851c00323336e213fc18c83da5fceee94 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 5 Dec 2014 16:26:39 +0100 Subject: upped proto version and modified block pool --- block_pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index 02eb65d47..803927f21 100644 --- a/block_pool.go +++ b/block_pool.go @@ -154,7 +154,7 @@ func (self *BlockPool) addBlock(b *types.Block, peer *Peer, newBlock bool) { fmt.Println("1.", !self.eth.ChainManager().HasBlock(b.PrevHash), ethutil.Bytes2Hex(b.Hash()[0:4]), ethutil.Bytes2Hex(b.PrevHash[0:4])) fmt.Println("2.", self.pool[string(b.PrevHash)] == nil) fmt.Println("3.", !self.fetchingHashes) - if !self.eth.ChainManager().HasBlock(b.PrevHash) && self.pool[string(b.PrevHash)] == nil && !self.fetchingHashes { + if !self.eth.ChainManager().HasBlock(b.PrevHash) /*&& self.pool[string(b.PrevHash)] == nil*/ && !self.fetchingHashes { poollogger.Infof("Unknown chain, requesting (%x...)\n", b.PrevHash[0:4]) peer.QueueMessage(wire.NewMessage(wire.MsgGetBlockHashesTy, []interface{}{b.Hash(), uint32(256)})) } -- cgit v1.2.3 From 2d09e67713757e2a80eb614562c97f962af36cf7 Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 18 Dec 2014 13:17:24 +0100 Subject: Updated to new methods --- block_pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'block_pool.go') diff --git a/block_pool.go b/block_pool.go index 803927f21..c618f6993 100644 --- a/block_pool.go +++ b/block_pool.go @@ -66,7 +66,7 @@ func (self *BlockPool) HasLatestHash() bool { self.mut.Lock() defer self.mut.Unlock() - return self.pool[string(self.eth.ChainManager().CurrentBlock.Hash())] != nil + return self.pool[string(self.eth.ChainManager().CurrentBlock().Hash())] != nil } func (self *BlockPool) HasCommonHash(hash []byte) bool { -- cgit v1.2.3