aboutsummaryrefslogtreecommitdiffstats
path: root/les/txrelay.go
diff options
context:
space:
mode:
Diffstat (limited to 'les/txrelay.go')
-rw-r--r--les/txrelay.go156
1 files changed, 156 insertions, 0 deletions
diff --git a/les/txrelay.go b/les/txrelay.go
new file mode 100644
index 000000000..2df2fa0a9
--- /dev/null
+++ b/les/txrelay.go
@@ -0,0 +1,156 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+package les
+
+import (
+ "sync"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/types"
+)
+
+type ltrInfo struct {
+ tx *types.Transaction
+ sentTo map[*peer]struct{}
+}
+
+type LesTxRelay struct {
+ txSent map[common.Hash]*ltrInfo
+ txPending map[common.Hash]struct{}
+ ps *peerSet
+ peerList []*peer
+ peerStartPos int
+ lock sync.RWMutex
+}
+
+func NewLesTxRelay() *LesTxRelay {
+ return &LesTxRelay{
+ txSent: make(map[common.Hash]*ltrInfo),
+ txPending: make(map[common.Hash]struct{}),
+ ps: newPeerSet(),
+ }
+}
+
+func (self *LesTxRelay) addPeer(p *peer) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+
+ self.ps.Register(p)
+ self.peerList = self.ps.AllPeers()
+}
+
+func (self *LesTxRelay) removePeer(id string) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+
+ self.ps.Unregister(id)
+ self.peerList = self.ps.AllPeers()
+}
+
+// send sends a list of transactions to at most a given number of peers at
+// once, never resending any particular transaction to the same peer twice
+func (self *LesTxRelay) send(txs types.Transactions, count int) {
+ sendTo := make(map[*peer]types.Transactions)
+
+ self.peerStartPos++ // rotate the starting position of the peer list
+ if self.peerStartPos >= len(self.peerList) {
+ self.peerStartPos = 0
+ }
+
+ for _, tx := range txs {
+ hash := tx.Hash()
+ ltr, ok := self.txSent[hash]
+ if !ok {
+ ltr = &ltrInfo{
+ tx: tx,
+ sentTo: make(map[*peer]struct{}),
+ }
+ self.txSent[hash] = ltr
+ self.txPending[hash] = struct{}{}
+ }
+
+ if len(self.peerList) > 0 {
+ cnt := count
+ pos := self.peerStartPos
+ for {
+ peer := self.peerList[pos]
+ if _, ok := ltr.sentTo[peer]; !ok {
+ sendTo[peer] = append(sendTo[peer], tx)
+ ltr.sentTo[peer] = struct{}{}
+ cnt--
+ }
+ if cnt == 0 {
+ break // sent it to the desired number of peers
+ }
+ pos++
+ if pos == len(self.peerList) {
+ pos = 0
+ }
+ if pos == self.peerStartPos {
+ break // tried all available peers
+ }
+ }
+ }
+ }
+
+ for p, list := range sendTo {
+ cost := p.GetRequestCost(SendTxMsg, len(list))
+ go func(p *peer, list types.Transactions, cost uint64) {
+ p.fcServer.SendRequest(0, cost)
+ p.SendTxs(cost, list)
+ }(p, list, cost)
+ }
+}
+
+func (self *LesTxRelay) Send(txs types.Transactions) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+
+ self.send(txs, 3)
+}
+
+func (self *LesTxRelay) NewHead(head common.Hash, mined []common.Hash, rollback []common.Hash) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+
+ for _, hash := range mined {
+ delete(self.txPending, hash)
+ }
+
+ for _, hash := range rollback {
+ self.txPending[hash] = struct{}{}
+ }
+
+ if len(self.txPending) > 0 {
+ txs := make(types.Transactions, len(self.txPending))
+ i := 0
+ for hash, _ := range self.txPending {
+ txs[i] = self.txSent[hash].tx
+ i++
+ }
+ self.send(txs, 1)
+ }
+}
+
+func (self *LesTxRelay) Discard(hashes []common.Hash) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+
+ for _, hash := range hashes {
+ delete(self.txSent, hash)
+ delete(self.txPending, hash)
+ }
+}