aboutsummaryrefslogtreecommitdiffstats
path: root/peer.go
diff options
context:
space:
mode:
Diffstat (limited to 'peer.go')
-rw-r--r--peer.go95
1 files changed, 78 insertions, 17 deletions
diff --git a/peer.go b/peer.go
index 0875c6e45..e3a4f74cb 100644
--- a/peer.go
+++ b/peer.go
@@ -5,6 +5,8 @@ import (
"github.com/ethereum/ethwire-go"
"log"
"net"
+ "sync/atomic"
+ "time"
)
type Peer struct {
@@ -16,8 +18,12 @@ type Peer struct {
outputQueue chan *ethwire.InOutMsg
// Quit channel
quit chan bool
-
- inbound bool // Determines whether it's an inbound or outbound peer
+ // Determines whether it's an inbound or outbound peer
+ inbound bool
+ // Flag for checking the peer's connectivity state
+ connected int32
+ disconnect int32
+ lastSend time.Time
}
func NewPeer(conn net.Conn, server *Server, inbound bool) *Peer {
@@ -27,12 +33,57 @@ func NewPeer(conn net.Conn, server *Server, inbound bool) *Peer {
server: server,
conn: conn,
inbound: inbound,
+ disconnect: 0,
+ connected: 1,
}
}
+func NewOutboundPeer(addr string, server *Server) *Peer {
+ p := &Peer{
+ outputQueue: make(chan *ethwire.InOutMsg, 1), // Buffered chan of 1 is enough
+ quit: make(chan bool),
+ server: server,
+ inbound: false,
+ connected: 0,
+ disconnect: 1,
+ }
+
+ // Set up the connection in another goroutine so we don't block the main thread
+ go func() {
+ conn, err := net.Dial("tcp", addr)
+ if err != nil {
+ p.Stop()
+ }
+ p.conn = conn
+
+ // Atomically set the connection state
+ atomic.StoreInt32(&p.connected, 1)
+ atomic.StoreInt32(&p.disconnect, 0)
+
+ log.Println("Connected to peer ::", conn.RemoteAddr())
+ }()
+
+ return p
+}
+
// Outputs any RLP encoded data to the peer
func (p *Peer) QueueMessage(msg *ethwire.InOutMsg) {
- p.outputQueue <- msg //ethwire.InOutMsg{MsgType: msgType, Nonce: ethutil.RandomUint64(), Data: data}
+ p.outputQueue <- msg
+}
+
+func (p *Peer) writeMessage(msg *ethwire.InOutMsg) {
+ // Ignore the write if we're not connected
+ if atomic.LoadInt32(&p.connected) != 1 {
+ return
+ }
+
+ err := ethwire.WriteMessage(p.conn, msg)
+ if err != nil {
+ log.Println("Can't send message:", err)
+ // Stop the client if there was an error writing to it
+ p.Stop()
+ return
+ }
}
// Outbound message handler. Outbound messages are handled here
@@ -42,28 +93,32 @@ out:
select {
// Main message queue. All outbound messages are processed through here
case msg := <-p.outputQueue:
- // TODO Message checking and handle accordingly
- err := ethwire.WriteMessage(p.conn, msg)
- if err != nil {
- log.Println(err)
-
- // Stop the client if there was an error writing to it
- p.Stop()
- }
+ p.writeMessage(msg)
+ p.lastSend = time.Now()
// Break out of the for loop if a quit message is posted
case <-p.quit:
break out
}
}
+
+clean:
+ // This loop is for draining the output queue and anybody waiting for us
+ for {
+ select {
+ case <- p.outputQueue:
+ // TODO
+ default:
+ break clean
+ }
+ }
}
// Inbound handler. Inbound messages are received here and passed to the appropriate methods
func (p *Peer) HandleInbound() {
- defer p.Stop()
out:
- for {
+ for atomic.LoadInt32(&p.disconnect) == 0 {
// Wait for a message from the peer
msg, err := ethwire.ReadMessage(p.conn)
if err != nil {
@@ -90,8 +145,7 @@ out:
}
}
- // Notify the out handler we're quiting
- p.quit <- true
+ p.Stop()
}
func (p *Peer) Start() {
@@ -111,9 +165,16 @@ func (p *Peer) Start() {
}
func (p *Peer) Stop() {
- p.conn.Close()
+ if atomic.AddInt32(&p.disconnect, 1) != 1 {
+ return
+ }
+
+ close(p.quit)
+ if atomic.LoadInt32(&p.connected) != 0 {
+ p.conn.Close()
+ }
- p.quit <- true
+ log.Println("Peer shutdown")
}
func (p *Peer) pushVersionAck() error {