package p2p

import (
	"bytes"
	// "fmt"
	"net"
	"time"

	"github.com/ethereum/eth-go/ethutil"
)

type Connection struct {
	conn net.Conn
	// conn       NetworkConnection
	timeout    time.Duration
	in         chan []byte
	out        chan []byte
	err        chan *PeerError
	closingIn  chan chan bool
	closingOut chan chan bool
}

// const readBufferLength = 2 //for testing

const readBufferLength = 1440
const partialsQueueSize = 10
const maxPendingQueueSize = 1
const defaultTimeout = 500

var magicToken = []byte{34, 64, 8, 145}

func (self *Connection) Open() {
	go self.startRead()
	go self.startWrite()
}

func (self *Connection) Close() {
	self.closeIn()
	self.closeOut()
}

func (self *Connection) closeIn() {
	errc := make(chan bool)
	self.closingIn <- errc
	<-errc
}

func (self *Connection) closeOut() {
	errc := make(chan bool)
	self.closingOut <- errc
	<-errc
}

func NewConnection(conn net.Conn, errchan chan *PeerError) *Connection {
	return &Connection{
		conn:       conn,
		timeout:    defaultTimeout,
		in:         make(chan []byte),
		out:        make(chan []byte),
		err:        errchan,
		closingIn:  make(chan chan bool, 1),
		closingOut: make(chan chan bool, 1),
	}
}

func (self *Connection) Read() <-chan []byte {
	return self.in
}

func (self *Connection) Write() chan<- []byte {
	return self.out
}

func (self *Connection) Error() <-chan *PeerError {
	return self.err
}

func (self *Connection) startRead() {
	payloads := make(chan []byte)
	done := make(chan *PeerError)
	pending := [][]byte{}
	var head []byte
	var wait time.Duration // initally 0 (no delay)
	read := time.After(wait * time.Millisecond)

	for {
		// if pending empty, nil channel blocks
		var in chan []byte
		if len(pending) > 0 {
			in = self.in // enable send case
			head = pending[0]
		} else {
			in = nil
		}

		select {
		case <-read:
			go self.read(payloads, done)
		case err := <-done:
			if err == nil { // no error but nothing to read
				if len(pending) < maxPendingQueueSize {
					wait = 100
				} else if wait == 0 {
					wait = 100
				} else {
					wait = 2 * wait
				}
			} else {
				self.err <- err // report error
				wait = 100
			}
			read = time.After(wait * time.Millisecond)
		case payload := <-payloads:
			pending = append(pending, payload)
			if len(pending) < maxPendingQueueSize {
				wait = 0
			} else {
				wait = 100
			}
			read = time.After(wait * time.Millisecond)
		case in <- head:
			pending = pending[1:]
		case errc := <-self.closingIn:
			errc <- true
			close(self.in)
			return
		}

	}
}

func (self *Connection) startWrite() {
	pending := [][]byte{}
	done := make(chan *PeerError)
	writing := false
	for {
		if len(pending) > 0 && !writing {
			writing = true
			go self.write(pending[0], done)
		}
		select {
		case payload := <-self.out:
			pending = append(pending, payload)
		case err := <-done:
			if err == nil {
				pending = pending[1:]
				writing = false
			} else {
				self.err <- err // report error
			}
		case errc := <-self.closingOut:
			errc <- true
			close(self.out)
			return
		}
	}
}

func pack(payload []byte) (packet []byte) {
	length := ethutil.NumberToBytes(uint32(len(payload)), 32)
	// return error if too long?
	// Write magic token and payload length (first 8 bytes)
	packet = append(magicToken, length...)
	packet = append(packet, payload...)
	return
}

func avoidPanic(done chan *PeerError) {
	if rec := recover(); rec != nil {
		err := NewPeerError(MiscError, " %v", rec)
		logger.Debugln(err)
		done <- err
	}
}

func (self *Connection) write(payload []byte, done chan *PeerError) {
	defer avoidPanic(done)
	var err *PeerError
	_, ok := self.conn.Write(pack(payload))
	if ok != nil {
		err = NewPeerError(WriteError, " %v", ok)
		logger.Debugln(err)
	}
	done <- err
}

func (self *Connection) read(payloads chan []byte, done chan *PeerError) {
	//defer avoidPanic(done)

	partials := make(chan []byte, partialsQueueSize)
	errc := make(chan *PeerError)
	go self.readPartials(partials, errc)

	packet := []byte{}
	length := 8
	start := true
	var err *PeerError
out:
	for {
		// appends partials read via connection until packet is
		// - either parseable (>=8bytes)
		// - or complete (payload fully consumed)
		for len(packet) < length {
			partial, ok := <-partials
			if !ok { // partials channel is closed
				err = <-errc
				if err == nil && len(packet) > 0 {
					if start {
						err = NewPeerError(PacketTooShort, "%v", packet)
					} else {
						err = NewPeerError(PayloadTooShort, "%d < %d", len(packet), length)
					}
				}
				break out
			}
			packet = append(packet, partial...)
		}
		if start {
			// at least 8 bytes read, can validate packet
			if bytes.Compare(magicToken, packet[:4]) != 0 {
				err = NewPeerError(MagicTokenMismatch, " received %v", packet[:4])
				break
			}
			length = int(ethutil.BytesToNumber(packet[4:8]))
			packet = packet[8:]

			if length > 0 {
				start = false // now consuming payload
			} else { //penalize peer but read on
				self.err <- NewPeerError(EmptyPayload, "")
				length = 8
			}
		} else {
			// packet complete (payload fully consumed)
			payloads <- packet[:length]
			packet = packet[length:] // resclice packet
			start = true
			length = 8
		}
	}

	// this stops partials read via the connection, should we?
	//if err != nil {
	//  select {
	//    case errc <- err
	//  default:
	//}
	done <- err
}

func (self *Connection) readPartials(partials chan []byte, errc chan *PeerError) {
	defer close(partials)
	for {
		// Give buffering some time
		self.conn.SetReadDeadline(time.Now().Add(self.timeout * time.Millisecond))
		buffer := make([]byte, readBufferLength)
		// read partial from connection
		bytesRead, err := self.conn.Read(buffer)
		if err == nil || err.Error() == "EOF" {
			if bytesRead > 0 {
				partials <- buffer[:bytesRead]
			}
			if err != nil && err.Error() == "EOF" {
				break
			}
		} else {
			// unexpected error, report to errc
			err := NewPeerError(ReadError, " %v", err)
			logger.Debugln(err)
			errc <- err
			return // will close partials channel
		}
	}
	close(errc)
}