aboutsummaryrefslogblamecommitdiffstats
path: root/swarm/network/hive.go
blob: 70652c45094ca610436c0d13d14220179adfaffb (plain) (tree)
























                                                                                  
                                             
                                                      
                                                     





































































































                                                                                                                   
                                                                                                         














                                                                                                   
                                                                                     












                                                                                                                                  
                                                                                                                                                  

                                                                      
                                                                        
                                 
                                                                         
                                
                                                                              





                                                 
                                                                                                                                           

















                                                                        
                                                                             






























                                                                                          
                                                  









                                                                                         
                                                                       





                                       
                                                   





                                  
                                                              














                                                                         
                                                  






















                                                                          
                                                                                      
                                                                                                               

                                
































                                                                            
                                                                                                  






                                                                                                    
                                                                                                                  








                                                                
                                                                                              

                              
                                                                                          





















                                                                                    
                                                                                                                                                            














                                                         
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package network

import (
    "fmt"
    "math/rand"
    "path/filepath"
    "time"

    "github.com/ethereum/go-ethereum/common"
    "github.com/ethereum/go-ethereum/log"
    "github.com/ethereum/go-ethereum/p2p/discover"
    "github.com/ethereum/go-ethereum/p2p/netutil"
    "github.com/ethereum/go-ethereum/swarm/network/kademlia"
    "github.com/ethereum/go-ethereum/swarm/storage"
)

// Hive is the logistic manager of the swarm
// it uses a generic kademlia nodetable to find best peer list
// for any target
// this is used by the netstore to search for content in the swarm
// the bzz protocol peersMsgData exchange is relayed to Kademlia
// for db storage and filtering
// connections and disconnections are reported and relayed
// to keep the nodetable uptodate

type Hive struct {
    listenAddr   func() string
    callInterval uint64
    id           discover.NodeID
    addr         kademlia.Address
    kad          *kademlia.Kademlia
    path         string
    quit         chan bool
    toggle       chan bool
    more         chan bool

    // for testing only
    swapEnabled bool
    syncEnabled bool
    blockRead   bool
    blockWrite  bool
}

const (
    callInterval = 3000000000
    // bucketSize   = 3
    // maxProx      = 8
    // proxBinSize  = 4
)

type HiveParams struct {
    CallInterval uint64
    KadDbPath    string
    *kademlia.KadParams
}

func NewHiveParams(path string) *HiveParams {
    kad := kademlia.NewKadParams()
    // kad.BucketSize = bucketSize
    // kad.MaxProx = maxProx
    // kad.ProxBinSize = proxBinSize

    return &HiveParams{
        CallInterval: callInterval,
        KadDbPath:    filepath.Join(path, "bzz-peers.json"),
        KadParams:    kad,
    }
}

func NewHive(addr common.Hash, params *HiveParams, swapEnabled, syncEnabled bool) *Hive {
    kad := kademlia.New(kademlia.Address(addr), params.KadParams)
    return &Hive{
        callInterval: params.CallInterval,
        kad:          kad,
        addr:         kad.Addr(),
        path:         params.KadDbPath,
        swapEnabled:  swapEnabled,
        syncEnabled:  syncEnabled,
    }
}

func (self *Hive) SyncEnabled(on bool) {
    self.syncEnabled = on
}

func (self *Hive) SwapEnabled(on bool) {
    self.swapEnabled = on
}

func (self *Hive) BlockNetworkRead(on bool) {
    self.blockRead = on
}

func (self *Hive) BlockNetworkWrite(on bool) {
    self.blockWrite = on
}

// public accessor to the hive base address
func (self *Hive) Addr() kademlia.Address {
    return self.addr
}

