aboutsummaryrefslogtreecommitdiffstats
path: root/lds/txrelay.go
diff options
context:
space:
mode:
Diffstat (limited to 'lds/txrelay.go')
-rw-r--r--lds/txrelay.go175
1 files changed, 175 insertions, 0 deletions
diff --git a/lds/txrelay.go b/lds/txrelay.go
new file mode 100644
index 000000000..d1fe80dc8
--- /dev/null
+++ b/lds/txrelay.go
@@ -0,0 +1,175 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package lds
+
+import (
+ "sync"
+
+ "github.com/dexon-foundation/dexon/common"
+ "github.com/dexon-foundation/dexon/core/types"
+)
+
+type ltrInfo struct {
+ tx *types.Transaction
+ sentTo map[*peer]struct{}
+}
+
+type LesTxRelay struct {
+ txSent map[common.Hash]*ltrInfo
+ txPending map[common.Hash]struct{}
+ ps *peerSet
+ peerList []*peer
+ peerStartPos int
+ lock sync.RWMutex
+
+ reqDist *requestDistributor
+}
+
+func NewLesTxRelay(ps *peerSet, reqDist *requestDistributor) *LesTxRelay {
+ r := &LesTxRelay{
+ txSent: make(map[common.Hash]*ltrInfo),
+ txPending: make(map[common.Hash]struct{}),
+ ps: ps,
+ reqDist: reqDist,
+ }
+ ps.notify(r)
+ return r
+}
+
+func (self *LesTxRelay) registerPeer(p *peer) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+
+ self.peerList = self.ps.AllPeers()
+}
+
+func (self *LesTxRelay) unregisterPeer(p *peer) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+
+ self.peerList = self.ps.AllPeers()
+}
+
+// send sends a list of transactions to at most a given number of peers at
+// once, never resending any particular transaction to the same peer twice
+func (self *LesTxRelay) send(txs types.Transactions, count int) {
+ sendTo := make(map[*peer]types.Transactions)
+
+ self.peerStartPos++ // rotate the starting position of the peer list
+ if self.peerStartPos >= len(self.peerList) {
+ self.peerStartPos = 0
+ }
+
+ for _, tx := range txs {
+ hash := tx.Hash()
+ ltr, ok := self.txSent[hash]
+ if !ok {
+ ltr = &ltrInfo{
+ tx: tx,
+ sentTo: make(map[*peer]struct{}),
+ }
+ self.txSent[hash] = ltr
+ self.txPending[hash] = struct{}{}
+ }
+
+ if len(self.peerList) > 0 {
+ cnt := count
+ pos := self.peerStartPos
+ for {
+ peer := self.peerList[pos]
+ if _, ok := ltr.sentTo[peer]; !ok {
+ sendTo[peer] = append(sendTo[peer], tx)
+ ltr.sentTo[peer] = struct{}{}
+ cnt--
+ }
+ if cnt == 0 {
+ break // sent it to the desired number of peers
+ }
+ pos++
+ if pos == len(self.peerList) {
+ pos = 0
+ }
+ if pos == self.peerStartPos {
+ break // tried all available peers
+ }
+ }
+ }
+ }
+
+ for p, list := range sendTo {
+ pp := p
+ ll := list
+
+ reqID := genReqID()
+ rq := &distReq{
+ getCost: func(dp distPeer) uint64 {
+ peer := dp.(*peer)
+ return peer.GetRequestCost(SendTxMsg, len(ll))
+ },
+ canSend: func(dp distPeer) bool {
+ return dp.(*peer) == pp
+ },
+ request: func(dp distPeer) func() {
+ peer := dp.(*peer)
+ cost := peer.GetRequestCost(SendTxMsg, len(ll))
+ peer.fcServer.QueueRequest(reqID, cost)
+ return func() { peer.SendTxs(reqID, cost, ll) }
+ },
+ }
+ self.reqDist.queue(rq)
+ }
+}
+
+func (self *LesTxRelay) Send(txs types.Transactions) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+
+ self.send(txs, 3)
+}
+
+func (self *LesTxRelay) NewHead(head common.Hash, mined []common.Hash, rollback []common.Hash) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+
+ for _, hash := range mined {
+ delete(self.txPending, hash)
+ }
+
+ for _, hash := range rollback {
+ self.txPending[hash] = struct{}{}
+ }
+
+ if len(self.txPending) > 0 {
+ txs := make(types.Transactions, len(self.txPending))
+ i := 0
+ for hash := range self.txPending {
+ txs[i] = self.txSent[hash].tx
+ i++
+ }
+ self.send(txs, 1)
+ }
+}
+
+func (self *LesTxRelay) Discard(hashes []common.Hash) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+
+ for _, hash := range hashes {
+ delete(self.txSent, hash)
+ delete(self.txPending, hash)
+ }
+}