aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/protocol.go
blob: 44787947c3a990d0f0b9ebe6d73b9bda1c8f33fc (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package network

/*
bzz implements the swarm wire protocol [bzz] (sister of eth and shh)
the protocol instance is launched on each peer by the network layer if the
bzz protocol handler is registered on the p2p server.

The bzz protocol component speaks the bzz protocol
* handle the protocol handshake
* register peers in the KΛÐΞMLIΛ table via the hive logistic manager
* dispatch to hive for handling the DHT logic
* encode and decode requests for storage and retrieval
* handle sync protocol messages via the syncer
* talks the SWAP payment protocol (swap accounting is done within NetStore)
*/

import (
    "fmt"
    "net"
    "strconv"
    "time"

    "github.com/ethereum/go-ethereum/contracts/chequebook"
    "github.com/ethereum/go-ethereum/errs"
    "github.com/ethereum/go-ethereum/log"
    "github.com/ethereum/go-ethereum/p2p"
    "github.com/ethereum/go-ethereum/p2p/discover"
    bzzswap "github.com/ethereum/go-ethereum/swarm/services/swap"
    "github.com/ethereum/go-ethereum/swarm/services/swap/swap"
    "github.com/ethereum/go-ethereum/swarm/storage"
)

const (
    Version            = 0
    ProtocolLength     = uint64(8)
    ProtocolMaxMsgSize = 10 * 1024 * 1024
    NetworkId          = 3
)

const (
    ErrMsgTooLarge = iota
    ErrDecode
    ErrInvalidMsgCode
    ErrVersionMismatch
    ErrNetworkIdMismatch
    ErrNoStatusMsg
    ErrExtraStatusMsg
    ErrSwap
    ErrSync
    ErrUnwanted
)

var errorToString = map[int]string{
    ErrMsgTooLarge:       "Message too long",
    ErrDecode:            "Invalid message",
    ErrInvalidMsgCode:    "Invalid message code",
    ErrVersionMismatch:   "Protocol version mismatch",
    ErrNetworkIdMismatch: "NetworkId mismatch",
    ErrNoStatusMsg:       "No status message",
    ErrExtraStatusMsg:    "Extra status message",
    ErrSwap:              "SWAP error",
    ErrSync:              "Sync error",
    ErrUnwanted:          "Unwanted peer",
}

// bzz represents the swarm wire protocol
// an instance is running on each peer
type bzz struct {
    selfID     discover.NodeID      // peer's node id used in peer advertising in handshake
    key        storage.Key          // baseaddress as storage.Key
    storage    StorageHandler       // handler storage/retrieval related requests coming via the bzz wire protocol
    hive       *Hive                // the logistic manager, peerPool, routing service and peer handler
    dbAccess   *DbAccess            // access to db storage counter and iterator for syncing
    requestDb  *storage.LDBDatabase // db to persist backlog of deliveries to aid syncing
    remoteAddr *peerAddr            // remote peers address
    peer       *p2p.Peer            // the p2p peer object
    rw         p2p.MsgReadWriter    // messageReadWriter to send messages to
    errors     *errs.Errors         // errors table
    backend    chequebook.Backend
    lastActive time.Time
    NetworkId  uint64

    swap        *swap.Swap          // swap instance for the peer connection
    swapParams  *bzzswap.SwapParams // swap settings both local and remote
    swapEnabled bool                // flag to enable SWAP (will be set via Caps in handshake)
    syncEnabled bool                // flag to enable SYNC (will be set via Caps in handshake)
    syncer      *syncer             // syncer instance for the peer connection
    syncParams  *SyncParams         // syncer params
    syncState   *syncState          // outgoing syncronisation state (contains reference to remote peers db counter)
}

// interface type for handler of storage/retrieval related requests coming
// via the bzz wire protocol
// messages: UnsyncedKeys, DeliveryRequest, StoreRequest, RetrieveRequest
type StorageHandler interface {
    HandleUnsyncedKeysMsg(req *unsyncedKeysMsgData, p *peer) error
    HandleDeliveryRequestMsg(req *deliveryRequestMsgData, p *peer) error
    HandleStoreRequestMsg(req *storeRequestMsgData, p *peer)
    HandleRetrieveRequestMsg(req *retrieveRequestMsgData, p *peer)
}

/*
main entrypoint, wrappers starting a server that will run the bzz protocol
use this constructor to attach the protocol ("class") to server caps
This is done by node.Node#Register(func(node.ServiceContext) (Service, error))
Service implements Protocols() which is an array of protocol constructors
at node startup the protocols are initialised
the Dev p2p layer then calls Run(p *p2p.Peer, rw p2p.MsgReadWriter) error
on each peer connection
The Run function of the Bzz protocol class creates a bzz instance
which will represent the peer for the swarm hive and all peer-aware components
*/
func Bzz(cloud StorageHandler, backend chequebook.Backend, hive *Hive, dbaccess *DbAccess, sp *bzzswap.SwapParams, sy *SyncParams, networkId uint64) (p2p.Protocol, error) {

    // a single global request db is created for all peer connections
    // this is to persist delivery backlog and aid syncronisation
    requestDb, err := storage.NewLDBDatabase(sy.RequestDbPath)
    if err != nil {
        return p2p.Protocol{}, fmt.Errorf("error setting up request db: %v", err)
    }
    if networkId == 0 {
        networkId = NetworkId
    }
    return p2p.Protocol{
        Name:    "bzz",
        Version: Version,
        Length:  ProtocolLength,
        Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
            return run(requestDb, cloud, backend, hive, dbaccess, sp, sy, networkId, p, rw)
        },
    }, nil
}