// Start receives network info only at startup
// listedAddr is a function to retrieve listening address to advertise to peers
// connectPeer is a function to connect to a peer based on its NodeID or enode URL
// there are called on the p2p.Server which runs on the node
func (self *Hive) Start(id discover.NodeID, listenAddr func() string, connectPeer func(string) error) (err error) {
    self.toggle = make(chan bool)
    self.more = make(chan bool)
    self.quit = make(chan bool)
    self.id = id
    self.listenAddr = listenAddr
    err = self.kad.Load(self.path, nil)
    if err != nil {
        log.Warn(fmt.Sprintf("Warning: error reading kaddb '%s' (skipping): %v", self.path, err))
        err = nil
    }
    // this loop is doing bootstrapping and maintains a healthy table
    go self.keepAlive()
    go func() {
        // whenever toggled ask kademlia about most preferred peer
        for alive := range self.more {
            if !alive {
                // receiving false closes the loop while allowing parallel routines
                // to attempt to write to more (remove Peer when shutting down)
                return
            }
            node, need, proxLimit := self.kad.Suggest()

            if node != nil && len(node.Url) > 0 {
                log.Trace(fmt.Sprintf("call known bee %v", node.Url))
                // enode or any lower level connection address is unnecessary in future
                // discovery table is used to look it up.
                connectPeer(node.Url)
            }
            if need {
                // a random peer is taken from the table
                peers := self.kad.FindClosest(kademlia.RandomAddressAt(self.addr, rand.Intn(self.kad.MaxProx)), 1)
                if len(peers) > 0 {
                    // a random address at prox bin 0 is sent for lookup
                    randAddr := kademlia.RandomAddressAt(self.addr, proxLimit)
                    req := &retrieveRequestMsgData{
                        Key: storage.Key(randAddr[:]),
                    }
                    log.Trace(fmt.Sprintf("call any bee near %v (PO%03d) - messenger bee: %v", randAddr, proxLimit, peers[0]))
                    peers[0].(*peer).retrieve(req)
                } else {
                    log.Warn(fmt.Sprintf("no peer"))
                }
                log.Trace(fmt.Sprintf("buzz kept alive"))
            } else {
                log.Info(fmt.Sprintf("no need for more bees"))
            }
            select {
            case self.toggle <- need:
            case <-self.quit:
                return
            }
            log.Debug(fmt.Sprintf("queen's address: %v, population: %d (%d)", self.addr, self.kad.Count(), self.kad.DBCount()))
        }
    }()
    return
}

// keepAlive is a forever loop
// in its awake state it periodically triggers connection attempts
// by writing to self.more until Kademlia Table is saturated
// wake state is toggled by writing to self.toggle
// it restarts if the table becomes non-full again due to disconnections
func (self *Hive) keepAlive() {
    alarm := time.NewTicker(time.Duration(self.callInterval)).C
    for {
        select {
        case <-alarm:
            if self.kad.DBCount() > 0 {
                select {
                case self.more <- true:
                    log.Debug(fmt.Sprintf("buzz wakeup"))
                default:
                }
            }
        case need := <-self.toggle:
            if alarm == nil && need {
                alarm = time.NewTicker(time.Duration(self.callInterval)).C
            }
            if alarm != nil && !need {
                alarm = nil

            }
        case <-self.quit:
            return
        }
    }
}

func (self *Hive) Stop() error {
    // closing toggle channel quits the updateloop
    close(self.quit)
    return self.kad.Save(self.path, saveSync)
}

// called at the end of a successful protocol handshake
func (self *Hive) addPeer(p *peer) error {
    defer func() {
        select {
        case self.more <- true:
        default:
        }
    }()
    log.Trace(fmt.Sprintf("hi new bee %v", p))
    err := self.kad.On(p, loadSync)
    if err != nil {
        return err
    }
    // self lookup (can be encoded as nil/zero key since peers addr known) + no id ()
    // the most common way of saying hi in bzz is initiation of gossip
    // let me know about anyone new from my hood , here is the storageradius
    // to send the 6 byte self lookup
    // we do not record as request or forward it, just reply with peers
    p.retrieve(&retrieveRequestMsgData{})
    log.Trace(fmt.Sprintf("'whatsup wheresdaparty' sent to %v", p))

    return nil
}

// called after peer disconnected
func (self *Hive) removePeer(p *peer) {
    log.Debug(fmt.Sprintf("bee %v removed", p))
    self.kad.Off(p, saveSync)
    select {
    case self.more <- true:
    default:
    }
    if self.kad.Count() == 0 {
        log.Debug(fmt.Sprintf("empty, all bees gone"))
    }
}

// Retrieve a list of live peers that are closer to target than us
func (self *Hive) getPeers(target storage.Key, max int) (peers []*peer) {
    var addr kademlia.Address
    copy(addr[:], target[:])
    for _, node := range self.kad.FindClosest(addr, max) {
        peers = append(peers, node.(*peer))
    }
    return
}

