From 59b63caf5e4de64ceb7dcdf01551a080f53b1672 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 21 Nov 2014 21:48:49 +0100 Subject: p2p: API cleanup and PoC 7 compatibility Whoa, one more big commit. I didn't manage to untangle the changes while working towards compatibility. --- p2p/server.go | 713 ++++++++++++++++++++++++++++------------------------------ 1 file changed, 346 insertions(+), 367 deletions(-) (limited to 'p2p/server.go') diff --git a/p2p/server.go b/p2p/server.go index 54d2cde30..8a6087566 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -2,155 +2,101 @@ package p2p import ( "bytes" + "errors" "fmt" "net" - "sort" - "strconv" "sync" "time" - logpkg "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger" ) const ( - outboundAddressPoolSize = 10 - disconnectGracePeriod = 2 + outboundAddressPoolSize = 500 + defaultDialTimeout = 10 * time.Second + portMappingUpdateInterval = 15 * time.Minute + portMappingTimeout = 20 * time.Minute ) -type Blacklist interface { - Get([]byte) (bool, error) - Put([]byte) error - Delete([]byte) error - Exists(pubkey []byte) (ok bool) -} - -type BlacklistMap struct { - blacklist map[string]bool - lock sync.RWMutex -} - -func NewBlacklist() *BlacklistMap { - return &BlacklistMap{ - blacklist: make(map[string]bool), - } -} - -func (self *BlacklistMap) Get(pubkey []byte) (bool, error) { - self.lock.RLock() - defer self.lock.RUnlock() - v, ok := self.blacklist[string(pubkey)] - var err error - if !ok { - err = fmt.Errorf("not found") - } - return v, err -} - -func (self *BlacklistMap) Exists(pubkey []byte) (ok bool) { - self.lock.RLock() - defer self.lock.RUnlock() - _, ok = self.blacklist[string(pubkey)] - return -} - -func (self *BlacklistMap) Put(pubkey []byte) error { - self.lock.RLock() - defer self.lock.RUnlock() - self.blacklist[string(pubkey)] = true - return nil -} - -func (self *BlacklistMap) Delete(pubkey []byte) error { - self.lock.RLock() - defer self.lock.RUnlock() - delete(self.blacklist, string(pubkey)) - return nil -} +var srvlog = logger.NewLogger("P2P Server") +// Server manages all peer connections. +// +// The fields of Server are used as configuration parameters. +// You should set them before starting the Server. Fields may not be +// modified while the server is running. type Server struct { - network Network - listening bool //needed? - dialing bool //needed? - closed bool - identity ClientIdentity - addr net.Addr - port uint16 - protocols []string - - quit chan chan bool - peersLock sync.RWMutex - - maxPeers int - peers []*Peer - peerSlots chan int - peersTable map[string]int - peerCount int - cachedEncodedPeers []byte - - peerConnect chan net.Addr - peerDisconnect chan DisconnectRequest - blacklist Blacklist - handlers Handlers -} - -var logger = logpkg.NewLogger("P2P") - -func New(network Network, addr net.Addr, identity ClientIdentity, handlers Handlers, maxPeers int, blacklist Blacklist) *Server { - // get alphabetical list of protocol names from handlers map - protocols := []string{} - for protocol := range handlers { - protocols = append(protocols, protocol) - } - sort.Strings(protocols) - - _, port, _ := net.SplitHostPort(addr.String()) - intport, _ := strconv.Atoi(port) - - self := &Server{ - // NewSimpleClientIdentity(clientIdentifier, version, customIdentifier) - network: network, - identity: identity, - addr: addr, - port: uint16(intport), - protocols: protocols, - - quit: make(chan chan bool), - - maxPeers: maxPeers, - peers: make([]*Peer, maxPeers), - peerSlots: make(chan int, maxPeers), - peersTable: make(map[string]int), + // This field must be set to a valid client identity. + Identity ClientIdentity + + // MaxPeers is the maximum number of peers that can be + // connected. It must be greater than zero. + MaxPeers int + + // Protocols should contain the protocols supported + // by the server. Matching protocols are launched for + // each peer. + Protocols []Protocol + + // If Blacklist is set to a non-nil value, the given Blacklist + // is used to verify peer connections. + Blacklist Blacklist + + // If ListenAddr is set to a non-nil address, the server + // will listen for incoming connections. + // + // If the port is zero, the operating system will pick a port. The + // ListenAddr field will be updated with the actual address when + // the server is started. + ListenAddr string + + // If set to a non-nil value, the given NAT port mapper + // is used to make the listening port available to the + // Internet. + NAT NAT + + // If Dialer is set to a non-nil value, the given Dialer + // is used to dial outbound peer connections. + Dialer *net.Dialer + + // If NoDial is true, the server will not dial any peers. + NoDial bool + + // Hook for testing. This is useful because we can inhibit + // the whole protocol stack. + newPeerFunc peerFunc - peerConnect: make(chan net.Addr, outboundAddressPoolSize), - peerDisconnect: make(chan DisconnectRequest), - blacklist: blacklist, - - handlers: handlers, - } - for i := 0; i < maxPeers; i++ { - self.peerSlots <- i // fill up with indexes - } - return self + lock sync.RWMutex + running bool + listener net.Listener + laddr *net.TCPAddr // real listen addr + peers []*Peer + peerSlots chan int + peerCount int + + quit chan struct{} + wg sync.WaitGroup + peerConnect chan *peerAddr + peerDisconnect chan *Peer } -func (self *Server) NewAddr(host string, port int) (addr net.Addr, err error) { - addr, err = self.network.NewAddr(host, port) - return -} +// NAT is implemented by NAT traversal methods. +type NAT interface { + GetExternalAddress() (net.IP, error) + AddPortMapping(protocol string, extport, intport int, name string, lifetime time.Duration) error + DeletePortMapping(protocol string, extport, intport int) error -func (self *Server) ParseAddr(address string) (addr net.Addr, err error) { - addr, err = self.network.ParseAddr(address) - return + // Should return name of the method. + String() string } -func (self *Server) ClientIdentity() ClientIdentity { - return self.identity -} +type peerFunc func(srv *Server, c net.Conn, dialAddr *peerAddr) *Peer -func (self *Server) Peers() (peers []*Peer) { - self.peersLock.RLock() - defer self.peersLock.RUnlock() - for _, peer := range self.peers { +// Peers returns all connected peers. +func (srv *Server) Peers() (peers []*Peer) { + srv.lock.RLock() + defer srv.lock.RUnlock() + for _, peer := range srv.peers { if peer != nil { peers = append(peers, peer) } @@ -158,331 +104,364 @@ func (self *Server) Peers() (peers []*Peer) { return } -func (self *Server) PeerCount() int { - self.peersLock.RLock() - defer self.peersLock.RUnlock() - return self.peerCount +// PeerCount returns the number of connected peers. +func (srv *Server) PeerCount() int { + srv.lock.RLock() + defer srv.lock.RUnlock() + return srv.peerCount } -func (self *Server) PeerConnect(addr net.Addr) { - // TODO: should buffer, filter and uniq - // send GetPeersMsg if not blocking +// SuggestPeer injects an address into the outbound address pool. +func (srv *Server) SuggestPeer(ip net.IP, port int, nodeID []byte) { select { - case self.peerConnect <- addr: // not enough peers - self.Broadcast("", getPeersMsg) - default: // we dont care + case srv.peerConnect <- &peerAddr{ip, uint64(port), nodeID}: + default: // don't block } } -func (self *Server) PeerDisconnect() chan DisconnectRequest { - return self.peerDisconnect -} - -func (self *Server) Blacklist() Blacklist { - return self.blacklist -} - -func (self *Server) Handlers() Handlers { - return self.handlers -} - -func (self *Server) Broadcast(protocol string, code MsgCode, data ...interface{}) { +// Broadcast sends an RLP-encoded message to all connected peers. +// This method is deprecated and will be removed later. +func (srv *Server) Broadcast(protocol string, code uint64, data ...interface{}) { var payload []byte if data != nil { payload = encodePayload(data...) } - self.peersLock.RLock() - defer self.peersLock.RUnlock() - for _, peer := range self.peers { + srv.lock.RLock() + defer srv.lock.RUnlock() + for _, peer := range srv.peers { if peer != nil { var msg = Msg{Code: code} if data != nil { msg.Payload = bytes.NewReader(payload) msg.Size = uint32(len(payload)) } - peer.messenger.writeProtoMsg(protocol, msg) + peer.writeProtoMsg(protocol, msg) } } } -// Start the server -func (self *Server) Start(listen bool, dial bool) { - self.network.Start() - if listen { - listener, err := self.network.Listener(self.addr) - if err != nil { - logger.Warnf("Error initializing listener: %v", err) - logger.Warnf("Connection listening disabled") - self.listening = false - } else { - self.listening = true - logger.Infoln("Listen on %v: ready and accepting connections", listener.Addr()) - go self.inboundPeerHandler(listener) - } +// Start starts running the server. +// Servers can be re-used and started again after stopping. +func (srv *Server) Start() (err error) { + srv.lock.Lock() + defer srv.lock.Unlock() + if srv.running { + return errors.New("server already running") + } + srvlog.Infoln("Starting Server") + + // initialize fields + if srv.Identity == nil { + return fmt.Errorf("Server.Identity must be set to a non-nil identity") } - if dial { - dialer, err := self.network.Dialer(self.addr) - if err != nil { - logger.Warnf("Error initializing dialer: %v", err) - logger.Warnf("Connection dialout disabled") - self.dialing = false - } else { - self.dialing = true - logger.Infoln("Dial peers watching outbound address pool") - go self.outboundPeerHandler(dialer) + if srv.MaxPeers <= 0 { + return fmt.Errorf("Server.MaxPeers must be > 0") + } + srv.quit = make(chan struct{}) + srv.peers = make([]*Peer, srv.MaxPeers) + srv.peerSlots = make(chan int, srv.MaxPeers) + srv.peerConnect = make(chan *peerAddr, outboundAddressPoolSize) + srv.peerDisconnect = make(chan *Peer) + if srv.newPeerFunc == nil { + srv.newPeerFunc = newServerPeer + } + if srv.Blacklist == nil { + srv.Blacklist = NewBlacklist() + } + if srv.Dialer == nil { + srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout} + } + + if srv.ListenAddr != "" { + if err := srv.startListening(); err != nil { + return err } } - logger.Infoln("server started") + if !srv.NoDial { + srv.wg.Add(1) + go srv.dialLoop() + } + if srv.NoDial && srv.ListenAddr == "" { + srvlog.Warnln("I will be kind-of useless, neither dialing nor listening.") + } + + // make all slots available + for i := range srv.peers { + srv.peerSlots <- i + } + // note: discLoop is not part of WaitGroup + go srv.discLoop() + srv.running = true + return nil } -func (self *Server) Stop() { - logger.Infoln("server stopping...") - // // quit one loop if dialing - if self.dialing { - logger.Infoln("stop dialout...") - dialq := make(chan bool) - self.quit <- dialq - <-dialq - fmt.Println("quit another") - } - // quit the other loop if listening - if self.listening { - logger.Infoln("stop listening...") - listenq := make(chan bool) - self.quit <- listenq - <-listenq - fmt.Println("quit one") - } - - fmt.Println("quit waited") - - logger.Infoln("stopping peers...") - peers := []net.Addr{} - self.peersLock.RLock() - self.closed = true - for _, peer := range self.peers { - if peer != nil { - peers = append(peers, peer.Address) - } +func (srv *Server) startListening() error { + listener, err := net.Listen("tcp", srv.ListenAddr) + if err != nil { + return err + } + srv.ListenAddr = listener.Addr().String() + srv.laddr = listener.Addr().(*net.TCPAddr) + srv.listener = listener + srv.wg.Add(1) + go srv.listenLoop() + if !srv.laddr.IP.IsLoopback() && srv.NAT != nil { + srv.wg.Add(1) + go srv.natLoop(srv.laddr.Port) + } + return nil +} + +// Stop terminates the server and all active peer connections. +// It blocks until all active connections have been closed. +func (srv *Server) Stop() { + srv.lock.Lock() + if !srv.running { + srv.lock.Unlock() + return } - self.peersLock.RUnlock() - for _, address := range peers { - go self.removePeer(DisconnectRequest{ - addr: address, - reason: DiscQuitting, - }) + srv.running = false + srv.lock.Unlock() + + srvlog.Infoln("Stopping server") + if srv.listener != nil { + // this unblocks listener Accept + srv.listener.Close() + } + close(srv.quit) + for _, peer := range srv.Peers() { + peer.Disconnect(DiscQuitting) } + srv.wg.Wait() + // wait till they actually disconnect - // this is checked by draining the peerSlots (slots are released back if a peer is removed) - i := 0 - fmt.Println("draining peers") + // this is checked by claiming all peerSlots. + // slots become available as the peers disconnect. + for i := 0; i < cap(srv.peerSlots); i++ { + <-srv.peerSlots + } + // terminate discLoop + close(srv.peerDisconnect) +} + +func (srv *Server) discLoop() { + for peer := range srv.peerDisconnect { + // peer has just disconnected. free up its slot. + srvlog.Infof("%v is gone", peer) + srv.peerSlots <- peer.slot + srv.lock.Lock() + srv.peers[peer.slot] = nil + srv.lock.Unlock() + } +} -FOR: +// main loop for adding connections via listening +func (srv *Server) listenLoop() { + defer srv.wg.Done() + + srvlog.Infoln("Listening on", srv.listener.Addr()) for { select { - case slot := <-self.peerSlots: - i++ - fmt.Printf("%v: found slot %v\n", i, slot) - if i == self.maxPeers { - break FOR + case slot := <-srv.peerSlots: + conn, err := srv.listener.Accept() + if err != nil { + srv.peerSlots <- slot + return } + srvlog.Debugf("Accepted conn %v (slot %d)\n", conn.RemoteAddr(), slot) + srv.addPeer(conn, nil, slot) + case <-srv.quit: + return } } - logger.Infoln("server stopped") } -// main loop for adding connections via listening -func (self *Server) inboundPeerHandler(listener net.Listener) { +func (srv *Server) natLoop(port int) { + defer srv.wg.Done() for { + srv.updatePortMapping(port) select { - case slot := <-self.peerSlots: - go self.connectInboundPeer(listener, slot) - case errc := <-self.quit: - listener.Close() - fmt.Println("quit listenloop") - errc <- true + case <-time.After(portMappingUpdateInterval): + // one more round + case <-srv.quit: + srv.removePortMapping(port) return } } } -// main loop for adding outbound peers based on peerConnect address pool -// this same loop handles peer disconnect requests as well -func (self *Server) outboundPeerHandler(dialer Dialer) { - // addressChan initially set to nil (only watches peerConnect if we need more peers) - var addressChan chan net.Addr - slots := self.peerSlots - var slot *int +func (srv *Server) updatePortMapping(port int) { + srvlog.Infoln("Attempting to map port", port, "with", srv.NAT) + err := srv.NAT.AddPortMapping("tcp", port, port, "ethereum p2p", portMappingTimeout) + if err != nil { + srvlog.Errorln("Port mapping error:", err) + return + } + extip, err := srv.NAT.GetExternalAddress() + if err != nil { + srvlog.Errorln("Error getting external IP:", err) + return + } + srv.lock.Lock() + extaddr := *(srv.listener.Addr().(*net.TCPAddr)) + extaddr.IP = extip + srvlog.Infoln("Mapped port, external addr is", &extaddr) + srv.laddr = &extaddr + srv.lock.Unlock() +} + +func (srv *Server) removePortMapping(port int) { + srvlog.Infoln("Removing port mapping for", port, "with", srv.NAT) + srv.NAT.DeletePortMapping("tcp", port, port) +} + +func (srv *Server) dialLoop() { + defer srv.wg.Done() + var ( + suggest chan *peerAddr + slot *int + slots = srv.peerSlots + ) for { select { case i := <-slots: // we need a peer in slot i, slot reserved slot = &i // now we can watch for candidate peers in the next loop - addressChan = self.peerConnect + suggest = srv.peerConnect // do not consume more until candidate peer is found slots = nil - case address := <-addressChan: + + case desc := <-suggest: // candidate peer found, will dial out asyncronously // if connection fails slot will be released - go self.connectOutboundPeer(dialer, address, *slot) + go srv.dialPeer(desc, *slot) // we can watch if more peers needed in the next loop - slots = self.peerSlots + slots = srv.peerSlots // until then we dont care about candidate peers - addressChan = nil - case request := <-self.peerDisconnect: - go self.removePeer(request) - case errc := <-self.quit: - if addressChan != nil && slot != nil { - self.peerSlots <- *slot + suggest = nil + + case <-srv.quit: + // give back the currently reserved slot + if slot != nil { + srv.peerSlots <- *slot } - fmt.Println("quit dialloop") - errc <- true return } } } -// check if peer address already connected -func (self *Server) isConnected(address net.Addr) bool { - self.peersLock.RLock() - defer self.peersLock.RUnlock() - _, found := self.peersTable[address.String()] - return found -} - -// connect to peer via listener.Accept() -func (self *Server) connectInboundPeer(listener net.Listener, slot int) { - var address net.Addr - conn, err := listener.Accept() - if err != nil { - logger.Debugln(err) - self.peerSlots <- slot - return - } - address = conn.RemoteAddr() - // XXX: this won't work because the remote socket - // address does not identify the peer. we should - // probably get rid of this check and rely on public - // key detection in the base protocol. - if self.isConnected(address) { - conn.Close() - self.peerSlots <- slot - return - } - fmt.Printf("adding %v\n", address) - go self.addPeer(conn, address, true, slot) -} - // connect to peer via dial out -func (self *Server) connectOutboundPeer(dialer Dialer, address net.Addr, slot int) { - if self.isConnected(address) { - return - } - conn, err := dialer.Dial(address.Network(), address.String()) +func (srv *Server) dialPeer(desc *peerAddr, slot int) { + srvlog.Debugf("Dialing %v (slot %d)\n", desc, slot) + conn, err := srv.Dialer.Dial(desc.Network(), desc.String()) if err != nil { - self.peerSlots <- slot + srvlog.Errorf("Dial error: %v", err) + srv.peerSlots <- slot return } - go self.addPeer(conn, address, false, slot) + go srv.addPeer(conn, desc, slot) } // creates the new peer object and inserts it into its slot -func (self *Server) addPeer(conn net.Conn, address net.Addr, inbound bool, slot int) *Peer { - self.peersLock.Lock() - defer self.peersLock.Unlock() - if self.closed { - fmt.Println("oopsy, not no longer need peer") - conn.Close() //oopsy our bad - self.peerSlots <- slot // release slot +func (srv *Server) addPeer(conn net.Conn, desc *peerAddr, slot int) *Peer { + srv.lock.Lock() + defer srv.lock.Unlock() + if !srv.running { + conn.Close() + srv.peerSlots <- slot // release slot return nil } - logger.Infoln("adding new peer", address) - peer := NewPeer(conn, address, inbound, self) - self.peers[slot] = peer - self.peersTable[address.String()] = slot - self.peerCount++ - self.cachedEncodedPeers = nil - fmt.Printf("added peer %v %v (slot %v)\n", address, peer, slot) - peer.Start() + peer := srv.newPeerFunc(srv, conn, desc) + peer.slot = slot + srv.peers[slot] = peer + srv.peerCount++ + go func() { peer.loop(); srv.peerDisconnect <- peer }() return peer } // removes peer: sending disconnect msg, stop peer, remove rom list/table, release slot -func (self *Server) removePeer(request DisconnectRequest) { - self.peersLock.Lock() - - address := request.addr - slot := self.peersTable[address.String()] - peer := self.peers[slot] - fmt.Printf("removing peer %v %v (slot %v)\n", address, peer, slot) - if peer == nil { - logger.Debugf("already removed peer on %v", address) - self.peersLock.Unlock() +func (srv *Server) removePeer(peer *Peer) { + srv.lock.Lock() + defer srv.lock.Unlock() + srvlog.Debugf("Removing peer %v %v (slot %v)\n", peer, peer.slot) + if srv.peers[peer.slot] != peer { + srvlog.Warnln("Invalid peer to remove:", peer) return } // remove from list and index - self.peerCount-- - self.peers[slot] = nil - delete(self.peersTable, address.String()) - self.cachedEncodedPeers = nil - fmt.Printf("removed peer %v (slot %v)\n", peer, slot) - self.peersLock.Unlock() - - // sending disconnect message - disconnectMsg := NewMsg(discMsg, request.reason) - peer.Write("", disconnectMsg) - // be nice and wait - time.Sleep(disconnectGracePeriod * time.Second) - // switch off peer and close connections etc. - fmt.Println("stopping peer") - peer.Stop() - fmt.Println("stopped peer") + srv.peerCount-- + srv.peers[peer.slot] = nil // release slot to signal need for a new peer, last! - self.peerSlots <- slot + srv.peerSlots <- peer.slot } -// encodedPeerList returns an RLP-encoded list of peers. -// the returned slice will be nil if there are no peers. -func (self *Server) encodedPeerList() []byte { - // TODO: memoize and reset when peers change - self.peersLock.RLock() - defer self.peersLock.RUnlock() - if self.cachedEncodedPeers == nil && self.peerCount > 0 { - var peerData []interface{} - for _, i := range self.peersTable { - peer := self.peers[i] - peerData = append(peerData, peer.Encode()) +func (srv *Server) verifyPeer(addr *peerAddr) error { + if srv.Blacklist.Exists(addr.Pubkey) { + return errors.New("blacklisted") + } + if bytes.Equal(srv.Identity.Pubkey()[1:], addr.Pubkey) { + return newPeerError(errPubkeyForbidden, "not allowed to connect to srv") + } + srv.lock.RLock() + defer srv.lock.RUnlock() + for _, peer := range srv.peers { + if peer != nil { + id := peer.Identity() + if id != nil && bytes.Equal(id.Pubkey(), addr.Pubkey) { + return errors.New("already connected") + } } - self.cachedEncodedPeers = encodePayload(peerData) } - return self.cachedEncodedPeers + return nil } -// fix handshake message to push to peers -func (self *Server) handshakeMsg() Msg { - return NewMsg(handshakeMsg, - p2pVersion, - []byte(self.identity.String()), - []interface{}{self.protocols}, - self.port, - self.identity.Pubkey()[1:], - ) +type Blacklist interface { + Get([]byte) (bool, error) + Put([]byte) error + Delete([]byte) error + Exists(pubkey []byte) (ok bool) +} + +type BlacklistMap struct { + blacklist map[string]bool + lock sync.RWMutex } -func (self *Server) RegisterPubkey(candidate *Peer, pubkey []byte) error { - // Check for blacklisting - if self.blacklist.Exists(pubkey) { - return fmt.Errorf("blacklisted") +func NewBlacklist() *BlacklistMap { + return &BlacklistMap{ + blacklist: make(map[string]bool), } +} - self.peersLock.RLock() - defer self.peersLock.RUnlock() - for _, peer := range self.peers { - if peer != nil && peer != candidate && bytes.Compare(peer.Pubkey, pubkey) == 0 { - return fmt.Errorf("already connected") - } +func (self *BlacklistMap) Get(pubkey []byte) (bool, error) { + self.lock.RLock() + defer self.lock.RUnlock() + v, ok := self.blacklist[string(pubkey)] + var err error + if !ok { + err = fmt.Errorf("not found") } - candidate.Pubkey = pubkey + return v, err +} + +func (self *BlacklistMap) Exists(pubkey []byte) (ok bool) { + self.lock.RLock() + defer self.lock.RUnlock() + _, ok = self.blacklist[string(pubkey)] + return +} + +func (self *BlacklistMap) Put(pubkey []byte) error { + self.lock.RLock() + defer self.lock.RUnlock() + self.blacklist[string(pubkey)] = true + return nil +} + +func (self *BlacklistMap) Delete(pubkey []byte) error { + self.lock.RLock() + defer self.lock.RUnlock() + delete(self.blacklist, string(pubkey)) return nil } -- cgit v1.2.3