aboutsummaryrefslogtreecommitdiffstats
path: root/dex/peer.go
diff options
context:
space:
mode:
Diffstat (limited to 'dex/peer.go')
-rw-r--r--dex/peer.go296
1 files changed, 230 insertions, 66 deletions
diff --git a/dex/peer.go b/dex/peer.go
index f1a4335d1..8c218f1d9 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -27,6 +27,7 @@ import (
"github.com/dexon-foundation/dexon/common"
"github.com/dexon-foundation/dexon/core/types"
"github.com/dexon-foundation/dexon/p2p"
+ "github.com/dexon-foundation/dexon/p2p/discover"
"github.com/dexon-foundation/dexon/rlp"
)
@@ -34,19 +35,20 @@ var (
errClosed = errors.New("peer set is closed")
errAlreadyRegistered = errors.New("peer is already registered")
errNotRegistered = errors.New("peer is not registered")
- errInvalidRound = errors.New("invalid round")
)
const (
maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
+ maxKnownMetas = 32768 // Maximum metas hashes to keep in the known list (prevent DOS)
maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
- maxKnownInfos = 1024
// maxQueuedTxs is the maximum number of transaction lists to queue up before
// dropping broadcasts. This is a sensitive number as a transaction list might
// contain a single transaction, or thousands.
maxQueuedTxs = 128
+ maxQueuedMetas = 512
+
// maxQueuedProps is the maximum number of block propagations to queue up before
// dropping broadcasts. There's not much point in queueing stale blocks, so a few
// that might cover uncles should be enough.
@@ -57,9 +59,9 @@ const (
// above some healthy uncle limit, so use that.
maxQueuedAnns = 4
- maxQueuedInfos = 1024
-
handshakeTimeout = 5 * time.Second
+
+ groupNodeNum = 3
)
// PeerInfo represents a short summary of the Ethereum sub-protocol metadata known
@@ -76,13 +78,27 @@ type propEvent struct {
td *big.Int
}
+type setType uint32
+
+const (
+ dkgset = iota
+ notaryset
+)
+
+type peerLabel struct {
+ set setType
+ chainID uint32
+ round uint64
+}
+
type peer struct {
id string
*p2p.Peer
rw p2p.MsgReadWriter
- version int // Protocol version negotiated
+ version int // Protocol version negotiated
+ labels mapset.Set
forkDrop *time.Timer // Timed connection dropper if forks aren't validated in time
head common.Hash
@@ -90,12 +106,12 @@ type peer struct {
lock sync.RWMutex
knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
+ knownMetas mapset.Set // Set of node metas known to be known by this peer
knownBlocks mapset.Set // Set of block hashes known to be known by this peer
- knownInfos mapset.Set // Set of infos known to be known by this peer
queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
+ queuedMetas chan []*NodeMeta // Queue of node metas to broadcast to the peer
queuedProps chan *propEvent // Queue of blocks to broadcast to the peer
queuedAnns chan *types.Block // Queue of blocks to announce to the peer
- queuedInfos chan *notaryNodeInfo // Queue of infos to broadcast to the peer
term chan struct{} // Termination channel to stop the broadcaster
}
@@ -104,20 +120,21 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
Peer: p,
rw: rw,
version: version,
- id: fmt.Sprintf("%x", p.ID().Bytes()[:8]),
+ labels: mapset.NewSet(),
+ id: p.ID().String(),
knownTxs: mapset.NewSet(),
+ knownMetas: mapset.NewSet(),
knownBlocks: mapset.NewSet(),
- knownInfos: mapset.NewSet(),
queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
+ queuedMetas: make(chan []*NodeMeta, maxQueuedMetas),
queuedProps: make(chan *propEvent, maxQueuedProps),
queuedAnns: make(chan *types.Block, maxQueuedAnns),
- queuedInfos: make(chan *notaryNodeInfo, maxQueuedInfos),
term: make(chan struct{}),
}
}
// broadcast is a write loop that multiplexes block propagations, announcements,
-// transaction and notary node infos broadcasts into the remote peer.
+// transaction and notary node metas broadcasts into the remote peer.
// The goal is to have an async writer that does not lock up node internals.
func (p *peer) broadcast() {
for {
@@ -128,6 +145,12 @@ func (p *peer) broadcast() {
}
p.Log().Trace("Broadcast transactions", "count", len(txs))
+ case metas := <-p.queuedMetas:
+ if err := p.SendNodeMetas(metas); err != nil {
+ return
+ }
+ p.Log().Trace("Broadcast node metas", "count", len(metas))
+
case prop := <-p.queuedProps:
if err := p.SendNewBlock(prop.block, prop.td); err != nil {
return
@@ -140,12 +163,6 @@ func (p *peer) broadcast() {
}
p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash())
- case info := <-p.queuedInfos:
- if err := p.SendNotaryNodeInfo(info); err != nil {
- return
- }
- p.Log().Trace("Broadcast notary node info")
-
case <-p.term:
return
}
@@ -157,6 +174,14 @@ func (p *peer) close() {
close(p.term)
}
+func (p *peer) addLabel(label peerLabel) {
+ p.labels.Add(label)
+}
+
+func (p *peer) removeLabel(label peerLabel) {
+ p.labels.Remove(label)
+}
+
// Info gathers and returns a collection of metadata known about a peer.
func (p *peer) Info() *PeerInfo {
hash, td := p.Head()
@@ -207,11 +232,11 @@ func (p *peer) MarkTransaction(hash common.Hash) {
p.knownTxs.Add(hash)
}
-func (p *peer) MarkNotaryNodeInfo(hash common.Hash) {
- for p.knownInfos.Cardinality() >= maxKnownInfos {
- p.knownInfos.Pop()
+func (p *peer) MarkNodeMeta(hash common.Hash) {
+ for p.knownMetas.Cardinality() >= maxKnownMetas {
+ p.knownMetas.Pop()
}
- p.knownInfos.Add(hash)
+ p.knownMetas.Add(hash)
}
// SendTransactions sends transactions to the peer and includes the hashes
@@ -236,21 +261,26 @@ func (p *peer) AsyncSendTransactions(txs []*types.Transaction) {
}
}
-// SendNotaryNodeInfo sends the info to the peer and includes the hashes
-// in its info hash set for future reference.
-func (p *peer) SendNotaryNodeInfo(info *notaryNodeInfo) error {
- return p2p.Send(p.rw, NotaryNodeInfoMsg, info)
+// SendNodeMetas sends the metas to the peer and includes the hashes
+// in its metas hash set for future reference.
+func (p *peer) SendNodeMetas(metas []*NodeMeta) error {
+ for _, meta := range metas {
+ p.knownMetas.Add(meta.Hash())
+ }
+ return p2p.Send(p.rw, MetaMsg, metas)
}
-// AsyncSendNotaryNodeInfo queues list of notary node info propagation to a
+// AsyncSendNodeMeta queues list of notary node meta propagation to a
// remote peer. If the peer's broadcast queue is full, the event is silently
// dropped.
-func (p *peer) AsyncSendNotaryNodeInfo(info *notaryNodeInfo) {
+func (p *peer) AsyncSendNodeMetas(metas []*NodeMeta) {
select {
- case p.queuedInfos <- info:
- p.knownInfos.Add(info.Hash())
+ case p.queuedMetas <- metas:
+ for _, meta := range metas {
+ p.knownMetas.Add(meta.Hash())
+ }
default:
- p.Log().Debug("Dropping notary node info propagation")
+ p.Log().Debug("Dropping node meta propagation", "count", len(metas))
}
}
@@ -431,7 +461,7 @@ func (p *peer) readStatus(network uint64, status *statusData, genesis common.Has
// String implements fmt.Stringer.
func (p *peer) String() string {
return fmt.Sprintf("Peer %s [%s]", p.id,
- fmt.Sprintf("eth/%2d", p.version),
+ fmt.Sprintf("dex/%2d", p.version),
)
}
@@ -441,18 +471,25 @@ type peerSet struct {
peers map[string]*peer
lock sync.RWMutex
closed bool
+ tab *nodeTable
- // TODO(sonic): revist this map after dexon core SDK is finalized.
- // use types.ValidatorID? or implement Stringer for types.ValidatorID
- notaryPeers map[uint64]map[string]*peer
- round uint64
+ srvr p2pServer
+ gov governance
+ peerLabels map[string]map[peerLabel]struct{}
+ notaryHistory map[uint64]struct{}
+ dkgHistory map[uint64]struct{}
}
// newPeerSet creates a new peer set to track the active participants.
-func newPeerSet() *peerSet {
+func newPeerSet(gov governance, srvr p2pServer, tab *nodeTable) *peerSet {
return &peerSet{
- peers: make(map[string]*peer),
- notaryPeers: make(map[uint64]map[string]*peer),
+ peers: make(map[string]*peer),
+ gov: gov,
+ srvr: srvr,
+ tab: tab,
+ peerLabels: make(map[string]map[peerLabel]struct{}),
+ notaryHistory: make(map[uint64]struct{}),
+ dkgHistory: make(map[uint64]struct{}),
}
}
@@ -537,14 +574,14 @@ func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer {
return list
}
-// PeersWithoutNotaryNodeInfo retrieves a list of peers that do not have a
-// given info in their set of known hashes.
-func (ps *peerSet) PeersWithoutNotaryNodeInfo(hash common.Hash) []*peer {
+// PeersWithoutNodeMeta retrieves a list of peers that do not have a
+// given meta in their set of known hashes.
+func (ps *peerSet) PeersWithoutNodeMeta(hash common.Hash) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()
list := make([]*peer, 0, len(ps.peers))
for _, p := range ps.peers {
- if p.knownInfos.Contains(hash) {
+ if !p.knownMetas.Contains(hash) {
list = append(list, p)
}
}
@@ -580,44 +617,171 @@ func (ps *peerSet) Close() {
ps.closed = true
}
-// AddNotaryPeer adds a peer into notary peer of the given round.
-// Caller of this function need to make sure that the peer is acutally in
-// notary set.
-func (ps *peerSet) AddNotaryPeer(round uint64, p *peer) error {
+func (ps *peerSet) BuildNotaryConn(round uint64) {
ps.lock.Lock()
defer ps.lock.Unlock()
- // TODO(sonic): revisit this round check after dexon core SDK is finalized.
- if round < ps.round || round > ps.round+2 {
- return errInvalidRound
+ if _, ok := ps.notaryHistory[round]; ok {
+ return
}
- if _, ok := ps.peers[p.id]; !ok {
- return errNotRegistered
+ ps.notaryHistory[round] = struct{}{}
+
+ selfID := ps.srvr.Self().ID.String()
+ for chainID := uint32(0); chainID < ps.gov.GetChainNum(round); chainID++ {
+ s := ps.gov.GetNotarySet(chainID, round)
+
+ // not in notary set, add group
+ if _, ok := s[selfID]; !ok {
+ var nodes []*discover.Node
+ for id := range s {
+ nodes = append(nodes, ps.newNode(id))
+ }
+ ps.srvr.AddGroup(notarySetName(chainID, round), nodes, groupNodeNum)
+ continue
+ }
+
+ label := peerLabel{
+ set: notaryset,
+ chainID: chainID,
+ round: round,
+ }
+ delete(s, selfID)
+ for id := range s {
+ ps.addDirectPeer(id, label)
+ }
}
+}
- ps.notaryPeers[round][p.id] = p
- return nil
+func (ps *peerSet) ForgetNotaryConn(round uint64) {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ // forget all the rounds before the given round
+ for r := range ps.notaryHistory {
+ if r <= round {
+ ps.forgetNotaryConn(r)
+ delete(ps.notaryHistory, r)
+ }
+ }
}
-// NotaryPeers return peers in notary set of the given round.
-func (ps *peerSet) NotaryPeers(round uint64) []*peer {
- ps.lock.RLock()
- defer ps.lock.RUnlock()
+func (ps *peerSet) forgetNotaryConn(round uint64) {
+ selfID := ps.srvr.Self().ID.String()
+ for chainID := uint32(0); chainID < ps.gov.GetChainNum(round); chainID++ {
+ s := ps.gov.GetNotarySet(chainID, round)
+ if _, ok := s[selfID]; !ok {
+ ps.srvr.RemoveGroup(notarySetName(chainID, round))
+ continue
+ }
- list := make([]*peer, 0, len(ps.notaryPeers[round]))
- for _, p := range ps.notaryPeers[round] {
- if _, ok := ps.peers[p.id]; ok {
- list = append(list, p)
+ label := peerLabel{
+ set: notaryset,
+ chainID: chainID,
+ round: round,
+ }
+ delete(s, selfID)
+ for id := range s {
+ ps.removeDirectPeer(id, label)
}
}
- return list
}
-// NextRound moves notary peer set to next round.
-func (ps *peerSet) NextRound() {
+func notarySetName(chainID uint32, round uint64) string {
+ return fmt.Sprintf("%d-%d-notaryset", chainID, round)
+}
+
+func (ps *peerSet) BuildDKGConn(round uint64) {
ps.lock.Lock()
defer ps.lock.Unlock()
- delete(ps.notaryPeers, ps.round)
- ps.round = ps.round + 1
+ selfID := ps.srvr.Self().ID.String()
+ s := ps.gov.GetDKGSet(round)
+ if _, ok := s[selfID]; !ok {
+ return
+ }
+ ps.dkgHistory[round] = struct{}{}
+
+ delete(s, selfID)
+ for id := range s {
+ ps.addDirectPeer(id, peerLabel{
+ set: dkgset,
+ round: round,
+ })
+ }
+}
+
+func (ps *peerSet) ForgetDKGConn(round uint64) {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ // forget all the rounds before the given round
+ for r := range ps.dkgHistory {
+ if r <= round {
+ ps.forgetDKGConn(r)
+ delete(ps.dkgHistory, r)
+ }
+ }
+}
+
+func (ps *peerSet) forgetDKGConn(round uint64) {
+ selfID := ps.srvr.Self().ID.String()
+ s := ps.gov.GetDKGSet(round)
+ if _, ok := s[selfID]; !ok {
+ return
+ }
+
+ delete(s, selfID)
+ label := peerLabel{
+ set: dkgset,
+ round: round,
+ }
+ for id := range s {
+ ps.removeDirectPeer(id, label)
+ }
+}
+
+// make sure the ps.lock is hold
+func (ps *peerSet) addDirectPeer(id string, label peerLabel) {
+ // if the peer exists add the label
+ if p, ok := ps.peers[id]; ok {
+ p.addLabel(label)
+ }
+
+ if _, ok := ps.peerLabels[id]; !ok {
+ ps.peerLabels[id] = make(map[peerLabel]struct{})
+ }
+
+ ps.peerLabels[id][label] = struct{}{}
+ ps.srvr.AddDirectPeer(ps.newNode(id))
+}
+
+// make sure the ps.lock is hold
+func (ps *peerSet) removeDirectPeer(id string, label peerLabel) {
+ if p, ok := ps.peers[id]; ok {
+ p.removeLabel(label)
+ }
+
+ delete(ps.peerLabels[id], label)
+
+ if len(ps.peerLabels[id]) == 0 {
+ ps.srvr.RemoveDirectPeer(ps.newNode(id))
+ delete(ps.peerLabels, id)
+ }
+}
+
+func (ps *peerSet) newNode(id string) *enode.Node {
+ nodeID := enode.HexID(id)
+ meta := ps.tab.Get(enode.HexID(id))
+
+ var r enr.Record
+ r.Set(enr.ID(nodeID.String()))
+ r.Set(enr.IP(meta.IP))
+ r.Set(enr.TCP(meta.TCP))
+ r.Set(enr.UDP(meta.UDP))
+
+ n, err := enode.New(enode.ValidSchemes, &r)
+ if err != nil {
+ panic(err)
+ }
+ return n
}