From 4914a78c8c650d7fc74570f25a682598aaeb6973 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 31 Oct 2014 14:53:42 +0100 Subject: ethwire => wire --- wire/.gitignore | 12 +++ wire/README.md | 36 ++++++++ wire/client_identity.go | 56 ++++++++++++ wire/client_identity_test.go | 30 +++++++ wire/messages2.go | 199 +++++++++++++++++++++++++++++++++++++++++++ wire/messaging.go | 179 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 512 insertions(+) create mode 100644 wire/.gitignore create mode 100644 wire/README.md create mode 100644 wire/client_identity.go create mode 100644 wire/client_identity_test.go create mode 100644 wire/messages2.go create mode 100644 wire/messaging.go (limited to 'wire') diff --git a/wire/.gitignore b/wire/.gitignore new file mode 100644 index 000000000..f725d58d1 --- /dev/null +++ b/wire/.gitignore @@ -0,0 +1,12 @@ +# See http://help.github.com/ignore-files/ for more about ignoring files. +# +# If you find yourself ignoring temporary files generated by your text editor +# or operating system, you probably want to add a global ignore instead: +# git config --global core.excludesfile ~/.gitignore_global + +/tmp +*/**/*un~ +*un~ +.DS_Store +*/**/.DS_Store + diff --git a/wire/README.md b/wire/README.md new file mode 100644 index 000000000..7f63688b3 --- /dev/null +++ b/wire/README.md @@ -0,0 +1,36 @@ +# ethwire + +The ethwire package contains the ethereum wire protocol. The ethwire +package is required to write and read from the ethereum network. + +# Installation + +`go get github.com/ethereum/ethwire-go` + +# Messaging overview + +The Ethereum Wire protocol defines the communication between the nodes +running Ethereum. Further reader reading can be done on the +[Wiki](http://wiki.ethereum.org/index.php/Wire_Protocol). + +# Reading Messages + +```go +// Read and validate the next eth message from the provided connection. +// returns a error message with the details. +msg, err := ethwire.ReadMessage(conn) +if err != nil { + // Handle error +} +``` + +# Writing Messages + +```go +// Constructs a message which can be interpreted by the eth network. +// Write the inventory to network +err := ethwire.WriteMessage(conn, &Msg{ + Type: ethwire.MsgInvTy, + Data : []interface{}{...}, +}) +``` diff --git a/wire/client_identity.go b/wire/client_identity.go new file mode 100644 index 000000000..0a268024a --- /dev/null +++ b/wire/client_identity.go @@ -0,0 +1,56 @@ +package wire + +import ( + "fmt" + "runtime" +) + +// should be used in Peer handleHandshake, incorporate Caps, ProtocolVersion, Pubkey etc. +type ClientIdentity interface { + String() string +} + +type SimpleClientIdentity struct { + clientIdentifier string + version string + customIdentifier string + os string + implementation string +} + +func NewSimpleClientIdentity(clientIdentifier string, version string, customIdentifier string) *SimpleClientIdentity { + clientIdentity := &SimpleClientIdentity{ + clientIdentifier: clientIdentifier, + version: version, + customIdentifier: customIdentifier, + os: runtime.GOOS, + implementation: runtime.Version(), + } + + return clientIdentity +} + +func (c *SimpleClientIdentity) init() { +} + +func (c *SimpleClientIdentity) String() string { + var id string + if len(c.customIdentifier) > 0 { + id = "/" + c.customIdentifier + } + + return fmt.Sprintf("%s/v%s%s/%s/%s", + c.clientIdentifier, + c.version, + id, + c.os, + c.implementation) +} + +func (c *SimpleClientIdentity) SetCustomIdentifier(customIdentifier string) { + c.customIdentifier = customIdentifier +} + +func (c *SimpleClientIdentity) GetCustomIdentifier() string { + return c.customIdentifier +} diff --git a/wire/client_identity_test.go b/wire/client_identity_test.go new file mode 100644 index 000000000..c0e7a0159 --- /dev/null +++ b/wire/client_identity_test.go @@ -0,0 +1,30 @@ +package wire + +import ( + "fmt" + "runtime" + "testing" +) + +func TestClientIdentity(t *testing.T) { + clientIdentity := NewSimpleClientIdentity("Ethereum(G)", "0.5.16", "test") + clientString := clientIdentity.String() + expected := fmt.Sprintf("Ethereum(G)/v0.5.16/test/%s/%s", runtime.GOOS, runtime.Version()) + if clientString != expected { + t.Errorf("Expected clientIdentity to be %q, got %q", expected, clientString) + } + customIdentifier := clientIdentity.GetCustomIdentifier() + if customIdentifier != "test" { + t.Errorf("Expected clientIdentity.GetCustomIdentifier() to be 'test', got %q", customIdentifier) + } + clientIdentity.SetCustomIdentifier("test2") + customIdentifier = clientIdentity.GetCustomIdentifier() + if customIdentifier != "test2" { + t.Errorf("Expected clientIdentity.GetCustomIdentifier() to be 'test2', got %q", customIdentifier) + } + clientString = clientIdentity.String() + expected = fmt.Sprintf("Ethereum(G)/v0.5.16/test2/%s/%s", runtime.GOOS, runtime.Version()) + if clientString != expected { + t.Errorf("Expected clientIdentity to be %q, got %q", expected, clientString) + } +} diff --git a/wire/messages2.go b/wire/messages2.go new file mode 100644 index 000000000..acbd9e0d5 --- /dev/null +++ b/wire/messages2.go @@ -0,0 +1,199 @@ +package wire + +import ( + "bytes" + "errors" + "fmt" + "net" + "time" + + "github.com/ethereum/go-ethereum/ethutil" +) + +// The connection object allows you to set up a connection to the Ethereum network. +// The Connection object takes care of all encoding and sending objects properly over +// the network. +type Connection struct { + conn net.Conn + nTimeout time.Duration + pendingMessages Messages +} + +// Create a new connection to the Ethereum network +func New(conn net.Conn) *Connection { + return &Connection{conn: conn, nTimeout: 500} +} + +// Read, reads from the network. It will block until the next message is received. +func (self *Connection) Read() *Msg { + if len(self.pendingMessages) == 0 { + self.readMessages() + } + + ret := self.pendingMessages[0] + self.pendingMessages = self.pendingMessages[1:] + + return ret + +} + +// Write to the Ethereum network specifying the type of the message and +// the data. Data can be of type RlpEncodable or []interface{}. Returns +// nil or if something went wrong an error. +func (self *Connection) Write(typ MsgType, v ...interface{}) error { + var pack []byte + + slice := [][]interface{}{[]interface{}{byte(typ)}} + for _, value := range v { + if encodable, ok := value.(ethutil.RlpEncodeDecode); ok { + slice = append(slice, encodable.RlpValue()) + } else if raw, ok := value.([]interface{}); ok { + slice = append(slice, raw) + } else { + panic(fmt.Sprintf("Unable to 'write' object of type %T", value)) + } + } + + // Encode the type and the (RLP encoded) data for sending over the wire + encoded := ethutil.NewValue(slice).Encode() + payloadLength := ethutil.NumberToBytes(uint32(len(encoded)), 32) + + // Write magic token and payload length (first 8 bytes) + pack = append(MagicToken, payloadLength...) + pack = append(pack, encoded...) + + // Write to the connection + _, err := self.conn.Write(pack) + if err != nil { + return err + } + + return nil +} + +func (self *Connection) readMessage(data []byte) (msg *Msg, remaining []byte, done bool, err error) { + if len(data) == 0 { + return nil, nil, true, nil + } + + if len(data) <= 8 { + return nil, remaining, false, errors.New("Invalid message") + } + + // Check if the received 4 first bytes are the magic token + if bytes.Compare(MagicToken, data[:4]) != 0 { + return nil, nil, false, fmt.Errorf("MagicToken mismatch. Received %v", data[:4]) + } + + messageLength := ethutil.BytesToNumber(data[4:8]) + remaining = data[8+messageLength:] + if int(messageLength) > len(data[8:]) { + return nil, nil, false, fmt.Errorf("message length %d, expected %d", len(data[8:]), messageLength) + } + + message := data[8 : 8+messageLength] + decoder := ethutil.NewValueFromBytes(message) + // Type of message + t := decoder.Get(0).Uint() + // Actual data + d := decoder.SliceFrom(1) + + msg = &Msg{ + Type: MsgType(t), + Data: d, + } + + return +} + +// The basic message reader waits for data on the given connection, decoding +// and doing a few sanity checks such as if there's a data type and +// unmarhals the given data +func (self *Connection) readMessages() (err error) { + // The recovering function in case anything goes horribly wrong + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("wire.ReadMessage error: %v", r) + } + }() + + // Buff for writing network message to + //buff := make([]byte, 1440) + var buff []byte + var totalBytes int + for { + // Give buffering some time + self.conn.SetReadDeadline(time.Now().Add(self.nTimeout * time.Millisecond)) + // Create a new temporarily buffer + b := make([]byte, 1440) + // Wait for a message from this peer + n, _ := self.conn.Read(b) + if err != nil && n == 0 { + if err.Error() != "EOF" { + fmt.Println("err now", err) + return err + } else { + break + } + + // Messages can't be empty + } else if n == 0 { + break + } + + buff = append(buff, b[:n]...) + totalBytes += n + } + + // Reslice buffer + buff = buff[:totalBytes] + msg, remaining, done, err := self.readMessage(buff) + for ; done != true; msg, remaining, done, err = self.readMessage(remaining) { + //log.Println("rx", msg) + + if msg != nil { + self.pendingMessages = append(self.pendingMessages, msg) + } + } + + return +} + +func ReadMessage(data []byte) (msg *Msg, remaining []byte, done bool, err error) { + if len(data) == 0 { + return nil, nil, true, nil + } + + if len(data) <= 8 { + return nil, remaining, false, errors.New("Invalid message") + } + + // Check if the received 4 first bytes are the magic token + if bytes.Compare(MagicToken, data[:4]) != 0 { + return nil, nil, false, fmt.Errorf("MagicToken mismatch. Received %v", data[:4]) + } + + messageLength := ethutil.BytesToNumber(data[4:8]) + remaining = data[8+messageLength:] + if int(messageLength) > len(data[8:]) { + return nil, nil, false, fmt.Errorf("message length %d, expected %d", len(data[8:]), messageLength) + } + + message := data[8 : 8+messageLength] + decoder := ethutil.NewValueFromBytes(message) + // Type of message + t := decoder.Get(0).Uint() + // Actual data + d := decoder.SliceFrom(1) + + msg = &Msg{ + Type: MsgType(t), + Data: d, + } + + return +} + +func bufferedRead(conn net.Conn) ([]byte, error) { + return nil, nil +} diff --git a/wire/messaging.go b/wire/messaging.go new file mode 100644 index 000000000..b919aa0f4 --- /dev/null +++ b/wire/messaging.go @@ -0,0 +1,179 @@ +// Package wire provides low level access to the Ethereum network and allows +// you to broadcast data over the network. +package wire + +import ( + "bytes" + "fmt" + "net" + "time" + + "github.com/ethereum/go-ethereum/ethutil" +) + +// Connection interface describing the methods required to implement the wire protocol. +type Conn interface { + Write(typ MsgType, v ...interface{}) error + Read() *Msg +} + +// The magic token which should be the first 4 bytes of every message and can be used as separator between messages. +var MagicToken = []byte{34, 64, 8, 145} + +type MsgType byte + +const ( + // Values are given explicitly instead of by iota because these values are + // defined by the wire protocol spec; it is easier for humans to ensure + // correctness when values are explicit. + MsgHandshakeTy = 0x00 + MsgDiscTy = 0x01 + MsgPingTy = 0x02 + MsgPongTy = 0x03 + MsgGetPeersTy = 0x04 + MsgPeersTy = 0x05 + + MsgStatusTy = 0x10 + //MsgGetTxsTy = 0x11 + MsgTxTy = 0x12 + MsgGetBlockHashesTy = 0x13 + MsgBlockHashesTy = 0x14 + MsgGetBlocksTy = 0x15 + MsgBlockTy = 0x16 + MsgNewBlockTy = 0x17 +) + +var msgTypeToString = map[MsgType]string{ + MsgHandshakeTy: "Handshake", + MsgDiscTy: "Disconnect", + MsgPingTy: "Ping", + MsgPongTy: "Pong", + MsgGetPeersTy: "Get peers", + MsgStatusTy: "Status", + MsgPeersTy: "Peers", + MsgTxTy: "Transactions", + MsgBlockTy: "Blocks", + //MsgGetTxsTy: "Get Txs", + MsgGetBlockHashesTy: "Get block hashes", + MsgBlockHashesTy: "Block hashes", + MsgGetBlocksTy: "Get blocks", +} + +func (mt MsgType) String() string { + return msgTypeToString[mt] +} + +type Msg struct { + Type MsgType // Specifies how the encoded data should be interpreted + //Data []byte + Data *ethutil.Value +} + +func NewMessage(msgType MsgType, data interface{}) *Msg { + return &Msg{ + Type: msgType, + Data: ethutil.NewValue(data), + } +} + +type Messages []*Msg + +// The basic message reader waits for data on the given connection, decoding +// and doing a few sanity checks such as if there's a data type and +// unmarhals the given data +func ReadMessages(conn net.Conn) (msgs []*Msg, err error) { + // The recovering function in case anything goes horribly wrong + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("wire.ReadMessage error: %v", r) + } + }() + + var ( + buff []byte + messages [][]byte + msgLength int + ) + + for { + // Give buffering some time + conn.SetReadDeadline(time.Now().Add(5 * time.Millisecond)) + // Create a new temporarily buffer + b := make([]byte, 1440) + n, _ := conn.Read(b) + if err != nil && n == 0 { + if err.Error() != "EOF" { + fmt.Println("err now", err) + return nil, err + } else { + break + } + } + + if n == 0 && len(buff) == 0 { + // If there's nothing on the wire wait for a bit + time.Sleep(200 * time.Millisecond) + + continue + } + + buff = append(buff, b[:n]...) + if msgLength == 0 { + // Check if the received 4 first bytes are the magic token + if bytes.Compare(MagicToken, buff[:4]) != 0 { + return nil, fmt.Errorf("MagicToken mismatch. Received %v", buff[:4]) + } + + // Read the length of the message + msgLength = int(ethutil.BytesToNumber(buff[4:8])) + + // Remove the token and length + buff = buff[8:] + } + + if len(buff) >= msgLength { + messages = append(messages, buff[:msgLength]) + buff = buff[msgLength:] + msgLength = 0 + + if len(buff) == 0 { + break + } + } + } + + for _, m := range messages { + decoder := ethutil.NewValueFromBytes(m) + // Type of message + t := decoder.Get(0).Uint() + // Actual data + d := decoder.SliceFrom(1) + + msgs = append(msgs, &Msg{Type: MsgType(t), Data: d}) + } + + return +} + +// The basic message writer takes care of writing data over the given +// connection and does some basic error checking +func WriteMessage(conn net.Conn, msg *Msg) error { + var pack []byte + + // Encode the type and the (RLP encoded) data for sending over the wire + encoded := ethutil.NewValue(append([]interface{}{byte(msg.Type)}, msg.Data.Slice()...)).Encode() + payloadLength := ethutil.NumberToBytes(uint32(len(encoded)), 32) + + // Write magic token and payload length (first 8 bytes) + pack = append(MagicToken, payloadLength...) + pack = append(pack, encoded...) + //fmt.Printf("payload %v (%v) %q\n", msg.Type, conn.RemoteAddr(), encoded) + + // Write to the connection + _, err := conn.Write(pack) + if err != nil { + return err + } + + return nil +} -- cgit v1.2.3