diff options
author | Péter Szilágyi <peterke@gmail.com> | 2015-04-14 19:28:59 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2015-04-14 19:28:59 +0800 |
commit | e2b7498c9dca34a1ebe29ba2eb6d5e1e2b48df5a (patch) | |
tree | 5698e5ef338da044d4781f4aba0401b24586cb0c /whisper/peer.go | |
parent | 59bff465053312013253f8c4288b6fb0c1e3e4e1 (diff) | |
download | go-tangerine-e2b7498c9dca34a1ebe29ba2eb6d5e1e2b48df5a.tar go-tangerine-e2b7498c9dca34a1ebe29ba2eb6d5e1e2b48df5a.tar.gz go-tangerine-e2b7498c9dca34a1ebe29ba2eb6d5e1e2b48df5a.tar.bz2 go-tangerine-e2b7498c9dca34a1ebe29ba2eb6d5e1e2b48df5a.tar.lz go-tangerine-e2b7498c9dca34a1ebe29ba2eb6d5e1e2b48df5a.tar.xz go-tangerine-e2b7498c9dca34a1ebe29ba2eb6d5e1e2b48df5a.tar.zst go-tangerine-e2b7498c9dca34a1ebe29ba2eb6d5e1e2b48df5a.zip |
whisper: add known message expiration to peers, cleanup
Diffstat (limited to 'whisper/peer.go')
-rw-r--r-- | whisper/peer.go | 163 |
1 files changed, 106 insertions, 57 deletions
diff --git a/whisper/peer.go b/whisper/peer.go index e50c9ec37..f077dbe70 100644 --- a/whisper/peer.go +++ b/whisper/peer.go @@ -4,106 +4,155 @@ import ( "fmt" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rlp" "gopkg.in/fatih/set.v0" ) +// peer represents a whisper protocol peer connection. type peer struct { host *Whisper peer *p2p.Peer ws p2p.MsgReadWriter - // XXX Eventually this is going to reach exceptional large space. We need an expiry here - known *set.Set + known *set.Set // Messages already known by the peer to avoid wasting bandwidth quit chan struct{} } -func NewPeer(host *Whisper, p *p2p.Peer, ws p2p.MsgReadWriter) *peer { - return &peer{host, p, ws, set.New(), make(chan struct{})} -} - -func (self *peer) init() error { - if err := self.handleStatus(); err != nil { - return err +// newPeer creates and initializes a new whisper peer connection, returning either +// the newly constructed link or a failure reason. +func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) (*peer, error) { + p := &peer{ + host: host, + peer: remote, + ws: rw, + known: set.New(), + quit: make(chan struct{}), } - - return nil + if err := p.handshake(); err != nil { + return nil, err + } + return p, nil } +// start initiates the peer updater, periodically broadcasting the whisper packets +// into the network. func (self *peer) start() { go self.update() self.peer.Debugln("whisper started") } +// stop terminates the peer updater, stopping message forwarding to it. func (self *peer) stop() { + close(self.quit) self.peer.Debugln("whisper stopped") +} - close(self.quit) +// handshake sends the protocol initiation status message to the remote peer and +// verifies the remote status too. +func (self *peer) handshake() error { + // Send own status message, fetch remote one + if err := p2p.SendItems(self.ws, statusCode, protocolVersion); err != nil { + return err + } + packet, err := self.ws.ReadMsg() + if err != nil { + return err + } + if packet.Code != statusCode { + return fmt.Errorf("peer sent %x before status packet", packet.Code) + } + // Decode the rest of the status packet and verify protocol match + s := rlp.NewStream(packet.Payload) + if _, err := s.List(); err != nil { + return fmt.Errorf("bad status message: %v", err) + } + peerVersion, err := s.Uint() + if err != nil { + return fmt.Errorf("bad status message: %v", err) + } + if peerVersion != protocolVersion { + return fmt.Errorf("protocol version mismatch %d != %d", peerVersion, protocolVersion) + } + return packet.Discard() // ignore anything after protocol version } +// update executes periodic operations on the peer, including message transmission +// and expiration. func (self *peer) update() { - relay := time.NewTicker(300 * time.Millisecond) -out: + // Start the tickers for the updates + expire := time.NewTicker(expirationTicks) + transmit := time.NewTicker(transmissionTicks) + + // Loop and transmit until termination is requested for { select { - case <-relay.C: - err := self.broadcast(self.host.envelopes()) - if err != nil { - self.peer.Infoln("broadcast err:", err) - break out + case <-expire.C: + self.expire() + + case <-transmit.C: + if err := self.broadcast(); err != nil { + self.peer.Infoln("broadcast failed:", err) + return } case <-self.quit: - break out + return } } } -func (self *peer) broadcast(envelopes []*Envelope) error { - envs := make([]*Envelope, 0, len(envelopes)) - for _, env := range envelopes { - if !self.known.Has(env.Hash()) { - envs = append(envs, env) - self.known.Add(env.Hash()) - } - } - if len(envs) > 0 { - if err := p2p.Send(self.ws, envelopesMsg, envs); err != nil { - return err - } - self.peer.DebugDetailln("broadcasted", len(envs), "message(s)") - } - return nil +// mark marks an envelope known to the peer so that it won't be sent back. +func (self *peer) mark(envelope *Envelope) { + self.known.Add(envelope.Hash()) } -func (self *peer) addKnown(envelope *Envelope) { - self.known.Add(envelope.Hash()) +// marked checks if an envelope is already known to the remote peer. +func (self *peer) marked(envelope *Envelope) bool { + return self.known.Has(envelope.Hash()) } -func (self *peer) handleStatus() error { - ws := self.ws - if err := p2p.SendItems(ws, statusMsg, protocolVersion); err != nil { - return err - } - msg, err := ws.ReadMsg() - if err != nil { - return err - } - if msg.Code != statusMsg { - return fmt.Errorf("peer send %x before status msg", msg.Code) +// expire iterates over all the known envelopes in the host and removes all +// expired (unknown) ones from the known list. +func (self *peer) expire() { + // Assemble the list of available envelopes + available := set.NewNonTS() + for _, envelope := range self.host.envelopes() { + available.Add(envelope.Hash()) } - s := rlp.NewStream(msg.Payload) - if _, err := s.List(); err != nil { - return fmt.Errorf("bad status message: %v", err) + // Cross reference availability with known status + unmark := make(map[common.Hash]struct{}) + self.known.Each(func(v interface{}) bool { + if !available.Has(v.(common.Hash)) { + unmark[v.(common.Hash)] = struct{}{} + } + return true + }) + // Dump all known but unavailable + for hash, _ := range unmark { + self.known.Remove(hash) } - pv, err := s.Uint() - if err != nil { - return fmt.Errorf("bad status message: %v", err) +} + +// broadcast iterates over the collection of envelopes and transmits yet unknown +// ones over the network. +func (self *peer) broadcast() error { + // Fetch the envelopes and collect the unknown ones + envelopes := self.host.envelopes() + transmit := make([]*Envelope, 0, len(envelopes)) + for _, envelope := range envelopes { + if !self.marked(envelope) { + transmit = append(transmit, envelope) + self.mark(envelope) + } } - if pv != protocolVersion { - return fmt.Errorf("protocol version mismatch %d != %d", pv, protocolVersion) + // Transmit the unknown batch (potentially empty) + if err := p2p.Send(self.ws, messagesCode, transmit); err != nil { + return err } - return msg.Discard() // ignore anything after protocol version + self.peer.DebugDetailln("broadcasted", len(transmit), "message(s)") + + return nil } |