aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/messenger.go
blob: d42ba1720eba615be87df2ea21a1cad9ac35d222 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
package p2p

import (
    "fmt"
    "sync"
    "time"
)

const (
    handlerTimeout = 1000
)

type Handlers map[string](func(p *Peer) Protocol)

type Messenger struct {
    conn          *Connection
    peer          *Peer
    handlers      Handlers
    protocolLock  sync.RWMutex
    protocols     []Protocol
    offsets       []MsgCode // offsets for adaptive message idss
    protocolTable map[string]int
    quit          chan chan bool
    err           chan *PeerError
    pulse         chan bool
}

func NewMessenger(peer *Peer, conn *Connection, errchan chan *PeerError, handlers Handlers) *Messenger {
    baseProtocol := NewBaseProtocol(peer)
    return &Messenger{
        conn:          conn,
        peer:          peer,
        offsets:       []MsgCode{baseProtocol.Offset()},
        handlers:      handlers,
        protocols:     []Protocol{baseProtocol},
        protocolTable: make(map[string]int),
        err:           errchan,
        pulse:         make(chan bool, 1),
        quit:          make(chan chan bool, 1),
    }
}

func (self *Messenger) Start() {
    self.conn.Open()
    go self.messenger()
    self.protocolLock.RLock()
    defer self.protocolLock.RUnlock()
    self.protocols[0].Start()
}

func (self *Messenger) Stop() {
    // close pulse to stop ping pong monitoring
    close(self.pulse)
    self.protocolLock.RLock()
    defer self.protocolLock.RUnlock()
    for _, protocol := range self.protocols {
        protocol.Stop() // could be parallel
    }
    q := make(chan bool)
    self.quit <- q
    <-q
    self.conn.Close()
}

func (self *Messenger) messenger() {
    in := self.conn.Read()
    for {
        select {
        case payload, ok := <-in:
            //dispatches message to the protocol asynchronously
            if ok {
                go self.handle(payload)
            } else {
                return
            }
        case q := <-self.quit:
            q <- true
            return
        }
    }
}

// handles each message by dispatching to the appropriate protocol
// using adaptive message codes
// this function is started as a separate go routine for each message
// it waits for the protocol response
// then encodes and sends outgoing messages to the connection's write channel
func (self *Messenger) handle(payload []byte) {
    // send ping to heartbeat channel signalling time of last message
    // select {
    // case self.pulse <- true:
    // default:
    // }
    self.pulse <- true
    // initialise message from payload
    msg, err := NewMsgFromBytes(payload)
    if err != nil {
        self.err <- NewPeerError(MiscError, " %v", err)
        return
    }
    // retrieves protocol based on message Code
    protocol, offset, peerErr := self.getProtocol(msg.Code())
    if err != nil {
        self.err <- peerErr
        return
    }
    // reset message code based on adaptive offset
    msg.Decode(offset)
    // dispatches
    response := make(chan *Msg)
    go protocol.HandleIn(msg, response)
    // protocol reponse timeout to prevent leaks
    timer := time.After(handlerTimeout * time.Millisecond)
    for {
        select {
        case outgoing, ok := <-response:
            // we check if response channel is not closed
            if ok {
                self.conn.Write() <- outgoing.Encode(offset)
            } else {
                return
            }
        case <-timer:
            return
        }
    }
}

// negotiated protocols
// stores offsets needed for adaptive message id scheme

// based on offsets set at handshake
// get the right protocol to handle the message
func (self *Messenger) getProtocol(code MsgCode) (Protocol, MsgCode, *PeerError) {
    self.protocolLock.RLock()
    defer self.protocolLock.RUnlock()
    base := MsgCode(0)
    for index, offset := range self.offsets {
        if code < offset {
            return self.protocols[index], base, nil
        }
        base = offset
    }
    return nil, MsgCode(0), NewPeerError(InvalidMsgCode, " %v", code)
}

func (self *Messenger) PingPong(timeout time.Duration, gracePeriod time.Duration, pingCallback func(), timeoutCallback func()) {
    fmt.Printf("pingpong keepalive started at %v", time.Now())

    timer := time.After(timeout)
    pinged := false
    for {
        select {
        case _, ok := <-self.pulse:
            if ok {
                pinged = false
                timer = time.After(timeout)
            } else {
                // pulse is closed, stop monitoring
                return
            }
        case <-timer:
            if pinged {
                fmt.Printf("timeout at %v", time.Now())
                timeoutCallback()
                return
            } else {
                fmt.Printf("pinged at %v", time.Now())
                pingCallback()
                timer = time.After(gracePeriod)
                pinged = true
            }
        }
    }
}

func (self *Messenger) AddProtocols(protocols []string) {
    self.protocolLock.Lock()
    defer self.protocolLock.Unlock()
    i := len(self.offsets)
    offset := self.offsets[i-1]
    for _, name := range protocols {
        protocolFunc, ok := self.handlers[name]
        if ok {
            protocol := protocolFunc(self.peer)
            self.protocolTable[name] = i
            i++
            offset += protocol.Offset()
            fmt.Println("offset ", name, offset)

            self.offsets = append(self.offsets, offset)
            self.protocols = append(self.protocols, protocol)
            protocol.Start()
        } else {
            fmt.Println("no ", name)
            // protocol not handled
        }
    }
}

func (self *Messenger) Write(protocol string, msg *Msg) error {
    self.protocolLock.RLock()
    defer self.protocolLock.RUnlock()
    i := 0
    offset := MsgCode(0)
    if len(protocol) > 0 {
        var ok bool
        i, ok = self.protocolTable[protocol]
        if !ok {
            return fmt.Errorf("protocol %v not handled by peer", protocol)
        }
        offset = self.offsets[i-1]
    }
    handler := self.protocols[i]
    // checking if protocol status/caps allows the message to be sent out
    if handler.HandleOut(msg) {
        self.conn.Write() <- msg.Encode(offset)
    }
    return nil
}