/*
the main protocol loop that
 * does the handshake by exchanging statusMsg
 * if peer is valid and accepted, registers with the hive
 * then enters into a forever loop handling incoming messages
 * storage and retrieval related queries coming via bzz are dispatched to StorageHandler
 * peer-related messages are dispatched to the hive
 * payment related messages are relayed to SWAP service
 * on disconnect, unregister the peer in the hive (note RemovePeer in the post-disconnect hook)
 * whenever the loop terminates, the peer will disconnect with Subprotocol error
 * whenever handlers return an error the loop terminates
*/
func run(requestDb *storage.LDBDatabase, depo StorageHandler, backend chequebook.Backend, hive *Hive, dbaccess *DbAccess, sp *bzzswap.SwapParams, sy *SyncParams, networkId uint64, p *p2p.Peer, rw p2p.MsgReadWriter) (err error) {

    self := &bzz{
        storage:   depo,
        backend:   backend,
        hive:      hive,
        dbAccess:  dbaccess,
        requestDb: requestDb,
        peer:      p,
        rw:        rw,
        errors: &errs.Errors{
            Package: "BZZ",
            Errors:  errorToString,
        },
        swapParams:  sp,
        syncParams:  sy,
        swapEnabled: hive.swapEnabled,
        syncEnabled: true,
        NetworkId:   networkId,
    }

    // handle handshake
    err = self.handleStatus()
    if err != nil {
        return err
    }
    defer func() {
        // if the handler loop exits, the peer is disconnecting
        // deregister the peer in the hive
        self.hive.removePeer(&peer{bzz: self})
        if self.syncer != nil {
            self.syncer.stop() // quits request db and delivery loops, save requests
        }
        if self.swap != nil {
            self.swap.Stop() // quits chequebox autocash etc
        }
    }()

    // the main forever loop that handles incoming requests
    for {
        if self.hive.blockRead {
            log.Warn(fmt.Sprintf("Cannot read network"))
            time.Sleep(100 * time.Millisecond)
            continue
        }
        err = self.handle()
        if err != nil {
            return
        }
    }
}

// TODO: may need to implement protocol drop only? don't want to kick off the peer
// if they are useful for other protocols
func (self *bzz) Drop() {
    self.peer.Disconnect(p2p.DiscSubprotocolError)
}

