From c67424ecc8a75d7c0bc942227a4c4e5c5628d7bc Mon Sep 17 00:00:00 2001 From: obscuren Date: Sun, 17 May 2015 01:42:30 +0200 Subject: core: parallelise nonce checking when processing blocks ChainManager now uses a parallel approach to block processing where all nonces are checked seperatly from the block processing process. This speeds up the process by about 3 times on my i7 --- eth/backend.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'eth') diff --git a/eth/backend.go b/eth/backend.go index a7107f8d8..519a4c410 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -266,9 +266,9 @@ func New(config *Config) (*Ethereum, error) { MinerThreads: config.MinerThreads, } - eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.EventMux()) - eth.downloader = downloader.New(eth.EventMux(), eth.chainManager.HasBlock, eth.chainManager.GetBlock) eth.pow = ethash.New() + eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.pow, eth.EventMux()) + eth.downloader = downloader.New(eth.EventMux(), eth.chainManager.HasBlock, eth.chainManager.GetBlock) eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State, eth.chainManager.GasLimit) eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux()) eth.chainManager.SetProcessor(eth.blockProcessor) -- cgit v1.2.3 From 5422fe51256e45c42939d1bfbcf13e07d2660f8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 18 May 2015 21:33:37 +0300 Subject: eth: make the peer set thread safe --- eth/handler.go | 65 +++++++++++------------------ eth/peer.go | 129 ++++++++++++++++++++++++++++++++++++++++++++++++++++----- eth/sync.go | 36 ++++++++-------- 3 files changed, 162 insertions(+), 68 deletions(-) (limited to 'eth') diff --git a/eth/handler.go b/eth/handler.go index b2d741295..835097d84 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -47,9 +47,7 @@ type ProtocolManager struct { txpool txPool chainman *core.ChainManager downloader *downloader.Downloader - - pmu sync.Mutex - peers map[string]*peer + peers *peerSet SubProtocol p2p.Protocol @@ -73,7 +71,7 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo txpool: txpool, chainman: chainman, downloader: downloader, - peers: make(map[string]*peer), + peers: newPeerSet(), newPeerCh: make(chan *peer, 1), quitSync: make(chan struct{}), } @@ -95,10 +93,14 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo } func (pm *ProtocolManager) removePeer(peer *peer) { - pm.pmu.Lock() - defer pm.pmu.Unlock() + // Unregister the peer from the downloader pm.downloader.UnregisterPeer(peer.id) - delete(pm.peers, peer.id) + + // Remove the peer from the Ethereum peer set too + glog.V(logger.Detail).Infoln("Removing peer", peer.id) + if err := pm.peers.Unregister(peer.id); err != nil { + glog.V(logger.Error).Infoln("Removal failed:", err) + } } func (pm *ProtocolManager) Start() { @@ -136,31 +138,32 @@ func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter } func (pm *ProtocolManager) handle(p *peer) error { + // Execute the Ethereum handshake, short circuit if fails if err := p.handleStatus(); err != nil { return err } - pm.pmu.Lock() - pm.peers[p.id] = p - pm.pmu.Unlock() - - pm.downloader.RegisterPeer(p.id, p.recentHash, p.requestHashes, p.requestBlocks) - defer func() { - pm.removePeer(p) - }() + // Register the peer locally and in the downloader too + glog.V(logger.Detail).Infoln("Adding peer", p.id) + if err := pm.peers.Register(p); err != nil { + glog.V(logger.Error).Infoln("Addition failed:", err) + return err + } + defer pm.removePeer(p) + if err := pm.downloader.RegisterPeer(p.id, p.recentHash, p.requestHashes, p.requestBlocks); err != nil { + return err + } // propagate existing transactions. new transactions appearing // after this will be sent via broadcasts. if err := p.sendTransactions(pm.txpool.GetTransactions()); err != nil { return err } - // main loop. handle incoming messages. for { if err := pm.handleMsg(p); err != nil { return err } } - return nil } @@ -346,18 +349,8 @@ func (pm *ProtocolManager) verifyTd(peer *peer, request newBlockMsgData) error { // out which peers do not contain the block in their block set and will do a // sqrt(peers) to determine the amount of peers we broadcast to. func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) { - pm.pmu.Lock() - defer pm.pmu.Unlock() - - // Find peers who don't know anything about the given hash. Peers that - // don't know about the hash will be a candidate for the broadcast loop - var peers []*peer - for _, peer := range pm.peers { - if !peer.blockHashes.Has(hash) { - peers = append(peers, peer) - } - } - // Broadcast block to peer set + // Broadcast block to a batch of peers not knowing about it + peers := pm.peers.BlockLackingPeers(hash) peers = peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range peers { peer.sendNewBlock(block) @@ -369,18 +362,8 @@ func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) // out which peers do not contain the block in their block set and will do a // sqrt(peers) to determine the amount of peers we broadcast to. func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) { - pm.pmu.Lock() - defer pm.pmu.Unlock() - - // Find peers who don't know anything about the given hash. Peers that - // don't know about the hash will be a candidate for the broadcast loop - var peers []*peer - for _, peer := range pm.peers { - if !peer.txHashes.Has(hash) { - peers = append(peers, peer) - } - } - // Broadcast block to peer set + // Broadcast transaction to a batch of peers not knowing about it + peers := pm.peers.TxLackingPeers(hash) //FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range peers { peer.sendTransaction(tx) diff --git a/eth/peer.go b/eth/peer.go index 861efaaec..369e16221 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -1,8 +1,10 @@ package eth import ( + "errors" "fmt" "math/big" + "sync" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -12,6 +14,11 @@ import ( "gopkg.in/fatih/set.v0" ) +var ( + errAlreadyRegistered = errors.New("peer is already registered") + errNotRegistered = errors.New("peer is not registered") +) + type statusMsgData struct { ProtocolVersion uint32 NetworkId uint32 @@ -25,16 +32,6 @@ type getBlockHashesMsgData struct { Amount uint64 } -func getBestPeer(peers map[string]*peer) *peer { - var peer *peer - for _, cp := range peers { - if peer == nil || cp.td.Cmp(peer.td) > 0 { - peer = cp - } - } - return peer -} - type peer struct { *p2p.Peer @@ -159,3 +156,115 @@ func (p *peer) handleStatus() error { return <-errc } + +// peerSet represents the collection of active peers currently participating in +// the Ethereum sub-protocol. +type peerSet struct { + peers map[string]*peer + lock sync.RWMutex +} + +// newPeerSet creates a new peer set to track the active participants. +func newPeerSet() *peerSet { + return &peerSet{ + peers: make(map[string]*peer), + } +} + +// Register injects a new peer into the working set, or returns an error if the +// peer is already known. +func (ps *peerSet) Register(p *peer) error { + ps.lock.Lock() + defer ps.lock.Unlock() + + if _, ok := ps.peers[p.id]; ok { + return errAlreadyRegistered + } + ps.peers[p.id] = p + return nil +} + +// Unregister removes a remote peer from the active set, disabling any further +// actions to/from that particular entity. +func (ps *peerSet) Unregister(id string) error { + ps.lock.Lock() + defer ps.lock.Unlock() + + if _, ok := ps.peers[id]; !ok { + return errNotRegistered + } + delete(ps.peers, id) + return nil +} + +// Peer retrieves the registered peer with the given id. +func (ps *peerSet) Peer(id string) *peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + return ps.peers[id] +} + +// Len returns if the current number of peers in the set. +func (ps *peerSet) Len() int { + ps.lock.RLock() + defer ps.lock.RUnlock() + + return len(ps.peers) +} + +// BlockLackingPeers retrieves a list of peers that do not have a given block +// in their set of known hashes. +func (ps *peerSet) BlockLackingPeers(hash common.Hash) []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + if !p.blockHashes.Has(hash) { + list = append(list, p) + } + } + return list +} + +// TxLackingPeers retrieves a list of peers that do not have a given transaction +// in their set of known hashes. +func (ps *peerSet) TxLackingPeers(hash common.Hash) []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + if !p.txHashes.Has(hash) { + list = append(list, p) + } + } + return list +} + +// AllPeers retrieves a flat list of all the peers within the set. +func (ps *peerSet) AllPeers() []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + list = append(list, p) + } + return list +} + +// BestPeer retrieves the known peer with the currently highest total difficulty. +func (ps *peerSet) BestPeer() *peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + var best *peer + for _, p := range ps.peers { + if best == nil || p.td.Cmp(best.td) > 0 { + best = p + } + } + return best +} diff --git a/eth/sync.go b/eth/sync.go index aa7ebc77b..62d08acb6 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -10,8 +10,8 @@ import ( "github.com/ethereum/go-ethereum/logger/glog" ) -// Sync contains all synchronisation code for the eth protocol - +// update periodically tries to synchronise with the network, both downloading +// hashes and blocks as well as retrieving cached ones. func (pm *ProtocolManager) update() { forceSync := time.Tick(forceSyncCycle) blockProc := time.Tick(blockProcCycle) @@ -20,22 +20,16 @@ func (pm *ProtocolManager) update() { for { select { case <-pm.newPeerCh: - // Meet the `minDesiredPeerCount` before we select our best peer - if len(pm.peers) < minDesiredPeerCount { + // Make sure we have peers to select from, then sync + if pm.peers.Len() < minDesiredPeerCount { break } - // Find the best peer and synchronise with it - peer := getBestPeer(pm.peers) - if peer == nil { - glog.V(logger.Debug).Infoln("Sync attempt canceled. No peers available") - } - go pm.synchronise(peer) + go pm.synchronise(pm.peers.BestPeer()) case <-forceSync: // Force a sync even if not enough peers are present - if peer := getBestPeer(pm.peers); peer != nil { - go pm.synchronise(peer) - } + go pm.synchronise(pm.peers.BestPeer()) + case <-blockProc: // Try to pull some blocks from the downloaded if atomic.CompareAndSwapInt32(&blockProcPend, 0, 1) { @@ -51,10 +45,9 @@ func (pm *ProtocolManager) update() { } } -// processBlocks will attempt to reconstruct a chain by checking the first item and check if it's -// a known parent. The first block in the chain may be unknown during downloading. When the -// downloader isn't downloading blocks will be dropped with an unknown parent until either it -// has depleted the list or found a known parent. +// processBlocks retrieves downloaded blocks from the download cache and tries +// to construct the local block chain with it. Note, since the block retrieval +// order matters, access to this function *must* be synchronized/serialized. func (pm *ProtocolManager) processBlocks() error { pm.wg.Add(1) defer pm.wg.Done() @@ -79,15 +72,24 @@ func (pm *ProtocolManager) processBlocks() error { return nil } +// synchronise tries to sync up our local block chain with a remote peer, both +// adding various sanity checks as well as wrapping it with various log entries. func (pm *ProtocolManager) synchronise(peer *peer) { + // Short circuit if no peers are available + if peer == nil { + glog.V(logger.Debug).Infoln("Synchronisation canceled: no peers available") + return + } // Make sure the peer's TD is higher than our own. If not drop. if peer.td.Cmp(pm.chainman.Td()) <= 0 { + glog.V(logger.Debug).Infoln("Synchronisation canceled: peer TD too small") return } // FIXME if we have the hash in our chain and the TD of the peer is // much higher than ours, something is wrong with us or the peer. // Check if the hash is on our own chain if pm.chainman.HasBlock(peer.recentHash) { + glog.V(logger.Debug).Infoln("Synchronisation canceled: head already known") return } // Get the hashes from the peer (synchronously) -- cgit v1.2.3 From 4755caeb2d07db057e152df555d58d0dd89bda03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 18 May 2015 21:35:42 +0300 Subject: eth: remote a superfluous peerSet method --- eth/peer.go | 12 ------------ 1 file changed, 12 deletions(-) (limited to 'eth') diff --git a/eth/peer.go b/eth/peer.go index 369e16221..a23449acd 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -243,18 +243,6 @@ func (ps *peerSet) TxLackingPeers(hash common.Hash) []*peer { return list } -// AllPeers retrieves a flat list of all the peers within the set. -func (ps *peerSet) AllPeers() []*peer { - ps.lock.RLock() - defer ps.lock.RUnlock() - - list := make([]*peer, 0, len(ps.peers)) - for _, p := range ps.peers { - list = append(list, p) - } - return list -} - // BestPeer retrieves the known peer with the currently highest total difficulty. func (ps *peerSet) BestPeer() *peer { ps.lock.RLock() -- cgit v1.2.3 From 22b694ee1e1044e68c906fbd864797ac2f8a4ab0 Mon Sep 17 00:00:00 2001 From: zelig Date: Wed, 20 May 2015 02:04:52 +0100 Subject: solc now in ethereum, fixes solc path setting; setSolc() didnt work --- eth/backend.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) (limited to 'eth') diff --git a/eth/backend.go b/eth/backend.go index 519a4c410..44ceb89e8 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -14,6 +14,7 @@ import ( "github.com/ethereum/ethash" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/compiler" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" @@ -79,6 +80,7 @@ type Config struct { GasPrice *big.Int MinerThreads int AccountManager *accounts.Manager + SolcPath string // NewDB is used to create databases. // If nil, the default is to create leveldb databases on disk. @@ -181,6 +183,8 @@ type Ethereum struct { pow *ethash.Ethash protocolManager *ProtocolManager downloader *downloader.Downloader + SolcPath string + solc *compiler.Solidity net *p2p.Server eventMux *event.TypeMux @@ -264,6 +268,7 @@ func New(config *Config) (*Ethereum, error) { netVersionId: config.NetworkId, NatSpec: config.NatSpec, MinerThreads: config.MinerThreads, + SolcPath: config.SolcPath, } eth.pow = ethash.New() @@ -571,3 +576,18 @@ func saveBlockchainVersion(db common.Database, bcVersion int) { db.Put([]byte("BlockchainVersion"), common.NewValue(bcVersion).Bytes()) } } + +func (self *Ethereum) Solc() (*compiler.Solidity, error) { + var err error + if self.solc == nil { + self.solc, err = compiler.New(self.SolcPath) + } + return self.solc, err +} + +// set in js console via admin interface or wrapper from cli flags +func (self *Ethereum) SetSolc(solcPath string) (*compiler.Solidity, error) { + self.SolcPath = solcPath + self.solc = nil + return self.Solc() +} -- cgit v1.2.3 From 6f54eb6d9a2375f4ee78df77068919ec0847fb1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 20 May 2015 10:15:42 +0300 Subject: eth/downloader: fix test to it doesn't time out on a slow machine --- eth/downloader/downloader_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'eth') diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 19d64ac67..98fdef696 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -382,7 +382,7 @@ func TestRepeatingHashAttack(t *testing.T) { // Make sure that syncing returns and does so with a failure select { - case <-time.After(100 * time.Millisecond): + case <-time.After(time.Second): t.Fatalf("synchronisation blocked") case err := <-errc: if err == nil { -- cgit v1.2.3 From 3c8227b935fdc9eda7a6cfacc2e0d0d189e7bb36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 20 May 2015 10:34:45 +0300 Subject: eth: fix odd method names in peer set --- eth/handler.go | 4 ++-- eth/peer.go | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) (limited to 'eth') diff --git a/eth/handler.go b/eth/handler.go index 835097d84..8dd254b1a 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -350,7 +350,7 @@ func (pm *ProtocolManager) verifyTd(peer *peer, request newBlockMsgData) error { // sqrt(peers) to determine the amount of peers we broadcast to. func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) { // Broadcast block to a batch of peers not knowing about it - peers := pm.peers.BlockLackingPeers(hash) + peers := pm.peers.PeersWithoutBlock(hash) peers = peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range peers { peer.sendNewBlock(block) @@ -363,7 +363,7 @@ func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) // sqrt(peers) to determine the amount of peers we broadcast to. func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) { // Broadcast transaction to a batch of peers not knowing about it - peers := pm.peers.TxLackingPeers(hash) + peers := pm.peers.PeersWithoutTx(hash) //FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range peers { peer.sendTransaction(tx) diff --git a/eth/peer.go b/eth/peer.go index a23449acd..fdd815293 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -213,9 +213,9 @@ func (ps *peerSet) Len() int { return len(ps.peers) } -// BlockLackingPeers retrieves a list of peers that do not have a given block -// in their set of known hashes. -func (ps *peerSet) BlockLackingPeers(hash common.Hash) []*peer { +// PeersWithoutBlock retrieves a list of peers that do not have a given block in +// their set of known hashes. +func (ps *peerSet) PeersWithoutBlock(hash common.Hash) []*peer { ps.lock.RLock() defer ps.lock.RUnlock() @@ -228,9 +228,9 @@ func (ps *peerSet) BlockLackingPeers(hash common.Hash) []*peer { return list } -// TxLackingPeers retrieves a list of peers that do not have a given transaction +// PeersWithoutTx retrieves a list of peers that do not have a given transaction // in their set of known hashes. -func (ps *peerSet) TxLackingPeers(hash common.Hash) []*peer { +func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer { ps.lock.RLock() defer ps.lock.RUnlock() -- cgit v1.2.3 From e8b22b9253da400cc350dbc673d07789f93f57bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 21 May 2015 08:07:58 +0300 Subject: eth/downloader: prevent a peer from dripping bad hashes --- eth/downloader/downloader.go | 20 ++++++++++++-------- eth/downloader/downloader_test.go | 25 ++++++++++++++++++++++++- 2 files changed, 36 insertions(+), 9 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index d817b223c..f3a866441 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -28,10 +28,11 @@ var ( ) var ( - errLowTd = errors.New("peer's TD is too low") + errLowTd = errors.New("peers TD is too low") ErrBusy = errors.New("busy") - errUnknownPeer = errors.New("peer's unknown or unhealthy") + errUnknownPeer = errors.New("peer is unknown or unhealthy") ErrBadPeer = errors.New("action from bad peer ignored") + ErrStallingPeer = errors.New("peer is stalling") errNoPeers = errors.New("no peers to keep download active") ErrPendingQueue = errors.New("pending items in queue") ErrTimeout = errors.New("timeout") @@ -283,15 +284,18 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { return ErrBadPeer } if !done { + // Check that the peer is not stalling the sync + if len(inserts) < maxHashFetch { + return ErrStallingPeer + } // Try and fetch a random block to verify the hash batch // Skip the last hash as the cross check races with the next hash fetch - if len(inserts) > 1 { - cross := inserts[rand.Intn(len(inserts)-1)] - glog.V(logger.Detail).Infof("Cross checking (%s) with %x", active.id, cross) + cross := inserts[rand.Intn(len(inserts)-1)] + glog.V(logger.Detail).Infof("Cross checking (%s) with %x", active.id, cross) + + d.checks[cross] = time.Now().Add(blockTTL) + active.getBlocks([]common.Hash{cross}) - d.checks[cross] = time.Now().Add(blockTTL) - active.getBlocks([]common.Hash{cross}) - } // Also fetch a fresh active.getHashes(head) continue diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 19d64ac67..8ed3289c6 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -53,6 +53,8 @@ type downloadTester struct { blocks map[common.Hash]*types.Block // Blocks associated with the hashes chain []common.Hash // Block-chain being constructed + maxHashFetch int // Overrides the maximum number of retrieved hashes + t *testing.T pcount int done chan bool @@ -133,8 +135,12 @@ func (dl *downloadTester) getBlock(hash common.Hash) *types.Block { // getHashes retrieves a batch of hashes for reconstructing the chain. func (dl *downloadTester) getHashes(head common.Hash) error { + limit := maxHashFetch + if dl.maxHashFetch > 0 { + limit = dl.maxHashFetch + } // Gather the next batch of hashes - hashes := make([]common.Hash, 0, maxHashFetch) + hashes := make([]common.Hash, 0, limit) for i, hash := range dl.hashes { if hash == head { i++ @@ -469,6 +475,23 @@ func TestMadeupHashChainAttack(t *testing.T) { } } +// Tests that if a malicious peer makes up a random hash chain, and tries to push +// indefinitely, one hash at a time, it actually gets caught with it. The reason +// this is separate from the classical made up chain attack is that sending hashes +// one by one prevents reliable block/parent verification. +func TestMadeupHashChainDrippingAttack(t *testing.T) { + // Create a random chain of hashes to drip + hashes := createHashes(0, 16*blockCacheLimit) + tester := newTester(t, hashes, nil) + + // Try and sync with the attacker, one hash at a time + tester.maxHashFetch = 1 + tester.newPeer("attack", big.NewInt(10000), hashes[0]) + if _, err := tester.syncTake("attack", hashes[0]); err != ErrStallingPeer { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrStallingPeer) + } +} + // Tests that if a malicious peer makes up a random block chain, and tried to // push indefinitely, it actually gets caught with it. func TestMadeupBlockChainAttack(t *testing.T) { -- cgit v1.2.3 From 52db6d8be577669bd5ba659ac223acf61956b05a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 21 May 2015 08:37:27 +0300 Subject: eth/downloader: circumvent a forged block chain with known parent attack --- eth/downloader/downloader.go | 33 +++++++++++++++++++++------------ eth/downloader/downloader_test.go | 36 +++++++++++++++++++++++++++++++++++- 2 files changed, 56 insertions(+), 13 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index f3a866441..f0629a551 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -61,13 +61,18 @@ type hashPack struct { hashes []common.Hash } +type crossCheck struct { + expire time.Time + parent common.Hash +} + type Downloader struct { mux *event.TypeMux mu sync.RWMutex - queue *queue // Scheduler for selecting the hashes to download - peers *peerSet // Set of active peers from which download can proceed - checks map[common.Hash]time.Time // Pending cross checks to verify a hash chain + queue *queue // Scheduler for selecting the hashes to download + peers *peerSet // Set of active peers from which download can proceed + checks map[common.Hash]*crossCheck // Pending cross checks to verify a hash chain // Callbacks hasBlock hashCheckFn @@ -158,7 +163,7 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { // Reset the queue and peer set to clean any internal leftover state d.queue.Reset() d.peers.Reset() - d.checks = make(map[common.Hash]time.Time) + d.checks = make(map[common.Hash]*crossCheck) // Retrieve the origin peer and initiate the downloading process p := d.peers.Peer(id) @@ -290,11 +295,15 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { } // Try and fetch a random block to verify the hash batch // Skip the last hash as the cross check races with the next hash fetch - cross := inserts[rand.Intn(len(inserts)-1)] - glog.V(logger.Detail).Infof("Cross checking (%s) with %x", active.id, cross) + cross := rand.Intn(len(inserts) - 1) + origin, parent := inserts[cross], inserts[cross+1] + glog.V(logger.Detail).Infof("Cross checking (%s) with %x/%x", active.id, origin, parent) - d.checks[cross] = time.Now().Add(blockTTL) - active.getBlocks([]common.Hash{cross}) + d.checks[origin] = &crossCheck{ + expire: time.Now().Add(blockTTL), + parent: parent, + } + active.getBlocks([]common.Hash{origin}) // Also fetch a fresh active.getHashes(head) @@ -314,8 +323,8 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { continue } block := blockPack.blocks[0] - if _, ok := d.checks[block.Hash()]; ok { - if !d.queue.Has(block.ParentHash()) { + if check, ok := d.checks[block.Hash()]; ok { + if block.ParentHash() != check.parent { return ErrCrossCheckFailed } delete(d.checks, block.Hash()) @@ -323,8 +332,8 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { case <-crossTicker.C: // Iterate over all the cross checks and fail the hash chain if they're not verified - for hash, deadline := range d.checks { - if time.Now().After(deadline) { + for hash, check := range d.checks { + if time.Now().After(check.expire) { glog.V(logger.Debug).Infof("Cross check timeout for %x", hash) return ErrCrossCheckFailed } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 8ed3289c6..d623a7c76 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -502,7 +502,7 @@ func TestMadeupBlockChainAttack(t *testing.T) { crossCheckCycle = 25 * time.Millisecond // Create a long chain of blocks and simulate an invalid chain by dropping every second - hashes := createHashes(0, 32*blockCacheLimit) + hashes := createHashes(0, 16*blockCacheLimit) blocks := createBlocksFromHashes(hashes) gapped := make([]common.Hash, len(hashes)/2) @@ -525,3 +525,37 @@ func TestMadeupBlockChainAttack(t *testing.T) { t.Fatalf("failed to synchronise blocks: %v", err) } } + +// Advanced form of the above forged blockchain attack, where not only does the +// attacker make up a valid hashes for random blocks, but also forges the block +// parents to point to existing hashes. +func TestMadeupParentBlockChainAttack(t *testing.T) { + defaultBlockTTL := blockTTL + defaultCrossCheckCycle := crossCheckCycle + + blockTTL = 100 * time.Millisecond + crossCheckCycle = 25 * time.Millisecond + + // Create a long chain of blocks and simulate an invalid chain by dropping every second + hashes := createHashes(0, 16*blockCacheLimit) + blocks := createBlocksFromHashes(hashes) + forges := createBlocksFromHashes(hashes) + for hash, block := range forges { + block.ParentHeaderHash = hash // Simulate pointing to already known hash + } + // Try and sync with the malicious node and check that it fails + tester := newTester(t, hashes, forges) + tester.newPeer("attack", big.NewInt(10000), hashes[0]) + if _, err := tester.syncTake("attack", hashes[0]); err != ErrCrossCheckFailed { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed) + } + // Ensure that a valid chain can still pass sync + blockTTL = defaultBlockTTL + crossCheckCycle = defaultCrossCheckCycle + + tester.blocks = blocks + tester.newPeer("valid", big.NewInt(20000), hashes[0]) + if _, err := tester.syncTake("valid", hashes[0]); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } +} -- cgit v1.2.3 From 207bd5575161fa2bc61a7ccf659c11878575dd32 Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 21 May 2015 11:45:35 +0200 Subject: eth: reduced max open files for LevelDB --- eth/backend.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'eth') diff --git a/eth/backend.go b/eth/backend.go index 44ceb89e8..69504fd94 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -213,7 +213,7 @@ func New(config *Config) (*Ethereum, error) { // Let the database take 3/4 of the max open files (TODO figure out a way to get the actual limit of the open files) const dbCount = 3 - ethdb.OpenFileLimit = 256 / (dbCount + 1) + ethdb.OpenFileLimit = 128 / (dbCount + 1) newdb := config.NewDB if newdb == nil { -- cgit v1.2.3 From bed80133e0573ebeefa201a15b20188198adf0ac Mon Sep 17 00:00:00 2001 From: zelig Date: Wed, 20 May 2015 16:56:17 +0100 Subject: automatic DAG pregeneration for smooth epoch transitions - backend: AutoDAG bool flag passed from cli/eth.Config to ethereum, autoDAG loop started if true - backend: autoDAG loop start/stop, remove previous DAG - cli: AutoDAG bool flag, off by default, but automatically ON if mining - admin jsre: add startAutoDAG stopAutoDAG and makeDAG in miner section - switch on/off DAG autogeneration when miner started/stopped on console --- eth/backend.go | 88 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) (limited to 'eth') diff --git a/eth/backend.go b/eth/backend.go index 69504fd94..938071fc7 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -31,6 +31,14 @@ import ( "github.com/ethereum/go-ethereum/whisper" ) +const ( + epochLength = 30000 + ethashRevision = 23 + + autoDAGcheckInterval = 10 * time.Hour + autoDAGepochHeight = epochLength / 2 +) + var ( jsonlogger = logger.NewJsonLogger() @@ -60,6 +68,7 @@ type Config struct { LogJSON string VmDebug bool NatSpec bool + AutoDAG bool MaxPeers int MaxPendingPeers int @@ -197,6 +206,8 @@ type Ethereum struct { MinerThreads int NatSpec bool DataDir string + AutoDAG bool + autodagquit chan bool etherbase common.Address clientVersion string ethVersionId int @@ -269,6 +280,7 @@ func New(config *Config) (*Ethereum, error) { NatSpec: config.NatSpec, MinerThreads: config.MinerThreads, SolcPath: config.SolcPath, + AutoDAG: config.AutoDAG, } eth.pow = ethash.New() @@ -448,6 +460,10 @@ func (s *Ethereum) Start() error { // periodically flush databases go s.syncDatabases() + if s.AutoDAG { + s.StartAutoDAG() + } + // Start services go s.txPool.Start() s.protocolManager.Start() @@ -526,6 +542,7 @@ func (s *Ethereum) Stop() { if s.whisper != nil { s.whisper.Stop() } + s.StopAutoDAG() glog.V(logger.Info).Infoln("Server stopped") close(s.shutdownChan) @@ -559,6 +576,77 @@ func (self *Ethereum) syncAccounts(tx *types.Transaction) { } } +// StartAutoDAG() spawns a go routine that checks the DAG every autoDAGcheckInterval +// by default that is 10 times per epoch +// in epoch n, if we past autoDAGepochHeight within-epoch blocks, +// it calls ethash.MakeDAG to pregenerate the DAG for the next epoch n+1 +// if it does not exist yet as well as remove the DAG for epoch n-1 +// the loop quits if autodagquit channel is closed, it can safely restart and +// stop any number of times. +// For any more sophisticated pattern of DAG generation, use CLI subcommand +// makedag +func (self *Ethereum) StartAutoDAG() { + if self.autodagquit != nil { + return // already started + } + go func() { + glog.V(logger.Info).Infof("Automatic pregeneration of ethash DAG ON (ethash dir: %s)", ethash.DefaultDir) + var nextEpoch uint64 + timer := time.After(0) + self.autodagquit = make(chan bool) + for { + select { + case <-timer: + glog.V(logger.Info).Infof("checking DAG (ethash dir: %s)", ethash.DefaultDir) + currentBlock := self.ChainManager().CurrentBlock().NumberU64() + thisEpoch := currentBlock / epochLength + if nextEpoch <= thisEpoch { + if currentBlock%epochLength > autoDAGepochHeight { + if thisEpoch > 0 { + previousDag, previousDagFull := dagFiles(thisEpoch - 1) + os.Remove(filepath.Join(ethash.DefaultDir, previousDag)) + os.Remove(filepath.Join(ethash.DefaultDir, previousDagFull)) + glog.V(logger.Info).Infof("removed DAG for epoch %d (%s)", thisEpoch-1, previousDag) + } + nextEpoch = thisEpoch + 1 + dag, _ := dagFiles(nextEpoch) + if _, err := os.Stat(dag); os.IsNotExist(err) { + glog.V(logger.Info).Infof("Pregenerating DAG for epoch %d (%s)", nextEpoch, dag) + err := ethash.MakeDAG(nextEpoch*epochLength, "") // "" -> ethash.DefaultDir + if err != nil { + glog.V(logger.Error).Infof("Error generating DAG for epoch %d (%s)", nextEpoch, dag) + return + } + } else { + glog.V(logger.Error).Infof("DAG for epoch %d (%s)", nextEpoch, dag) + } + } + } + timer = time.After(autoDAGcheckInterval) + case <-self.autodagquit: + return + } + } + }() +} + +// dagFiles(epoch) returns the two alternative DAG filenames (not a path) +// 1) - 2) full-R- +func dagFiles(epoch uint64) (string, string) { + seedHash, _ := ethash.GetSeedHash(epoch * epochLength) + dag := fmt.Sprintf("full-R%d-%x", ethashRevision, seedHash[:8]) + return dag, "full-R" + dag +} + +// stopAutoDAG stops automatic DAG pregeneration by quitting the loop +func (self *Ethereum) StopAutoDAG() { + if self.autodagquit != nil { + close(self.autodagquit) + self.autodagquit = nil + } + glog.V(logger.Info).Infof("Automatic pregeneration of ethash DAG OFF (ethash dir: %s)", ethash.DefaultDir) +} + func saveProtocolVersion(db common.Database, protov int) { d, _ := db.Get([]byte("ProtocolVersion")) protocolVersion := common.NewValue(d).Uint() -- cgit v1.2.3 From 06a041589f3c2d4b3e66a1ce51e3e03e209fdbff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 21 May 2015 18:16:04 +0300 Subject: eth, eth/downloader: remove duplicate consts, bump hash fetch to 2K --- eth/downloader/downloader.go | 10 ++++++---- eth/downloader/downloader_test.go | 2 +- eth/downloader/queue.go | 2 +- eth/handler.go | 6 +++--- eth/peer.go | 5 +++-- eth/protocol.go | 2 -- 6 files changed, 14 insertions(+), 13 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index f0629a551..fd588d2f3 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -15,8 +15,10 @@ import ( ) const ( - maxHashFetch = 512 // Amount of hashes to be fetched per chunk - maxBlockFetch = 128 // Amount of blocks to be fetched per chunk + MinHashFetch = 512 // Minimum amount of hashes to not consider a peer stalling + MaxHashFetch = 2048 // Amount of hashes to be fetched per retrieval request + MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request + peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount hashTTL = 5 * time.Second // Time it takes for a hash request to time out ) @@ -290,7 +292,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { } if !done { // Check that the peer is not stalling the sync - if len(inserts) < maxHashFetch { + if len(inserts) < MinHashFetch { return ErrStallingPeer } // Try and fetch a random block to verify the hash batch @@ -451,7 +453,7 @@ out: } // Get a possible chunk. If nil is returned no chunk // could be returned due to no hashes available. - request := d.queue.Reserve(peer, maxBlockFetch) + request := d.queue.Reserve(peer, MaxBlockFetch) if request == nil { continue } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index d623a7c76..6299ea162 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -135,7 +135,7 @@ func (dl *downloadTester) getBlock(hash common.Hash) *types.Block { // getHashes retrieves a batch of hashes for reconstructing the chain. func (dl *downloadTester) getHashes(head common.Hash) error { - limit := maxHashFetch + limit := MaxHashFetch if dl.maxHashFetch > 0 { limit = dl.maxHashFetch } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 13ec9a520..591a37773 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -17,7 +17,7 @@ import ( ) const ( - blockCacheLimit = 1024 // Maximum number of blocks to cache before throttling the download + blockCacheLimit = 8 * MaxBlockFetch // Maximum number of blocks to cache before throttling the download ) // fetchRequest is a currently running block retrieval operation. diff --git a/eth/handler.go b/eth/handler.go index 835097d84..63b1d50f6 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -206,8 +206,8 @@ func (self *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "->msg %v: %v", msg, err) } - if request.Amount > maxHashes { - request.Amount = maxHashes + if request.Amount > downloader.MaxHashFetch { + request.Amount = downloader.MaxHashFetch } hashes := self.chainman.GetBlockHashesFromHash(request.Hash, request.Amount) @@ -254,7 +254,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error { if block != nil { blocks = append(blocks, block) } - if i == maxBlocks { + if i == downloader.MaxBlockFetch { break } } diff --git a/eth/peer.go b/eth/peer.go index a23449acd..1ef032d38 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -8,6 +8,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/p2p" @@ -100,8 +101,8 @@ func (p *peer) sendTransaction(tx *types.Transaction) error { } func (p *peer) requestHashes(from common.Hash) error { - glog.V(logger.Debug).Infof("[%s] fetching hashes (%d) %x...\n", p.id, maxHashes, from[:4]) - return p2p.Send(p.rw, GetBlockHashesMsg, getBlockHashesMsgData{from, maxHashes}) + glog.V(logger.Debug).Infof("[%s] fetching hashes (%d) %x...\n", p.id, downloader.MaxHashFetch, from[:4]) + return p2p.Send(p.rw, GetBlockHashesMsg, getBlockHashesMsgData{from, downloader.MaxHashFetch}) } func (p *peer) requestBlocks(hashes []common.Hash) error { diff --git a/eth/protocol.go b/eth/protocol.go index 48f37b59c..948051ed1 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -12,8 +12,6 @@ const ( NetworkId = 0 ProtocolLength = uint64(8) ProtocolMaxMsgSize = 10 * 1024 * 1024 - maxHashes = 512 - maxBlocks = 128 ) // eth protocol message codes -- cgit v1.2.3