aboutsummaryrefslogtreecommitdiffstats
path: root/dex/handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'dex/handler.go')
-rw-r--r--dex/handler.go216
1 files changed, 87 insertions, 129 deletions
diff --git a/dex/handler.go b/dex/handler.go
index bc932fb28..5348b06f2 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -49,6 +49,8 @@ const (
// txChanSize is the size of channel listening to NewTxsEvent.
// The number is referenced from the size of tx pool.
txChanSize = 4096
+
+ metaChanSize = 10240
)
var (
@@ -59,11 +61,6 @@ var (
// not compatible (low protocol version restrictions and high requirements).
var errIncompatibleConfig = errors.New("incompatible configuration")
-type newNotarySetEvent struct {
- Round uint64
- Set map[string]struct{}
-}
-
func errResp(code errCode, format string, v ...interface{}) error {
return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
}
@@ -75,32 +72,35 @@ type ProtocolManager struct {
acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
txpool txPool
+ nodeTable *nodeTable
+ gov governance
blockchain *core.BlockChain
chainconfig *params.ChainConfig
maxPeers int
- downloader *downloader.Downloader
- fetcher *fetcher.Fetcher
- peers *peerSet
- notarySetManager *notarySetManager
+ downloader *downloader.Downloader
+ fetcher *fetcher.Fetcher
+ peers *peerSet
SubProtocols []p2p.Protocol
eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
txsSub event.Subscription
+ metasCh chan newMetasEvent
+ metasSub event.Subscription
minedBlockSub *event.TypeMuxSubscription
// channels for fetcher, syncer, txsyncLoop
newPeerCh chan *peer
txsyncCh chan *txsync
+ metasyncCh chan *metasync
quitSync chan struct{}
noMorePeers chan struct{}
- // channels for notarySetLoop
- newNotarySetCh chan newNotarySetEvent
- newRoundCh chan uint64
- newNotaryNodeInfoCh chan *notaryNodeInfo
+ // channels for peerSetLoop
+ crsCh chan core.NewCRSEvent
+ crsSub event.Subscription
srvr p2pServer
@@ -111,24 +111,26 @@ type ProtocolManager struct {
// NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the Ethereum network.
-func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
+func NewProtocolManager(
+ config *params.ChainConfig, mode downloader.SyncMode, networkID uint64,
+ mux *event.TypeMux, txpool txPool, engine consensus.Engine,
+ blockchain *core.BlockChain, chaindb ethdb.Database,
+ gov governance) (*ProtocolManager, error) {
+ tab := newNodeTable()
// Create the protocol manager with the base fields
manager := &ProtocolManager{
- networkID: networkID,
- eventMux: mux,
- txpool: txpool,
- blockchain: blockchain,
- chainconfig: config,
- peers: newPeerSet(),
- newPeerCh: make(chan *peer),
- noMorePeers: make(chan struct{}),
- txsyncCh: make(chan *txsync),
- quitSync: make(chan struct{}),
- newNotarySetCh: make(chan newNotarySetEvent),
- newRoundCh: make(chan uint64),
- newNotaryNodeInfoCh: make(chan *notaryNodeInfo),
- }
- manager.notarySetManager = newNotarySetManager(manager.newNotaryNodeInfoCh)
+ networkID: networkID,
+ eventMux: mux,
+ txpool: txpool,
+ nodeTable: tab,
+ gov: gov,
+ blockchain: blockchain,
+ chainconfig: config,
+ newPeerCh: make(chan *peer),
+ noMorePeers: make(chan struct{}),
+ txsyncCh: make(chan *txsync),
+ quitSync: make(chan struct{}),
+ }
// Figure out whether to allow fast sync or not
if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
@@ -216,22 +218,33 @@ func (pm *ProtocolManager) removePeer(id string) {
func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
pm.maxPeers = maxPeers
pm.srvr = srvr
+ pm.peers = newPeerSet(pm.gov, pm.srvr, pm.nodeTable)
+
+ // if our self in node set build the node info
// broadcast transactions
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
go pm.txBroadcastLoop()
+ // broadcast node metas
+ pm.metasCh = make(chan newMetasEvent, metaChanSize)
+ pm.metasSub = pm.nodeTable.SubscribeNewMetasEvent(pm.metasCh)
+ go pm.metaBroadcastLoop()
+
// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()
- // run the notary set loop
- go pm.notarySetLoop()
+ // run the peer set loop
+ pm.crsCh = make(chan core.NewCRSEvent)
+ pm.crsSub = pm.gov.SubscribeNewCRSEvent(pm.crsCh)
+ go pm.peerSetLoop()
// start sync handlers
go pm.syncer()
go pm.txsyncLoop()
+ go pm.metasyncLoop()
}
func (pm *ProtocolManager) Stop() {
@@ -301,6 +314,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
pm.syncTransactions(p)
+ pm.syncNodeMetas(p)
// If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork
if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
@@ -699,18 +713,18 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
pm.txpool.AddRemotes(txs)
- case msg.Code == NotaryNodeInfoMsg:
- var info notaryNodeInfo
- if err := msg.Decode(&info); err != nil {
+ case msg.Code == MetaMsg:
+ var metas []*NodeMeta
+ if err := msg.Decode(&metas); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
-
- // TODO(sonic): validate signature
- p.MarkNotaryNodeInfo(info.Hash())
- // Broadcast to peers
- pm.BroadcastNotaryNodeInfo(&info)
-
- pm.notarySetManager.TryAddInfo(&info)
+ for i, meta := range metas {
+ if meta == nil {
+ return errResp(ErrDecode, "node meta %d is nil", i)
+ }
+ p.MarkNodeMeta(meta.Hash())
+ }
+ pm.nodeTable.Add(metas)
default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
}
@@ -769,11 +783,20 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
}
}
-// BroadcastNotaryNodeInfo will propagate notary node info to its peers.
-func (pm *ProtocolManager) BroadcastNotaryNodeInfo(info *notaryNodeInfo) {
- peers := pm.peers.PeersWithoutNotaryNodeInfo(info.Hash())
- for _, peer := range peers {
- peer.AsyncSendNotaryNodeInfo(info)
+// BroadcastMetas will propagate node metas to its peers.
+func (pm *ProtocolManager) BroadcastMetas(metas []*NodeMeta) {
+ var metaset = make(map[*peer][]*NodeMeta)
+
+ for _, meta := range metas {
+ peers := pm.peers.PeersWithoutNodeMeta(meta.Hash())
+ for _, peer := range peers {
+ metaset[peer] = append(metaset[peer], meta)
+ }
+ log.Trace("Broadcast meta", "ID", meta.ID, "recipients", len(peers))
+ }
+
+ for peer, metas := range metaset {
+ peer.AsyncSendNodeMetas(metas)
}
}
@@ -801,97 +824,32 @@ func (pm *ProtocolManager) txBroadcastLoop() {
}
}
-// a loop keep building and maintaining peers in notary set.
-func (pm *ProtocolManager) notarySetLoop() {
- // TODO:
- // * pm need to know current round, and whether we are in notary set.
- // * (done) able to handle node info in the near future round.
- // * check peer limit.
- // * revisit channels (newNotarySetCh, newRoundCh, newNotaryNodeInfoCh) (buffered ?)
-
- // If we are in notary set and we are synced with network.
- // events:
- // * new notary set is determined, and we are in notary set.
- // (TBD: subscribe the event or polling govornance)
- // - advance round
- // - build new notary set
- // - remove old notary set, remove old notary set from static
-
- // * current round node info changed.
-
- self := pm.srvr.Self()
- var round uint64
-
+func (pm *ProtocolManager) metaBroadcastLoop() {
for {
select {
- case event := <-pm.newNotarySetCh:
- // new notary set is determined.
- if _, ok := event.Set[self.ID.String()]; ok {
- // initialize the new notary set and add it to pm.notarySetManager
- s := newNotarySet(event.Round, event.Set)
- pm.notarySetManager.Register(event.Round, s)
-
- // TODO: handle signature
- pm.BroadcastNotaryNodeInfo(&notaryNodeInfo{
- ID: self.ID,
- IP: self.IP,
- UDP: self.UDP,
- TCP: self.TCP,
- Round: event.Round,
- Timestamp: time.Now().Unix(),
- })
- }
-
- case r := <-pm.newRoundCh:
- // move to new round.
- round = r
+ case event := <-pm.metasCh:
+ pm.BroadcastMetas(event.Metas)
- // TODO: revisit this to make sure how many previous round's
- // notary set we need to keep.
-
- // rmove notary set before current round, they are useless
- for _, s := range pm.notarySetManager.Before(round) {
- pm.removeNotarySetFromStatic(s)
- }
-
- // prepare notary set connections for new round.
- if pm.isInNotarySet(round + 1) {
- // we assume the notary set for round + 1 is added to
- // pm.notarySetManager before this step.
- if n, ok := pm.notarySetManager.Round(round + 1); ok {
- pm.addNotarySetToStatic(n)
- }
- }
-
- case <-pm.newNotaryNodeInfoCh:
- // some node info in "current round" is updated.
- // try to connect them by the new info.
- if pm.isInNotarySet(round) {
- if n, ok := pm.notarySetManager.Round(round); ok {
- pm.addNotarySetToStatic(n)
- }
- }
-
- case <-pm.quitSync:
+ // Err() channel will be closed when unsubscribing.
+ case <-pm.metasSub.Err():
return
}
}
}
-func (pm *ProtocolManager) isInNotarySet(r uint64) bool {
- return false
-}
-
-func (pm *ProtocolManager) addNotarySetToStatic(n *notarySet) {
- for _, node := range n.NodesToAdd() {
- pm.srvr.AddNotaryPeer(node)
- n.MarkAdded(node.ID.String())
- }
-}
-
-func (pm *ProtocolManager) removeNotarySetFromStatic(n *notarySet) {
- for _, node := range n.Nodes() {
- pm.srvr.RemoveNotaryPeer(node)
+// a loop keep building and maintaining peers in notary set.
+// TODO: finish this
+func (pm *ProtocolManager) peerSetLoop() {
+ for {
+ select {
+ case event := <-pm.crsCh:
+ pm.peers.BuildNotaryConn(event.Round)
+ pm.peers.BuildDKGConn(event.Round)
+ pm.peers.ForgetNotaryConn(event.Round - 1)
+ pm.peers.ForgetDKGConn(event.Round - 1)
+ case <-pm.quitSync:
+ return
+ }
}
}