// one cycle of the main forever loop that handles and dispatches incoming messages
func (self *bzz) handle() error {
    msg, err := self.rw.ReadMsg()
    log.Debug(fmt.Sprintf("<- %v", msg))
    if err != nil {
        return err
    }
    if msg.Size > ProtocolMaxMsgSize {
        return self.protoError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
    }
    // make sure that the payload has been fully consumed
    defer msg.Discard()

    switch msg.Code {

    case statusMsg:
        // no extra status message allowed. The one needed already handled by
        // handleStatus
        log.Debug(fmt.Sprintf("Status message: %v", msg))
        return self.protoError(ErrExtraStatusMsg, "")

    case storeRequestMsg:
        // store requests are dispatched to netStore
        var req storeRequestMsgData
        if err := msg.Decode(&req); err != nil {
            return self.protoError(ErrDecode, "<- %v: %v", msg, err)
        }
        if len(req.SData) < 9 {
            return self.protoError(ErrDecode, "<- %v: Data too short (%v)", msg)
        }
        // last Active time is set only when receiving chunks
        self.lastActive = time.Now()
        log.Trace(fmt.Sprintf("incoming store request: %s", req.String()))
        // swap accounting is done within forwarding
        self.storage.HandleStoreRequestMsg(&req, &peer{bzz: self})

    case retrieveRequestMsg:
        // retrieve Requests are dispatched to netStore
        var req retrieveRequestMsgData
        if err := msg.Decode(&req); err != nil {
            return self.protoError(ErrDecode, "<- %v: %v", msg, err)
        }
        req.from = &peer{bzz: self}
        // if request is lookup and not to be delivered
        if req.isLookup() {
            log.Trace(fmt.Sprintf("self lookup for %v: responding with peers only...", req.from))
        } else if req.Key == nil {
            return self.protoError(ErrDecode, "protocol handler: req.Key == nil || req.Timeout == nil")
        } else {
            // swap accounting is done within netStore
            self.storage.HandleRetrieveRequestMsg(&req, &peer{bzz: self})
        }
        // direct response with peers, TODO: sort this out
        self.hive.peers(&req)

    case peersMsg:
        // response to lookups and immediate response to retrieve requests
        // dispatches new peer data to the hive that adds them to KADDB
        var req peersMsgData
        if err := msg.Decode(&req); err != nil {
            return self.protoError(ErrDecode, "<- %v: %v", msg, err)
        }
        req.from = &peer{bzz: self}
        log.Trace(fmt.Sprintf("<- peer addresses: %v", req))
        self.hive.HandlePeersMsg(&req, &peer{bzz: self})

    case syncRequestMsg:
        var req syncRequestMsgData
        if err := msg.Decode(&req); err != nil {
            return self.protoError(ErrDecode, "<- %v: %v", msg, err)
        }
        log.Debug(fmt.Sprintf("<- sync request: %v", req))
        self.lastActive = time.Now()
        self.sync(req.SyncState)

    case unsyncedKeysMsg:
        // coming from parent node offering
        var req unsyncedKeysMsgData
        if err := msg.Decode(&req); err != nil {
            return self.protoError(ErrDecode, "<- %v: %v", msg, err)
        }
        log.Debug(fmt.Sprintf("<- unsynced keys : %s", req.String()))
        err := self.storage.HandleUnsyncedKeysMsg(&req, &peer{bzz: self})
        self.lastActive = time.Now()
        if err != nil {
            return self.protoError(ErrDecode, "<- %v: %v", msg, err)
        }

    case deliveryRequestMsg:
        // response to syncKeysMsg hashes filtered not existing in db
        // also relays the last synced state to the source
        var req deliveryRequestMsgData
        if err := msg.Decode(&req); err != nil {
            return self.protoError(ErrDecode, "<-msg %v: %v", msg, err)
        }
        log.Debug(fmt.Sprintf("<- delivery request: %s", req.String()))
        err := self.storage.HandleDeliveryRequestMsg(&req, &peer{bzz: self})
        self.lastActive = time.Now()
        if err != nil {
            return self.protoError(ErrDecode, "<- %v: %v", msg, err)
        }

    case paymentMsg:
        // swap protocol message for payment, Units paid for, Cheque paid with
        if self.swapEnabled {
            var req paymentMsgData
            if err := msg.Decode(&req); err != nil {
                return self.protoError(ErrDecode, "<- %v: %v", msg, err)
            }
            log.Debug(fmt.Sprintf("<- payment: %s", req.String()))
            self.swap.Receive(int(req.Units), req.Promise)
        }

    default:
        // no other message is allowed
        return self.protoError(ErrInvalidMsgCode, "%v", msg.Code)
    }
    return nil
}

