aboutsummaryrefslogtreecommitdiffstats
path: root/dex/handler.go
diff options
context:
space:
mode:
authorSonic <sonic@cobinhood.com>2018-10-12 15:02:33 +0800
committerWei-Ning Huang <w@dexon.org>2018-12-19 20:54:27 +0800
commita468fd43993733d7ad89c14d038d91e88e12f611 (patch)
tree8aef4eb4e6c587edcdff15e85094c15f9e4322bd /dex/handler.go
parent0f649b2cb78a3f35b870b2db94a92ec9297886b6 (diff)
downloaddexon-a468fd43993733d7ad89c14d038d91e88e12f611.tar
dexon-a468fd43993733d7ad89c14d038d91e88e12f611.tar.gz
dexon-a468fd43993733d7ad89c14d038d91e88e12f611.tar.bz2
dexon-a468fd43993733d7ad89c14d038d91e88e12f611.tar.lz
dexon-a468fd43993733d7ad89c14d038d91e88e12f611.tar.xz
dexon-a468fd43993733d7ad89c14d038d91e88e12f611.tar.zst
dexon-a468fd43993733d7ad89c14d038d91e88e12f611.zip
dex: network: implement the network interface
Diffstat (limited to 'dex/handler.go')
-rw-r--r--dex/handler.go116
1 files changed, 116 insertions, 0 deletions
diff --git a/dex/handler.go b/dex/handler.go
index 67cbe8a63..e013b9722 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -26,6 +26,9 @@ import (
"sync/atomic"
"time"
+ "github.com/dexon-foundation/dexon-consensus-core/core/crypto"
+ coreTypes "github.com/dexon-foundation/dexon-consensus-core/core/types"
+
"github.com/dexon-foundation/dexon/common"
"github.com/dexon-foundation/dexon/consensus"
"github.com/dexon-foundation/dexon/core"
@@ -97,6 +100,9 @@ type ProtocolManager struct {
crsCh chan core.NewCRSEvent
crsSub event.Subscription
+ // channels for dexon consensus core
+ receiveCh chan interface{}
+
srvr p2pServer
// wait group is used for graceful shutdowns during downloading
@@ -125,6 +131,7 @@ func NewProtocolManager(
noMorePeers: make(chan struct{}),
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
+ receiveCh: make(chan interface{}, 1024),
}
// Figure out whether to allow fast sync or not
@@ -267,6 +274,10 @@ func (pm *ProtocolManager) Stop() {
log.Info("Ethereum protocol stopped")
}
+func (pm *ProtocolManager) ReceiveChan() <-chan interface{} {
+ return pm.receiveCh
+}
+
func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
return newPeer(pv, p, newMeteredMsgWriter(rw))
}
@@ -666,6 +677,47 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
p.MarkNodeMeta(meta.Hash())
}
pm.nodeTable.Add(metas)
+ case msg.Code == LatticeBlockMsg:
+ var rb rlpLatticeBlock
+ if err := msg.Decode(&rb); err != nil {
+ fmt.Println("decode lattice block error", err)
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ pm.receiveCh <- fromRLPLatticeBlock(&rb)
+ case msg.Code == VoteMsg:
+ var vote coreTypes.Vote
+ if err := msg.Decode(&vote); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ pm.receiveCh <- &vote
+ case msg.Code == AgreementMsg:
+ // DKG set is receiver
+ var agreement coreTypes.AgreementResult
+ if err := msg.Decode(&agreement); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ pm.receiveCh <- &agreement
+ case msg.Code == RandomnessMsg:
+ // Broadcast this to all peer
+ var randomness coreTypes.BlockRandomnessResult
+ if err := msg.Decode(&randomness); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ pm.receiveCh <- &randomness
+ case msg.Code == DKGPrivateShareMsg:
+ // Do not relay this msg
+ var rps rlpDKGPrivateShare
+ if err := msg.Decode(&rps); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ pm.receiveCh <- fromRLPDKGPrivateShare(&rps)
+ case msg.Code == DKGPartialSignatureMsg:
+ // broadcast in DKG set
+ var psig coreTypes.DKGPartialSignature
+ if err := msg.Decode(&psig); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ pm.receiveCh <- &psig
default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
}
@@ -741,6 +793,68 @@ func (pm *ProtocolManager) BroadcastMetas(metas []*NodeMeta) {
}
}
+func (pm *ProtocolManager) BroadcastVote(vote *coreTypes.Vote) {
+ label := peerLabel{
+ set: notaryset,
+ chainID: vote.Position.ChainID,
+ round: vote.Position.Round,
+ }
+
+ for _, peer := range pm.peers.PeersWithoutVote(rlpHash(vote), label) {
+ peer.AsyncSendVote(vote)
+ }
+}
+
+// TODO(sonic): try to reduce traffic
+func (pm *ProtocolManager) BroadcastLatticeBlock(block *coreTypes.Block) {
+ hash := rlpHash(toRLPLatticeBlock(block))
+ for _, peer := range pm.peers.PeersWithoutLatticeBlock(hash) {
+ peer.AsyncSendLatticeBlock(block)
+ }
+}
+
+// TODO(sonic): try to reduce traffic
+func (pm *ProtocolManager) SendDKGPrivateShare(
+ pub crypto.PublicKey, privateShare *coreTypes.DKGPrivateShare) {
+ id := discover.MustBytesID(pub.Bytes()[1:])
+ if p := pm.peers.Peer(id.String()); p != nil {
+ p.AsyncSendDKGPrivateShare(privateShare)
+ }
+}
+
+// TODO(sonic): try to reduce traffic
+func (pm *ProtocolManager) BroadcastDKGPrivateShare(
+ privateShare *coreTypes.DKGPrivateShare) {
+ for _, peer := range pm.peers.allPeers() {
+ peer.AsyncSendDKGPrivateShare(privateShare)
+ }
+}
+
+// TODO(sonic): try to reduce traffic
+func (pm *ProtocolManager) BroadcastAgreementResult(
+ agreement *coreTypes.AgreementResult) {
+ for _, peer := range pm.peers.PeersWithoutAgreement(rlpHash(agreement)) {
+ peer.AsyncSendAgreement(agreement)
+ }
+}
+
+// TODO(sonic): try to reduce traffic
+func (pm *ProtocolManager) BroadcastRandomnessResult(
+ randomness *coreTypes.BlockRandomnessResult) {
+ // random pick n peers
+ for _, peer := range pm.peers.PeersWithoutRandomness(rlpHash(randomness)) {
+ peer.AsyncSendRandomness(randomness)
+ }
+}
+
+// TODO(sonic): try to reduce traffic
+func (pm *ProtocolManager) BroadcastDKGPartialSignature(
+ psig *coreTypes.DKGPartialSignature) {
+ for _, peer := range pm.peers.PeersWithoutDKGPartialSignature(rlpHash(psig)) {
+ peer.AsyncSendDKGPartialSignature(psig)
+ }
+}
+
// Mined broadcast loop
func (pm *ProtocolManager) minedBroadcastLoop() {
// automatically stops if unsubscribe
@@ -781,6 +895,8 @@ func (pm *ProtocolManager) metaBroadcastLoop() {
// a loop keep building and maintaining peers in notary set.
// TODO: finish this
func (pm *ProtocolManager) peerSetLoop() {
+
+ log.Debug("start peer set loop")
for {
select {
case event := <-pm.crsCh: