aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/connection.go
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/connection.go')
-rw-r--r--p2p/connection.go275
1 files changed, 0 insertions, 275 deletions
diff --git a/p2p/connection.go b/p2p/connection.go
deleted file mode 100644
index be366235d..000000000
--- a/p2p/connection.go
+++ /dev/null
@@ -1,275 +0,0 @@
-package p2p
-
-import (
- "bytes"
- // "fmt"
- "net"
- "time"
-
- "github.com/ethereum/go-ethereum/ethutil"
-)
-
-type Connection struct {
- conn net.Conn
- // conn NetworkConnection
- timeout time.Duration
- in chan []byte
- out chan []byte
- err chan *PeerError
- closingIn chan chan bool
- closingOut chan chan bool
-}
-
-// const readBufferLength = 2 //for testing
-
-const readBufferLength = 1440
-const partialsQueueSize = 10
-const maxPendingQueueSize = 1
-const defaultTimeout = 500
-
-var magicToken = []byte{34, 64, 8, 145}
-
-func (self *Connection) Open() {
- go self.startRead()
- go self.startWrite()
-}
-
-func (self *Connection) Close() {
- self.closeIn()
- self.closeOut()
-}
-
-func (self *Connection) closeIn() {
- errc := make(chan bool)
- self.closingIn <- errc
- <-errc
-}
-
-func (self *Connection) closeOut() {
- errc := make(chan bool)
- self.closingOut <- errc
- <-errc
-}
-
-func NewConnection(conn net.Conn, errchan chan *PeerError) *Connection {
- return &Connection{
- conn: conn,
- timeout: defaultTimeout,
- in: make(chan []byte),
- out: make(chan []byte),
- err: errchan,
- closingIn: make(chan chan bool, 1),
- closingOut: make(chan chan bool, 1),
- }
-}
-
-func (self *Connection) Read() <-chan []byte {
- return self.in
-}
-
-func (self *Connection) Write() chan<- []byte {
- return self.out
-}
-
-func (self *Connection) Error() <-chan *PeerError {
- return self.err
-}
-
-func (self *Connection) startRead() {
- payloads := make(chan []byte)
- done := make(chan *PeerError)
- pending := [][]byte{}
- var head []byte
- var wait time.Duration // initally 0 (no delay)
- read := time.After(wait * time.Millisecond)
-
- for {
- // if pending empty, nil channel blocks
- var in chan []byte
- if len(pending) > 0 {
- in = self.in // enable send case
- head = pending[0]
- } else {
- in = nil
- }
-
- select {
- case <-read:
- go self.read(payloads, done)
- case err := <-done:
- if err == nil { // no error but nothing to read
- if len(pending) < maxPendingQueueSize {
- wait = 100
- } else if wait == 0 {
- wait = 100
- } else {
- wait = 2 * wait
- }
- } else {
- self.err <- err // report error
- wait = 100
- }
- read = time.After(wait * time.Millisecond)
- case payload := <-payloads:
- pending = append(pending, payload)
- if len(pending) < maxPendingQueueSize {
- wait = 0
- } else {
- wait = 100
- }
- read = time.After(wait * time.Millisecond)
- case in <- head:
- pending = pending[1:]
- case errc := <-self.closingIn:
- errc <- true
- close(self.in)
- return
- }
-
- }
-}
-
-func (self *Connection) startWrite() {
- pending := [][]byte{}
- done := make(chan *PeerError)
- writing := false
- for {
- if len(pending) > 0 && !writing {
- writing = true
- go self.write(pending[0], done)
- }
- select {
- case payload := <-self.out:
- pending = append(pending, payload)
- case err := <-done:
- if err == nil {
- pending = pending[1:]
- writing = false
- } else {
- self.err <- err // report error
- }
- case errc := <-self.closingOut:
- errc <- true
- close(self.out)
- return
- }
- }
-}
-
-func pack(payload []byte) (packet []byte) {
- length := ethutil.NumberToBytes(uint32(len(payload)), 32)
- // return error if too long?
- // Write magic token and payload length (first 8 bytes)
- packet = append(magicToken, length...)
- packet = append(packet, payload...)
- return
-}
-
-func avoidPanic(done chan *PeerError) {
- if rec := recover(); rec != nil {
- err := NewPeerError(MiscError, " %v", rec)
- logger.Debugln(err)
- done <- err
- }
-}
-
-func (self *Connection) write(payload []byte, done chan *PeerError) {
- defer avoidPanic(done)
- var err *PeerError
- _, ok := self.conn.Write(pack(payload))
- if ok != nil {
- err = NewPeerError(WriteError, " %v", ok)
- logger.Debugln(err)
- }
- done <- err
-}
-
-func (self *Connection) read(payloads chan []byte, done chan *PeerError) {
- //defer avoidPanic(done)
-
- partials := make(chan []byte, partialsQueueSize)
- errc := make(chan *PeerError)
- go self.readPartials(partials, errc)
-
- packet := []byte{}
- length := 8
- start := true
- var err *PeerError
-out:
- for {
- // appends partials read via connection until packet is
- // - either parseable (>=8bytes)
- // - or complete (payload fully consumed)
- for len(packet) < length {
- partial, ok := <-partials
- if !ok { // partials channel is closed
- err = <-errc
- if err == nil && len(packet) > 0 {
- if start {
- err = NewPeerError(PacketTooShort, "%v", packet)
- } else {
- err = NewPeerError(PayloadTooShort, "%d < %d", len(packet), length)
- }
- }
- break out
- }
- packet = append(packet, partial...)
- }
- if start {
- // at least 8 bytes read, can validate packet
- if bytes.Compare(magicToken, packet[:4]) != 0 {
- err = NewPeerError(MagicTokenMismatch, " received %v", packet[:4])
- break
- }
- length = int(ethutil.BytesToNumber(packet[4:8]))
- packet = packet[8:]
-
- if length > 0 {
- start = false // now consuming payload
- } else { //penalize peer but read on
- self.err <- NewPeerError(EmptyPayload, "")
- length = 8
- }
- } else {
- // packet complete (payload fully consumed)
- payloads <- packet[:length]
- packet = packet[length:] // resclice packet
- start = true
- length = 8
- }
- }
-
- // this stops partials read via the connection, should we?
- //if err != nil {
- // select {
- // case errc <- err
- // default:
- //}
- done <- err
-}
-
-func (self *Connection) readPartials(partials chan []byte, errc chan *PeerError) {
- defer close(partials)
- for {
- // Give buffering some time
- self.conn.SetReadDeadline(time.Now().Add(self.timeout * time.Millisecond))
- buffer := make([]byte, readBufferLength)
- // read partial from connection
- bytesRead, err := self.conn.Read(buffer)
- if err == nil || err.Error() == "EOF" {
- if bytesRead > 0 {
- partials <- buffer[:bytesRead]
- }
- if err != nil && err.Error() == "EOF" {
- break
- }
- } else {
- // unexpected error, report to errc
- err := NewPeerError(ReadError, " %v", err)
- logger.Debugln(err)
- errc <- err
- return // will close partials channel
- }
- }
- close(errc)
-}