From 5422fe51256e45c42939d1bfbcf13e07d2660f8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 18 May 2015 21:33:37 +0300 Subject: eth: make the peer set thread safe --- eth/peer.go | 129 +++++++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 119 insertions(+), 10 deletions(-) (limited to 'eth/peer.go') diff --git a/eth/peer.go b/eth/peer.go index 861efaaec..369e16221 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -1,8 +1,10 @@ package eth import ( + "errors" "fmt" "math/big" + "sync" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -12,6 +14,11 @@ import ( "gopkg.in/fatih/set.v0" ) +var ( + errAlreadyRegistered = errors.New("peer is already registered") + errNotRegistered = errors.New("peer is not registered") +) + type statusMsgData struct { ProtocolVersion uint32 NetworkId uint32 @@ -25,16 +32,6 @@ type getBlockHashesMsgData struct { Amount uint64 } -func getBestPeer(peers map[string]*peer) *peer { - var peer *peer - for _, cp := range peers { - if peer == nil || cp.td.Cmp(peer.td) > 0 { - peer = cp - } - } - return peer -} - type peer struct { *p2p.Peer @@ -159,3 +156,115 @@ func (p *peer) handleStatus() error { return <-errc } + +// peerSet represents the collection of active peers currently participating in +// the Ethereum sub-protocol. +type peerSet struct { + peers map[string]*peer + lock sync.RWMutex +} + +// newPeerSet creates a new peer set to track the active participants. +func newPeerSet() *peerSet { + return &peerSet{ + peers: make(map[string]*peer), + } +} + +// Register injects a new peer into the working set, or returns an error if the +// peer is already known. +func (ps *peerSet) Register(p *peer) error { + ps.lock.Lock() + defer ps.lock.Unlock() + + if _, ok := ps.peers[p.id]; ok { + return errAlreadyRegistered + } + ps.peers[p.id] = p + return nil +} + +// Unregister removes a remote peer from the active set, disabling any further +// actions to/from that particular entity. +func (ps *peerSet) Unregister(id string) error { + ps.lock.Lock() + defer ps.lock.Unlock() + + if _, ok := ps.peers[id]; !ok { + return errNotRegistered + } + delete(ps.peers, id) + return nil +} + +// Peer retrieves the registered peer with the given id. +func (ps *peerSet) Peer(id string) *peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + return ps.peers[id] +} + +// Len returns if the current number of peers in the set. +func (ps *peerSet) Len() int { + ps.lock.RLock() + defer ps.lock.RUnlock() + + return len(ps.peers) +} + +// BlockLackingPeers retrieves a list of peers that do not have a given block +// in their set of known hashes. +func (ps *peerSet) BlockLackingPeers(hash common.Hash) []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + if !p.blockHashes.Has(hash) { + list = append(list, p) + } + } + return list +} + +// TxLackingPeers retrieves a list of peers that do not have a given transaction +// in their set of known hashes. +func (ps *peerSet) TxLackingPeers(hash common.Hash) []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + if !p.txHashes.Has(hash) { + list = append(list, p) + } + } + return list +} + +// AllPeers retrieves a flat list of all the peers within the set. +func (ps *peerSet) AllPeers() []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + list = append(list, p) + } + return list +} + +// BestPeer retrieves the known peer with the currently highest total difficulty. +func (ps *peerSet) BestPeer() *peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + var best *peer + for _, p := range ps.peers { + if best == nil || p.td.Cmp(best.td) > 0 { + best = p + } + } + return best +} -- cgit v1.2.3 From 4755caeb2d07db057e152df555d58d0dd89bda03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 18 May 2015 21:35:42 +0300 Subject: eth: remote a superfluous peerSet method --- eth/peer.go | 12 ------------ 1 file changed, 12 deletions(-) (limited to 'eth/peer.go') diff --git a/eth/peer.go b/eth/peer.go index 369e16221..a23449acd 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -243,18 +243,6 @@ func (ps *peerSet) TxLackingPeers(hash common.Hash) []*peer { return list } -// AllPeers retrieves a flat list of all the peers within the set. -func (ps *peerSet) AllPeers() []*peer { - ps.lock.RLock() - defer ps.lock.RUnlock() - - list := make([]*peer, 0, len(ps.peers)) - for _, p := range ps.peers { - list = append(list, p) - } - return list -} - // BestPeer retrieves the known peer with the currently highest total difficulty. func (ps *peerSet) BestPeer() *peer { ps.lock.RLock() -- cgit v1.2.3 From 3c8227b935fdc9eda7a6cfacc2e0d0d189e7bb36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 20 May 2015 10:34:45 +0300 Subject: eth: fix odd method names in peer set --- eth/peer.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'eth/peer.go') diff --git a/eth/peer.go b/eth/peer.go index a23449acd..fdd815293 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -213,9 +213,9 @@ func (ps *peerSet) Len() int { return len(ps.peers) } -// BlockLackingPeers retrieves a list of peers that do not have a given block -// in their set of known hashes. -func (ps *peerSet) BlockLackingPeers(hash common.Hash) []*peer { +// PeersWithoutBlock retrieves a list of peers that do not have a given block in +// their set of known hashes. +func (ps *peerSet) PeersWithoutBlock(hash common.Hash) []*peer { ps.lock.RLock() defer ps.lock.RUnlock() @@ -228,9 +228,9 @@ func (ps *peerSet) BlockLackingPeers(hash common.Hash) []*peer { return list } -// TxLackingPeers retrieves a list of peers that do not have a given transaction +// PeersWithoutTx retrieves a list of peers that do not have a given transaction // in their set of known hashes. -func (ps *peerSet) TxLackingPeers(hash common.Hash) []*peer { +func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer { ps.lock.RLock() defer ps.lock.RUnlock() -- cgit v1.2.3 From 06a041589f3c2d4b3e66a1ce51e3e03e209fdbff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 21 May 2015 18:16:04 +0300 Subject: eth, eth/downloader: remove duplicate consts, bump hash fetch to 2K --- eth/peer.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'eth/peer.go') diff --git a/eth/peer.go b/eth/peer.go index a23449acd..1ef032d38 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -8,6 +8,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/p2p" @@ -100,8 +101,8 @@ func (p *peer) sendTransaction(tx *types.Transaction) error { } func (p *peer) requestHashes(from common.Hash) error { - glog.V(logger.Debug).Infof("[%s] fetching hashes (%d) %x...\n", p.id, maxHashes, from[:4]) - return p2p.Send(p.rw, GetBlockHashesMsg, getBlockHashesMsgData{from, maxHashes}) + glog.V(logger.Debug).Infof("[%s] fetching hashes (%d) %x...\n", p.id, downloader.MaxHashFetch, from[:4]) + return p2p.Send(p.rw, GetBlockHashesMsg, getBlockHashesMsgData{from, downloader.MaxHashFetch}) } func (p *peer) requestBlocks(hashes []common.Hash) error { -- cgit v1.2.3