diff options
author | obscuren <geffobscura@gmail.com> | 2014-12-20 09:34:12 +0800 |
---|---|---|
committer | obscuren <geffobscura@gmail.com> | 2014-12-20 09:34:12 +0800 |
commit | 3983dd2428137211f84f299f9ce8690c22f50afd (patch) | |
tree | 3a2dc53b365e6f377fc82a3514150d1297fe549c /p2p/server.go | |
parent | 7daa8c2f6eb25511c6a54ad420709af911fc6748 (diff) | |
parent | 0a9dc1536c5d776844d6947a0090ff7e1a7c6ab4 (diff) | |
download | go-tangerine-3983dd2428137211f84f299f9ce8690c22f50afd.tar go-tangerine-3983dd2428137211f84f299f9ce8690c22f50afd.tar.gz go-tangerine-3983dd2428137211f84f299f9ce8690c22f50afd.tar.bz2 go-tangerine-3983dd2428137211f84f299f9ce8690c22f50afd.tar.lz go-tangerine-3983dd2428137211f84f299f9ce8690c22f50afd.tar.xz go-tangerine-3983dd2428137211f84f299f9ce8690c22f50afd.tar.zst go-tangerine-3983dd2428137211f84f299f9ce8690c22f50afd.zip |
Merge branch 'release/v0.7.10'vv0.7.10
Diffstat (limited to 'p2p/server.go')
-rw-r--r-- | p2p/server.go | 467 |
1 files changed, 467 insertions, 0 deletions
diff --git a/p2p/server.go b/p2p/server.go new file mode 100644 index 000000000..8a6087566 --- /dev/null +++ b/p2p/server.go @@ -0,0 +1,467 @@ +package p2p + +import ( + "bytes" + "errors" + "fmt" + "net" + "sync" + "time" + + "github.com/ethereum/go-ethereum/logger" +) + +const ( + outboundAddressPoolSize = 500 + defaultDialTimeout = 10 * time.Second + portMappingUpdateInterval = 15 * time.Minute + portMappingTimeout = 20 * time.Minute +) + +var srvlog = logger.NewLogger("P2P Server") + +// Server manages all peer connections. +// +// The fields of Server are used as configuration parameters. +// You should set them before starting the Server. Fields may not be +// modified while the server is running. +type Server struct { + // This field must be set to a valid client identity. + Identity ClientIdentity + + // MaxPeers is the maximum number of peers that can be + // connected. It must be greater than zero. + MaxPeers int + + // Protocols should contain the protocols supported + // by the server. Matching protocols are launched for + // each peer. + Protocols []Protocol + + // If Blacklist is set to a non-nil value, the given Blacklist + // is used to verify peer connections. + Blacklist Blacklist + + // If ListenAddr is set to a non-nil address, the server + // will listen for incoming connections. + // + // If the port is zero, the operating system will pick a port. The + // ListenAddr field will be updated with the actual address when + // the server is started. + ListenAddr string + + // If set to a non-nil value, the given NAT port mapper + // is used to make the listening port available to the + // Internet. + NAT NAT + + // If Dialer is set to a non-nil value, the given Dialer + // is used to dial outbound peer connections. + Dialer *net.Dialer + + // If NoDial is true, the server will not dial any peers. + NoDial bool + + // Hook for testing. This is useful because we can inhibit + // the whole protocol stack. + newPeerFunc peerFunc + + lock sync.RWMutex + running bool + listener net.Listener + laddr *net.TCPAddr // real listen addr + peers []*Peer + peerSlots chan int + peerCount int + + quit chan struct{} + wg sync.WaitGroup + peerConnect chan *peerAddr + peerDisconnect chan *Peer +} + +// NAT is implemented by NAT traversal methods. +type NAT interface { + GetExternalAddress() (net.IP, error) + AddPortMapping(protocol string, extport, intport int, name string, lifetime time.Duration) error + DeletePortMapping(protocol string, extport, intport int) error + + // Should return name of the method. + String() string +} + +type peerFunc func(srv *Server, c net.Conn, dialAddr *peerAddr) *Peer + +// Peers returns all connected peers. +func (srv *Server) Peers() (peers []*Peer) { + srv.lock.RLock() + defer srv.lock.RUnlock() + for _, peer := range srv.peers { + if peer != nil { + peers = append(peers, peer) + } + } + return +} + +// PeerCount returns the number of connected peers. +func (srv *Server) PeerCount() int { + srv.lock.RLock() + defer srv.lock.RUnlock() + return srv.peerCount +} + +// SuggestPeer injects an address into the outbound address pool. +func (srv *Server) SuggestPeer(ip net.IP, port int, nodeID []byte) { + select { + case srv.peerConnect <- &peerAddr{ip, uint64(port), nodeID}: + default: // don't block + } +} + +// Broadcast sends an RLP-encoded message to all connected peers. +// This method is deprecated and will be removed later. +func (srv *Server) Broadcast(protocol string, code uint64, data ...interface{}) { + var payload []byte + if data != nil { + payload = encodePayload(data...) + } + srv.lock.RLock() + defer srv.lock.RUnlock() + for _, peer := range srv.peers { + if peer != nil { + var msg = Msg{Code: code} + if data != nil { + msg.Payload = bytes.NewReader(payload) + msg.Size = uint32(len(payload)) + } + peer.writeProtoMsg(protocol, msg) + } + } +} + +// Start starts running the server. +// Servers can be re-used and started again after stopping. +func (srv *Server) Start() (err error) { + srv.lock.Lock() + defer srv.lock.Unlock() + if srv.running { + return errors.New("server already running") + } + srvlog.Infoln("Starting Server") + + // initialize fields + if srv.Identity == nil { + return fmt.Errorf("Server.Identity must be set to a non-nil identity") + } + if srv.MaxPeers <= 0 { + return fmt.Errorf("Server.MaxPeers must be > 0") + } + srv.quit = make(chan struct{}) + srv.peers = make([]*Peer, srv.MaxPeers) + srv.peerSlots = make(chan int, srv.MaxPeers) + srv.peerConnect = make(chan *peerAddr, outboundAddressPoolSize) + srv.peerDisconnect = make(chan *Peer) + if srv.newPeerFunc == nil { + srv.newPeerFunc = newServerPeer + } + if srv.Blacklist == nil { + srv.Blacklist = NewBlacklist() + } + if srv.Dialer == nil { + srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout} + } + + if srv.ListenAddr != "" { + if err := srv.startListening(); err != nil { + return err + } + } + if !srv.NoDial { + srv.wg.Add(1) + go srv.dialLoop() + } + if srv.NoDial && srv.ListenAddr == "" { + srvlog.Warnln("I will be kind-of useless, neither dialing nor listening.") + } + + // make all slots available + for i := range srv.peers { + srv.peerSlots <- i + } + // note: discLoop is not part of WaitGroup + go srv.discLoop() + srv.running = true + return nil +} + +func (srv *Server) startListening() error { + listener, err := net.Listen("tcp", srv.ListenAddr) + if err != nil { + return err + } + srv.ListenAddr = listener.Addr().String() + srv.laddr = listener.Addr().(*net.TCPAddr) + srv.listener = listener + srv.wg.Add(1) + go srv.listenLoop() + if !srv.laddr.IP.IsLoopback() && srv.NAT != nil { + srv.wg.Add(1) + go srv.natLoop(srv.laddr.Port) + } + return nil +} + +// Stop terminates the server and all active peer connections. +// It blocks until all active connections have been closed. +func (srv *Server) Stop() { + srv.lock.Lock() + if !srv.running { + srv.lock.Unlock() + return + } + srv.running = false + srv.lock.Unlock() + + srvlog.Infoln("Stopping server") + if srv.listener != nil { + // this unblocks listener Accept + srv.listener.Close() + } + close(srv.quit) + for _, peer := range srv.Peers() { + peer.Disconnect(DiscQuitting) + } + srv.wg.Wait() + + // wait till they actually disconnect + // this is checked by claiming all peerSlots. + // slots become available as the peers disconnect. + for i := 0; i < cap(srv.peerSlots); i++ { + <-srv.peerSlots + } + // terminate discLoop + close(srv.peerDisconnect) +} + +func (srv *Server) discLoop() { + for peer := range srv.peerDisconnect { + // peer has just disconnected. free up its slot. + srvlog.Infof("%v is gone", peer) + srv.peerSlots <- peer.slot + srv.lock.Lock() + srv.peers[peer.slot] = nil + srv.lock.Unlock() + } +} + +// main loop for adding connections via listening +func (srv *Server) listenLoop() { + defer srv.wg.Done() + + srvlog.Infoln("Listening on", srv.listener.Addr()) + for { + select { + case slot := <-srv.peerSlots: + conn, err := srv.listener.Accept() + if err != nil { + srv.peerSlots <- slot + return + } + srvlog.Debugf("Accepted conn %v (slot %d)\n", conn.RemoteAddr(), slot) + srv.addPeer(conn, nil, slot) + case <-srv.quit: + return + } + } +} + +func (srv *Server) natLoop(port int) { + defer srv.wg.Done() + for { + srv.updatePortMapping(port) + select { + case <-time.After(portMappingUpdateInterval): + // one more round + case <-srv.quit: + srv.removePortMapping(port) + return + } + } +} + +func (srv *Server) updatePortMapping(port int) { + srvlog.Infoln("Attempting to map port", port, "with", srv.NAT) + err := srv.NAT.AddPortMapping("tcp", port, port, "ethereum p2p", portMappingTimeout) + if err != nil { + srvlog.Errorln("Port mapping error:", err) + return + } + extip, err := srv.NAT.GetExternalAddress() + if err != nil { + srvlog.Errorln("Error getting external IP:", err) + return + } + srv.lock.Lock() + extaddr := *(srv.listener.Addr().(*net.TCPAddr)) + extaddr.IP = extip + srvlog.Infoln("Mapped port, external addr is", &extaddr) + srv.laddr = &extaddr + srv.lock.Unlock() +} + +func (srv *Server) removePortMapping(port int) { + srvlog.Infoln("Removing port mapping for", port, "with", srv.NAT) + srv.NAT.DeletePortMapping("tcp", port, port) +} + +func (srv *Server) dialLoop() { + defer srv.wg.Done() + var ( + suggest chan *peerAddr + slot *int + slots = srv.peerSlots + ) + for { + select { + case i := <-slots: + // we need a peer in slot i, slot reserved + slot = &i + // now we can watch for candidate peers in the next loop + suggest = srv.peerConnect + // do not consume more until candidate peer is found + slots = nil + + case desc := <-suggest: + // candidate peer found, will dial out asyncronously + // if connection fails slot will be released + go srv.dialPeer(desc, *slot) + // we can watch if more peers needed in the next loop + slots = srv.peerSlots + // until then we dont care about candidate peers + suggest = nil + + case <-srv.quit: + // give back the currently reserved slot + if slot != nil { + srv.peerSlots <- *slot + } + return + } + } +} + +// connect to peer via dial out +func (srv *Server) dialPeer(desc *peerAddr, slot int) { + srvlog.Debugf("Dialing %v (slot %d)\n", desc, slot) + conn, err := srv.Dialer.Dial(desc.Network(), desc.String()) + if err != nil { + srvlog.Errorf("Dial error: %v", err) + srv.peerSlots <- slot + return + } + go srv.addPeer(conn, desc, slot) +} + +// creates the new peer object and inserts it into its slot +func (srv *Server) addPeer(conn net.Conn, desc *peerAddr, slot int) *Peer { + srv.lock.Lock() + defer srv.lock.Unlock() + if !srv.running { + conn.Close() + srv.peerSlots <- slot // release slot + return nil + } + peer := srv.newPeerFunc(srv, conn, desc) + peer.slot = slot + srv.peers[slot] = peer + srv.peerCount++ + go func() { peer.loop(); srv.peerDisconnect <- peer }() + return peer +} + +// removes peer: sending disconnect msg, stop peer, remove rom list/table, release slot +func (srv *Server) removePeer(peer *Peer) { + srv.lock.Lock() + defer srv.lock.Unlock() + srvlog.Debugf("Removing peer %v %v (slot %v)\n", peer, peer.slot) + if srv.peers[peer.slot] != peer { + srvlog.Warnln("Invalid peer to remove:", peer) + return + } + // remove from list and index + srv.peerCount-- + srv.peers[peer.slot] = nil + // release slot to signal need for a new peer, last! + srv.peerSlots <- peer.slot +} + +func (srv *Server) verifyPeer(addr *peerAddr) error { + if srv.Blacklist.Exists(addr.Pubkey) { + return errors.New("blacklisted") + } + if bytes.Equal(srv.Identity.Pubkey()[1:], addr.Pubkey) { + return newPeerError(errPubkeyForbidden, "not allowed to connect to srv") + } + srv.lock.RLock() + defer srv.lock.RUnlock() + for _, peer := range srv.peers { + if peer != nil { + id := peer.Identity() + if id != nil && bytes.Equal(id.Pubkey(), addr.Pubkey) { + return errors.New("already connected") + } + } + } + return nil +} + +type Blacklist interface { + Get([]byte) (bool, error) + Put([]byte) error + Delete([]byte) error + Exists(pubkey []byte) (ok bool) +} + +type BlacklistMap struct { + blacklist map[string]bool + lock sync.RWMutex +} + +func NewBlacklist() *BlacklistMap { + return &BlacklistMap{ + blacklist: make(map[string]bool), + } +} + +func (self *BlacklistMap) Get(pubkey []byte) (bool, error) { + self.lock.RLock() + defer self.lock.RUnlock() + v, ok := self.blacklist[string(pubkey)] + var err error + if !ok { + err = fmt.Errorf("not found") + } + return v, err +} + +func (self *BlacklistMap) Exists(pubkey []byte) (ok bool) { + self.lock.RLock() + defer self.lock.RUnlock() + _, ok = self.blacklist[string(pubkey)] + return +} + +func (self *BlacklistMap) Put(pubkey []byte) error { + self.lock.RLock() + defer self.lock.RUnlock() + self.blacklist[string(pubkey)] = true + return nil +} + +func (self *BlacklistMap) Delete(pubkey []byte) error { + self.lock.RLock() + defer self.lock.RUnlock() + delete(self.blacklist, string(pubkey)) + return nil +} |