diff options
author | zelig <viktor.tron@gmail.com> | 2014-10-23 23:57:54 +0800 |
---|---|---|
committer | zelig <viktor.tron@gmail.com> | 2014-10-23 23:57:54 +0800 |
commit | 771fbcc02e6d10cdf4cda2e8ec8ea23f11066feb (patch) | |
tree | 15a966dbe15e2f8388f69b396e613c7759b06f6d /p2p/connection.go | |
parent | 119c5b40a7ed1aea1c871c0cb56956b8ef9303d9 (diff) | |
download | go-tangerine-771fbcc02e6d10cdf4cda2e8ec8ea23f11066feb.tar go-tangerine-771fbcc02e6d10cdf4cda2e8ec8ea23f11066feb.tar.gz go-tangerine-771fbcc02e6d10cdf4cda2e8ec8ea23f11066feb.tar.bz2 go-tangerine-771fbcc02e6d10cdf4cda2e8ec8ea23f11066feb.tar.lz go-tangerine-771fbcc02e6d10cdf4cda2e8ec8ea23f11066feb.tar.xz go-tangerine-771fbcc02e6d10cdf4cda2e8ec8ea23f11066feb.tar.zst go-tangerine-771fbcc02e6d10cdf4cda2e8ec8ea23f11066feb.zip |
initial commit of p2p package
Diffstat (limited to 'p2p/connection.go')
-rw-r--r-- | p2p/connection.go | 275 |
1 files changed, 275 insertions, 0 deletions
diff --git a/p2p/connection.go b/p2p/connection.go new file mode 100644 index 000000000..e999cbe55 --- /dev/null +++ b/p2p/connection.go @@ -0,0 +1,275 @@ +package p2p + +import ( + "bytes" + // "fmt" + "net" + "time" + + "github.com/ethereum/eth-go/ethutil" +) + +type Connection struct { + conn net.Conn + // conn NetworkConnection + timeout time.Duration + in chan []byte + out chan []byte + err chan *PeerError + closingIn chan chan bool + closingOut chan chan bool +} + +// const readBufferLength = 2 //for testing + +const readBufferLength = 1440 +const partialsQueueSize = 10 +const maxPendingQueueSize = 1 +const defaultTimeout = 500 + +var magicToken = []byte{34, 64, 8, 145} + +func (self *Connection) Open() { + go self.startRead() + go self.startWrite() +} + +func (self *Connection) Close() { + self.closeIn() + self.closeOut() +} + +func (self *Connection) closeIn() { + errc := make(chan bool) + self.closingIn <- errc + <-errc +} + +func (self *Connection) closeOut() { + errc := make(chan bool) + self.closingOut <- errc + <-errc +} + +func NewConnection(conn net.Conn, errchan chan *PeerError) *Connection { + return &Connection{ + conn: conn, + timeout: defaultTimeout, + in: make(chan []byte), + out: make(chan []byte), + err: errchan, + closingIn: make(chan chan bool, 1), + closingOut: make(chan chan bool, 1), + } +} + +func (self *Connection) Read() <-chan []byte { + return self.in +} + +func (self *Connection) Write() chan<- []byte { + return self.out +} + +func (self *Connection) Error() <-chan *PeerError { + return self.err +} + +func (self *Connection) startRead() { + payloads := make(chan []byte) + done := make(chan *PeerError) + pending := [][]byte{} + var head []byte + var wait time.Duration // initally 0 (no delay) + read := time.After(wait * time.Millisecond) + + for { + // if pending empty, nil channel blocks + var in chan []byte + if len(pending) > 0 { + in = self.in // enable send case + head = pending[0] + } else { + in = nil + } + + select { + case <-read: + go self.read(payloads, done) + case err := <-done: + if err == nil { // no error but nothing to read + if len(pending) < maxPendingQueueSize { + wait = 100 + } else if wait == 0 { + wait = 100 + } else { + wait = 2 * wait + } + } else { + self.err <- err // report error + wait = 100 + } + read = time.After(wait * time.Millisecond) + case payload := <-payloads: + pending = append(pending, payload) + if len(pending) < maxPendingQueueSize { + wait = 0 + } else { + wait = 100 + } + read = time.After(wait * time.Millisecond) + case in <- head: + pending = pending[1:] + case errc := <-self.closingIn: + errc <- true + close(self.in) + return + } + + } +} + +func (self *Connection) startWrite() { + pending := [][]byte{} + done := make(chan *PeerError) + writing := false + for { + if len(pending) > 0 && !writing { + writing = true + go self.write(pending[0], done) + } + select { + case payload := <-self.out: + pending = append(pending, payload) + case err := <-done: + if err == nil { + pending = pending[1:] + writing = false + } else { + self.err <- err // report error + } + case errc := <-self.closingOut: + errc <- true + close(self.out) + return + } + } +} + +func pack(payload []byte) (packet []byte) { + length := ethutil.NumberToBytes(uint32(len(payload)), 32) + // return error if too long? + // Write magic token and payload length (first 8 bytes) + packet = append(magicToken, length...) + packet = append(packet, payload...) + return +} + +func avoidPanic(done chan *PeerError) { + if rec := recover(); rec != nil { + err := NewPeerError(MiscError, " %v", rec) + logger.Debugln(err) + done <- err + } +} + +func (self *Connection) write(payload []byte, done chan *PeerError) { + defer avoidPanic(done) + var err *PeerError + _, ok := self.conn.Write(pack(payload)) + if ok != nil { + err = NewPeerError(WriteError, " %v", ok) + logger.Debugln(err) + } + done <- err +} + +func (self *Connection) read(payloads chan []byte, done chan *PeerError) { + //defer avoidPanic(done) + + partials := make(chan []byte, partialsQueueSize) + errc := make(chan *PeerError) + go self.readPartials(partials, errc) + + packet := []byte{} + length := 8 + start := true + var err *PeerError +out: + for { + // appends partials read via connection until packet is + // - either parseable (>=8bytes) + // - or complete (payload fully consumed) + for len(packet) < length { + partial, ok := <-partials + if !ok { // partials channel is closed + err = <-errc + if err == nil && len(packet) > 0 { + if start { + err = NewPeerError(PacketTooShort, "%v", packet) + } else { + err = NewPeerError(PayloadTooShort, "%d < %d", len(packet), length) + } + } + break out + } + packet = append(packet, partial...) + } + if start { + // at least 8 bytes read, can validate packet + if bytes.Compare(magicToken, packet[:4]) != 0 { + err = NewPeerError(MagicTokenMismatch, " received %v", packet[:4]) + break + } + length = int(ethutil.BytesToNumber(packet[4:8])) + packet = packet[8:] + + if length > 0 { + start = false // now consuming payload + } else { //penalize peer but read on + self.err <- NewPeerError(EmptyPayload, "") + length = 8 + } + } else { + // packet complete (payload fully consumed) + payloads <- packet[:length] + packet = packet[length:] // resclice packet + start = true + length = 8 + } + } + + // this stops partials read via the connection, should we? + //if err != nil { + // select { + // case errc <- err + // default: + //} + done <- err +} + +func (self *Connection) readPartials(partials chan []byte, errc chan *PeerError) { + defer close(partials) + for { + // Give buffering some time + self.conn.SetReadDeadline(time.Now().Add(self.timeout * time.Millisecond)) + buffer := make([]byte, readBufferLength) + // read partial from connection + bytesRead, err := self.conn.Read(buffer) + if err == nil || err.Error() == "EOF" { + if bytesRead > 0 { + partials <- buffer[:bytesRead] + } + if err != nil && err.Error() == "EOF" { + break + } + } else { + // unexpected error, report to errc + err := NewPeerError(ReadError, " %v", err) + logger.Debugln(err) + errc <- err + return // will close partials channel + } + } + close(errc) +} |