aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/server.go
diff options
context:
space:
mode:
authorFelix Lange <fjl@twurst.com>2014-11-04 20:21:44 +0800
committerFelix Lange <fjl@twurst.com>2014-11-22 04:52:45 +0800
commitf38052c499c1fee61423efeddb1f52677f1442e9 (patch)
tree6cc4c4e9739d61edeba9dc62781b2ebdeb0faf11 /p2p/server.go
parent8cf9ed0ea588e97f2baf0f834248727e8fbca18f (diff)
downloadgo-tangerine-f38052c499c1fee61423efeddb1f52677f1442e9.tar
go-tangerine-f38052c499c1fee61423efeddb1f52677f1442e9.tar.gz
go-tangerine-f38052c499c1fee61423efeddb1f52677f1442e9.tar.bz2
go-tangerine-f38052c499c1fee61423efeddb1f52677f1442e9.tar.lz
go-tangerine-f38052c499c1fee61423efeddb1f52677f1442e9.tar.xz
go-tangerine-f38052c499c1fee61423efeddb1f52677f1442e9.tar.zst
go-tangerine-f38052c499c1fee61423efeddb1f52677f1442e9.zip
p2p: rework protocol API
Diffstat (limited to 'p2p/server.go')
-rw-r--r--p2p/server.go150
1 files changed, 77 insertions, 73 deletions
diff --git a/p2p/server.go b/p2p/server.go
index 91bc4af5c..54d2cde30 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -80,12 +80,12 @@ type Server struct {
quit chan chan bool
peersLock sync.RWMutex
- maxPeers int
- peers []*Peer
- peerSlots chan int
- peersTable map[string]int
- peersMsg *Msg
- peerCount int
+ maxPeers int
+ peers []*Peer
+ peerSlots chan int
+ peersTable map[string]int
+ peerCount int
+ cachedEncodedPeers []byte
peerConnect chan net.Addr
peerDisconnect chan DisconnectRequest
@@ -147,27 +147,6 @@ func (self *Server) ClientIdentity() ClientIdentity {
return self.identity
}
-func (self *Server) PeersMessage() (msg *Msg, err error) {
- // TODO: memoize and reset when peers change
- self.peersLock.RLock()
- defer self.peersLock.RUnlock()
- msg = self.peersMsg
- if msg == nil {
- var peerData []interface{}
- for _, i := range self.peersTable {
- peer := self.peers[i]
- peerData = append(peerData, peer.Encode())
- }
- if len(peerData) == 0 {
- err = fmt.Errorf("no peers")
- } else {
- msg, err = NewMsg(PeersMsg, peerData...)
- self.peersMsg = msg //memoize
- }
- }
- return
-}
-
func (self *Server) Peers() (peers []*Peer) {
self.peersLock.RLock()
defer self.peersLock.RUnlock()
@@ -185,8 +164,6 @@ func (self *Server) PeerCount() int {
return self.peerCount
}
-var getPeersMsg, _ = NewMsg(GetPeersMsg)
-
func (self *Server) PeerConnect(addr net.Addr) {
// TODO: should buffer, filter and uniq
// send GetPeersMsg if not blocking
@@ -209,12 +186,21 @@ func (self *Server) Handlers() Handlers {
return self.handlers
}
-func (self *Server) Broadcast(protocol string, msg *Msg) {
+func (self *Server) Broadcast(protocol string, code MsgCode, data ...interface{}) {
+ var payload []byte
+ if data != nil {
+ payload = encodePayload(data...)
+ }
self.peersLock.RLock()
defer self.peersLock.RUnlock()
for _, peer := range self.peers {
if peer != nil {
- peer.Write(protocol, msg)
+ var msg = Msg{Code: code}
+ if data != nil {
+ msg.Payload = bytes.NewReader(payload)
+ msg.Size = uint32(len(payload))
+ }
+ peer.messenger.writeProtoMsg(protocol, msg)
}
}
}
@@ -296,7 +282,7 @@ FOR:
select {
case slot := <-self.peerSlots:
i++
- fmt.Printf("%v: found slot %v", i, slot)
+ fmt.Printf("%v: found slot %v\n", i, slot)
if i == self.maxPeers {
break FOR
}
@@ -358,70 +344,68 @@ func (self *Server) outboundPeerHandler(dialer Dialer) {
}
// check if peer address already connected
-func (self *Server) connected(address net.Addr) (err error) {
+func (self *Server) isConnected(address net.Addr) bool {
self.peersLock.RLock()
defer self.peersLock.RUnlock()
- // fmt.Printf("address: %v\n", address)
- slot, found := self.peersTable[address.String()]
- if found {
- err = fmt.Errorf("already connected as peer %v (%v)", slot, address)
- }
- return
+ _, found := self.peersTable[address.String()]
+ return found
}
// connect to peer via listener.Accept()
func (self *Server) connectInboundPeer(listener net.Listener, slot int) {
var address net.Addr
conn, err := listener.Accept()
- if err == nil {
- address = conn.RemoteAddr()
- err = self.connected(address)
- if err != nil {
- conn.Close()
- }
- }
if err != nil {
logger.Debugln(err)
self.peerSlots <- slot
- } else {
- fmt.Printf("adding %v\n", address)
- go self.addPeer(conn, address, true, slot)
+ return
+ }
+ address = conn.RemoteAddr()
+ // XXX: this won't work because the remote socket
+ // address does not identify the peer. we should
+ // probably get rid of this check and rely on public
+ // key detection in the base protocol.
+ if self.isConnected(address) {
+ conn.Close()
+ self.peerSlots <- slot
+ return
}
+ fmt.Printf("adding %v\n", address)
+ go self.addPeer(conn, address, true, slot)
}
// connect to peer via dial out
func (self *Server) connectOutboundPeer(dialer Dialer, address net.Addr, slot int) {
- var conn net.Conn
- err := self.connected(address)
- if err == nil {
- conn, err = dialer.Dial(address.Network(), address.String())
+ if self.isConnected(address) {
+ return
}
+ conn, err := dialer.Dial(address.Network(), address.String())
if err != nil {
- logger.Debugln(err)
self.peerSlots <- slot
- } else {
- go self.addPeer(conn, address, false, slot)
+ return
}
+ go self.addPeer(conn, address, false, slot)
}
// creates the new peer object and inserts it into its slot
-func (self *Server) addPeer(conn net.Conn, address net.Addr, inbound bool, slot int) {
+func (self *Server) addPeer(conn net.Conn, address net.Addr, inbound bool, slot int) *Peer {
self.peersLock.Lock()
defer self.peersLock.Unlock()
if self.closed {
fmt.Println("oopsy, not no longer need peer")
conn.Close() //oopsy our bad
self.peerSlots <- slot // release slot
- } else {
- peer := NewPeer(conn, address, inbound, self)
- self.peers[slot] = peer
- self.peersTable[address.String()] = slot
- self.peerCount++
- // reset peersmsg
- self.peersMsg = nil
- fmt.Printf("added peer %v %v (slot %v)\n", address, peer, slot)
- peer.Start()
+ return nil
}
+ logger.Infoln("adding new peer", address)
+ peer := NewPeer(conn, address, inbound, self)
+ self.peers[slot] = peer
+ self.peersTable[address.String()] = slot
+ self.peerCount++
+ self.cachedEncodedPeers = nil
+ fmt.Printf("added peer %v %v (slot %v)\n", address, peer, slot)
+ peer.Start()
+ return peer
}
// removes peer: sending disconnect msg, stop peer, remove rom list/table, release slot
@@ -441,13 +425,12 @@ func (self *Server) removePeer(request DisconnectRequest) {
self.peerCount--
self.peers[slot] = nil
delete(self.peersTable, address.String())
- // reset peersmsg
- self.peersMsg = nil
+ self.cachedEncodedPeers = nil
fmt.Printf("removed peer %v (slot %v)\n", peer, slot)
self.peersLock.Unlock()
// sending disconnect message
- disconnectMsg, _ := NewMsg(DiscMsg, request.reason)
+ disconnectMsg := NewMsg(discMsg, request.reason)
peer.Write("", disconnectMsg)
// be nice and wait
time.Sleep(disconnectGracePeriod * time.Second)
@@ -459,11 +442,32 @@ func (self *Server) removePeer(request DisconnectRequest) {
self.peerSlots <- slot
}
+// encodedPeerList returns an RLP-encoded list of peers.
+// the returned slice will be nil if there are no peers.
+func (self *Server) encodedPeerList() []byte {
+ // TODO: memoize and reset when peers change
+ self.peersLock.RLock()
+ defer self.peersLock.RUnlock()
+ if self.cachedEncodedPeers == nil && self.peerCount > 0 {
+ var peerData []interface{}
+ for _, i := range self.peersTable {
+ peer := self.peers[i]
+ peerData = append(peerData, peer.Encode())
+ }
+ self.cachedEncodedPeers = encodePayload(peerData)
+ }
+ return self.cachedEncodedPeers
+}
+
// fix handshake message to push to peers
-func (self *Server) Handshake() *Msg {
- fmt.Println(self.identity.Pubkey()[1:])
- msg, _ := NewMsg(HandshakeMsg, P2PVersion, []byte(self.identity.String()), []interface{}{self.protocols}, self.port, self.identity.Pubkey()[1:])
- return msg
+func (self *Server) handshakeMsg() Msg {
+ return NewMsg(handshakeMsg,
+ p2pVersion,
+ []byte(self.identity.String()),
+ []interface{}{self.protocols},
+ self.port,
+ self.identity.Pubkey()[1:],
+ )
}
func (self *Server) RegisterPubkey(candidate *Peer, pubkey []byte) error {