aboutsummaryrefslogtreecommitdiffstats
path: root/eth
diff options
context:
space:
mode:
Diffstat (limited to 'eth')
-rw-r--r--eth/backend.go114
-rw-r--r--eth/downloader/downloader.go51
-rw-r--r--eth/downloader/downloader_test.go63
-rw-r--r--eth/downloader/queue.go2
-rw-r--r--eth/handler.go71
-rw-r--r--eth/peer.go122
-rw-r--r--eth/protocol.go2
-rw-r--r--eth/sync.go36
8 files changed, 361 insertions, 100 deletions
diff --git a/eth/backend.go b/eth/backend.go
index a7107f8d8..938071fc7 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -14,6 +14,7 @@ import (
"github.com/ethereum/ethash"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/compiler"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
@@ -30,6 +31,14 @@ import (
"github.com/ethereum/go-ethereum/whisper"
)
+const (
+ epochLength = 30000
+ ethashRevision = 23
+
+ autoDAGcheckInterval = 10 * time.Hour
+ autoDAGepochHeight = epochLength / 2
+)
+
var (
jsonlogger = logger.NewJsonLogger()
@@ -59,6 +68,7 @@ type Config struct {
LogJSON string
VmDebug bool
NatSpec bool
+ AutoDAG bool
MaxPeers int
MaxPendingPeers int
@@ -79,6 +89,7 @@ type Config struct {
GasPrice *big.Int
MinerThreads int
AccountManager *accounts.Manager
+ SolcPath string
// NewDB is used to create databases.
// If nil, the default is to create leveldb databases on disk.
@@ -181,6 +192,8 @@ type Ethereum struct {
pow *ethash.Ethash
protocolManager *ProtocolManager
downloader *downloader.Downloader
+ SolcPath string
+ solc *compiler.Solidity
net *p2p.Server
eventMux *event.TypeMux
@@ -193,6 +206,8 @@ type Ethereum struct {
MinerThreads int
NatSpec bool
DataDir string
+ AutoDAG bool
+ autodagquit chan bool
etherbase common.Address
clientVersion string
ethVersionId int
@@ -209,7 +224,7 @@ func New(config *Config) (*Ethereum, error) {
// Let the database take 3/4 of the max open files (TODO figure out a way to get the actual limit of the open files)
const dbCount = 3
- ethdb.OpenFileLimit = 256 / (dbCount + 1)
+ ethdb.OpenFileLimit = 128 / (dbCount + 1)
newdb := config.NewDB
if newdb == nil {
@@ -264,11 +279,13 @@ func New(config *Config) (*Ethereum, error) {
netVersionId: config.NetworkId,
NatSpec: config.NatSpec,
MinerThreads: config.MinerThreads,
+ SolcPath: config.SolcPath,
+ AutoDAG: config.AutoDAG,
}
- eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.EventMux())
- eth.downloader = downloader.New(eth.EventMux(), eth.chainManager.HasBlock, eth.chainManager.GetBlock)
eth.pow = ethash.New()
+ eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.pow, eth.EventMux())
+ eth.downloader = downloader.New(eth.EventMux(), eth.chainManager.HasBlock, eth.chainManager.GetBlock)
eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State, eth.chainManager.GasLimit)
eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux())
eth.chainManager.SetProcessor(eth.blockProcessor)
@@ -443,6 +460,10 @@ func (s *Ethereum) Start() error {
// periodically flush databases
go s.syncDatabases()
+ if s.AutoDAG {
+ s.StartAutoDAG()
+ }
+
// Start services
go s.txPool.Start()
s.protocolManager.Start()
@@ -521,6 +542,7 @@ func (s *Ethereum) Stop() {
if s.whisper != nil {
s.whisper.Stop()
}
+ s.StopAutoDAG()
glog.V(logger.Info).Infoln("Server stopped")
close(s.shutdownChan)
@@ -554,6 +576,77 @@ func (self *Ethereum) syncAccounts(tx *types.Transaction) {
}
}
+// StartAutoDAG() spawns a go routine that checks the DAG every autoDAGcheckInterval
+// by default that is 10 times per epoch
+// in epoch n, if we past autoDAGepochHeight within-epoch blocks,
+// it calls ethash.MakeDAG to pregenerate the DAG for the next epoch n+1
+// if it does not exist yet as well as remove the DAG for epoch n-1
+// the loop quits if autodagquit channel is closed, it can safely restart and
+// stop any number of times.
+// For any more sophisticated pattern of DAG generation, use CLI subcommand
+// makedag
+func (self *Ethereum) StartAutoDAG() {
+ if self.autodagquit != nil {
+ return // already started
+ }
+ go func() {
+ glog.V(logger.Info).Infof("Automatic pregeneration of ethash DAG ON (ethash dir: %s)", ethash.DefaultDir)
+ var nextEpoch uint64
+ timer := time.After(0)
+ self.autodagquit = make(chan bool)
+ for {
+ select {
+ case <-timer:
+ glog.V(logger.Info).Infof("checking DAG (ethash dir: %s)", ethash.DefaultDir)
+ currentBlock := self.ChainManager().CurrentBlock().NumberU64()
+ thisEpoch := currentBlock / epochLength
+ if nextEpoch <= thisEpoch {
+ if currentBlock%epochLength > autoDAGepochHeight {
+ if thisEpoch > 0 {
+ previousDag, previousDagFull := dagFiles(thisEpoch - 1)
+ os.Remove(filepath.Join(ethash.DefaultDir, previousDag))
+ os.Remove(filepath.Join(ethash.DefaultDir, previousDagFull))
+ glog.V(logger.Info).Infof("removed DAG for epoch %d (%s)", thisEpoch-1, previousDag)
+ }
+ nextEpoch = thisEpoch + 1
+ dag, _ := dagFiles(nextEpoch)
+ if _, err := os.Stat(dag); os.IsNotExist(err) {
+ glog.V(logger.Info).Infof("Pregenerating DAG for epoch %d (%s)", nextEpoch, dag)
+ err := ethash.MakeDAG(nextEpoch*epochLength, "") // "" -> ethash.DefaultDir
+ if err != nil {
+ glog.V(logger.Error).Infof("Error generating DAG for epoch %d (%s)", nextEpoch, dag)
+ return
+ }
+ } else {
+ glog.V(logger.Error).Infof("DAG for epoch %d (%s)", nextEpoch, dag)
+ }
+ }
+ }
+ timer = time.After(autoDAGcheckInterval)
+ case <-self.autodagquit:
+ return
+ }
+ }
+ }()
+}
+
+// dagFiles(epoch) returns the two alternative DAG filenames (not a path)
+// 1) <revision>-<hex(seedhash[8])> 2) full-R<revision>-<hex(seedhash[8])>
+func dagFiles(epoch uint64) (string, string) {
+ seedHash, _ := ethash.GetSeedHash(epoch * epochLength)
+ dag := fmt.Sprintf("full-R%d-%x", ethashRevision, seedHash[:8])
+ return dag, "full-R" + dag
+}
+
+// stopAutoDAG stops automatic DAG pregeneration by quitting the loop
+func (self *Ethereum) StopAutoDAG() {
+ if self.autodagquit != nil {
+ close(self.autodagquit)
+ self.autodagquit = nil
+ }
+ glog.V(logger.Info).Infof("Automatic pregeneration of ethash DAG OFF (ethash dir: %s)", ethash.DefaultDir)
+}
+
func saveProtocolVersion(db common.Database, protov int) {
d, _ := db.Get([]byte("ProtocolVersion"))
protocolVersion := common.NewValue(d).Uint()
@@ -571,3 +664,18 @@ func saveBlockchainVersion(db common.Database, bcVersion int) {
db.Put([]byte("BlockchainVersion"), common.NewValue(bcVersion).Bytes())
}
}
+
+func (self *Ethereum) Solc() (*compiler.Solidity, error) {
+ var err error
+ if self.solc == nil {
+ self.solc, err = compiler.New(self.SolcPath)
+ }
+ return self.solc, err
+}
+
+// set in js console via admin interface or wrapper from cli flags
+func (self *Ethereum) SetSolc(solcPath string) (*compiler.Solidity, error) {
+ self.SolcPath = solcPath
+ self.solc = nil
+ return self.Solc()
+}
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index d817b223c..fd588d2f3 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -15,8 +15,10 @@ import (
)
const (
- maxHashFetch = 512 // Amount of hashes to be fetched per chunk
- maxBlockFetch = 128 // Amount of blocks to be fetched per chunk
+ MinHashFetch = 512 // Minimum amount of hashes to not consider a peer stalling
+ MaxHashFetch = 2048 // Amount of hashes to be fetched per retrieval request
+ MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request
+
peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount
hashTTL = 5 * time.Second // Time it takes for a hash request to time out
)
@@ -28,10 +30,11 @@ var (
)
var (
- errLowTd = errors.New("peer's TD is too low")
+ errLowTd = errors.New("peers TD is too low")
ErrBusy = errors.New("busy")
- errUnknownPeer = errors.New("peer's unknown or unhealthy")
+ errUnknownPeer = errors.New("peer is unknown or unhealthy")
ErrBadPeer = errors.New("action from bad peer ignored")
+ ErrStallingPeer = errors.New("peer is stalling")
errNoPeers = errors.New("no peers to keep download active")
ErrPendingQueue = errors.New("pending items in queue")
ErrTimeout = errors.New("timeout")
@@ -60,13 +63,18 @@ type hashPack struct {
hashes []common.Hash
}
+type crossCheck struct {
+ expire time.Time
+ parent common.Hash
+}
+
type Downloader struct {
mux *event.TypeMux
mu sync.RWMutex
- queue *queue // Scheduler for selecting the hashes to download
- peers *peerSet // Set of active peers from which download can proceed
- checks map[common.Hash]time.Time // Pending cross checks to verify a hash chain
+ queue *queue // Scheduler for selecting the hashes to download
+ peers *peerSet // Set of active peers from which download can proceed
+ checks map[common.Hash]*crossCheck // Pending cross checks to verify a hash chain
// Callbacks
hasBlock hashCheckFn
@@ -157,7 +165,7 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error {
// Reset the queue and peer set to clean any internal leftover state
d.queue.Reset()
d.peers.Reset()
- d.checks = make(map[common.Hash]time.Time)
+ d.checks = make(map[common.Hash]*crossCheck)
// Retrieve the origin peer and initiate the downloading process
p := d.peers.Peer(id)
@@ -283,15 +291,22 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
return ErrBadPeer
}
if !done {
+ // Check that the peer is not stalling the sync
+ if len(inserts) < MinHashFetch {
+ return ErrStallingPeer
+ }
// Try and fetch a random block to verify the hash batch
// Skip the last hash as the cross check races with the next hash fetch
- if len(inserts) > 1 {
- cross := inserts[rand.Intn(len(inserts)-1)]
- glog.V(logger.Detail).Infof("Cross checking (%s) with %x", active.id, cross)
+ cross := rand.Intn(len(inserts) - 1)
+ origin, parent := inserts[cross], inserts[cross+1]
+ glog.V(logger.Detail).Infof("Cross checking (%s) with %x/%x", active.id, origin, parent)
- d.checks[cross] = time.Now().Add(blockTTL)
- active.getBlocks([]common.Hash{cross})
+ d.checks[origin] = &crossCheck{
+ expire: time.Now().Add(blockTTL),
+ parent: parent,
}
+ active.getBlocks([]common.Hash{origin})
+
// Also fetch a fresh
active.getHashes(head)
continue
@@ -310,8 +325,8 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
continue
}
block := blockPack.blocks[0]
- if _, ok := d.checks[block.Hash()]; ok {
- if !d.queue.Has(block.ParentHash()) {
+ if check, ok := d.checks[block.Hash()]; ok {
+ if block.ParentHash() != check.parent {
return ErrCrossCheckFailed
}
delete(d.checks, block.Hash())
@@ -319,8 +334,8 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
case <-crossTicker.C:
// Iterate over all the cross checks and fail the hash chain if they're not verified
- for hash, deadline := range d.checks {
- if time.Now().After(deadline) {
+ for hash, check := range d.checks {
+ if time.Now().After(check.expire) {
glog.V(logger.Debug).Infof("Cross check timeout for %x", hash)
return ErrCrossCheckFailed
}
@@ -438,7 +453,7 @@ out:
}
// Get a possible chunk. If nil is returned no chunk
// could be returned due to no hashes available.
- request := d.queue.Reserve(peer, maxBlockFetch)
+ request := d.queue.Reserve(peer, MaxBlockFetch)
if request == nil {
continue
}
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index 19d64ac67..8b541d8b7 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -53,6 +53,8 @@ type downloadTester struct {
blocks map[common.Hash]*types.Block // Blocks associated with the hashes
chain []common.Hash // Block-chain being constructed
+ maxHashFetch int // Overrides the maximum number of retrieved hashes
+
t *testing.T
pcount int
done chan bool
@@ -133,8 +135,12 @@ func (dl *downloadTester) getBlock(hash common.Hash) *types.Block {
// getHashes retrieves a batch of hashes for reconstructing the chain.
func (dl *downloadTester) getHashes(head common.Hash) error {
+ limit := MaxHashFetch
+ if dl.maxHashFetch > 0 {
+ limit = dl.maxHashFetch
+ }
// Gather the next batch of hashes
- hashes := make([]common.Hash, 0, maxHashFetch)
+ hashes := make([]common.Hash, 0, limit)
for i, hash := range dl.hashes {
if hash == head {
i++
@@ -382,7 +388,7 @@ func TestRepeatingHashAttack(t *testing.T) {
// Make sure that syncing returns and does so with a failure
select {
- case <-time.After(100 * time.Millisecond):
+ case <-time.After(time.Second):
t.Fatalf("synchronisation blocked")
case err := <-errc:
if err == nil {
@@ -469,6 +475,23 @@ func TestMadeupHashChainAttack(t *testing.T) {
}
}
+// Tests that if a malicious peer makes up a random hash chain, and tries to push
+// indefinitely, one hash at a time, it actually gets caught with it. The reason
+// this is separate from the classical made up chain attack is that sending hashes
+// one by one prevents reliable block/parent verification.
+func TestMadeupHashChainDrippingAttack(t *testing.T) {
+ // Create a random chain of hashes to drip
+ hashes := createHashes(0, 16*blockCacheLimit)
+ tester := newTester(t, hashes, nil)
+
+ // Try and sync with the attacker, one hash at a time
+ tester.maxHashFetch = 1
+ tester.newPeer("attack", big.NewInt(10000), hashes[0])
+ if _, err := tester.syncTake("attack", hashes[0]); err != ErrStallingPeer {
+ t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrStallingPeer)
+ }
+}
+
// Tests that if a malicious peer makes up a random block chain, and tried to
// push indefinitely, it actually gets caught with it.
func TestMadeupBlockChainAttack(t *testing.T) {
@@ -479,7 +502,7 @@ func TestMadeupBlockChainAttack(t *testing.T) {
crossCheckCycle = 25 * time.Millisecond
// Create a long chain of blocks and simulate an invalid chain by dropping every second
- hashes := createHashes(0, 32*blockCacheLimit)
+ hashes := createHashes(0, 16*blockCacheLimit)
blocks := createBlocksFromHashes(hashes)
gapped := make([]common.Hash, len(hashes)/2)
@@ -502,3 +525,37 @@ func TestMadeupBlockChainAttack(t *testing.T) {
t.Fatalf("failed to synchronise blocks: %v", err)
}
}
+
+// Advanced form of the above forged blockchain attack, where not only does the
+// attacker make up a valid hashes for random blocks, but also forges the block
+// parents to point to existing hashes.
+func TestMadeupParentBlockChainAttack(t *testing.T) {
+ defaultBlockTTL := blockTTL
+ defaultCrossCheckCycle := crossCheckCycle
+
+ blockTTL = 100 * time.Millisecond
+ crossCheckCycle = 25 * time.Millisecond
+
+ // Create a long chain of blocks and simulate an invalid chain by dropping every second
+ hashes := createHashes(0, 16*blockCacheLimit)
+ blocks := createBlocksFromHashes(hashes)
+ forges := createBlocksFromHashes(hashes)
+ for hash, block := range forges {
+ block.ParentHeaderHash = hash // Simulate pointing to already known hash
+ }
+ // Try and sync with the malicious node and check that it fails
+ tester := newTester(t, hashes, forges)
+ tester.newPeer("attack", big.NewInt(10000), hashes[0])
+ if _, err := tester.syncTake("attack", hashes[0]); err != ErrCrossCheckFailed {
+ t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed)
+ }
+ // Ensure that a valid chain can still pass sync
+ blockTTL = defaultBlockTTL
+ crossCheckCycle = defaultCrossCheckCycle
+
+ tester.blocks = blocks
+ tester.newPeer("valid", big.NewInt(20000), hashes[0])
+ if _, err := tester.syncTake("valid", hashes[0]); err != nil {
+ t.Fatalf("failed to synchronise blocks: %v", err)
+ }
+}
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 13ec9a520..591a37773 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -17,7 +17,7 @@ import (
)
const (
- blockCacheLimit = 1024 // Maximum number of blocks to cache before throttling the download
+ blockCacheLimit = 8 * MaxBlockFetch // Maximum number of blocks to cache before throttling the download
)
// fetchRequest is a currently running block retrieval operation.
diff --git a/eth/handler.go b/eth/handler.go
index b2d741295..9117a70de 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -47,9 +47,7 @@ type ProtocolManager struct {
txpool txPool
chainman *core.ChainManager
downloader *downloader.Downloader
-
- pmu sync.Mutex
- peers map[string]*peer
+ peers *peerSet
SubProtocol p2p.Protocol
@@ -73,7 +71,7 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
txpool: txpool,
chainman: chainman,
downloader: downloader,
- peers: make(map[string]*peer),
+ peers: newPeerSet(),
newPeerCh: make(chan *peer, 1),
quitSync: make(chan struct{}),
}
@@ -95,10 +93,14 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
}
func (pm *ProtocolManager) removePeer(peer *peer) {
- pm.pmu.Lock()
- defer pm.pmu.Unlock()
+ // Unregister the peer from the downloader
pm.downloader.UnregisterPeer(peer.id)
- delete(pm.peers, peer.id)
+
+ // Remove the peer from the Ethereum peer set too
+ glog.V(logger.Detail).Infoln("Removing peer", peer.id)
+ if err := pm.peers.Unregister(peer.id); err != nil {
+ glog.V(logger.Error).Infoln("Removal failed:", err)
+ }
}
func (pm *ProtocolManager) Start() {
@@ -136,31 +138,32 @@ func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter
}
func (pm *ProtocolManager) handle(p *peer) error {
+ // Execute the Ethereum handshake, short circuit if fails
if err := p.handleStatus(); err != nil {
return err
}
- pm.pmu.Lock()
- pm.peers[p.id] = p
- pm.pmu.Unlock()
-
- pm.downloader.RegisterPeer(p.id, p.recentHash, p.requestHashes, p.requestBlocks)
- defer func() {
- pm.removePeer(p)
- }()
+ // Register the peer locally and in the downloader too
+ glog.V(logger.Detail).Infoln("Adding peer", p.id)
+ if err := pm.peers.Register(p); err != nil {
+ glog.V(logger.Error).Infoln("Addition failed:", err)
+ return err
+ }
+ defer pm.removePeer(p)
+ if err := pm.downloader.RegisterPeer(p.id, p.recentHash, p.requestHashes, p.requestBlocks); err != nil {
+ return err
+ }
// propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
if err := p.sendTransactions(pm.txpool.GetTransactions()); err != nil {
return err
}
-
// main loop. handle incoming messages.
for {
if err := pm.handleMsg(p); err != nil {
return err
}
}
-
return nil
}
@@ -203,8 +206,8 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "->msg %v: %v", msg, err)
}
- if request.Amount > maxHashes {
- request.Amount = maxHashes
+ if request.Amount > downloader.MaxHashFetch {
+ request.Amount = downloader.MaxHashFetch
}
hashes := self.chainman.GetBlockHashesFromHash(request.Hash, request.Amount)
@@ -251,7 +254,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
if block != nil {
blocks = append(blocks, block)
}
- if i == maxBlocks {
+ if i == downloader.MaxBlockFetch {
break
}
}
@@ -346,18 +349,8 @@ func (pm *ProtocolManager) verifyTd(peer *peer, request newBlockMsgData) error {
// out which peers do not contain the block in their block set and will do a
// sqrt(peers) to determine the amount of peers we broadcast to.
func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) {
- pm.pmu.Lock()
- defer pm.pmu.Unlock()
-
- // Find peers who don't know anything about the given hash. Peers that
- // don't know about the hash will be a candidate for the broadcast loop
- var peers []*peer
- for _, peer := range pm.peers {
- if !peer.blockHashes.Has(hash) {
- peers = append(peers, peer)
- }
- }
- // Broadcast block to peer set
+ // Broadcast block to a batch of peers not knowing about it
+ peers := pm.peers.PeersWithoutBlock(hash)
peers = peers[:int(math.Sqrt(float64(len(peers))))]
for _, peer := range peers {
peer.sendNewBlock(block)
@@ -369,18 +362,8 @@ func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block)
// out which peers do not contain the block in their block set and will do a
// sqrt(peers) to determine the amount of peers we broadcast to.
func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) {
- pm.pmu.Lock()
- defer pm.pmu.Unlock()
-
- // Find peers who don't know anything about the given hash. Peers that
- // don't know about the hash will be a candidate for the broadcast loop
- var peers []*peer
- for _, peer := range pm.peers {
- if !peer.txHashes.Has(hash) {
- peers = append(peers, peer)
- }
- }
- // Broadcast block to peer set
+ // Broadcast transaction to a batch of peers not knowing about it
+ peers := pm.peers.PeersWithoutTx(hash)
//FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
for _, peer := range peers {
peer.sendTransaction(tx)
diff --git a/eth/peer.go b/eth/peer.go
index 861efaaec..bb6a20349 100644
--- a/eth/peer.go
+++ b/eth/peer.go
@@ -1,17 +1,25 @@
package eth
import (
+ "errors"
"fmt"
"math/big"
+ "sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/p2p"
"gopkg.in/fatih/set.v0"
)
+var (
+ errAlreadyRegistered = errors.New("peer is already registered")
+ errNotRegistered = errors.New("peer is not registered")
+)
+
type statusMsgData struct {
ProtocolVersion uint32
NetworkId uint32
@@ -25,16 +33,6 @@ type getBlockHashesMsgData struct {
Amount uint64
}
-func getBestPeer(peers map[string]*peer) *peer {
- var peer *peer
- for _, cp := range peers {
- if peer == nil || cp.td.Cmp(peer.td) > 0 {
- peer = cp
- }
- }
- return peer
-}
-
type peer struct {
*p2p.Peer
@@ -103,8 +101,8 @@ func (p *peer) sendTransaction(tx *types.Transaction) error {
}
func (p *peer) requestHashes(from common.Hash) error {
- glog.V(logger.Debug).Infof("[%s] fetching hashes (%d) %x...\n", p.id, maxHashes, from[:4])
- return p2p.Send(p.rw, GetBlockHashesMsg, getBlockHashesMsgData{from, maxHashes})
+ glog.V(logger.Debug).Infof("[%s] fetching hashes (%d) %x...\n", p.id, downloader.MaxHashFetch, from[:4])
+ return p2p.Send(p.rw, GetBlockHashesMsg, getBlockHashesMsgData{from, downloader.MaxHashFetch})
}
func (p *peer) requestBlocks(hashes []common.Hash) error {
@@ -159,3 +157,103 @@ func (p *peer) handleStatus() error {
return <-errc
}
+
+// peerSet represents the collection of active peers currently participating in
+// the Ethereum sub-protocol.
+type peerSet struct {
+ peers map[string]*peer
+ lock sync.RWMutex
+}
+
+// newPeerSet creates a new peer set to track the active participants.
+func newPeerSet() *peerSet {
+ return &peerSet{
+ peers: make(map[string]*peer),
+ }
+}
+
+// Register injects a new peer into the working set, or returns an error if the
+// peer is already known.
+func (ps *peerSet) Register(p *peer) error {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ if _, ok := ps.peers[p.id]; ok {
+ return errAlreadyRegistered
+ }
+ ps.peers[p.id] = p
+ return nil
+}
+
+// Unregister removes a remote peer from the active set, disabling any further
+// actions to/from that particular entity.
+func (ps *peerSet) Unregister(id string) error {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ if _, ok := ps.peers[id]; !ok {
+ return errNotRegistered
+ }
+ delete(ps.peers, id)
+ return nil
+}
+
+// Peer retrieves the registered peer with the given id.
+func (ps *peerSet) Peer(id string) *peer {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ return ps.peers[id]
+}
+
+// Len returns if the current number of peers in the set.
+func (ps *peerSet) Len() int {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ return len(ps.peers)
+}
+
+// PeersWithoutBlock retrieves a list of peers that do not have a given block in
+// their set of known hashes.
+func (ps *peerSet) PeersWithoutBlock(hash common.Hash) []*peer {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ list := make([]*peer, 0, len(ps.peers))
+ for _, p := range ps.peers {
+ if !p.blockHashes.Has(hash) {
+ list = append(list, p)
+ }
+ }
+ return list
+}
+
+// PeersWithoutTx retrieves a list of peers that do not have a given transaction
+// in their set of known hashes.
+func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ list := make([]*peer, 0, len(ps.peers))
+ for _, p := range ps.peers {
+ if !p.txHashes.Has(hash) {
+ list = append(list, p)
+ }
+ }
+ return list
+}
+
+// BestPeer retrieves the known peer with the currently highest total difficulty.
+func (ps *peerSet) BestPeer() *peer {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ var best *peer
+ for _, p := range ps.peers {
+ if best == nil || p.td.Cmp(best.td) > 0 {
+ best = p
+ }
+ }
+ return best
+}
diff --git a/eth/protocol.go b/eth/protocol.go
index 48f37b59c..948051ed1 100644
--- a/eth/protocol.go
+++ b/eth/protocol.go
@@ -12,8 +12,6 @@ const (
NetworkId = 0
ProtocolLength = uint64(8)
ProtocolMaxMsgSize = 10 * 1024 * 1024
- maxHashes = 512
- maxBlocks = 128
)
// eth protocol message codes
diff --git a/eth/sync.go b/eth/sync.go
index aa7ebc77b..62d08acb6 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -10,8 +10,8 @@ import (
"github.com/ethereum/go-ethereum/logger/glog"
)
-// Sync contains all synchronisation code for the eth protocol
-
+// update periodically tries to synchronise with the network, both downloading
+// hashes and blocks as well as retrieving cached ones.
func (pm *ProtocolManager) update() {
forceSync := time.Tick(forceSyncCycle)
blockProc := time.Tick(blockProcCycle)
@@ -20,22 +20,16 @@ func (pm *ProtocolManager) update() {
for {
select {
case <-pm.newPeerCh:
- // Meet the `minDesiredPeerCount` before we select our best peer
- if len(pm.peers) < minDesiredPeerCount {
+ // Make sure we have peers to select from, then sync
+ if pm.peers.Len() < minDesiredPeerCount {
break
}
- // Find the best peer and synchronise with it
- peer := getBestPeer(pm.peers)
- if peer == nil {
- glog.V(logger.Debug).Infoln("Sync attempt canceled. No peers available")
- }
- go pm.synchronise(peer)
+ go pm.synchronise(pm.peers.BestPeer())
case <-forceSync:
// Force a sync even if not enough peers are present
- if peer := getBestPeer(pm.peers); peer != nil {
- go pm.synchronise(peer)
- }
+ go pm.synchronise(pm.peers.BestPeer())
+
case <-blockProc:
// Try to pull some blocks from the downloaded
if atomic.CompareAndSwapInt32(&blockProcPend, 0, 1) {
@@ -51,10 +45,9 @@ func (pm *ProtocolManager) update() {
}
}
-// processBlocks will attempt to reconstruct a chain by checking the first item and check if it's
-// a known parent. The first block in the chain may be unknown during downloading. When the
-// downloader isn't downloading blocks will be dropped with an unknown parent until either it
-// has depleted the list or found a known parent.
+// processBlocks retrieves downloaded blocks from the download cache and tries
+// to construct the local block chain with it. Note, since the block retrieval
+// order matters, access to this function *must* be synchronized/serialized.
func (pm *ProtocolManager) processBlocks() error {
pm.wg.Add(1)
defer pm.wg.Done()
@@ -79,15 +72,24 @@ func (pm *ProtocolManager) processBlocks() error {
return nil
}
+// synchronise tries to sync up our local block chain with a remote peer, both
+// adding various sanity checks as well as wrapping it with various log entries.
func (pm *ProtocolManager) synchronise(peer *peer) {
+ // Short circuit if no peers are available
+ if peer == nil {
+ glog.V(logger.Debug).Infoln("Synchronisation canceled: no peers available")
+ return
+ }
// Make sure the peer's TD is higher than our own. If not drop.
if peer.td.Cmp(pm.chainman.Td()) <= 0 {
+ glog.V(logger.Debug).Infoln("Synchronisation canceled: peer TD too small")
return
}
// FIXME if we have the hash in our chain and the TD of the peer is
// much higher than ours, something is wrong with us or the peer.
// Check if the hash is on our own chain
if pm.chainman.HasBlock(peer.recentHash) {
+ glog.V(logger.Debug).Infoln("Synchronisation canceled: head already known")
return
}
// Get the hashes from the peer (synchronously)