aboutsummaryrefslogtreecommitdiffstats
path: root/whisper/peer.go
blob: 332ddd22a43cac37e730601c600482637fb65fd2 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package whisper

import (
    "fmt"
    "io/ioutil"
    "time"

    "github.com/ethereum/go-ethereum/p2p"
    "gopkg.in/fatih/set.v0"
)

const (
    protocolVersion = 0x02
)

type peer struct {
    host *Whisper
    peer *p2p.Peer
    ws   p2p.MsgReadWriter

    // XXX Eventually this is going to reach exceptional large space. We need an expiry here
    known *set.Set

    quit chan struct{}
}

func NewPeer(host *Whisper, p *p2p.Peer, ws p2p.MsgReadWriter) *peer {
    return &peer{host, p, ws, set.New(), make(chan struct{})}
}

func (self *peer) init() error {
    if err := self.handleStatus(); err != nil {
        return err
    }

    return nil
}

func (self *peer) start() {
    go self.update()
    self.peer.Infoln("whisper started")
}

func (self *peer) stop() {
    self.peer.Infoln("whisper stopped")

    close(self.quit)
}

func (self *peer) update() {
    relay := time.NewTicker(300 * time.Millisecond)
out:
    for {
        select {
        case <-relay.C:
            err := self.broadcast(self.host.envelopes())
            if err != nil {
                self.peer.Infoln("broadcast err:", err)
                break out
            }

        case <-self.quit:
            break out
        }
    }
}

func (self *peer) broadcast(envelopes []*Envelope) error {
    envs := make([]interface{}, len(envelopes))
    i := 0
    for _, envelope := range envelopes {
        if !self.known.Has(envelope.Hash()) {
            envs[i] = envelope
            self.known.Add(envelope.Hash())
            i++
        }
    }

    if i > 0 {
        msg := p2p.NewMsg(envelopesMsg, envs[:i]...)
        if err := self.ws.WriteMsg(msg); err != nil {
            return err
        }
        self.peer.DebugDetailln("broadcasted", i, "message(s)")
    }

    return nil
}

func (self *peer) addKnown(envelope *Envelope) {
    self.known.Add(envelope.Hash())
}

func (self *peer) handleStatus() error {
    ws := self.ws

    if err := ws.WriteMsg(self.statusMsg()); err != nil {
        return err
    }

    msg, err := ws.ReadMsg()
    if err != nil {
        return err
    }

    if msg.Code != statusMsg {
        return fmt.Errorf("peer send %x before status msg", msg.Code)
    }

    data, err := ioutil.ReadAll(msg.Payload)
    if err != nil {
        return err
    }

    if len(data) == 0 {
        return fmt.Errorf("malformed status. data len = 0")
    }

    if pv := data[0]; pv != protocolVersion {
        return fmt.Errorf("protocol version mismatch %d != %d", pv, protocolVersion)
    }

    return nil
}

func (self *peer) statusMsg() p2p.Msg {
    return p2p.NewMsg(statusMsg, protocolVersion)
}