diff options
author | Felix Lange <fjl@twurst.com> | 2014-11-22 04:48:49 +0800 |
---|---|---|
committer | Felix Lange <fjl@twurst.com> | 2014-11-22 04:52:45 +0800 |
commit | 59b63caf5e4de64ceb7dcdf01551a080f53b1672 (patch) | |
tree | a4e79590284c5afe4d6927b422a5092b074e7938 /p2p/messenger.go | |
parent | e4a601c6444afdc11ce0cb80d7fd83116de2c8b9 (diff) | |
download | go-tangerine-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar go-tangerine-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar.gz go-tangerine-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar.bz2 go-tangerine-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar.lz go-tangerine-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar.xz go-tangerine-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar.zst go-tangerine-59b63caf5e4de64ceb7dcdf01551a080f53b1672.zip |
p2p: API cleanup and PoC 7 compatibility
Whoa, one more big commit. I didn't manage to untangle the
changes while working towards compatibility.
Diffstat (limited to 'p2p/messenger.go')
-rw-r--r-- | p2p/messenger.go | 221 |
1 files changed, 0 insertions, 221 deletions
diff --git a/p2p/messenger.go b/p2p/messenger.go deleted file mode 100644 index c7948a9ac..000000000 --- a/p2p/messenger.go +++ /dev/null @@ -1,221 +0,0 @@ -package p2p - -import ( - "bufio" - "bytes" - "fmt" - "io" - "io/ioutil" - "net" - "sync" - "time" -) - -type Handlers map[string]Protocol - -type proto struct { - in chan Msg - maxcode, offset MsgCode - messenger *messenger -} - -func (rw *proto) WriteMsg(msg Msg) error { - if msg.Code >= rw.maxcode { - return NewPeerError(InvalidMsgCode, "not handled") - } - msg.Code += rw.offset - return rw.messenger.writeMsg(msg) -} - -func (rw *proto) ReadMsg() (Msg, error) { - msg, ok := <-rw.in - if !ok { - return msg, io.EOF - } - msg.Code -= rw.offset - return msg, nil -} - -// eofSignal wraps a reader with eof signaling. -// the eof channel is closed when the wrapped reader -// reaches EOF. -type eofSignal struct { - wrapped io.Reader - eof chan struct{} -} - -func (r *eofSignal) Read(buf []byte) (int, error) { - n, err := r.wrapped.Read(buf) - if err != nil { - close(r.eof) // tell messenger that msg has been consumed - } - return n, err -} - -// messenger represents a message-oriented peer connection. -// It keeps track of the set of protocols understood -// by the remote peer. -type messenger struct { - peer *Peer - handlers Handlers - - // the mutex protects the connection - // so only one protocol can write at a time. - writeMu sync.Mutex - conn net.Conn - bufconn *bufio.ReadWriter - - protocolLock sync.RWMutex - protocols map[string]*proto - offsets map[MsgCode]*proto - protoWG sync.WaitGroup - - err chan error - pulse chan bool -} - -func newMessenger(peer *Peer, conn net.Conn, errchan chan error, handlers Handlers) *messenger { - return &messenger{ - conn: conn, - bufconn: bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)), - peer: peer, - handlers: handlers, - protocols: make(map[string]*proto), - err: errchan, - pulse: make(chan bool, 1), - } -} - -func (m *messenger) Start() { - m.protocols[""] = m.startProto(0, "", &baseProtocol{}) - go m.readLoop() -} - -func (m *messenger) Stop() { - m.conn.Close() - m.protoWG.Wait() -} - -const ( - // maximum amount of time allowed for reading a message - msgReadTimeout = 5 * time.Second - - // messages smaller than this many bytes will be read at - // once before passing them to a protocol. - wholePayloadSize = 64 * 1024 -) - -func (m *messenger) readLoop() { - defer m.closeProtocols() - for { - m.conn.SetReadDeadline(time.Now().Add(msgReadTimeout)) - msg, err := readMsg(m.bufconn) - if err != nil { - m.err <- err - return - } - // send ping to heartbeat channel signalling time of last message - m.pulse <- true - proto, err := m.getProto(msg.Code) - if err != nil { - m.err <- err - return - } - if msg.Size <= wholePayloadSize { - // optimization: msg is small enough, read all - // of it and move on to the next message - buf, err := ioutil.ReadAll(msg.Payload) - if err != nil { - m.err <- err - return - } - msg.Payload = bytes.NewReader(buf) - proto.in <- msg - } else { - pr := &eofSignal{msg.Payload, make(chan struct{})} - msg.Payload = pr - proto.in <- msg - <-pr.eof - } - } -} - -func (m *messenger) closeProtocols() { - m.protocolLock.RLock() - for _, p := range m.protocols { - close(p.in) - } - m.protocolLock.RUnlock() -} - -func (m *messenger) startProto(offset MsgCode, name string, impl Protocol) *proto { - proto := &proto{ - in: make(chan Msg), - offset: offset, - maxcode: impl.Offset(), - messenger: m, - } - m.protoWG.Add(1) - go func() { - if err := impl.Start(m.peer, proto); err != nil && err != io.EOF { - logger.Errorf("protocol %q error: %v\n", name, err) - m.err <- err - } - m.protoWG.Done() - }() - return proto -} - -// getProto finds the protocol responsible for handling -// the given message code. -func (m *messenger) getProto(code MsgCode) (*proto, error) { - m.protocolLock.RLock() - defer m.protocolLock.RUnlock() - for _, proto := range m.protocols { - if code >= proto.offset && code < proto.offset+proto.maxcode { - return proto, nil - } - } - return nil, NewPeerError(InvalidMsgCode, "%d", code) -} - -// setProtocols starts all subprotocols shared with the -// remote peer. the protocols must be sorted alphabetically. -func (m *messenger) setRemoteProtocols(protocols []string) { - m.protocolLock.Lock() - defer m.protocolLock.Unlock() - offset := baseProtocolOffset - for _, name := range protocols { - inst, ok := m.handlers[name] - if !ok { - continue // not handled - } - m.protocols[name] = m.startProto(offset, name, inst) - offset += inst.Offset() - } -} - -// writeProtoMsg sends the given message on behalf of the given named protocol. -func (m *messenger) writeProtoMsg(protoName string, msg Msg) error { - m.protocolLock.RLock() - proto, ok := m.protocols[protoName] - m.protocolLock.RUnlock() - if !ok { - return fmt.Errorf("protocol %s not handled by peer", protoName) - } - if msg.Code >= proto.maxcode { - return NewPeerError(InvalidMsgCode, "code %x is out of range for protocol %q", msg.Code, protoName) - } - msg.Code += proto.offset - return m.writeMsg(msg) -} - -// writeMsg writes a message to the connection. -func (m *messenger) writeMsg(msg Msg) error { - m.writeMu.Lock() - defer m.writeMu.Unlock() - if err := writeMsg(m.bufconn, msg); err != nil { - return err - } - return m.bufconn.Flush() -} |