diff options
Diffstat (limited to 'dex')
-rw-r--r-- | dex/notaryset.go | 203 |
1 files changed, 203 insertions, 0 deletions
diff --git a/dex/notaryset.go b/dex/notaryset.go new file mode 100644 index 000000000..c520aa73c --- /dev/null +++ b/dex/notaryset.go @@ -0,0 +1,203 @@ +package dex + +import ( + "fmt" + "sync" + + "github.com/ethereum/go-ethereum/p2p/discover" +) + +type nodeInfo struct { + info *notaryNodeInfo + added bool +} + +func (n *nodeInfo) NewNode() *discover.Node { + return discover.NewNode(n.info.ID, n.info.IP, n.info.UDP, n.info.TCP) +} + +type notarySet struct { + round uint64 + m map[string]*nodeInfo + lock sync.RWMutex +} + +func newNotarySet(round uint64, s map[string]struct{}) *notarySet { + m := make(map[string]*nodeInfo) + for nodeID := range s { + m[nodeID] = &nodeInfo{} + } + + return ¬arySet{ + round: round, + m: m, + } +} + +// Call this function when the notaryNodeInfoMsg is received. +func (n *notarySet) AddInfo(info *notaryNodeInfo) error { + n.lock.Lock() + defer n.lock.Unlock() + + // check round + if info.Round != n.round { + return fmt.Errorf("invalid round") + } + + nInfo, ok := n.m[info.ID.String()] + if !ok { + return fmt.Errorf("not in notary set") + } + + // if the info exists check timstamp + if nInfo.info != nil { + if nInfo.info.Timestamp > info.Timestamp { + return fmt.Errorf("old msg") + } + } + + nInfo.info = info + return nil +} + +// MarkAdded mark the notary node as added +// to prevent duplcate addition in the future. +func (n *notarySet) MarkAdded(nodeID string) { + if info, ok := n.m[nodeID]; ok { + info.added = true + } +} + +// Return all nodes +func (n *notarySet) Nodes() []*discover.Node { + n.lock.RLock() + defer n.lock.RUnlock() + + list := make([]*discover.Node, 0, len(n.m)) + for _, info := range n.m { + list = append(list, info.NewNode()) + } + return list +} + +// Return nodes that need to be added to p2p server as notary node. +func (n *notarySet) NodesToAdd() []*discover.Node { + n.lock.RLock() + defer n.lock.RUnlock() + + var list []*discover.Node + for _, info := range n.m { + // craete a new discover.Node + if !info.added { + list = append(list, info.NewNode()) + } + } + return list +} + +type notarySetManager struct { + m map[uint64]*notarySet + lock sync.RWMutex + queued map[uint64]map[string]*notaryNodeInfo + round uint64 // biggest round of managed notary sets + newNotaryNodeInfoCh chan *notaryNodeInfo +} + +func newNotarySetManager( + newNotaryNodeInfoCh chan *notaryNodeInfo) *notarySetManager { + return ¬arySetManager{ + m: make(map[uint64]*notarySet), + queued: make(map[uint64]map[string]*notaryNodeInfo), + newNotaryNodeInfoCh: newNotaryNodeInfoCh, + } +} + +// Register injects a new notary set into the manager and +// processes the queued info. +func (n *notarySetManager) Register(r uint64, s *notarySet) { + n.lock.Lock() + defer n.lock.Unlock() + if r > n.round { + n.round = r + } + n.m[r] = s + n.processQueuedInfo() +} + +// Unregister removes the notary set of the given round. +func (n *notarySetManager) Unregister(r uint64) { + n.lock.Lock() + defer n.lock.Unlock() + delete(n.m, r) +} + +// Round returns the notary set of the given round. +func (n *notarySetManager) Round(r uint64) (*notarySet, bool) { + n.lock.RLock() + defer n.lock.RUnlock() + s, ok := n.m[r] + return s, ok +} + +// Before returns all the notary sets that before the given round. +func (n *notarySetManager) Before(r uint64) []*notarySet { + n.lock.RLock() + defer n.lock.RUnlock() + var list []*notarySet + for round, s := range n.m { + if round < r { + list = append(list, s) + } + } + return list +} + +// TryAddInfo associates the given info to the notary set if the notary set is +// managed by the manager. +// If the notary node info is belong to future notary set, queue the info. +func (n *notarySetManager) TryAddInfo(info *notaryNodeInfo) { + n.lock.Lock() + defer n.lock.Unlock() + n.tryAddInfo(info) +} + +// This function is extract for calling without lock. +// Make sure the caller already accquired the lock. +func (n *notarySetManager) tryAddInfo(info *notaryNodeInfo) { + if info.Round > n.round { + if q, ok := n.queued[info.Round]; ok { + q[info.Hash().String()] = info + return + } + n.queued[info.Round] = map[string]*notaryNodeInfo{ + info.Hash().String(): info, + } + return + } + + s, ok := n.Round(info.Round) + if !ok { + return + } + s.AddInfo(info) + + // TODO(sonic): handle timeout + n.newNotaryNodeInfoCh <- info +} + +func (n *notarySetManager) processQueuedInfo() { + n.lock.Lock() + defer n.lock.Unlock() + if q, ok := n.queued[n.round]; ok { + for _, info := range q { + n.tryAddInfo(info) + } + } + + // Clear queue infos before current round. + for round := range n.queued { + if round <= n.round { + delete(n.queued, round) + } + } +} |