diff options
-rw-r--r-- | dex/handler.go | 164 | ||||
-rw-r--r-- | dex/helper_test.go | 24 | ||||
-rw-r--r-- | dex/protocol.go | 11 |
3 files changed, 183 insertions, 16 deletions
diff --git a/dex/handler.go b/dex/handler.go index 21609a561..96d20b02b 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -59,6 +59,11 @@ var ( // not compatible (low protocol version restrictions and high requirements). var errIncompatibleConfig = errors.New("incompatible configuration") +type newNotarySetEvent struct { + Round uint64 + Set map[string]struct{} +} + func errResp(code errCode, format string, v ...interface{}) error { return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) } @@ -74,9 +79,10 @@ type ProtocolManager struct { chainconfig *params.ChainConfig maxPeers int - downloader *downloader.Downloader - fetcher *fetcher.Fetcher - peers *peerSet + downloader *downloader.Downloader + fetcher *fetcher.Fetcher + peers *peerSet + notarySetManager *notarySetManager SubProtocols []p2p.Protocol @@ -91,6 +97,13 @@ type ProtocolManager struct { quitSync chan struct{} noMorePeers chan struct{} + // channels for notarySetLoop + newNotarySetCh chan newNotarySetEvent + newRoundCh chan uint64 + newNotaryNodeInfoCh chan *notaryNodeInfo + + srvr p2pServer + // wait group is used for graceful shutdowns during downloading // and processing wg sync.WaitGroup @@ -101,17 +114,22 @@ type ProtocolManager struct { func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) { // Create the protocol manager with the base fields manager := &ProtocolManager{ - networkID: networkID, - eventMux: mux, - txpool: txpool, - blockchain: blockchain, - chainconfig: config, - peers: newPeerSet(), - newPeerCh: make(chan *peer), - noMorePeers: make(chan struct{}), - txsyncCh: make(chan *txsync), - quitSync: make(chan struct{}), + networkID: networkID, + eventMux: mux, + txpool: txpool, + blockchain: blockchain, + chainconfig: config, + peers: newPeerSet(), + newPeerCh: make(chan *peer), + noMorePeers: make(chan struct{}), + txsyncCh: make(chan *txsync), + quitSync: make(chan struct{}), + newNotarySetCh: make(chan newNotarySetEvent), + newRoundCh: make(chan uint64), + newNotaryNodeInfoCh: make(chan *notaryNodeInfo), } + manager.notarySetManager = newNotarySetManager(manager.newNotaryNodeInfoCh) + // Figure out whether to allow fast sync or not if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 { log.Warn("Blockchain not empty, fast sync disabled") @@ -195,8 +213,9 @@ func (pm *ProtocolManager) removePeer(id string) { } } -func (pm *ProtocolManager) Start(maxPeers int) { +func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { pm.maxPeers = maxPeers + pm.srvr = srvr // broadcast transactions pm.txsCh = make(chan core.NewTxsEvent, txChanSize) @@ -207,6 +226,9 @@ func (pm *ProtocolManager) Start(maxPeers int) { pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) go pm.minedBroadcastLoop() + // run the notary set loop + go pm.notarySetLoop() + // start sync handlers go pm.syncer() go pm.txsyncLoop() @@ -677,6 +699,18 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.txpool.AddRemotes(txs) + case msg.Code == NotaryNodeInfoMsg: + var info notaryNodeInfo + if err := msg.Decode(&info); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + + // TODO(sonic): validate signature + p.MarkNotaryNodeInfo(info.Hash()) + // Broadcast to peers + pm.BroadcastNotaryNodeInfo(&info) + + pm.notarySetManager.TryAddInfo(&info) default: return errResp(ErrInvalidMsgCode, "%v", msg.Code) } @@ -735,6 +769,14 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) { } } +// BroadcastNotaryNodeInfo will propagate notary node info to its peers. +func (pm *ProtocolManager) BroadcastNotaryNodeInfo(info *notaryNodeInfo) { + peers := pm.peers.PeersWithoutNotaryNodeInfo(info.Hash()) + for _, peer := range peers { + peer.AsyncSendNotaryNodeInfo(info) + } +} + // Mined broadcast loop func (pm *ProtocolManager) minedBroadcastLoop() { // automatically stops if unsubscribe @@ -759,6 +801,100 @@ func (pm *ProtocolManager) txBroadcastLoop() { } } +// a loop keep building and maintaining peers in notary set. +func (pm *ProtocolManager) notarySetLoop() { + // TODO: + // * pm need to know current round, and whether we are in notary set. + // * (done) able to handle node info in the near future round. + // * check peer limit. + // * revisit channels (newNotarySetCh, newRoundCh, newNotaryNodeInfoCh) (buffered ?) + + // If we are in notary set and we are synced with network. + // events: + // * new notary set is determined, and we are in notary set. + // (TBD: subscribe the event or polling govornance) + // - advance round + // - build new notary set + // - remove old notary set, remove old notary set from static + + // * current round node info changed. + + self := pm.srvr.Self() + var round uint64 + + for { + select { + case event := <-pm.newNotarySetCh: + // new notary set is determined. + if _, ok := event.Set[self.ID.String()]; ok { + // initialize the new notary set and add it to pm.notarySetManager + s := newNotarySet(event.Round, event.Set) + pm.notarySetManager.Register(event.Round, s) + + // TODO: handle signature + pm.BroadcastNotaryNodeInfo(¬aryNodeInfo{ + ID: self.ID, + IP: self.IP, + UDP: self.UDP, + TCP: self.TCP, + Round: event.Round, + Timestamp: time.Now().Unix(), + }) + } + + case r := <-pm.newRoundCh: + // move to new round. + round = r + + // TODO: revisit this to make sure how many previous round's + // notary set we need to keep. + + // rmove notary set before current round, they are useless + for _, s := range pm.notarySetManager.Before(round) { + pm.removeNotarySetFromStatic(s) + } + + // prepare notary set connections for new round. + if pm.isInNotarySet(round + 1) { + // we assume the notary set for round + 1 is added to + // pm.notarySetManager before this step. + if n, ok := pm.notarySetManager.Round(round + 1); ok { + pm.addNotarySetToStatic(n) + } + } + + case <-pm.newNotaryNodeInfoCh: + // some node info in "current round" is updated. + // try to connect them by the new info. + if pm.isInNotarySet(round) { + if n, ok := pm.notarySetManager.Round(round); ok { + pm.addNotarySetToStatic(n) + } + } + + case <-pm.quitSync: + return + } + } +} + +func (pm *ProtocolManager) isInNotarySet(r uint64) bool { + return false +} + +func (pm *ProtocolManager) addNotarySetToStatic(n *notarySet) { + for _, node := range n.NodesToAdd() { + pm.srvr.AddNotaryPeer(node) + n.MarkAdded(node.ID.String()) + } +} + +func (pm *ProtocolManager) removeNotarySetFromStatic(n *notarySet) { + for _, node := range n.Nodes() { + pm.srvr.RemoveNotaryPeer(node) + } +} + // NodeInfo represents a short summary of the Ethereum sub-protocol metadata // known about the host peer. type NodeInfo struct { diff --git a/dex/helper_test.go b/dex/helper_test.go index 13d2bbaec..8836b31da 100644 --- a/dex/helper_test.go +++ b/dex/helper_test.go @@ -46,6 +46,28 @@ var ( testBank = crypto.PubkeyToAddress(testBankKey.PublicKey) ) +// testP2PServer is a fake, helper p2p server for testing purposes. +type testP2PServer struct { + added chan *discover.Node + removed chan *discover.Node +} + +func (s *testP2PServer) Self() *discover.Node { + return &discover.Node{} +} + +func (s *testP2PServer) AddNotaryPeer(node *discover.Node) { + if s.added != nil { + s.added <- node + } +} + +func (s *testP2PServer) RemoveNotaryPeer(node *discover.Node) { + if s.removed != nil { + s.removed <- node + } +} + // newTestProtocolManager creates a new protocol manager for testing purposes, // with the given number of blocks already known, and potential notification // channels for different events. @@ -70,7 +92,7 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func if err != nil { return nil, nil, err } - pm.Start(1000) + pm.Start(&testP2PServer{}, 1000) return pm, db, nil } diff --git a/dex/protocol.go b/dex/protocol.go index 6452d854a..8aa16db2f 100644 --- a/dex/protocol.go +++ b/dex/protocol.go @@ -22,12 +22,13 @@ import ( "math/big" "net" - "github.com/dexon-foundation/dexon/crypto/sha3" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto/sha3" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/rlp" ) @@ -113,6 +114,14 @@ type txPool interface { SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription } +type p2pServer interface { + Self() *enode.Node + + AddNotaryPeer(*discover.Node) + + RemoveNotaryPeer(*discover.Node) +} + // statusData is the network packet for the status message. type statusData struct { ProtocolVersion uint32 |