func (self *bzz) handleStatus() (err error) {

    handshake := &statusMsgData{
        Version:   uint64(Version),
        ID:        "honey",
        Addr:      self.selfAddr(),
        NetworkId: uint64(self.NetworkId),
        Swap: &bzzswap.SwapProfile{
            Profile:    self.swapParams.Profile,
            PayProfile: self.swapParams.PayProfile,
        },
    }

    err = p2p.Send(self.rw, statusMsg, handshake)
    if err != nil {
        self.protoError(ErrNoStatusMsg, err.Error())
    }

    // read and handle remote status
    var msg p2p.Msg
    msg, err = self.rw.ReadMsg()
    if err != nil {
        return err
    }

    if msg.Code != statusMsg {
        self.protoError(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, statusMsg)
    }

    if msg.Size > ProtocolMaxMsgSize {
        return self.protoError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
    }

    var status statusMsgData
    if err := msg.Decode(&status); err != nil {
        return self.protoError(ErrDecode, " %v: %v", msg, err)
    }

    if status.NetworkId != self.NetworkId {
        return self.protoError(ErrNetworkIdMismatch, "%d (!= %d)", status.NetworkId, self.NetworkId)
    }

    if Version != status.Version {
        return self.protoError(ErrVersionMismatch, "%d (!= %d)", status.Version, Version)
    }

    self.remoteAddr = self.peerAddr(status.Addr)
    log.Trace(fmt.Sprintf("self: advertised IP: %v, peer advertised: %v, local address: %v\npeer: advertised IP: %v, remote address: %v\n", self.selfAddr(), self.remoteAddr, self.peer.LocalAddr(), status.Addr.IP, self.peer.RemoteAddr()))

    if self.swapEnabled {
        // set remote profile for accounting
        self.swap, err = bzzswap.NewSwap(self.swapParams, status.Swap, self.backend, self)
        if err != nil {
            return self.protoError(ErrSwap, "%v", err)
        }
    }

    log.Info(fmt.Sprintf("Peer %08x is capable (%d/%d)", self.remoteAddr.Addr[:4], status.Version, status.NetworkId))
    err = self.hive.addPeer(&peer{bzz: self})
    if err != nil {
        return self.protoError(ErrUnwanted, "%v", err)
    }

    // hive sets syncstate so sync should start after node added
    log.Info(fmt.Sprintf("syncronisation request sent with %v", self.syncState))
    self.syncRequest()

    return nil
}

