aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/peer.go
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-05-11 19:26:20 +0800
committerobscuren <geffobscura@gmail.com>2015-05-11 23:21:22 +0800
commit70c65835f4747d991fe8d79e7138828cd97c6ac7 (patch)
tree6ba6612d7e4f8b6fe718d0c7db55b6313d045b40 /eth/downloader/peer.go
parentfe7e284709f079fb9ddee2d0d2667ba968db0558 (diff)
downloadgo-tangerine-70c65835f4747d991fe8d79e7138828cd97c6ac7.tar
go-tangerine-70c65835f4747d991fe8d79e7138828cd97c6ac7.tar.gz
go-tangerine-70c65835f4747d991fe8d79e7138828cd97c6ac7.tar.bz2
go-tangerine-70c65835f4747d991fe8d79e7138828cd97c6ac7.tar.lz
go-tangerine-70c65835f4747d991fe8d79e7138828cd97c6ac7.tar.xz
go-tangerine-70c65835f4747d991fe8d79e7138828cd97c6ac7.tar.zst
go-tangerine-70c65835f4747d991fe8d79e7138828cd97c6ac7.zip
eth/downloader: fix #910, thread safe peers & polishes
Diffstat (limited to 'eth/downloader/peer.go')
-rw-r--r--eth/downloader/peer.go219
1 files changed, 143 insertions, 76 deletions
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index 45ec1cbfd..e2dec5571 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -1,125 +1,192 @@
+// Contains the active peer-set of the downloader, maintaining both failures
+// as well as reputation metrics to prioritize the block retrievals.
+
package downloader
import (
"errors"
"sync"
+ "sync/atomic"
"github.com/ethereum/go-ethereum/common"
"gopkg.in/fatih/set.v0"
)
-const (
- workingState = 2
- idleState = 4
-)
-
type hashFetcherFn func(common.Hash) error
type blockFetcherFn func([]common.Hash) error
-// XXX make threadsafe!!!!
-type peers map[string]*peer
+var (
+ errAlreadyFetching = errors.New("already fetching blocks from peer")
+ errAlreadyRegistered = errors.New("peer is already registered")
+ errNotRegistered = errors.New("peer is not registered")
+)
-func (p peers) reset() {
- for _, peer := range p {
- peer.reset()
- }
+// peer represents an active peer from which hashes and blocks are retrieved.
+type peer struct {
+ id string // Unique identifier of the peer
+ head common.Hash // Hash of the peers latest known block
+
+ idle int32 // Current activity state of the peer (idle = 0, active = 1)
+ rep int32 // Simple peer reputation (not used currently)
+
+ mu sync.RWMutex
+
+ ignored *set.Set
+
+ getHashes hashFetcherFn
+ getBlocks blockFetcherFn
}
-func (p peers) get(state int) []*peer {
- var peers []*peer
- for _, peer := range p {
- peer.mu.RLock()
- if peer.state == state {
- peers = append(peers, peer)
- }
- peer.mu.RUnlock()
+// newPeer create a new downloader peer, with specific hash and block retrieval
+// mechanisms.
+func newPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer {
+ return &peer{
+ id: id,
+ head: head,
+ getHashes: getHashes,
+ getBlocks: getBlocks,
+ ignored: set.New(),
}
+}
- return peers
+// Reset clears the internal state of a peer entity.
+func (p *peer) Reset() {
+ atomic.StoreInt32(&p.idle, 0)
+ p.ignored.Clear()
}
-func (p peers) setState(id string, state int) {
- if peer, exist := p[id]; exist {
- peer.mu.Lock()
- defer peer.mu.Unlock()
- peer.state = state
+// Fetch sends a block retrieval request to the remote peer.
+func (p *peer) Fetch(request *fetchRequest) error {
+ // Short circuit if the peer is already fetching
+ if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) {
+ return errAlreadyFetching
}
-}
+ // Convert the hash set to a retrievable slice
+ hashes := make([]common.Hash, 0, len(request.Hashes))
+ for hash, _ := range request.Hashes {
+ hashes = append(hashes, hash)
+ }
+ p.getBlocks(hashes)
-func (p peers) getPeer(id string) *peer {
- return p[id]
+ return nil
}
-// peer represents an active peer
-type peer struct {
- state int // Peer state (working, idle)
- rep int // TODO peer reputation
+// SetIdle sets the peer to idle, allowing it to execute new retrieval requests.
+func (p *peer) SetIdle() {
+ atomic.StoreInt32(&p.idle, 0)
+}
- mu sync.RWMutex
- id string
- recentHash common.Hash
+// Promote increases the peer's reputation.
+func (p *peer) Promote() {
+ atomic.AddInt32(&p.rep, 1)
+}
- ignored *set.Set
+// Demote decreases the peer's reputation or leaves it at 0.
+func (p *peer) Demote() {
+ for {
+ // Calculate the new reputation value
+ prev := atomic.LoadInt32(&p.rep)
+ next := prev - 2
+ if next < 0 {
+ next = 0
+ }
+ // Try to update the old value
+ if atomic.CompareAndSwapInt32(&p.rep, prev, next) {
+ return
+ }
+ }
+}
- getHashes hashFetcherFn
- getBlocks blockFetcherFn
+// peerSet represents the collection of active peer participating in the block
+// download procedure.
+type peerSet struct {
+ peers map[string]*peer
+ lock sync.RWMutex
}
-// create a new peer
-func newPeer(id string, hash common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer {
- return &peer{
- id: id,
- recentHash: hash,
- getHashes: getHashes,
- getBlocks: getBlocks,
- state: idleState,
- ignored: set.New(),
+// newPeerSet creates a new peer set top track the active download sources.
+func newPeerSet() *peerSet {
+ return &peerSet{
+ peers: make(map[string]*peer),
}
}
-// fetch a chunk using the peer
-func (p *peer) fetch(request *fetchRequest) error {
- p.mu.Lock()
- defer p.mu.Unlock()
+// Reset iterates over the current peer set, and resets each of the known peers
+// to prepare for a next batch of block retrieval.
+func (ps *peerSet) Reset() {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
- if p.state == workingState {
- return errors.New("peer already fetching chunk")
+ for _, peer := range ps.peers {
+ peer.Reset()
}
+}
- // set working state
- p.state = workingState
+// Register injects a new peer into the working set, or returns an error if the
+// peer is already known.
+func (ps *peerSet) Register(p *peer) error {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
- // Convert the hash set to a fetchable slice
- hashes := make([]common.Hash, 0, len(request.Hashes))
- for hash, _ := range request.Hashes {
- hashes = append(hashes, hash)
+ if _, ok := ps.peers[p.id]; ok {
+ return errAlreadyRegistered
}
- p.getBlocks(hashes)
+ ps.peers[p.id] = p
+ return nil
+}
+// Unregister removes a remote peer from the active set, disabling any further
+// actions to/from that particular entity.
+func (ps *peerSet) Unregister(id string) error {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ if _, ok := ps.peers[id]; !ok {
+ return errNotRegistered
+ }
+ delete(ps.peers, id)
return nil
}
-// promote increases the peer's reputation
-func (p *peer) promote() {
- p.mu.Lock()
- defer p.mu.Unlock()
+// Peer retrieves the registered peer with the given id.
+func (ps *peerSet) Peer(id string) *peer {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ return ps.peers[id]
+}
+
+// Peers returns if the current number of peers in the set.
+func (ps *peerSet) Peers() int {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
- p.rep++
+ return len(ps.peers)
}
-// demote decreases the peer's reputation or leaves it at 0
-func (p *peer) demote() {
- p.mu.Lock()
- defer p.mu.Unlock()
+// AllPeers retrieves a flat list of all the peers within the set.
+func (ps *peerSet) AllPeers() []*peer {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
- if p.rep > 1 {
- p.rep -= 2
- } else {
- p.rep = 0
+ list := make([]*peer, 0, len(ps.peers))
+ for _, p := range ps.peers {
+ list = append(list, p)
}
+ return list
}
-func (p *peer) reset() {
- p.state = idleState
- p.ignored.Clear()
+// IdlePeers retrieves a flat list of all the currently idle peers within the
+// active peer set.
+func (ps *peerSet) IdlePeers() []*peer {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ list := make([]*peer, 0, len(ps.peers))
+ for _, p := range ps.peers {
+ if atomic.LoadInt32(&p.idle) == 0 {
+ list = append(list, p)
+ }
+ }
+ return list
}