diff options
Diffstat (limited to 'server.go')
-rw-r--r-- | server.go | 62 |
1 files changed, 48 insertions, 14 deletions
@@ -7,8 +7,8 @@ import ( "github.com/ethereum/ethwire-go" "log" "net" - "time" "sync/atomic" + "time" ) func eachPeer(peers *list.List, callback func(*Peer, *list.Element)) { @@ -20,6 +20,9 @@ func eachPeer(peers *list.List, callback func(*Peer, *list.Element)) { } } +const ( + processReapingTimeout = 60 // TODO increase +) type Server struct { // Channel for shutting down the server @@ -67,6 +70,13 @@ func (s *Server) AddPeer(conn net.Conn) { } } +func (s *Server) ProcessPeerList(addrs []string) { + for _, addr := range addrs { + // TODO Probably requires some sanity checks + s.ConnectToPeer(addr) + } +} + func (s *Server) ConnectToPeer(addr string) error { peer := NewOutboundPeer(addr, s) @@ -75,20 +85,44 @@ func (s *Server) ConnectToPeer(addr string) error { return nil } -func (s *Server) Broadcast(msgType string, data []byte) { +func (s *Server) OutboundPeers() []*Peer { + // Create a new peer slice with at least the length of the total peers + outboundPeers := make([]*Peer, s.peers.Len()) + length := 0 eachPeer(s.peers, func(p *Peer, e *list.Element) { - p.QueueMessage(ethwire.NewMessage(msgType, 0, data)) + if !p.inbound { + outboundPeers[length] = p + length++ + } }) + + return outboundPeers[:length] } -const ( - processReapingTimeout = 10 // TODO increase -) +func (s *Server) InboundPeers() []*Peer { + // Create a new peer slice with at least the length of the total peers + inboundPeers := make([]*Peer, s.peers.Len()) + length := 0 + eachPeer(s.peers, func(p *Peer, e *list.Element) { + if p.inbound { + inboundPeers[length] = p + length++ + } + }) + + return inboundPeers[:length] +} + +func (s *Server) Broadcast(msgType ethwire.MsgType, data []byte) { + eachPeer(s.peers, func(p *Peer, e *list.Element) { + p.QueueMessage(ethwire.NewMessage(msgType, data)) + }) +} func (s *Server) ReapDeadPeers() { for { eachPeer(s.peers, func(p *Peer, e *list.Element) { - if atomic.LoadInt32(&p.disconnect) == 1 { + if atomic.LoadInt32(&p.disconnect) == 1 || (p.inbound && (time.Now().Unix()-p.lastPong) > int64(5*time.Minute)) { log.Println("Dead peer found .. reaping") s.peers.Remove(e) @@ -139,13 +173,13 @@ func (s *Server) Start() { // TMP /* - go func() { - for { - s.Broadcast("block", s.blockManager.bc.GenesisBlock().MarshalRlp()) + go func() { + for { + s.Broadcast("block", s.blockManager.bc.GenesisBlock().RlpEncode()) - time.Sleep(1000 * time.Millisecond) - } - }() + time.Sleep(1000 * time.Millisecond) + } + }() */ } @@ -154,7 +188,7 @@ func (s *Server) Stop() { defer s.db.Close() eachPeer(s.peers, func(p *Peer, e *list.Element) { - p.Stop() + p.Stop() }) s.shutdownChan <- true |