// disconnects all the peers
func (self *Hive) DropAll() {
    log.Info(fmt.Sprintf("dropping all bees"))
    for _, node := range self.kad.FindClosest(kademlia.Address{}, 0) {
        node.Drop()
    }
}

// contructor for kademlia.NodeRecord based on peer address alone
// TODO: should go away and only addr passed to kademlia
func newNodeRecord(addr *peerAddr) *kademlia.NodeRecord {
    now := time.Now()
    return &kademlia.NodeRecord{
        Addr:  addr.Addr,
        Url:   addr.String(),
        Seen:  now,
        After: now,
    }
}

// called by the protocol when receiving peerset (for target address)
// peersMsgData is converted to a slice of NodeRecords for Kademlia
// this is to store all thats needed
func (self *Hive) HandlePeersMsg(req *peersMsgData, from *peer) {
    var nrs []*kademlia.NodeRecord
    for _, p := range req.Peers {
        if err := netutil.CheckRelayIP(from.remoteAddr.IP, p.IP); err != nil {
            log.Trace(fmt.Sprintf("invalid peer IP %v from %v: %v", from.remoteAddr.IP, p.IP, err))
            continue
        }
        nrs = append(nrs, newNodeRecord(p))
    }
    self.kad.Add(nrs)
}

// peer wraps the protocol instance to represent a connected peer
// it implements kademlia.Node interface
type peer struct {
    *bzz // protocol instance running on peer connection
}

// protocol instance implements kademlia.Node interface (embedded peer)
func (self *peer) Addr() kademlia.Address {
    return self.remoteAddr.Addr
}

func (self *peer) Url() string {
    return self.remoteAddr.String()
}

// TODO take into account traffic
func (self *peer) LastActive() time.Time {
    return self.lastActive
}

// reads the serialised form of sync state persisted as the 'Meta' attribute
// and sets the decoded syncState on the online node
func loadSync(record *kademlia.NodeRecord, node kademlia.Node) error {
    p, ok := node.(*peer)
    if !ok {
        return fmt.Errorf("invalid type")
    }
    if record.Meta == nil {
        log.Debug(fmt.Sprintf("no sync state for node record %v setting default", record))
        p.syncState = &syncState{DbSyncState: &storage.DbSyncState{}}
        return nil
    }
    state, err := decodeSync(record.Meta)
    if err != nil {
        return fmt.Errorf("error decoding kddb record meta info into a sync state: %v", err)
    }
    log.Trace(fmt.Sprintf("sync state for node record %v read from Meta: %s", record, string(*(record.Meta))))
    p.syncState = state
    return err
}

// callback when saving a sync state
func saveSync(record *kademlia.NodeRecord, node kademlia.Node) {
    if p, ok := node.(*peer); ok {
        meta, err := encodeSync(p.syncState)
        if err != nil {
            log.Warn(fmt.Sprintf("error saving sync state for %v: %v", node, err))
            return
        }
        log.Trace(fmt.Sprintf("saved sync state for %v: %s", node, string(*meta)))
        record.Meta = meta
    }
}

// the immediate response to a retrieve request,
// sends relevant peer data given by the kademlia hive to the requester
// TODO: remember peers sent for duration of the session, only new peers sent
func (self *Hive) peers(req *retrieveRequestMsgData) {
    if req != nil && req.MaxPeers >= 0 {
        var addrs []*peerAddr
        if req.timeout == nil || time.Now().Before(*(req.timeout)) {
            key := req.Key
            // self lookup from remote peer
            if storage.IsZeroKey(key) {
                addr := req.from.Addr()
                key = storage.Key(addr[:])
                req.Key = nil
            }
            // get peer addresses from hive
            for _, peer := range self.getPeers(key, int(req.MaxPeers)) {
                addrs = append(addrs, peer.remoteAddr)
            }
            log.Debug(fmt.Sprintf("Hive sending %d peer addresses to %v. req.Id: %v, req.Key: %v", len(addrs), req.from, req.Id, req.Key.Log()))

            peersData := &peersMsgData{
                Peers: addrs,
                Key:   req.Key,
                Id:    req.Id,
            }
            peersData.setTimeout(req.timeout)
            req.from.peers(peersData)
        }
    }
}

func (self *Hive) String() string {
    return self.kad.String()
}