func (self *bzz) sync(state *syncState) error {
    // syncer setup
    if self.syncer != nil {
        return self.protoError(ErrSync, "sync request can only be sent once")
    }

    cnt := self.dbAccess.counter()
    remoteaddr := self.remoteAddr.Addr
    start, stop := self.hive.kad.KeyRange(remoteaddr)

    // an explicitly received nil syncstate disables syncronisation
    if state == nil {
        self.syncEnabled = false
        log.Warn(fmt.Sprintf("syncronisation disabled for peer %v", self))
        state = &syncState{DbSyncState: &storage.DbSyncState{}, Synced: true}
    } else {
        state.synced = make(chan bool)
        state.SessionAt = cnt
        if storage.IsZeroKey(state.Stop) && state.Synced {
            state.Start = storage.Key(start[:])
            state.Stop = storage.Key(stop[:])
        }
        log.Debug(fmt.Sprintf("syncronisation requested by peer %v at state %v", self, state))
    }
    var err error
    self.syncer, err = newSyncer(
        self.requestDb,
        storage.Key(remoteaddr[:]),
        self.dbAccess,
        self.unsyncedKeys, self.store,
        self.syncParams, state, func() bool { return self.syncEnabled },
    )
    if err != nil {
        return self.protoError(ErrSync, "%v", err)
    }
    log.Trace(fmt.Sprintf("syncer set for peer %v", self))
    return nil
}

func (self *bzz) String() string {
    return self.remoteAddr.String()
}

// repair reported address if IP missing
func (self *bzz) peerAddr(base *peerAddr) *peerAddr {
    if base.IP.IsUnspecified() {
        host, _, _ := net.SplitHostPort(self.peer.RemoteAddr().String())
        base.IP = net.ParseIP(host)
    }
    return base
}

// returns self advertised node connection info (listening address w enodes)
// IP will get repaired on the other end if missing
// or resolved via ID by discovery at dialout
func (self *bzz) selfAddr() *peerAddr {
    id := self.hive.id
    host, port, _ := net.SplitHostPort(self.hive.listenAddr())
    intport, _ := strconv.Atoi(port)
    addr := &peerAddr{
        Addr: self.hive.addr,
        ID:   id[:],
        IP:   net.ParseIP(host),
        Port: uint16(intport),
    }
    return addr
}

// outgoing messages
// send retrieveRequestMsg
func (self *bzz) retrieve(req *retrieveRequestMsgData) error {
    return self.send(retrieveRequestMsg, req)
}

// send storeRequestMsg
func (self *bzz) store(req *storeRequestMsgData) error {
    return self.send(storeRequestMsg, req)
}

func (self *bzz) syncRequest() error {
    req := &syncRequestMsgData{}
    if self.hive.syncEnabled {
        log.Debug(fmt.Sprintf("syncronisation request to peer %v at state %v", self, self.syncState))
        req.SyncState = self.syncState
    }
    if self.syncState == nil {
        log.Warn(fmt.Sprintf("syncronisation disabled for peer %v at state %v", self, self.syncState))
    }
    return self.send(syncRequestMsg, req)
}

// queue storeRequestMsg in request db
func (self *bzz) deliveryRequest(reqs []*syncRequest) error {
    req := &deliveryRequestMsgData{
        Deliver: reqs,
    }
    return self.send(deliveryRequestMsg, req)
}

// batch of syncRequests to send off
func (self *bzz) unsyncedKeys(reqs []*syncRequest, state *syncState) error {
    req := &unsyncedKeysMsgData{
        Unsynced: reqs,
        State:    state,
    }
    return self.send(unsyncedKeysMsg, req)
}

// send paymentMsg
func (self *bzz) Pay(units int, promise swap.Promise) {
    req := &paymentMsgData{uint(units), promise.(*chequebook.Cheque)}
    self.payment(req)
}

// send paymentMsg
func (self *bzz) payment(req *paymentMsgData) error {
    return self.send(paymentMsg, req)
}

// sends peersMsg
func (self *bzz) peers(req *peersMsgData) error {
    return self.send(peersMsg, req)
}

func (self *bzz) protoError(code int, format string, params ...interface{}) (err *errs.Error) {
    err = self.errors.New(code, format, params...)
    log.Info(err.Error())
    return
}

func (self *bzz) send(msg uint64, data interface{}) error {
    if self.hive.blockWrite {
        return fmt.Errorf("network write blocked")
    }
    log.Trace(fmt.Sprintf("-> %v: %v (%T) to %v", msg, data, data, self))
    err := p2p.Send(self.rw, msg, data)
    if err != nil {
        self.Drop()
    }
    return err
}