aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/server.go
diff options
context:
space:
mode:
authorFelix Lange <fjl@twurst.com>2014-11-22 04:48:49 +0800
committerFelix Lange <fjl@twurst.com>2014-11-22 04:52:45 +0800
commit59b63caf5e4de64ceb7dcdf01551a080f53b1672 (patch)
treea4e79590284c5afe4d6927b422a5092b074e7938 /p2p/server.go
parente4a601c6444afdc11ce0cb80d7fd83116de2c8b9 (diff)
downloaddexon-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar
dexon-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar.gz
dexon-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar.bz2
dexon-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar.lz
dexon-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar.xz
dexon-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar.zst
dexon-59b63caf5e4de64ceb7dcdf01551a080f53b1672.zip
p2p: API cleanup and PoC 7 compatibility
Whoa, one more big commit. I didn't manage to untangle the changes while working towards compatibility.
Diffstat (limited to 'p2p/server.go')
-rw-r--r--p2p/server.go713
1 files changed, 346 insertions, 367 deletions
diff --git a/p2p/server.go b/p2p/server.go
index 54d2cde30..8a6087566 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -2,155 +2,101 @@ package p2p
import (
"bytes"
+ "errors"
"fmt"
"net"
- "sort"
- "strconv"
"sync"
"time"
- logpkg "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger"
)
const (
- outboundAddressPoolSize = 10
- disconnectGracePeriod = 2
+ outboundAddressPoolSize = 500
+ defaultDialTimeout = 10 * time.Second
+ portMappingUpdateInterval = 15 * time.Minute
+ portMappingTimeout = 20 * time.Minute
)
-type Blacklist interface {
- Get([]byte) (bool, error)
- Put([]byte) error
- Delete([]byte) error
- Exists(pubkey []byte) (ok bool)
-}
-
-type BlacklistMap struct {
- blacklist map[string]bool
- lock sync.RWMutex
-}
-
-func NewBlacklist() *BlacklistMap {
- return &BlacklistMap{
- blacklist: make(map[string]bool),
- }
-}
-
-func (self *BlacklistMap) Get(pubkey []byte) (bool, error) {
- self.lock.RLock()
- defer self.lock.RUnlock()
- v, ok := self.blacklist[string(pubkey)]
- var err error
- if !ok {
- err = fmt.Errorf("not found")
- }
- return v, err
-}
-
-func (self *BlacklistMap) Exists(pubkey []byte) (ok bool) {
- self.lock.RLock()
- defer self.lock.RUnlock()
- _, ok = self.blacklist[string(pubkey)]
- return
-}
-
-func (self *BlacklistMap) Put(pubkey []byte) error {
- self.lock.RLock()
- defer self.lock.RUnlock()
- self.blacklist[string(pubkey)] = true
- return nil
-}
-
-func (self *BlacklistMap) Delete(pubkey []byte) error {
- self.lock.RLock()
- defer self.lock.RUnlock()
- delete(self.blacklist, string(pubkey))
- return nil
-}
+var srvlog = logger.NewLogger("P2P Server")
+// Server manages all peer connections.
+//
+// The fields of Server are used as configuration parameters.
+// You should set them before starting the Server. Fields may not be
+// modified while the server is running.
type Server struct {
- network Network
- listening bool //needed?
- dialing bool //needed?
- closed bool
- identity ClientIdentity
- addr net.Addr
- port uint16
- protocols []string
-
- quit chan chan bool
- peersLock sync.RWMutex
-
- maxPeers int
- peers []*Peer
- peerSlots chan int
- peersTable map[string]int
- peerCount int
- cachedEncodedPeers []byte
-
- peerConnect chan net.Addr
- peerDisconnect chan DisconnectRequest
- blacklist Blacklist
- handlers Handlers
-}
-
-var logger = logpkg.NewLogger("P2P")
-
-func New(network Network, addr net.Addr, identity ClientIdentity, handlers Handlers, maxPeers int, blacklist Blacklist) *Server {
- // get alphabetical list of protocol names from handlers map
- protocols := []string{}
- for protocol := range handlers {
- protocols = append(protocols, protocol)
- }
- sort.Strings(protocols)
-
- _, port, _ := net.SplitHostPort(addr.String())
- intport, _ := strconv.Atoi(port)
-
- self := &Server{
- // NewSimpleClientIdentity(clientIdentifier, version, customIdentifier)
- network: network,
- identity: identity,
- addr: addr,
- port: uint16(intport),
- protocols: protocols,
-
- quit: make(chan chan bool),
-
- maxPeers: maxPeers,
- peers: make([]*Peer, maxPeers),
- peerSlots: make(chan int, maxPeers),
- peersTable: make(map[string]int),
+ // This field must be set to a valid client identity.
+ Identity ClientIdentity
+
+ // MaxPeers is the maximum number of peers that can be
+ // connected. It must be greater than zero.
+ MaxPeers int
+
+ // Protocols should contain the protocols supported
+ // by the server. Matching protocols are launched for
+ // each peer.
+ Protocols []Protocol
+
+ // If Blacklist is set to a non-nil value, the given Blacklist
+ // is used to verify peer connections.
+ Blacklist Blacklist
+
+ // If ListenAddr is set to a non-nil address, the server
+ // will listen for incoming connections.
+ //
+ // If the port is zero, the operating system will pick a port. The
+ // ListenAddr field will be updated with the actual address when
+ // the server is started.
+ ListenAddr string
+
+ // If set to a non-nil value, the given NAT port mapper
+ // is used to make the listening port available to the
+ // Internet.
+ NAT NAT
+
+ // If Dialer is set to a non-nil value, the given Dialer
+ // is used to dial outbound peer connections.
+ Dialer *net.Dialer
+
+ // If NoDial is true, the server will not dial any peers.
+ NoDial bool
+
+ // Hook for testing. This is useful because we can inhibit
+ // the whole protocol stack.
+ newPeerFunc peerFunc
- peerConnect: make(chan net.Addr, outboundAddressPoolSize),
- peerDisconnect: make(chan DisconnectRequest),
- blacklist: blacklist,
-
- handlers: handlers,
- }
- for i := 0; i < maxPeers; i++ {
- self.peerSlots <- i // fill up with indexes
- }
- return self
+ lock sync.RWMutex
+ running bool
+ listener net.Listener
+ laddr *net.TCPAddr // real listen addr
+ peers []*Peer
+ peerSlots chan int
+ peerCount int
+
+ quit chan struct{}
+ wg sync.WaitGroup
+ peerConnect chan *peerAddr
+ peerDisconnect chan *Peer
}
-func (self *Server) NewAddr(host string, port int) (addr net.Addr, err error) {
- addr, err = self.network.NewAddr(host, port)
- return
-}
+// NAT is implemented by NAT traversal methods.
+type NAT interface {
+ GetExternalAddress() (net.IP, error)
+ AddPortMapping(protocol string, extport, intport int, name string, lifetime time.Duration) error
+ DeletePortMapping(protocol string, extport, intport int) error
-func (self *Server) ParseAddr(address string) (addr net.Addr, err error) {
- addr, err = self.network.ParseAddr(address)
- return
+ // Should return name of the method.
+ String() string
}
-func (self *Server) ClientIdentity() ClientIdentity {
- return self.identity
-}
+type peerFunc func(srv *Server, c net.Conn, dialAddr *peerAddr) *Peer
-func (self *Server) Peers() (peers []*Peer) {
- self.peersLock.RLock()
- defer self.peersLock.RUnlock()
- for _, peer := range self.peers {
+// Peers returns all connected peers.
+func (srv *Server) Peers() (peers []*Peer) {
+ srv.lock.RLock()
+ defer srv.lock.RUnlock()
+ for _, peer := range srv.peers {
if peer != nil {
peers = append(peers, peer)
}
@@ -158,331 +104,364 @@ func (self *Server) Peers() (peers []*Peer) {
return
}
-func (self *Server) PeerCount() int {
- self.peersLock.RLock()
- defer self.peersLock.RUnlock()
- return self.peerCount
+// PeerCount returns the number of connected peers.
+func (srv *Server) PeerCount() int {
+ srv.lock.RLock()
+ defer srv.lock.RUnlock()
+ return srv.peerCount
}
-func (self *Server) PeerConnect(addr net.Addr) {
- // TODO: should buffer, filter and uniq
- // send GetPeersMsg if not blocking
+// SuggestPeer injects an address into the outbound address pool.
+func (srv *Server) SuggestPeer(ip net.IP, port int, nodeID []byte) {
select {
- case self.peerConnect <- addr: // not enough peers
- self.Broadcast("", getPeersMsg)
- default: // we dont care
+ case srv.peerConnect <- &peerAddr{ip, uint64(port), nodeID}:
+ default: // don't block
}
}
-func (self *Server) PeerDisconnect() chan DisconnectRequest {
- return self.peerDisconnect
-}
-
-func (self *Server) Blacklist() Blacklist {
- return self.blacklist
-}
-
-func (self *Server) Handlers() Handlers {
- return self.handlers
-}
-
-func (self *Server) Broadcast(protocol string, code MsgCode, data ...interface{}) {
+// Broadcast sends an RLP-encoded message to all connected peers.
+// This method is deprecated and will be removed later.
+func (srv *Server) Broadcast(protocol string, code uint64, data ...interface{}) {
var payload []byte
if data != nil {
payload = encodePayload(data...)
}
- self.peersLock.RLock()
- defer self.peersLock.RUnlock()
- for _, peer := range self.peers {
+ srv.lock.RLock()
+ defer srv.lock.RUnlock()
+ for _, peer := range srv.peers {
if peer != nil {
var msg = Msg{Code: code}
if data != nil {
msg.Payload = bytes.NewReader(payload)
msg.Size = uint32(len(payload))
}
- peer.messenger.writeProtoMsg(protocol, msg)
+ peer.writeProtoMsg(protocol, msg)
}
}
}
-// Start the server
-func (self *Server) Start(listen bool, dial bool) {
- self.network.Start()
- if listen {
- listener, err := self.network.Listener(self.addr)
- if err != nil {
- logger.Warnf("Error initializing listener: %v", err)
- logger.Warnf("Connection listening disabled")
- self.listening = false
- } else {
- self.listening = true
- logger.Infoln("Listen on %v: ready and accepting connections", listener.Addr())
- go self.inboundPeerHandler(listener)
- }
+// Start starts running the server.
+// Servers can be re-used and started again after stopping.
+func (srv *Server) Start() (err error) {
+ srv.lock.Lock()
+ defer srv.lock.Unlock()
+ if srv.running {
+ return errors.New("server already running")
+ }
+ srvlog.Infoln("Starting Server")
+
+ // initialize fields
+ if srv.Identity == nil {
+ return fmt.Errorf("Server.Identity must be set to a non-nil identity")
}
- if dial {
- dialer, err := self.network.Dialer(self.addr)
- if err != nil {
- logger.Warnf("Error initializing dialer: %v", err)
- logger.Warnf("Connection dialout disabled")
- self.dialing = false
- } else {
- self.dialing = true
- logger.Infoln("Dial peers watching outbound address pool")
- go self.outboundPeerHandler(dialer)
+ if srv.MaxPeers <= 0 {
+ return fmt.Errorf("Server.MaxPeers must be > 0")
+ }
+ srv.quit = make(chan struct{})
+ srv.peers = make([]*Peer, srv.MaxPeers)
+ srv.peerSlots = make(chan int, srv.MaxPeers)
+ srv.peerConnect = make(chan *peerAddr, outboundAddressPoolSize)
+ srv.peerDisconnect = make(chan *Peer)
+ if srv.newPeerFunc == nil {
+ srv.newPeerFunc = newServerPeer
+ }
+ if srv.Blacklist == nil {
+ srv.Blacklist = NewBlacklist()
+ }
+ if srv.Dialer == nil {
+ srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout}
+ }
+
+ if srv.ListenAddr != "" {
+ if err := srv.startListening(); err != nil {
+ return err
}
}
- logger.Infoln("server started")
+ if !srv.NoDial {
+ srv.wg.Add(1)
+ go srv.dialLoop()
+ }
+ if srv.NoDial && srv.ListenAddr == "" {
+ srvlog.Warnln("I will be kind-of useless, neither dialing nor listening.")
+ }
+
+ // make all slots available
+ for i := range srv.peers {
+ srv.peerSlots <- i
+ }
+ // note: discLoop is not part of WaitGroup
+ go srv.discLoop()
+ srv.running = true
+ return nil
}
-func (self *Server) Stop() {
- logger.Infoln("server stopping...")
- // // quit one loop if dialing
- if self.dialing {
- logger.Infoln("stop dialout...")
- dialq := make(chan bool)
- self.quit <- dialq
- <-dialq
- fmt.Println("quit another")
- }
- // quit the other loop if listening
- if self.listening {
- logger.Infoln("stop listening...")
- listenq := make(chan bool)
- self.quit <- listenq
- <-listenq
- fmt.Println("quit one")
- }
-
- fmt.Println("quit waited")
-
- logger.Infoln("stopping peers...")
- peers := []net.Addr{}
- self.peersLock.RLock()
- self.closed = true
- for _, peer := range self.peers {
- if peer != nil {
- peers = append(peers, peer.Address)
- }
+func (srv *Server) startListening() error {
+ listener, err := net.Listen("tcp", srv.ListenAddr)
+ if err != nil {
+ return err
+ }
+ srv.ListenAddr = listener.Addr().String()
+ srv.laddr = listener.Addr().(*net.TCPAddr)
+ srv.listener = listener
+ srv.wg.Add(1)
+ go srv.listenLoop()
+ if !srv.laddr.IP.IsLoopback() && srv.NAT != nil {
+ srv.wg.Add(1)
+ go srv.natLoop(srv.laddr.Port)
+ }
+ return nil
+}
+
+// Stop terminates the server and all active peer connections.
+// It blocks until all active connections have been closed.
+func (srv *Server) Stop() {
+ srv.lock.Lock()
+ if !srv.running {
+ srv.lock.Unlock()
+ return
}
- self.peersLock.RUnlock()
- for _, address := range peers {
- go self.removePeer(DisconnectRequest{
- addr: address,
- reason: DiscQuitting,
- })
+ srv.running = false
+ srv.lock.Unlock()
+
+ srvlog.Infoln("Stopping server")
+ if srv.listener != nil {
+ // this unblocks listener Accept
+ srv.listener.Close()
+ }
+ close(srv.quit)
+ for _, peer := range srv.Peers() {
+ peer.Disconnect(DiscQuitting)
}
+ srv.wg.Wait()
+
// wait till they actually disconnect
- // this is checked by draining the peerSlots (slots are released back if a peer is removed)
- i := 0
- fmt.Println("draining peers")
+ // this is checked by claiming all peerSlots.
+ // slots become available as the peers disconnect.
+ for i := 0; i < cap(srv.peerSlots); i++ {
+ <-srv.peerSlots
+ }
+ // terminate discLoop
+ close(srv.peerDisconnect)
+}
+
+func (srv *Server) discLoop() {
+ for peer := range srv.peerDisconnect {
+ // peer has just disconnected. free up its slot.
+ srvlog.Infof("%v is gone", peer)
+ srv.peerSlots <- peer.slot
+ srv.lock.Lock()
+ srv.peers[peer.slot] = nil
+ srv.lock.Unlock()
+ }
+}
-FOR:
+// main loop for adding connections via listening
+func (srv *Server) listenLoop() {
+ defer srv.wg.Done()
+
+ srvlog.Infoln("Listening on", srv.listener.Addr())
for {
select {
- case slot := <-self.peerSlots:
- i++
- fmt.Printf("%v: found slot %v\n", i, slot)
- if i == self.maxPeers {
- break FOR
+ case slot := <-srv.peerSlots:
+ conn, err := srv.listener.Accept()
+ if err != nil {
+ srv.peerSlots <- slot
+ return
}
+ srvlog.Debugf("Accepted conn %v (slot %d)\n", conn.RemoteAddr(), slot)
+ srv.addPeer(conn, nil, slot)
+ case <-srv.quit:
+ return
}
}
- logger.Infoln("server stopped")
}
-// main loop for adding connections via listening
-func (self *Server) inboundPeerHandler(listener net.Listener) {
+func (srv *Server) natLoop(port int) {
+ defer srv.wg.Done()
for {
+ srv.updatePortMapping(port)
select {
- case slot := <-self.peerSlots:
- go self.connectInboundPeer(listener, slot)
- case errc := <-self.quit:
- listener.Close()
- fmt.Println("quit listenloop")
- errc <- true
+ case <-time.After(portMappingUpdateInterval):
+ // one more round
+ case <-srv.quit:
+ srv.removePortMapping(port)
return
}
}
}
-// main loop for adding outbound peers based on peerConnect address pool
-// this same loop handles peer disconnect requests as well
-func (self *Server) outboundPeerHandler(dialer Dialer) {
- // addressChan initially set to nil (only watches peerConnect if we need more peers)
- var addressChan chan net.Addr
- slots := self.peerSlots
- var slot *int
+func (srv *Server) updatePortMapping(port int) {
+ srvlog.Infoln("Attempting to map port", port, "with", srv.NAT)
+ err := srv.NAT.AddPortMapping("tcp", port, port, "ethereum p2p", portMappingTimeout)
+ if err != nil {
+ srvlog.Errorln("Port mapping error:", err)
+ return
+ }
+ extip, err := srv.NAT.GetExternalAddress()
+ if err != nil {
+ srvlog.Errorln("Error getting external IP:", err)
+ return
+ }
+ srv.lock.Lock()
+ extaddr := *(srv.listener.Addr().(*net.TCPAddr))
+ extaddr.IP = extip
+ srvlog.Infoln("Mapped port, external addr is", &extaddr)
+ srv.laddr = &extaddr
+ srv.lock.Unlock()
+}
+
+func (srv *Server) removePortMapping(port int) {
+ srvlog.Infoln("Removing port mapping for", port, "with", srv.NAT)
+ srv.NAT.DeletePortMapping("tcp", port, port)
+}
+
+func (srv *Server) dialLoop() {
+ defer srv.wg.Done()
+ var (
+ suggest chan *peerAddr
+ slot *int
+ slots = srv.peerSlots
+ )
for {
select {
case i := <-slots:
// we need a peer in slot i, slot reserved
slot = &i
// now we can watch for candidate peers in the next loop
- addressChan = self.peerConnect
+ suggest = srv.peerConnect
// do not consume more until candidate peer is found
slots = nil
- case address := <-addressChan:
+
+ case desc := <-suggest:
// candidate peer found, will dial out asyncronously
// if connection fails slot will be released
- go self.connectOutboundPeer(dialer, address, *slot)
+ go srv.dialPeer(desc, *slot)
// we can watch if more peers needed in the next loop
- slots = self.peerSlots
+ slots = srv.peerSlots
// until then we dont care about candidate peers
- addressChan = nil
- case request := <-self.peerDisconnect:
- go self.removePeer(request)
- case errc := <-self.quit:
- if addressChan != nil && slot != nil {
- self.peerSlots <- *slot
+ suggest = nil
+
+ case <-srv.quit:
+ // give back the currently reserved slot
+ if slot != nil {
+ srv.peerSlots <- *slot
}
- fmt.Println("quit dialloop")
- errc <- true
return
}
}
}
-// check if peer address already connected
-func (self *Server) isConnected(address net.Addr) bool {
- self.peersLock.RLock()
- defer self.peersLock.RUnlock()
- _, found := self.peersTable[address.String()]
- return found
-}
-
-// connect to peer via listener.Accept()
-func (self *Server) connectInboundPeer(listener net.Listener, slot int) {
- var address net.Addr
- conn, err := listener.Accept()
- if err != nil {
- logger.Debugln(err)
- self.peerSlots <- slot
- return
- }
- address = conn.RemoteAddr()
- // XXX: this won't work because the remote socket
- // address does not identify the peer. we should
- // probably get rid of this check and rely on public
- // key detection in the base protocol.
- if self.isConnected(address) {
- conn.Close()
- self.peerSlots <- slot
- return
- }
- fmt.Printf("adding %v\n", address)
- go self.addPeer(conn, address, true, slot)
-}
-
// connect to peer via dial out
-func (self *Server) connectOutboundPeer(dialer Dialer, address net.Addr, slot int) {
- if self.isConnected(address) {
- return
- }
- conn, err := dialer.Dial(address.Network(), address.String())
+func (srv *Server) dialPeer(desc *peerAddr, slot int) {
+ srvlog.Debugf("Dialing %v (slot %d)\n", desc, slot)
+ conn, err := srv.Dialer.Dial(desc.Network(), desc.String())
if err != nil {
- self.peerSlots <- slot
+ srvlog.Errorf("Dial error: %v", err)
+ srv.peerSlots <- slot
return
}
- go self.addPeer(conn, address, false, slot)
+ go srv.addPeer(conn, desc, slot)
}
// creates the new peer object and inserts it into its slot
-func (self *Server) addPeer(conn net.Conn, address net.Addr, inbound bool, slot int) *Peer {
- self.peersLock.Lock()
- defer self.peersLock.Unlock()
- if self.closed {
- fmt.Println("oopsy, not no longer need peer")
- conn.Close() //oopsy our bad
- self.peerSlots <- slot // release slot
+func (srv *Server) addPeer(conn net.Conn, desc *peerAddr, slot int) *Peer {
+ srv.lock.Lock()
+ defer srv.lock.Unlock()
+ if !srv.running {
+ conn.Close()
+ srv.peerSlots <- slot // release slot
return nil
}
- logger.Infoln("adding new peer", address)
- peer := NewPeer(conn, address, inbound, self)
- self.peers[slot] = peer
- self.peersTable[address.String()] = slot
- self.peerCount++
- self.cachedEncodedPeers = nil
- fmt.Printf("added peer %v %v (slot %v)\n", address, peer, slot)
- peer.Start()
+ peer := srv.newPeerFunc(srv, conn, desc)
+ peer.slot = slot
+ srv.peers[slot] = peer
+ srv.peerCount++
+ go func() { peer.loop(); srv.peerDisconnect <- peer }()
return peer
}
// removes peer: sending disconnect msg, stop peer, remove rom list/table, release slot
-func (self *Server) removePeer(request DisconnectRequest) {
- self.peersLock.Lock()
-
- address := request.addr
- slot := self.peersTable[address.String()]
- peer := self.peers[slot]
- fmt.Printf("removing peer %v %v (slot %v)\n", address, peer, slot)
- if peer == nil {
- logger.Debugf("already removed peer on %v", address)
- self.peersLock.Unlock()
+func (srv *Server) removePeer(peer *Peer) {
+ srv.lock.Lock()
+ defer srv.lock.Unlock()
+ srvlog.Debugf("Removing peer %v %v (slot %v)\n", peer, peer.slot)
+ if srv.peers[peer.slot] != peer {
+ srvlog.Warnln("Invalid peer to remove:", peer)
return
}
// remove from list and index
- self.peerCount--
- self.peers[slot] = nil
- delete(self.peersTable, address.String())
- self.cachedEncodedPeers = nil
- fmt.Printf("removed peer %v (slot %v)\n", peer, slot)
- self.peersLock.Unlock()
-
- // sending disconnect message
- disconnectMsg := NewMsg(discMsg, request.reason)
- peer.Write("", disconnectMsg)
- // be nice and wait
- time.Sleep(disconnectGracePeriod * time.Second)
- // switch off peer and close connections etc.
- fmt.Println("stopping peer")
- peer.Stop()
- fmt.Println("stopped peer")
+ srv.peerCount--
+ srv.peers[peer.slot] = nil
// release slot to signal need for a new peer, last!
- self.peerSlots <- slot
+ srv.peerSlots <- peer.slot
}
-// encodedPeerList returns an RLP-encoded list of peers.
-// the returned slice will be nil if there are no peers.
-func (self *Server) encodedPeerList() []byte {
- // TODO: memoize and reset when peers change
- self.peersLock.RLock()
- defer self.peersLock.RUnlock()
- if self.cachedEncodedPeers == nil && self.peerCount > 0 {
- var peerData []interface{}
- for _, i := range self.peersTable {
- peer := self.peers[i]
- peerData = append(peerData, peer.Encode())
+func (srv *Server) verifyPeer(addr *peerAddr) error {
+ if srv.Blacklist.Exists(addr.Pubkey) {
+ return errors.New("blacklisted")
+ }
+ if bytes.Equal(srv.Identity.Pubkey()[1:], addr.Pubkey) {
+ return newPeerError(errPubkeyForbidden, "not allowed to connect to srv")
+ }
+ srv.lock.RLock()
+ defer srv.lock.RUnlock()
+ for _, peer := range srv.peers {
+ if peer != nil {
+ id := peer.Identity()
+ if id != nil && bytes.Equal(id.Pubkey(), addr.Pubkey) {
+ return errors.New("already connected")
+ }
}
- self.cachedEncodedPeers = encodePayload(peerData)
}
- return self.cachedEncodedPeers
+ return nil
}
-// fix handshake message to push to peers
-func (self *Server) handshakeMsg() Msg {
- return NewMsg(handshakeMsg,
- p2pVersion,
- []byte(self.identity.String()),
- []interface{}{self.protocols},
- self.port,
- self.identity.Pubkey()[1:],
- )
+type Blacklist interface {
+ Get([]byte) (bool, error)
+ Put([]byte) error
+ Delete([]byte) error
+ Exists(pubkey []byte) (ok bool)
+}
+
+type BlacklistMap struct {
+ blacklist map[string]bool
+ lock sync.RWMutex
}
-func (self *Server) RegisterPubkey(candidate *Peer, pubkey []byte) error {
- // Check for blacklisting
- if self.blacklist.Exists(pubkey) {
- return fmt.Errorf("blacklisted")
+func NewBlacklist() *BlacklistMap {
+ return &BlacklistMap{
+ blacklist: make(map[string]bool),
}
+}
- self.peersLock.RLock()
- defer self.peersLock.RUnlock()
- for _, peer := range self.peers {
- if peer != nil && peer != candidate && bytes.Compare(peer.Pubkey, pubkey) == 0 {
- return fmt.Errorf("already connected")
- }
+func (self *BlacklistMap) Get(pubkey []byte) (bool, error) {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ v, ok := self.blacklist[string(pubkey)]
+ var err error
+ if !ok {
+ err = fmt.Errorf("not found")
}
- candidate.Pubkey = pubkey
+ return v, err
+}
+
+func (self *BlacklistMap) Exists(pubkey []byte) (ok bool) {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ _, ok = self.blacklist[string(pubkey)]
+ return
+}
+
+func (self *BlacklistMap) Put(pubkey []byte) error {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ self.blacklist[string(pubkey)] = true
+ return nil
+}
+
+func (self *BlacklistMap) Delete(pubkey []byte) error {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ delete(self.blacklist, string(pubkey))
return nil
}