From 94bb940818d9b5c6d654da14f13918e65cf84623 Mon Sep 17 00:00:00 2001
From: Sonic <sonic@cobinhood.com>
Date: Tue, 25 Sep 2018 16:53:56 +0800
Subject: dex: implement notary node info propagation and management mechanism

---
 dex/handler.go     | 164 ++++++++++++++++++++++++++++++++++++++++++++++++-----
 dex/helper_test.go |  24 +++++++-
 dex/protocol.go    |  11 +++-
 3 files changed, 183 insertions(+), 16 deletions(-)

(limited to 'dex')

diff --git a/dex/handler.go b/dex/handler.go
index 21609a561..96d20b02b 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -59,6 +59,11 @@ var (
 // not compatible (low protocol version restrictions and high requirements).
 var errIncompatibleConfig = errors.New("incompatible configuration")
 
+type newNotarySetEvent struct {
+	Round uint64
+	Set   map[string]struct{}
+}
+
 func errResp(code errCode, format string, v ...interface{}) error {
 	return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
 }
@@ -74,9 +79,10 @@ type ProtocolManager struct {
 	chainconfig *params.ChainConfig
 	maxPeers    int
 
-	downloader *downloader.Downloader
-	fetcher    *fetcher.Fetcher
-	peers      *peerSet
+	downloader       *downloader.Downloader
+	fetcher          *fetcher.Fetcher
+	peers            *peerSet
+	notarySetManager *notarySetManager
 
 	SubProtocols []p2p.Protocol
 
@@ -91,6 +97,13 @@ type ProtocolManager struct {
 	quitSync    chan struct{}
 	noMorePeers chan struct{}
 
+	// channels for notarySetLoop
+	newNotarySetCh      chan newNotarySetEvent
+	newRoundCh          chan uint64
+	newNotaryNodeInfoCh chan *notaryNodeInfo
+
+	srvr p2pServer
+
 	// wait group is used for graceful shutdowns during downloading
 	// and processing
 	wg sync.WaitGroup
@@ -101,17 +114,22 @@ type ProtocolManager struct {
 func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
 	// Create the protocol manager with the base fields
 	manager := &ProtocolManager{
-		networkID:   networkID,
-		eventMux:    mux,
-		txpool:      txpool,
-		blockchain:  blockchain,
-		chainconfig: config,
-		peers:       newPeerSet(),
-		newPeerCh:   make(chan *peer),
-		noMorePeers: make(chan struct{}),
-		txsyncCh:    make(chan *txsync),
-		quitSync:    make(chan struct{}),
+		networkID:           networkID,
+		eventMux:            mux,
+		txpool:              txpool,
+		blockchain:          blockchain,
+		chainconfig:         config,
+		peers:               newPeerSet(),
+		newPeerCh:           make(chan *peer),
+		noMorePeers:         make(chan struct{}),
+		txsyncCh:            make(chan *txsync),
+		quitSync:            make(chan struct{}),
+		newNotarySetCh:      make(chan newNotarySetEvent),
+		newRoundCh:          make(chan uint64),
+		newNotaryNodeInfoCh: make(chan *notaryNodeInfo),
 	}
+	manager.notarySetManager = newNotarySetManager(manager.newNotaryNodeInfoCh)
+
 	// Figure out whether to allow fast sync or not
 	if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
 		log.Warn("Blockchain not empty, fast sync disabled")
@@ -195,8 +213,9 @@ func (pm *ProtocolManager) removePeer(id string) {
 	}
 }
 
-func (pm *ProtocolManager) Start(maxPeers int) {
+func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
 	pm.maxPeers = maxPeers
+	pm.srvr = srvr
 
 	// broadcast transactions
 	pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
@@ -207,6 +226,9 @@ func (pm *ProtocolManager) Start(maxPeers int) {
 	pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
 	go pm.minedBroadcastLoop()
 
+	// run the notary set loop
+	go pm.notarySetLoop()
+
 	// start sync handlers
 	go pm.syncer()
 	go pm.txsyncLoop()
@@ -677,6 +699,18 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 		}
 		pm.txpool.AddRemotes(txs)
 
+	case msg.Code == NotaryNodeInfoMsg:
+		var info notaryNodeInfo
+		if err := msg.Decode(&info); err != nil {
+			return errResp(ErrDecode, "msg %v: %v", msg, err)
+		}
+
+		// TODO(sonic): validate signature
+		p.MarkNotaryNodeInfo(info.Hash())
+		// Broadcast to peers
+		pm.BroadcastNotaryNodeInfo(&info)
+
+		pm.notarySetManager.TryAddInfo(&info)
 	default:
 		return errResp(ErrInvalidMsgCode, "%v", msg.Code)
 	}
@@ -735,6 +769,14 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
 	}
 }
 
+// BroadcastNotaryNodeInfo will propagate notary node info to its peers.
+func (pm *ProtocolManager) BroadcastNotaryNodeInfo(info *notaryNodeInfo) {
+	peers := pm.peers.PeersWithoutNotaryNodeInfo(info.Hash())
+	for _, peer := range peers {
+		peer.AsyncSendNotaryNodeInfo(info)
+	}
+}
+
 // Mined broadcast loop
 func (pm *ProtocolManager) minedBroadcastLoop() {
 	// automatically stops if unsubscribe
@@ -759,6 +801,100 @@ func (pm *ProtocolManager) txBroadcastLoop() {
 	}
 }
 
+// a loop keep building and maintaining peers in notary set.
+func (pm *ProtocolManager) notarySetLoop() {
+	// TODO:
+	//   * pm need to know current round, and whether we are in notary set.
+	//   * (done) able to handle node info in the near future round.
+	//   * check peer limit.
+	//   * revisit channels (newNotarySetCh, newRoundCh, newNotaryNodeInfoCh) (buffered ?)
+
+	// If we are in notary set and we are synced with network.
+	// events:
+	//   * new notary set is determined, and we are in notary set.
+	//     (TBD: subscribe the event or polling govornance)
+	//     - advance round
+	//     - build new notary set
+	//     - remove old notary set, remove old notary set from static
+
+	//   * current round node info changed.
+
+	self := pm.srvr.Self()
+	var round uint64
+
+	for {
+		select {
+		case event := <-pm.newNotarySetCh:
+			// new notary set is determined.
+			if _, ok := event.Set[self.ID.String()]; ok {
+				// initialize the new notary set and add it to pm.notarySetManager
+				s := newNotarySet(event.Round, event.Set)
+				pm.notarySetManager.Register(event.Round, s)
+
+				// TODO: handle signature
+				pm.BroadcastNotaryNodeInfo(&notaryNodeInfo{
+					ID:        self.ID,
+					IP:        self.IP,
+					UDP:       self.UDP,
+					TCP:       self.TCP,
+					Round:     event.Round,
+					Timestamp: time.Now().Unix(),
+				})
+			}
+
+		case r := <-pm.newRoundCh:
+			// move to new round.
+			round = r
+
+			// TODO: revisit this to make sure how many previous round's
+			// notary set we need to keep.
+
+			// rmove notary set before current round, they are useless
+			for _, s := range pm.notarySetManager.Before(round) {
+				pm.removeNotarySetFromStatic(s)
+			}
+
+			// prepare notary set connections for new round.
+			if pm.isInNotarySet(round + 1) {
+				// we assume the notary set for round + 1 is added to
+				// pm.notarySetManager before this step.
+				if n, ok := pm.notarySetManager.Round(round + 1); ok {
+					pm.addNotarySetToStatic(n)
+				}
+			}
+
+		case <-pm.newNotaryNodeInfoCh:
+			// some node info in "current round" is updated.
+			// try to connect them by the new info.
+			if pm.isInNotarySet(round) {
+				if n, ok := pm.notarySetManager.Round(round); ok {
+					pm.addNotarySetToStatic(n)
+				}
+			}
+
+		case <-pm.quitSync:
+			return
+		}
+	}
+}
+
+func (pm *ProtocolManager) isInNotarySet(r uint64) bool {
+	return false
+}
+
+func (pm *ProtocolManager) addNotarySetToStatic(n *notarySet) {
+	for _, node := range n.NodesToAdd() {
+		pm.srvr.AddNotaryPeer(node)
+		n.MarkAdded(node.ID.String())
+	}
+}
+
+func (pm *ProtocolManager) removeNotarySetFromStatic(n *notarySet) {
+	for _, node := range n.Nodes() {
+		pm.srvr.RemoveNotaryPeer(node)
+	}
+}
+
 // NodeInfo represents a short summary of the Ethereum sub-protocol metadata
 // known about the host peer.
 type NodeInfo struct {
diff --git a/dex/helper_test.go b/dex/helper_test.go
index 13d2bbaec..8836b31da 100644
--- a/dex/helper_test.go
+++ b/dex/helper_test.go
@@ -46,6 +46,28 @@ var (
 	testBank       = crypto.PubkeyToAddress(testBankKey.PublicKey)
 )
 
+// testP2PServer is a fake, helper p2p server for testing purposes.
+type testP2PServer struct {
+	added   chan *discover.Node
+	removed chan *discover.Node
+}
+
+func (s *testP2PServer) Self() *discover.Node {
+	return &discover.Node{}
+}
+
+func (s *testP2PServer) AddNotaryPeer(node *discover.Node) {
+	if s.added != nil {
+		s.added <- node
+	}
+}
+
+func (s *testP2PServer) RemoveNotaryPeer(node *discover.Node) {
+	if s.removed != nil {
+		s.removed <- node
+	}
+}
+
 // newTestProtocolManager creates a new protocol manager for testing purposes,
 // with the given number of blocks already known, and potential notification
 // channels for different events.
@@ -70,7 +92,7 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func
 	if err != nil {
 		return nil, nil, err
 	}
-	pm.Start(1000)
+	pm.Start(&testP2PServer{}, 1000)
 	return pm, db, nil
 }
 
diff --git a/dex/protocol.go b/dex/protocol.go
index 6452d854a..8aa16db2f 100644
--- a/dex/protocol.go
+++ b/dex/protocol.go
@@ -22,12 +22,13 @@ import (
 	"math/big"
 	"net"
 
-	"github.com/dexon-foundation/dexon/crypto/sha3"
 	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/core"
 	"github.com/ethereum/go-ethereum/core/types"
+	"github.com/ethereum/go-ethereum/crypto/sha3"
 	"github.com/ethereum/go-ethereum/event"
 	"github.com/ethereum/go-ethereum/p2p/discover"
+	"github.com/ethereum/go-ethereum/p2p/enode"
 	"github.com/ethereum/go-ethereum/rlp"
 )
 
@@ -113,6 +114,14 @@ type txPool interface {
 	SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
 }
 
+type p2pServer interface {
+	Self() *enode.Node
+
+	AddNotaryPeer(*discover.Node)
+
+	RemoveNotaryPeer(*discover.Node)
+}
+
 // statusData is the network packet for the status message.
 type statusData struct {
 	ProtocolVersion uint32
-- 
cgit v1.2.3