From f6d1bfe45bf3709d7bad40bf563b5c09228622e3 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 14 Feb 2014 23:56:09 +0100 Subject: The great merge --- ethwire/messaging.go | 180 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 180 insertions(+) create mode 100644 ethwire/messaging.go (limited to 'ethwire/messaging.go') diff --git a/ethwire/messaging.go b/ethwire/messaging.go new file mode 100644 index 000000000..651bf4710 --- /dev/null +++ b/ethwire/messaging.go @@ -0,0 +1,180 @@ +package ethwire + +import ( + "bytes" + "errors" + "fmt" + "github.com/ethereum/eth-go/ethutil" + "net" + "time" +) + +// Message: +// [4 bytes token] RLP([TYPE, DATA]) +// Refer to http://wiki.ethereum.org/index.php/Wire_Protocol + +// The magic token which should be the first 4 bytes of every message. +var MagicToken = []byte{34, 64, 8, 145} + +type MsgType byte + +const ( + MsgHandshakeTy = 0x00 + MsgDiscTy = 0x01 + MsgPingTy = 0x02 + MsgPongTy = 0x03 + MsgGetPeersTy = 0x10 + MsgPeersTy = 0x11 + MsgTxTy = 0x12 + MsgBlockTy = 0x13 + MsgGetChainTy = 0x14 + MsgNotInChainTy = 0x15 + + MsgTalkTy = 0xff +) + +var msgTypeToString = map[MsgType]string{ + MsgHandshakeTy: "Handshake", + MsgDiscTy: "Disconnect", + MsgPingTy: "Ping", + MsgPongTy: "Pong", + MsgGetPeersTy: "Get peers", + MsgPeersTy: "Peers", + MsgTxTy: "Transactions", + MsgBlockTy: "Blocks", + MsgGetChainTy: "Get chain", + MsgNotInChainTy: "Not in chain", +} + +func (mt MsgType) String() string { + return msgTypeToString[mt] +} + +type Msg struct { + Type MsgType // Specifies how the encoded data should be interpreted + //Data []byte + Data *ethutil.Value +} + +func NewMessage(msgType MsgType, data interface{}) *Msg { + return &Msg{ + Type: msgType, + Data: ethutil.NewValue(data), + } +} + +func ReadMessage(data []byte) (msg *Msg, remaining []byte, done bool, err error) { + if len(data) == 0 { + return nil, nil, true, nil + } + + if len(data) <= 8 { + return nil, remaining, false, errors.New("Invalid message") + } + + // Check if the received 4 first bytes are the magic token + if bytes.Compare(MagicToken, data[:4]) != 0 { + return nil, nil, false, fmt.Errorf("MagicToken mismatch. Received %v", data[:4]) + } + + messageLength := ethutil.BytesToNumber(data[4:8]) + remaining = data[8+messageLength:] + if int(messageLength) > len(data[8:]) { + return nil, nil, false, fmt.Errorf("message length %d, expected %d", len(data[8:]), messageLength) + } + + message := data[8 : 8+messageLength] + decoder := ethutil.NewValueFromBytes(message) + // Type of message + t := decoder.Get(0).Uint() + // Actual data + d := decoder.SliceFrom(1) + + msg = &Msg{ + Type: MsgType(t), + Data: d, + } + + return +} + +func bufferedRead(conn net.Conn) ([]byte, error) { + return nil, nil +} + +// The basic message reader waits for data on the given connection, decoding +// and doing a few sanity checks such as if there's a data type and +// unmarhals the given data +func ReadMessages(conn net.Conn) (msgs []*Msg, err error) { + // The recovering function in case anything goes horribly wrong + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("ethwire.ReadMessage error: %v", r) + } + }() + + // Buff for writing network message to + //buff := make([]byte, 1440) + var buff []byte + var totalBytes int + for { + // Give buffering some time + conn.SetReadDeadline(time.Now().Add(20 * time.Millisecond)) + // Create a new temporarily buffer + b := make([]byte, 1440) + // Wait for a message from this peer + n, _ := conn.Read(b) + if err != nil && n == 0 { + if err.Error() != "EOF" { + fmt.Println("err now", err) + return nil, err + } else { + fmt.Println("IOF NOW") + break + } + + // Messages can't be empty + } else if n == 0 { + break + } + + buff = append(buff, b[:n]...) + totalBytes += n + } + + // Reslice buffer + buff = buff[:totalBytes] + msg, remaining, done, err := ReadMessage(buff) + for ; done != true; msg, remaining, done, err = ReadMessage(remaining) { + //log.Println("rx", msg) + + if msg != nil { + msgs = append(msgs, msg) + } + } + + return +} + +// The basic message writer takes care of writing data over the given +// connection and does some basic error checking +func WriteMessage(conn net.Conn, msg *Msg) error { + var pack []byte + + // Encode the type and the (RLP encoded) data for sending over the wire + encoded := ethutil.NewValue(append([]interface{}{byte(msg.Type)}, msg.Data.Slice()...)).Encode() + payloadLength := ethutil.NumberToBytes(uint32(len(encoded)), 32) + + // Write magic token and payload length (first 8 bytes) + pack = append(MagicToken, payloadLength...) + pack = append(pack, encoded...) + //fmt.Printf("payload %v (%v) %q\n", msg.Type, conn.RemoteAddr(), encoded) + + // Write to the connection + _, err := conn.Write(pack) + if err != nil { + return err + } + + return nil +} -- cgit v1.2.3 From 357b4bc14c82d206a8c813291fb3ead01ed29041 Mon Sep 17 00:00:00 2001 From: Sam Boyer Date: Tue, 18 Feb 2014 17:24:44 -0500 Subject: Add comment explaining why iota is not used. --- ethwire/messaging.go | 3 +++ 1 file changed, 3 insertions(+) (limited to 'ethwire/messaging.go') diff --git a/ethwire/messaging.go b/ethwire/messaging.go index 651bf4710..185faa341 100644 --- a/ethwire/messaging.go +++ b/ethwire/messaging.go @@ -19,6 +19,9 @@ var MagicToken = []byte{34, 64, 8, 145} type MsgType byte const ( + // Values are given explicitly instead of by iota because these values are + // defined by the wire protocol spec; it is easier for humans to ensure + // correctness when values are explicit. MsgHandshakeTy = 0x00 MsgDiscTy = 0x01 MsgPingTy = 0x02 -- cgit v1.2.3 From b888652201277ab86e9e8c280e75e23ced5e3d38 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 28 Mar 2014 11:20:07 +0100 Subject: Added missing GetTx (0x16) wire message --- ethwire/messaging.go | 2 ++ 1 file changed, 2 insertions(+) (limited to 'ethwire/messaging.go') diff --git a/ethwire/messaging.go b/ethwire/messaging.go index 185faa341..b622376f3 100644 --- a/ethwire/messaging.go +++ b/ethwire/messaging.go @@ -32,6 +32,7 @@ const ( MsgBlockTy = 0x13 MsgGetChainTy = 0x14 MsgNotInChainTy = 0x15 + MsgGetTxsTy = 0x16 MsgTalkTy = 0xff ) @@ -46,6 +47,7 @@ var msgTypeToString = map[MsgType]string{ MsgTxTy: "Transactions", MsgBlockTy: "Blocks", MsgGetChainTy: "Get chain", + MsgGetTxsTy: "Get Txs", MsgNotInChainTy: "Not in chain", } -- cgit v1.2.3 From 7c0df348f86d4ee47111b57b83fb1613e6338e05 Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 14 May 2014 11:52:16 +0200 Subject: Increased deadline --- ethwire/messaging.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'ethwire/messaging.go') diff --git a/ethwire/messaging.go b/ethwire/messaging.go index b622376f3..cc0e7a9a0 100644 --- a/ethwire/messaging.go +++ b/ethwire/messaging.go @@ -69,6 +69,12 @@ func NewMessage(msgType MsgType, data interface{}) *Msg { } func ReadMessage(data []byte) (msg *Msg, remaining []byte, done bool, err error) { + defer func() { + if r := recover(); r != nil { + panic(fmt.Sprintf("message error %d %v", len(data), data)) + } + }() + if len(data) == 0 { return nil, nil, true, nil } @@ -124,7 +130,7 @@ func ReadMessages(conn net.Conn) (msgs []*Msg, err error) { var totalBytes int for { // Give buffering some time - conn.SetReadDeadline(time.Now().Add(20 * time.Millisecond)) + conn.SetReadDeadline(time.Now().Add(500 * time.Millisecond)) // Create a new temporarily buffer b := make([]byte, 1440) // Wait for a message from this peer @@ -134,7 +140,6 @@ func ReadMessages(conn net.Conn) (msgs []*Msg, err error) { fmt.Println("err now", err) return nil, err } else { - fmt.Println("IOF NOW") break } -- cgit v1.2.3 From 0512113bdd5cc55ae35abd442b668ab5ed7a116b Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 14 May 2014 11:56:06 +0200 Subject: Removed defer --- ethwire/messaging.go | 6 ------ 1 file changed, 6 deletions(-) (limited to 'ethwire/messaging.go') diff --git a/ethwire/messaging.go b/ethwire/messaging.go index cc0e7a9a0..cbcbbb8b7 100644 --- a/ethwire/messaging.go +++ b/ethwire/messaging.go @@ -69,12 +69,6 @@ func NewMessage(msgType MsgType, data interface{}) *Msg { } func ReadMessage(data []byte) (msg *Msg, remaining []byte, done bool, err error) { - defer func() { - if r := recover(); r != nil { - panic(fmt.Sprintf("message error %d %v", len(data), data)) - } - }() - if len(data) == 0 { return nil, nil, true, nil } -- cgit v1.2.3 From 1fbea2e438d56484ebfa509d7433cc418e17a79b Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 16 Jun 2014 00:51:21 +0200 Subject: Reworking messaging interface --- ethwire/messaging.go | 163 +++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 159 insertions(+), 4 deletions(-) (limited to 'ethwire/messaging.go') diff --git a/ethwire/messaging.go b/ethwire/messaging.go index cbcbbb8b7..f13b72353 100644 --- a/ethwire/messaging.go +++ b/ethwire/messaging.go @@ -1,3 +1,5 @@ +// Package ethwire provides low level access to the Ethereum network and allows +// you to broadcast data over the network. package ethwire import ( @@ -9,11 +11,13 @@ import ( "time" ) -// Message: -// [4 bytes token] RLP([TYPE, DATA]) -// Refer to http://wiki.ethereum.org/index.php/Wire_Protocol +// Connection interface describing the methods required to implement the wire protocol. +type Conn interface { + Write(typ MsgType, v ...interface{}) error + Read() *Msg +} -// The magic token which should be the first 4 bytes of every message. +// The magic token which should be the first 4 bytes of every message and can be used as separator between messages. var MagicToken = []byte{34, 64, 8, 145} type MsgType byte @@ -68,6 +72,157 @@ func NewMessage(msgType MsgType, data interface{}) *Msg { } } +type Messages []*Msg + +// The connection object allows you to set up a connection to the Ethereum network. +// The Connection object takes care of all encoding and sending objects properly over +// the network. +type Connection struct { + conn net.Conn + nTimeout time.Duration + pendingMessages Messages +} + +// Create a new connection to the Ethereum network +func New(conn net.Conn) *Connection { + return &Connection{conn: conn, nTimeout: 500} +} + +// Read, reads from the network. It will block until the next message is received. +func (self *Connection) Read() *Msg { + if len(self.pendingMessages) == 0 { + self.readMessages() + } + + ret := self.pendingMessages[0] + self.pendingMessages = self.pendingMessages[1:] + + return ret + +} + +// Write to the Ethereum network specifying the type of the message and +// the data. Data can be of type RlpEncodable or []interface{}. Returns +// nil or if something went wrong an error. +func (self *Connection) Write(typ MsgType, v ...interface{}) error { + var pack []byte + + slice := [][]interface{}{[]interface{}{byte(typ)}} + for _, value := range v { + if encodable, ok := value.(ethutil.RlpEncodable); ok { + slice = append(slice, encodable.RlpValue()) + } else if raw, ok := value.([]interface{}); ok { + slice = append(slice, raw) + } else { + panic(fmt.Sprintf("Unable to 'write' object of type %T", value)) + } + } + + // Encode the type and the (RLP encoded) data for sending over the wire + encoded := ethutil.NewValue(slice).Encode() + payloadLength := ethutil.NumberToBytes(uint32(len(encoded)), 32) + + // Write magic token and payload length (first 8 bytes) + pack = append(MagicToken, payloadLength...) + pack = append(pack, encoded...) + + // Write to the connection + _, err := self.conn.Write(pack) + if err != nil { + return err + } + + return nil +} + +func (self *Connection) readMessage(data []byte) (msg *Msg, remaining []byte, done bool, err error) { + if len(data) == 0 { + return nil, nil, true, nil + } + + if len(data) <= 8 { + return nil, remaining, false, errors.New("Invalid message") + } + + // Check if the received 4 first bytes are the magic token + if bytes.Compare(MagicToken, data[:4]) != 0 { + return nil, nil, false, fmt.Errorf("MagicToken mismatch. Received %v", data[:4]) + } + + messageLength := ethutil.BytesToNumber(data[4:8]) + remaining = data[8+messageLength:] + if int(messageLength) > len(data[8:]) { + return nil, nil, false, fmt.Errorf("message length %d, expected %d", len(data[8:]), messageLength) + } + + message := data[8 : 8+messageLength] + decoder := ethutil.NewValueFromBytes(message) + // Type of message + t := decoder.Get(0).Uint() + // Actual data + d := decoder.SliceFrom(1) + + msg = &Msg{ + Type: MsgType(t), + Data: d, + } + + return +} + +// The basic message reader waits for data on the given connection, decoding +// and doing a few sanity checks such as if there's a data type and +// unmarhals the given data +func (self *Connection) readMessages() (err error) { + // The recovering function in case anything goes horribly wrong + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("ethwire.ReadMessage error: %v", r) + } + }() + + // Buff for writing network message to + //buff := make([]byte, 1440) + var buff []byte + var totalBytes int + for { + // Give buffering some time + self.conn.SetReadDeadline(time.Now().Add(self.nTimeout * time.Millisecond)) + // Create a new temporarily buffer + b := make([]byte, 1440) + // Wait for a message from this peer + n, _ := self.conn.Read(b) + if err != nil && n == 0 { + if err.Error() != "EOF" { + fmt.Println("err now", err) + return err + } else { + break + } + + // Messages can't be empty + } else if n == 0 { + break + } + + buff = append(buff, b[:n]...) + totalBytes += n + } + + // Reslice buffer + buff = buff[:totalBytes] + msg, remaining, done, err := self.readMessage(buff) + for ; done != true; msg, remaining, done, err = self.readMessage(remaining) { + //log.Println("rx", msg) + + if msg != nil { + self.pendingMessages = append(self.pendingMessages, msg) + } + } + + return +} + func ReadMessage(data []byte) (msg *Msg, remaining []byte, done bool, err error) { if len(data) == 0 { return nil, nil, true, nil -- cgit v1.2.3 From cb7ebdf821adb4b022adcaea0973c8c7da2e2923 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 4 Jul 2014 00:12:21 +0200 Subject: Decreased timeout --- ethwire/messaging.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'ethwire/messaging.go') diff --git a/ethwire/messaging.go b/ethwire/messaging.go index f13b72353..5319d0711 100644 --- a/ethwire/messaging.go +++ b/ethwire/messaging.go @@ -279,7 +279,7 @@ func ReadMessages(conn net.Conn) (msgs []*Msg, err error) { var totalBytes int for { // Give buffering some time - conn.SetReadDeadline(time.Now().Add(500 * time.Millisecond)) + conn.SetReadDeadline(time.Now().Add(50 * time.Millisecond)) // Create a new temporarily buffer b := make([]byte, 1440) // Wait for a message from this peer -- cgit v1.2.3 From 890745e8469b3d6ee7175db94edfb618b6bc8bc5 Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 7 Jul 2014 13:58:20 +0200 Subject: Increased timeout --- ethwire/messaging.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'ethwire/messaging.go') diff --git a/ethwire/messaging.go b/ethwire/messaging.go index 5319d0711..a2e0651dc 100644 --- a/ethwire/messaging.go +++ b/ethwire/messaging.go @@ -279,7 +279,7 @@ func ReadMessages(conn net.Conn) (msgs []*Msg, err error) { var totalBytes int for { // Give buffering some time - conn.SetReadDeadline(time.Now().Add(50 * time.Millisecond)) + conn.SetReadDeadline(time.Now().Add(200 * time.Millisecond)) // Create a new temporarily buffer b := make([]byte, 1440) // Wait for a message from this peer -- cgit v1.2.3 From bea468f1e595e3de9c8c479e9151c28f0f102973 Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 10 Jul 2014 15:03:48 +0200 Subject: Increased timeout to 500ms --- ethwire/messaging.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'ethwire/messaging.go') diff --git a/ethwire/messaging.go b/ethwire/messaging.go index a2e0651dc..f13b72353 100644 --- a/ethwire/messaging.go +++ b/ethwire/messaging.go @@ -279,7 +279,7 @@ func ReadMessages(conn net.Conn) (msgs []*Msg, err error) { var totalBytes int for { // Give buffering some time - conn.SetReadDeadline(time.Now().Add(200 * time.Millisecond)) + conn.SetReadDeadline(time.Now().Add(500 * time.Millisecond)) // Create a new temporarily buffer b := make([]byte, 1440) // Wait for a message from this peer -- cgit v1.2.3 From a760ce05b948e89bc564af20599dcf95698ac0eb Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 11 Aug 2014 16:23:38 +0200 Subject: Updated chain for filtering --- ethwire/messaging.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'ethwire/messaging.go') diff --git a/ethwire/messaging.go b/ethwire/messaging.go index f13b72353..d114a1c9d 100644 --- a/ethwire/messaging.go +++ b/ethwire/messaging.go @@ -6,9 +6,10 @@ import ( "bytes" "errors" "fmt" - "github.com/ethereum/eth-go/ethutil" "net" "time" + + "github.com/ethereum/eth-go/ethutil" ) // Connection interface describing the methods required to implement the wire protocol. @@ -109,7 +110,7 @@ func (self *Connection) Write(typ MsgType, v ...interface{}) error { slice := [][]interface{}{[]interface{}{byte(typ)}} for _, value := range v { - if encodable, ok := value.(ethutil.RlpEncodable); ok { + if encodable, ok := value.(ethutil.RlpEncodeDecode); ok { slice = append(slice, encodable.RlpValue()) } else if raw, ok := value.([]interface{}); ok { slice = append(slice, raw) -- cgit v1.2.3 From eaa2e8900d1036e09b002c4e20fc6e4f9cd031bb Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 21 Aug 2014 14:47:58 +0200 Subject: PoC 6 networking code. * Added block pool for gathering blocks from the network (chunks) * Re wrote syncing --- ethwire/messaging.go | 52 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 30 insertions(+), 22 deletions(-) (limited to 'ethwire/messaging.go') diff --git a/ethwire/messaging.go b/ethwire/messaging.go index d114a1c9d..7ac0188a1 100644 --- a/ethwire/messaging.go +++ b/ethwire/messaging.go @@ -27,33 +27,41 @@ const ( // Values are given explicitly instead of by iota because these values are // defined by the wire protocol spec; it is easier for humans to ensure // correctness when values are explicit. - MsgHandshakeTy = 0x00 - MsgDiscTy = 0x01 - MsgPingTy = 0x02 - MsgPongTy = 0x03 - MsgGetPeersTy = 0x10 - MsgPeersTy = 0x11 - MsgTxTy = 0x12 - MsgBlockTy = 0x13 - MsgGetChainTy = 0x14 - MsgNotInChainTy = 0x15 - MsgGetTxsTy = 0x16 + MsgHandshakeTy = 0x00 + MsgDiscTy = 0x01 + MsgPingTy = 0x02 + MsgPongTy = 0x03 + MsgGetPeersTy = 0x10 + MsgPeersTy = 0x11 + MsgTxTy = 0x12 + MsgGetChainTy = 0x14 + MsgNotInChainTy = 0x15 + MsgGetTxsTy = 0x16 + MsgGetBlockHashesTy = 0x17 + MsgBlockHashesTy = 0x18 + MsgGetBlocksTy = 0x19 + MsgBlockTy = 0x13 + + MsgOldBlockTy = 0xbb MsgTalkTy = 0xff ) var msgTypeToString = map[MsgType]string{ - MsgHandshakeTy: "Handshake", - MsgDiscTy: "Disconnect", - MsgPingTy: "Ping", - MsgPongTy: "Pong", - MsgGetPeersTy: "Get peers", - MsgPeersTy: "Peers", - MsgTxTy: "Transactions", - MsgBlockTy: "Blocks", - MsgGetChainTy: "Get chain", - MsgGetTxsTy: "Get Txs", - MsgNotInChainTy: "Not in chain", + MsgHandshakeTy: "Handshake", + MsgDiscTy: "Disconnect", + MsgPingTy: "Ping", + MsgPongTy: "Pong", + MsgGetPeersTy: "Get peers", + MsgPeersTy: "Peers", + MsgTxTy: "Transactions", + MsgBlockTy: "Blocks", + MsgGetChainTy: "Get chain", + MsgGetTxsTy: "Get Txs", + MsgNotInChainTy: "Not in chain", + MsgGetBlockHashesTy: "Get block hashes", + MsgBlockHashesTy: "Block hashes", + MsgGetBlocksTy: "Get blocks", } func (mt MsgType) String() string { -- cgit v1.2.3 From 2f362509b813573f533a5be437c140355ddec7fc Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 10 Sep 2014 11:22:19 +0200 Subject: New p2p protocol. NOTE: Needs major refactoring. See #50 --- ethwire/messaging.go | 32 +++++++++++++------------------- 1 file changed, 13 insertions(+), 19 deletions(-) (limited to 'ethwire/messaging.go') diff --git a/ethwire/messaging.go b/ethwire/messaging.go index 7ac0188a1..c93c717a2 100644 --- a/ethwire/messaging.go +++ b/ethwire/messaging.go @@ -27,24 +27,20 @@ const ( // Values are given explicitly instead of by iota because these values are // defined by the wire protocol spec; it is easier for humans to ensure // correctness when values are explicit. - MsgHandshakeTy = 0x00 - MsgDiscTy = 0x01 - MsgPingTy = 0x02 - MsgPongTy = 0x03 - MsgGetPeersTy = 0x10 - MsgPeersTy = 0x11 + MsgHandshakeTy = 0x00 + MsgDiscTy = 0x01 + MsgPingTy = 0x02 + MsgPongTy = 0x03 + MsgGetPeersTy = 0x04 + MsgPeersTy = 0x05 + + MsgStatusTy = 0x10 + MsgGetTxsTy = 0x11 MsgTxTy = 0x12 - MsgGetChainTy = 0x14 - MsgNotInChainTy = 0x15 - MsgGetTxsTy = 0x16 - MsgGetBlockHashesTy = 0x17 - MsgBlockHashesTy = 0x18 - MsgGetBlocksTy = 0x19 - MsgBlockTy = 0x13 - - MsgOldBlockTy = 0xbb - - MsgTalkTy = 0xff + MsgGetBlockHashesTy = 0x13 + MsgBlockHashesTy = 0x14 + MsgGetBlocksTy = 0x15 + MsgBlockTy = 0x16 ) var msgTypeToString = map[MsgType]string{ @@ -56,9 +52,7 @@ var msgTypeToString = map[MsgType]string{ MsgPeersTy: "Peers", MsgTxTy: "Transactions", MsgBlockTy: "Blocks", - MsgGetChainTy: "Get chain", MsgGetTxsTy: "Get Txs", - MsgNotInChainTy: "Not in chain", MsgGetBlockHashesTy: "Get block hashes", MsgBlockHashesTy: "Block hashes", MsgGetBlocksTy: "Get blocks", -- cgit v1.2.3 From c0187930dc352c645c223e17364623a68413cb74 Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 10 Sep 2014 11:39:11 +0200 Subject: Removed some commented code --- ethwire/messaging.go | 1 + 1 file changed, 1 insertion(+) (limited to 'ethwire/messaging.go') diff --git a/ethwire/messaging.go b/ethwire/messaging.go index c93c717a2..67a866f73 100644 --- a/ethwire/messaging.go +++ b/ethwire/messaging.go @@ -49,6 +49,7 @@ var msgTypeToString = map[MsgType]string{ MsgPingTy: "Ping", MsgPongTy: "Pong", MsgGetPeersTy: "Get peers", + MsgStatusTy: "Status", MsgPeersTy: "Peers", MsgTxTy: "Transactions", MsgBlockTy: "Blocks", -- cgit v1.2.3 From 399256b38403f2e95312250d49fca3cada8956b8 Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 15 Sep 2014 22:11:05 +0200 Subject: VM execution fixes Refactoring caused executing issues --- ethwire/messaging.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'ethwire/messaging.go') diff --git a/ethwire/messaging.go b/ethwire/messaging.go index 67a866f73..bee6dd526 100644 --- a/ethwire/messaging.go +++ b/ethwire/messaging.go @@ -282,8 +282,10 @@ func ReadMessages(conn net.Conn) (msgs []*Msg, err error) { var buff []byte var totalBytes int for { + // This is a bit of a cheat actually to make buffering extremely fast. + defer recover() // Give buffering some time - conn.SetReadDeadline(time.Now().Add(500 * time.Millisecond)) + conn.SetReadDeadline(time.Now().Add(5 * time.Millisecond)) // Create a new temporarily buffer b := make([]byte, 1440) // Wait for a message from this peer -- cgit v1.2.3 From 74de0f1f2ab342466556baddbab166a284f86891 Mon Sep 17 00:00:00 2001 From: obscuren Date: Tue, 16 Sep 2014 16:06:38 +0200 Subject: Rewrote reading strategy --- ethwire/messaging.go | 241 ++++++++------------------------------------------- 1 file changed, 38 insertions(+), 203 deletions(-) (limited to 'ethwire/messaging.go') diff --git a/ethwire/messaging.go b/ethwire/messaging.go index bee6dd526..99f6be8db 100644 --- a/ethwire/messaging.go +++ b/ethwire/messaging.go @@ -4,7 +4,6 @@ package ethwire import ( "bytes" - "errors" "fmt" "net" "time" @@ -78,106 +77,10 @@ func NewMessage(msgType MsgType, data interface{}) *Msg { type Messages []*Msg -// The connection object allows you to set up a connection to the Ethereum network. -// The Connection object takes care of all encoding and sending objects properly over -// the network. -type Connection struct { - conn net.Conn - nTimeout time.Duration - pendingMessages Messages -} - -// Create a new connection to the Ethereum network -func New(conn net.Conn) *Connection { - return &Connection{conn: conn, nTimeout: 500} -} - -// Read, reads from the network. It will block until the next message is received. -func (self *Connection) Read() *Msg { - if len(self.pendingMessages) == 0 { - self.readMessages() - } - - ret := self.pendingMessages[0] - self.pendingMessages = self.pendingMessages[1:] - - return ret - -} - -// Write to the Ethereum network specifying the type of the message and -// the data. Data can be of type RlpEncodable or []interface{}. Returns -// nil or if something went wrong an error. -func (self *Connection) Write(typ MsgType, v ...interface{}) error { - var pack []byte - - slice := [][]interface{}{[]interface{}{byte(typ)}} - for _, value := range v { - if encodable, ok := value.(ethutil.RlpEncodeDecode); ok { - slice = append(slice, encodable.RlpValue()) - } else if raw, ok := value.([]interface{}); ok { - slice = append(slice, raw) - } else { - panic(fmt.Sprintf("Unable to 'write' object of type %T", value)) - } - } - - // Encode the type and the (RLP encoded) data for sending over the wire - encoded := ethutil.NewValue(slice).Encode() - payloadLength := ethutil.NumberToBytes(uint32(len(encoded)), 32) - - // Write magic token and payload length (first 8 bytes) - pack = append(MagicToken, payloadLength...) - pack = append(pack, encoded...) - - // Write to the connection - _, err := self.conn.Write(pack) - if err != nil { - return err - } - - return nil -} - -func (self *Connection) readMessage(data []byte) (msg *Msg, remaining []byte, done bool, err error) { - if len(data) == 0 { - return nil, nil, true, nil - } - - if len(data) <= 8 { - return nil, remaining, false, errors.New("Invalid message") - } - - // Check if the received 4 first bytes are the magic token - if bytes.Compare(MagicToken, data[:4]) != 0 { - return nil, nil, false, fmt.Errorf("MagicToken mismatch. Received %v", data[:4]) - } - - messageLength := ethutil.BytesToNumber(data[4:8]) - remaining = data[8+messageLength:] - if int(messageLength) > len(data[8:]) { - return nil, nil, false, fmt.Errorf("message length %d, expected %d", len(data[8:]), messageLength) - } - - message := data[8 : 8+messageLength] - decoder := ethutil.NewValueFromBytes(message) - // Type of message - t := decoder.Get(0).Uint() - // Actual data - d := decoder.SliceFrom(1) - - msg = &Msg{ - Type: MsgType(t), - Data: d, - } - - return -} - // The basic message reader waits for data on the given connection, decoding // and doing a few sanity checks such as if there's a data type and // unmarhals the given data -func (self *Connection) readMessages() (err error) { +func ReadMessages(conn net.Conn) (msgs []*Msg, err error) { // The recovering function in case anything goes horribly wrong defer func() { if r := recover(); r != nil { @@ -185,137 +88,69 @@ func (self *Connection) readMessages() (err error) { } }() - // Buff for writing network message to - //buff := make([]byte, 1440) - var buff []byte - var totalBytes int + var ( + buff []byte + messages [][]byte + msgLength int + ) + for { // Give buffering some time - self.conn.SetReadDeadline(time.Now().Add(self.nTimeout * time.Millisecond)) + conn.SetReadDeadline(time.Now().Add(5 * time.Millisecond)) // Create a new temporarily buffer b := make([]byte, 1440) - // Wait for a message from this peer - n, _ := self.conn.Read(b) + n, _ := conn.Read(b) if err != nil && n == 0 { if err.Error() != "EOF" { fmt.Println("err now", err) - return err + return nil, err } else { break } - - // Messages can't be empty - } else if n == 0 { - break } - buff = append(buff, b[:n]...) - totalBytes += n - } - - // Reslice buffer - buff = buff[:totalBytes] - msg, remaining, done, err := self.readMessage(buff) - for ; done != true; msg, remaining, done, err = self.readMessage(remaining) { - //log.Println("rx", msg) - - if msg != nil { - self.pendingMessages = append(self.pendingMessages, msg) + if n == 0 { + continue } - } - - return -} - -func ReadMessage(data []byte) (msg *Msg, remaining []byte, done bool, err error) { - if len(data) == 0 { - return nil, nil, true, nil - } - - if len(data) <= 8 { - return nil, remaining, false, errors.New("Invalid message") - } - - // Check if the received 4 first bytes are the magic token - if bytes.Compare(MagicToken, data[:4]) != 0 { - return nil, nil, false, fmt.Errorf("MagicToken mismatch. Received %v", data[:4]) - } - messageLength := ethutil.BytesToNumber(data[4:8]) - remaining = data[8+messageLength:] - if int(messageLength) > len(data[8:]) { - return nil, nil, false, fmt.Errorf("message length %d, expected %d", len(data[8:]), messageLength) - } + if msgLength == 0 { + // Check if the received 4 first bytes are the magic token + if bytes.Compare(MagicToken, b[:4]) != 0 { + return nil, fmt.Errorf("MagicToken mismatch. Received %v", b[:4]) + } - message := data[8 : 8+messageLength] - decoder := ethutil.NewValueFromBytes(message) - // Type of message - t := decoder.Get(0).Uint() - // Actual data - d := decoder.SliceFrom(1) + // Remove the token + b = b[4:] + // Read the length of the message + msgLength = int(ethutil.BytesToNumber(b[:4])) - msg = &Msg{ - Type: MsgType(t), - Data: d, - } + // Remove the length + b = b[4:] - return -} + n -= 8 + } -func bufferedRead(conn net.Conn) ([]byte, error) { - return nil, nil -} + buff = append(buff, b[:n]...) -// The basic message reader waits for data on the given connection, decoding -// and doing a few sanity checks such as if there's a data type and -// unmarhals the given data -func ReadMessages(conn net.Conn) (msgs []*Msg, err error) { - // The recovering function in case anything goes horribly wrong - defer func() { - if r := recover(); r != nil { - err = fmt.Errorf("ethwire.ReadMessage error: %v", r) - } - }() + if len(buff) >= msgLength { + messages = append(messages, buff[:msgLength]) + buff = buff[msgLength:] + msgLength = 0 - // Buff for writing network message to - //buff := make([]byte, 1440) - var buff []byte - var totalBytes int - for { - // This is a bit of a cheat actually to make buffering extremely fast. - defer recover() - // Give buffering some time - conn.SetReadDeadline(time.Now().Add(5 * time.Millisecond)) - // Create a new temporarily buffer - b := make([]byte, 1440) - // Wait for a message from this peer - n, _ := conn.Read(b) - if err != nil && n == 0 { - if err.Error() != "EOF" { - fmt.Println("err now", err) - return nil, err - } else { + if len(buff) == 0 { break } - - // Messages can't be empty - } else if n == 0 { - break } - - buff = append(buff, b[:n]...) - totalBytes += n } - // Reslice buffer - buff = buff[:totalBytes] - msg, remaining, done, err := ReadMessage(buff) - for ; done != true; msg, remaining, done, err = ReadMessage(remaining) { - //log.Println("rx", msg) + for _, m := range messages { + decoder := ethutil.NewValueFromBytes(m) + // Type of message + t := decoder.Get(0).Uint() + // Actual data + d := decoder.SliceFrom(1) - if msg != nil { - msgs = append(msgs, msg) - } + msgs = append(msgs, &Msg{Type: MsgType(t), Data: d}) } return -- cgit v1.2.3 From fb528c47c0c51a7a204a18227349e6500aba49ab Mon Sep 17 00:00:00 2001 From: obscuren Date: Tue, 16 Sep 2014 16:19:48 +0200 Subject: Moved code --- ethwire/messaging.go | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) (limited to 'ethwire/messaging.go') diff --git a/ethwire/messaging.go b/ethwire/messaging.go index 99f6be8db..4f4393a9d 100644 --- a/ethwire/messaging.go +++ b/ethwire/messaging.go @@ -109,29 +109,24 @@ func ReadMessages(conn net.Conn) (msgs []*Msg, err error) { } } - if n == 0 { + if n == 0 && len(buff) == 0 { continue } + buff = append(buff, b[:n]...) if msgLength == 0 { // Check if the received 4 first bytes are the magic token - if bytes.Compare(MagicToken, b[:4]) != 0 { - return nil, fmt.Errorf("MagicToken mismatch. Received %v", b[:4]) + if bytes.Compare(MagicToken, buff[:4]) != 0 { + return nil, fmt.Errorf("MagicToken mismatch. Received %v", buff[:4]) } - // Remove the token - b = b[4:] // Read the length of the message - msgLength = int(ethutil.BytesToNumber(b[:4])) - - // Remove the length - b = b[4:] + msgLength = int(ethutil.BytesToNumber(buff[4:8])) - n -= 8 + // Remove the token and length + buff = buff[8:] } - buff = append(buff, b[:n]...) - if len(buff) >= msgLength { messages = append(messages, buff[:msgLength]) buff = buff[msgLength:] -- cgit v1.2.3 From 66e309c5c4fb1faebe7338053481024eeb55a2d1 Mon Sep 17 00:00:00 2001 From: obscuren Date: Tue, 16 Sep 2014 16:36:18 +0200 Subject: Up deadline to reduce cpu load --- ethwire/messaging.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'ethwire/messaging.go') diff --git a/ethwire/messaging.go b/ethwire/messaging.go index 4f4393a9d..2161ce27f 100644 --- a/ethwire/messaging.go +++ b/ethwire/messaging.go @@ -96,7 +96,7 @@ func ReadMessages(conn net.Conn) (msgs []*Msg, err error) { for { // Give buffering some time - conn.SetReadDeadline(time.Now().Add(5 * time.Millisecond)) + conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond)) // Create a new temporarily buffer b := make([]byte, 1440) n, _ := conn.Read(b) -- cgit v1.2.3 From eb32fe20c8513b936faf908985d021caa87d5b0d Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 17 Sep 2014 15:57:32 +0200 Subject: Rewrite --- ethwire/messaging.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'ethwire/messaging.go') diff --git a/ethwire/messaging.go b/ethwire/messaging.go index 2161ce27f..b1cefc0ae 100644 --- a/ethwire/messaging.go +++ b/ethwire/messaging.go @@ -96,7 +96,7 @@ func ReadMessages(conn net.Conn) (msgs []*Msg, err error) { for { // Give buffering some time - conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond)) + conn.SetReadDeadline(time.Now().Add(5 * time.Millisecond)) // Create a new temporarily buffer b := make([]byte, 1440) n, _ := conn.Read(b) @@ -110,6 +110,9 @@ func ReadMessages(conn net.Conn) (msgs []*Msg, err error) { } if n == 0 && len(buff) == 0 { + // If there's nothing on the wire wait for a bit + time.Sleep(200) + continue } -- cgit v1.2.3 From 2ae3bda029ec91e02fb164fd53175ad78d45a0ba Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 22 Sep 2014 18:15:10 +0200 Subject: Increased from 200 nano to milliseconds --- ethwire/messaging.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'ethwire/messaging.go') diff --git a/ethwire/messaging.go b/ethwire/messaging.go index b1cefc0ae..2b3836e9c 100644 --- a/ethwire/messaging.go +++ b/ethwire/messaging.go @@ -109,9 +109,10 @@ func ReadMessages(conn net.Conn) (msgs []*Msg, err error) { } } + fmt.Println(n, len(buff)) if n == 0 && len(buff) == 0 { // If there's nothing on the wire wait for a bit - time.Sleep(200) + time.Sleep(200 * time.Millisecond) continue } -- cgit v1.2.3 From 7d08e4f7d14600ee4ed38fc9d435e9c2e0e0fdac Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 22 Sep 2014 18:15:32 +0200 Subject: Remove log --- ethwire/messaging.go | 1 - 1 file changed, 1 deletion(-) (limited to 'ethwire/messaging.go') diff --git a/ethwire/messaging.go b/ethwire/messaging.go index 2b3836e9c..2ef53c003 100644 --- a/ethwire/messaging.go +++ b/ethwire/messaging.go @@ -109,7 +109,6 @@ func ReadMessages(conn net.Conn) (msgs []*Msg, err error) { } } - fmt.Println(n, len(buff)) if n == 0 && len(buff) == 0 { // If there's nothing on the wire wait for a bit time.Sleep(200 * time.Millisecond) -- cgit v1.2.3 From 4de3ad1712ce0fdc62b1acc27a3922b192e943c6 Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 8 Oct 2014 12:29:49 +0200 Subject: New block message --- ethwire/messaging.go | 1 + 1 file changed, 1 insertion(+) (limited to 'ethwire/messaging.go') diff --git a/ethwire/messaging.go b/ethwire/messaging.go index 2ef53c003..5013f1a97 100644 --- a/ethwire/messaging.go +++ b/ethwire/messaging.go @@ -40,6 +40,7 @@ const ( MsgBlockHashesTy = 0x14 MsgGetBlocksTy = 0x15 MsgBlockTy = 0x16 + MsgNewBlockTy = 0x17 ) var msgTypeToString = map[MsgType]string{ -- cgit v1.2.3 From 520fdfe346ab51708f4f1fdfd0b2e42cc919e613 Mon Sep 17 00:00:00 2001 From: obscuren Date: Tue, 21 Oct 2014 13:25:31 +0200 Subject: PoC7 Net --- ethwire/messaging.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) (limited to 'ethwire/messaging.go') diff --git a/ethwire/messaging.go b/ethwire/messaging.go index 5013f1a97..cef520547 100644 --- a/ethwire/messaging.go +++ b/ethwire/messaging.go @@ -33,8 +33,8 @@ const ( MsgGetPeersTy = 0x04 MsgPeersTy = 0x05 - MsgStatusTy = 0x10 - MsgGetTxsTy = 0x11 + MsgStatusTy = 0x10 + //MsgGetTxsTy = 0x11 MsgTxTy = 0x12 MsgGetBlockHashesTy = 0x13 MsgBlockHashesTy = 0x14 @@ -44,16 +44,16 @@ const ( ) var msgTypeToString = map[MsgType]string{ - MsgHandshakeTy: "Handshake", - MsgDiscTy: "Disconnect", - MsgPingTy: "Ping", - MsgPongTy: "Pong", - MsgGetPeersTy: "Get peers", - MsgStatusTy: "Status", - MsgPeersTy: "Peers", - MsgTxTy: "Transactions", - MsgBlockTy: "Blocks", - MsgGetTxsTy: "Get Txs", + MsgHandshakeTy: "Handshake", + MsgDiscTy: "Disconnect", + MsgPingTy: "Ping", + MsgPongTy: "Pong", + MsgGetPeersTy: "Get peers", + MsgStatusTy: "Status", + MsgPeersTy: "Peers", + MsgTxTy: "Transactions", + MsgBlockTy: "Blocks", + //MsgGetTxsTy: "Get Txs", MsgGetBlockHashesTy: "Get block hashes", MsgBlockHashesTy: "Block hashes", MsgGetBlocksTy: "Get blocks", -- cgit v1.2.3