diff options
Diffstat (limited to 'whisper/whisperv2')
-rw-r--r-- | whisper/whisperv2/api.go | 413 | ||||
-rw-r--r-- | whisper/whisperv2/doc.go | 32 | ||||
-rw-r--r-- | whisper/whisperv2/envelope.go | 147 | ||||
-rw-r--r-- | whisper/whisperv2/envelope_test.go | 158 | ||||
-rw-r--r-- | whisper/whisperv2/filter.go | 132 | ||||
-rw-r--r-- | whisper/whisperv2/filter_test.go | 215 | ||||
-rw-r--r-- | whisper/whisperv2/main.go | 106 | ||||
-rw-r--r-- | whisper/whisperv2/message.go | 156 | ||||
-rw-r--r-- | whisper/whisperv2/message_test.go | 159 | ||||
-rw-r--r-- | whisper/whisperv2/peer.go | 175 | ||||
-rw-r--r-- | whisper/whisperv2/peer_test.go | 261 | ||||
-rw-r--r-- | whisper/whisperv2/topic.go | 140 | ||||
-rw-r--r-- | whisper/whisperv2/topic_test.go | 215 | ||||
-rw-r--r-- | whisper/whisperv2/whisper.go | 378 | ||||
-rw-r--r-- | whisper/whisperv2/whisper_test.go | 216 |
15 files changed, 2903 insertions, 0 deletions
diff --git a/whisper/whisperv2/api.go b/whisper/whisperv2/api.go new file mode 100644 index 000000000..9c9c6a84c --- /dev/null +++ b/whisper/whisperv2/api.go @@ -0,0 +1,413 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package whisperv2 + +import ( + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/rpc" +) + +// PublicWhisperAPI provides the whisper RPC service. +type PublicWhisperAPI struct { + w *Whisper + + messagesMu sync.RWMutex + messages map[int]*whisperFilter +} + +type whisperOfflineError struct{} + +func (e *whisperOfflineError) Error() string { + return "whisper is offline" +} + +// whisperOffLineErr is returned when the node doesn't offer the shh service. +var whisperOffLineErr = new(whisperOfflineError) + +// NewPublicWhisperAPI create a new RPC whisper service. +func NewPublicWhisperAPI(w *Whisper) *PublicWhisperAPI { + return &PublicWhisperAPI{w: w, messages: make(map[int]*whisperFilter)} +} + +// Version returns the Whisper version this node offers. +func (s *PublicWhisperAPI) Version() (*rpc.HexNumber, error) { + if s.w == nil { + return rpc.NewHexNumber(0), whisperOffLineErr + } + return rpc.NewHexNumber(s.w.Version()), nil +} + +// HasIdentity checks if the the whisper node is configured with the private key +// of the specified public pair. +func (s *PublicWhisperAPI) HasIdentity(identity string) (bool, error) { + if s.w == nil { + return false, whisperOffLineErr + } + return s.w.HasIdentity(crypto.ToECDSAPub(common.FromHex(identity))), nil +} + +// NewIdentity generates a new cryptographic identity for the client, and injects +// it into the known identities for message decryption. +func (s *PublicWhisperAPI) NewIdentity() (string, error) { + if s.w == nil { + return "", whisperOffLineErr + } + + identity := s.w.NewIdentity() + return common.ToHex(crypto.FromECDSAPub(&identity.PublicKey)), nil +} + +type NewFilterArgs struct { + To string + From string + Topics [][][]byte +} + +// NewWhisperFilter creates and registers a new message filter to watch for inbound whisper messages. +func (s *PublicWhisperAPI) NewFilter(args NewFilterArgs) (*rpc.HexNumber, error) { + if s.w == nil { + return nil, whisperOffLineErr + } + + var id int + filter := Filter{ + To: crypto.ToECDSAPub(common.FromHex(args.To)), + From: crypto.ToECDSAPub(common.FromHex(args.From)), + Topics: NewFilterTopics(args.Topics...), + Fn: func(message *Message) { + wmsg := NewWhisperMessage(message) + s.messagesMu.RLock() // Only read lock to the filter pool + defer s.messagesMu.RUnlock() + if s.messages[id] != nil { + s.messages[id].insert(wmsg) + } + }, + } + + id = s.w.Watch(filter) + + s.messagesMu.Lock() + s.messages[id] = newWhisperFilter(id, s.w) + s.messagesMu.Unlock() + + return rpc.NewHexNumber(id), nil +} + +// GetFilterChanges retrieves all the new messages matched by a filter since the last retrieval. +func (s *PublicWhisperAPI) GetFilterChanges(filterId rpc.HexNumber) []WhisperMessage { + s.messagesMu.RLock() + defer s.messagesMu.RUnlock() + + if s.messages[filterId.Int()] != nil { + if changes := s.messages[filterId.Int()].retrieve(); changes != nil { + return changes + } + } + return returnWhisperMessages(nil) +} + +// UninstallFilter disables and removes an existing filter. +func (s *PublicWhisperAPI) UninstallFilter(filterId rpc.HexNumber) bool { + s.messagesMu.Lock() + defer s.messagesMu.Unlock() + + if _, ok := s.messages[filterId.Int()]; ok { + delete(s.messages, filterId.Int()) + return true + } + return false +} + +// GetMessages retrieves all the known messages that match a specific filter. +func (s *PublicWhisperAPI) GetMessages(filterId rpc.HexNumber) []WhisperMessage { + // Retrieve all the cached messages matching a specific, existing filter + s.messagesMu.RLock() + defer s.messagesMu.RUnlock() + + var messages []*Message + if s.messages[filterId.Int()] != nil { + messages = s.messages[filterId.Int()].messages() + } + + return returnWhisperMessages(messages) +} + +// returnWhisperMessages converts aNhisper message to a RPC whisper message. +func returnWhisperMessages(messages []*Message) []WhisperMessage { + msgs := make([]WhisperMessage, len(messages)) + for i, msg := range messages { + msgs[i] = NewWhisperMessage(msg) + } + return msgs +} + +type PostArgs struct { + From string `json:"from"` + To string `json:"to"` + Topics [][]byte `json:"topics"` + Payload string `json:"payload"` + Priority int64 `json:"priority"` + TTL int64 `json:"ttl"` +} + +// Post injects a message into the whisper network for distribution. +func (s *PublicWhisperAPI) Post(args PostArgs) (bool, error) { + if s.w == nil { + return false, whisperOffLineErr + } + + // construct whisper message with transmission options + message := NewMessage(common.FromHex(args.Payload)) + options := Options{ + To: crypto.ToECDSAPub(common.FromHex(args.To)), + TTL: time.Duration(args.TTL) * time.Second, + Topics: NewTopics(args.Topics...), + } + + // set sender identity + if len(args.From) > 0 { + if key := s.w.GetIdentity(crypto.ToECDSAPub(common.FromHex(args.From))); key != nil { + options.From = key + } else { + return false, fmt.Errorf("unknown identity to send from: %s", args.From) + } + } + + // Wrap and send the message + pow := time.Duration(args.Priority) * time.Millisecond + envelope, err := message.Wrap(pow, options) + if err != nil { + return false, err + } + + return true, s.w.Send(envelope) +} + +// WhisperMessage is the RPC representation of a whisper message. +type WhisperMessage struct { + ref *Message + + Payload string `json:"payload"` + To string `json:"to"` + From string `json:"from"` + Sent int64 `json:"sent"` + TTL int64 `json:"ttl"` + Hash string `json:"hash"` +} + +func (args *PostArgs) UnmarshalJSON(data []byte) (err error) { + var obj struct { + From string `json:"from"` + To string `json:"to"` + Topics []string `json:"topics"` + Payload string `json:"payload"` + Priority rpc.HexNumber `json:"priority"` + TTL rpc.HexNumber `json:"ttl"` + } + + if err := json.Unmarshal(data, &obj); err != nil { + return err + } + + args.From = obj.From + args.To = obj.To + args.Payload = obj.Payload + args.Priority = obj.Priority.Int64() + args.TTL = obj.TTL.Int64() + + // decode topic strings + args.Topics = make([][]byte, len(obj.Topics)) + for i, topic := range obj.Topics { + args.Topics[i] = common.FromHex(topic) + } + + return nil +} + +// UnmarshalJSON implements the json.Unmarshaler interface, invoked to convert a +// JSON message blob into a WhisperFilterArgs structure. +func (args *NewFilterArgs) UnmarshalJSON(b []byte) (err error) { + // Unmarshal the JSON message and sanity check + var obj struct { + To interface{} `json:"to"` + From interface{} `json:"from"` + Topics interface{} `json:"topics"` + } + if err := json.Unmarshal(b, &obj); err != nil { + return err + } + + // Retrieve the simple data contents of the filter arguments + if obj.To == nil { + args.To = "" + } else { + argstr, ok := obj.To.(string) + if !ok { + return fmt.Errorf("to is not a string") + } + args.To = argstr + } + if obj.From == nil { + args.From = "" + } else { + argstr, ok := obj.From.(string) + if !ok { + return fmt.Errorf("from is not a string") + } + args.From = argstr + } + // Construct the nested topic array + if obj.Topics != nil { + // Make sure we have an actual topic array + list, ok := obj.Topics.([]interface{}) + if !ok { + return fmt.Errorf("topics is not an array") + } + // Iterate over each topic and handle nil, string or array + topics := make([][]string, len(list)) + for idx, field := range list { + switch value := field.(type) { + case nil: + topics[idx] = []string{} + + case string: + topics[idx] = []string{value} + + case []interface{}: + topics[idx] = make([]string, len(value)) + for i, nested := range value { + switch value := nested.(type) { + case nil: + topics[idx][i] = "" + + case string: + topics[idx][i] = value + + default: + return fmt.Errorf("topic[%d][%d] is not a string", idx, i) + } + } + default: + return fmt.Errorf("topic[%d] not a string or array", idx) + } + } + + topicsDecoded := make([][][]byte, len(topics)) + for i, condition := range topics { + topicsDecoded[i] = make([][]byte, len(condition)) + for j, topic := range condition { + topicsDecoded[i][j] = common.FromHex(topic) + } + } + + args.Topics = topicsDecoded + } + return nil +} + +// whisperFilter is the message cache matching a specific filter, accumulating +// inbound messages until the are requested by the client. +type whisperFilter struct { + id int // Filter identifier for old message retrieval + ref *Whisper // Whisper reference for old message retrieval + + cache []WhisperMessage // Cache of messages not yet polled + skip map[common.Hash]struct{} // List of retrieved messages to avoid duplication + update time.Time // Time of the last message query + + lock sync.RWMutex // Lock protecting the filter internals +} + +// messages retrieves all the cached messages from the entire pool matching the +// filter, resetting the filter's change buffer. +func (w *whisperFilter) messages() []*Message { + w.lock.Lock() + defer w.lock.Unlock() + + w.cache = nil + w.update = time.Now() + + w.skip = make(map[common.Hash]struct{}) + messages := w.ref.Messages(w.id) + for _, message := range messages { + w.skip[message.Hash] = struct{}{} + } + return messages +} + +// insert injects a new batch of messages into the filter cache. +func (w *whisperFilter) insert(messages ...WhisperMessage) { + w.lock.Lock() + defer w.lock.Unlock() + + for _, message := range messages { + if _, ok := w.skip[message.ref.Hash]; !ok { + w.cache = append(w.cache, messages...) + } + } +} + +// retrieve fetches all the cached messages from the filter. +func (w *whisperFilter) retrieve() (messages []WhisperMessage) { + w.lock.Lock() + defer w.lock.Unlock() + + messages, w.cache = w.cache, nil + w.update = time.Now() + + return +} + +// activity returns the last time instance when client requests were executed on +// the filter. +func (w *whisperFilter) activity() time.Time { + w.lock.RLock() + defer w.lock.RUnlock() + + return w.update +} + +// newWhisperFilter creates a new serialized, poll based whisper topic filter. +func newWhisperFilter(id int, ref *Whisper) *whisperFilter { + return &whisperFilter{ + id: id, + ref: ref, + + update: time.Now(), + skip: make(map[common.Hash]struct{}), + } +} + +// NewWhisperMessage converts an internal message into an API version. +func NewWhisperMessage(message *Message) WhisperMessage { + return WhisperMessage{ + ref: message, + + Payload: common.ToHex(message.Payload), + From: common.ToHex(crypto.FromECDSAPub(message.Recover())), + To: common.ToHex(crypto.FromECDSAPub(message.To)), + Sent: message.Sent.Unix(), + TTL: int64(message.TTL / time.Second), + Hash: common.ToHex(message.Hash.Bytes()), + } +} diff --git a/whisper/whisperv2/doc.go b/whisper/whisperv2/doc.go new file mode 100644 index 000000000..7252f44b1 --- /dev/null +++ b/whisper/whisperv2/doc.go @@ -0,0 +1,32 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +/* +Package whisper implements the Whisper PoC-1. + +(https://github.com/ethereum/wiki/wiki/Whisper-PoC-1-Protocol-Spec) + +Whisper combines aspects of both DHTs and datagram messaging systems (e.g. UDP). +As such it may be likened and compared to both, not dissimilar to the +matter/energy duality (apologies to physicists for the blatant abuse of a +fundamental and beautiful natural principle). + +Whisper is a pure identity-based messaging system. Whisper provides a low-level +(non-application-specific) but easily-accessible API without being based upon +or prejudiced by the low-level hardware attributes and characteristics, +particularly the notion of singular endpoints. +*/ +package whisperv2 diff --git a/whisper/whisperv2/envelope.go b/whisper/whisperv2/envelope.go new file mode 100644 index 000000000..7110ab457 --- /dev/null +++ b/whisper/whisperv2/envelope.go @@ -0,0 +1,147 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Contains the Whisper protocol Envelope element. For formal details please see +// the specs at https://github.com/ethereum/wiki/wiki/Whisper-PoC-1-Protocol-Spec#envelopes. + +package whisperv2 + +import ( + "crypto/ecdsa" + "encoding/binary" + "fmt" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/crypto/ecies" + "github.com/ethereum/go-ethereum/rlp" +) + +// Envelope represents a clear-text data packet to transmit through the Whisper +// network. Its contents may or may not be encrypted and signed. +type Envelope struct { + Expiry uint32 // Whisper protocol specifies int32, really should be int64 + TTL uint32 // ^^^^^^ + Topics []Topic + Data []byte + Nonce uint32 + + hash common.Hash // Cached hash of the envelope to avoid rehashing every time +} + +// NewEnvelope wraps a Whisper message with expiration and destination data +// included into an envelope for network forwarding. +func NewEnvelope(ttl time.Duration, topics []Topic, msg *Message) *Envelope { + return &Envelope{ + Expiry: uint32(time.Now().Add(ttl).Unix()), + TTL: uint32(ttl.Seconds()), + Topics: topics, + Data: msg.bytes(), + Nonce: 0, + } +} + +// Seal closes the envelope by spending the requested amount of time as a proof +// of work on hashing the data. +func (self *Envelope) Seal(pow time.Duration) { + d := make([]byte, 64) + copy(d[:32], self.rlpWithoutNonce()) + + finish, bestBit := time.Now().Add(pow).UnixNano(), 0 + for nonce := uint32(0); time.Now().UnixNano() < finish; { + for i := 0; i < 1024; i++ { + binary.BigEndian.PutUint32(d[60:], nonce) + + firstBit := common.FirstBitSet(common.BigD(crypto.Keccak256(d))) + if firstBit > bestBit { + self.Nonce, bestBit = nonce, firstBit + } + nonce++ + } + } +} + +// rlpWithoutNonce returns the RLP encoded envelope contents, except the nonce. +func (self *Envelope) rlpWithoutNonce() []byte { + enc, _ := rlp.EncodeToBytes([]interface{}{self.Expiry, self.TTL, self.Topics, self.Data}) + return enc +} + +// Open extracts the message contained within a potentially encrypted envelope. +func (self *Envelope) Open(key *ecdsa.PrivateKey) (msg *Message, err error) { + // Split open the payload into a message construct + data := self.Data + + message := &Message{ + Flags: data[0], + Sent: time.Unix(int64(self.Expiry-self.TTL), 0), + TTL: time.Duration(self.TTL) * time.Second, + Hash: self.Hash(), + } + data = data[1:] + + if message.Flags&signatureFlag == signatureFlag { + if len(data) < signatureLength { + return nil, fmt.Errorf("unable to open envelope. First bit set but len(data) < len(signature)") + } + message.Signature, data = data[:signatureLength], data[signatureLength:] + } + message.Payload = data + + // Decrypt the message, if requested + if key == nil { + return message, nil + } + err = message.decrypt(key) + switch err { + case nil: + return message, nil + + case ecies.ErrInvalidPublicKey: // Payload isn't encrypted + return message, err + + default: + return nil, fmt.Errorf("unable to open envelope, decrypt failed: %v", err) + } +} + +// Hash returns the SHA3 hash of the envelope, calculating it if not yet done. +func (self *Envelope) Hash() common.Hash { + if (self.hash == common.Hash{}) { + enc, _ := rlp.EncodeToBytes(self) + self.hash = crypto.Keccak256Hash(enc) + } + return self.hash +} + +// DecodeRLP decodes an Envelope from an RLP data stream. +func (self *Envelope) DecodeRLP(s *rlp.Stream) error { + raw, err := s.Raw() + if err != nil { + return err + } + // The decoding of Envelope uses the struct fields but also needs + // to compute the hash of the whole RLP-encoded envelope. This + // type has the same structure as Envelope but is not an + // rlp.Decoder so we can reuse the Envelope struct definition. + type rlpenv Envelope + if err := rlp.DecodeBytes(raw, (*rlpenv)(self)); err != nil { + return err + } + self.hash = crypto.Keccak256Hash(raw) + return nil +} diff --git a/whisper/whisperv2/envelope_test.go b/whisper/whisperv2/envelope_test.go new file mode 100644 index 000000000..75e2fbe8a --- /dev/null +++ b/whisper/whisperv2/envelope_test.go @@ -0,0 +1,158 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package whisperv2 + +import ( + "bytes" + "testing" + "time" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/crypto/ecies" +) + +func TestEnvelopeOpen(t *testing.T) { + payload := []byte("hello world") + message := NewMessage(payload) + + envelope, err := message.Wrap(DefaultPoW, Options{}) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + opened, err := envelope.Open(nil) + if err != nil { + t.Fatalf("failed to open envelope: %v", err) + } + if opened.Flags != message.Flags { + t.Fatalf("flags mismatch: have %d, want %d", opened.Flags, message.Flags) + } + if bytes.Compare(opened.Signature, message.Signature) != 0 { + t.Fatalf("signature mismatch: have 0x%x, want 0x%x", opened.Signature, message.Signature) + } + if bytes.Compare(opened.Payload, message.Payload) != 0 { + t.Fatalf("payload mismatch: have 0x%x, want 0x%x", opened.Payload, message.Payload) + } + if opened.Sent.Unix() != message.Sent.Unix() { + t.Fatalf("send time mismatch: have %d, want %d", opened.Sent, message.Sent) + } + if opened.TTL/time.Second != DefaultTTL/time.Second { + t.Fatalf("message TTL mismatch: have %v, want %v", opened.TTL, DefaultTTL) + } + + if opened.Hash != envelope.Hash() { + t.Fatalf("message hash mismatch: have 0x%x, want 0x%x", opened.Hash, envelope.Hash()) + } +} + +func TestEnvelopeAnonymousOpenUntargeted(t *testing.T) { + payload := []byte("hello envelope") + envelope, err := NewMessage(payload).Wrap(DefaultPoW, Options{}) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + opened, err := envelope.Open(nil) + if err != nil { + t.Fatalf("failed to open envelope: %v", err) + } + if opened.To != nil { + t.Fatalf("recipient mismatch: have 0x%x, want nil", opened.To) + } + if bytes.Compare(opened.Payload, payload) != 0 { + t.Fatalf("payload mismatch: have 0x%x, want 0x%x", opened.Payload, payload) + } +} + +func TestEnvelopeAnonymousOpenTargeted(t *testing.T) { + key, err := crypto.GenerateKey() + if err != nil { + t.Fatalf("failed to generate test identity: %v", err) + } + + payload := []byte("hello envelope") + envelope, err := NewMessage(payload).Wrap(DefaultPoW, Options{ + To: &key.PublicKey, + }) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + opened, err := envelope.Open(nil) + if err != nil { + t.Fatalf("failed to open envelope: %v", err) + } + if opened.To != nil { + t.Fatalf("recipient mismatch: have 0x%x, want nil", opened.To) + } + if bytes.Compare(opened.Payload, payload) == 0 { + t.Fatalf("payload match, should have been encrypted: 0x%x", opened.Payload) + } +} + +func TestEnvelopeIdentifiedOpenUntargeted(t *testing.T) { + key, err := crypto.GenerateKey() + if err != nil { + t.Fatalf("failed to generate test identity: %v", err) + } + + payload := []byte("hello envelope") + envelope, err := NewMessage(payload).Wrap(DefaultPoW, Options{}) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + opened, err := envelope.Open(key) + switch err { + case nil: + t.Fatalf("envelope opened with bad key: %v", opened) + + case ecies.ErrInvalidPublicKey: + // Ok, key mismatch but opened + + default: + t.Fatalf("failed to open envelope: %v", err) + } + + if opened.To != nil { + t.Fatalf("recipient mismatch: have 0x%x, want nil", opened.To) + } + if bytes.Compare(opened.Payload, payload) != 0 { + t.Fatalf("payload mismatch: have 0x%x, want 0x%x", opened.Payload, payload) + } +} + +func TestEnvelopeIdentifiedOpenTargeted(t *testing.T) { + key, err := crypto.GenerateKey() + if err != nil { + t.Fatalf("failed to generate test identity: %v", err) + } + + payload := []byte("hello envelope") + envelope, err := NewMessage(payload).Wrap(DefaultPoW, Options{ + To: &key.PublicKey, + }) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + opened, err := envelope.Open(key) + if err != nil { + t.Fatalf("failed to open envelope: %v", err) + } + if opened.To != nil { + t.Fatalf("recipient mismatch: have 0x%x, want nil", opened.To) + } + if bytes.Compare(opened.Payload, payload) != 0 { + t.Fatalf("payload mismatch: have 0x%x, want 0x%x", opened.Payload, payload) + } +} diff --git a/whisper/whisperv2/filter.go b/whisper/whisperv2/filter.go new file mode 100644 index 000000000..8ce4a54fb --- /dev/null +++ b/whisper/whisperv2/filter.go @@ -0,0 +1,132 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Contains the message filter for fine grained subscriptions. + +package whisperv2 + +import ( + "crypto/ecdsa" + + "github.com/ethereum/go-ethereum/event/filter" +) + +// Filter is used to subscribe to specific types of whisper messages. +type Filter struct { + To *ecdsa.PublicKey // Recipient of the message + From *ecdsa.PublicKey // Sender of the message + Topics [][]Topic // Topics to filter messages with + Fn func(msg *Message) // Handler in case of a match +} + +// NewFilterTopics creates a 2D topic array used by whisper.Filter from binary +// data elements. +func NewFilterTopics(data ...[][]byte) [][]Topic { + filter := make([][]Topic, len(data)) + for i, condition := range data { + // Handle the special case of condition == [[]byte{}] + if len(condition) == 1 && len(condition[0]) == 0 { + filter[i] = []Topic{} + continue + } + // Otherwise flatten normally + filter[i] = NewTopics(condition...) + } + return filter +} + +// NewFilterTopicsFlat creates a 2D topic array used by whisper.Filter from flat +// binary data elements. +func NewFilterTopicsFlat(data ...[]byte) [][]Topic { + filter := make([][]Topic, len(data)) + for i, element := range data { + // Only add non-wildcard topics + filter[i] = make([]Topic, 0, 1) + if len(element) > 0 { + filter[i] = append(filter[i], NewTopic(element)) + } + } + return filter +} + +// NewFilterTopicsFromStrings creates a 2D topic array used by whisper.Filter +// from textual data elements. +func NewFilterTopicsFromStrings(data ...[]string) [][]Topic { + filter := make([][]Topic, len(data)) + for i, condition := range data { + // Handle the special case of condition == [""] + if len(condition) == 1 && condition[0] == "" { + filter[i] = []Topic{} + continue + } + // Otherwise flatten normally + filter[i] = NewTopicsFromStrings(condition...) + } + return filter +} + +// NewFilterTopicsFromStringsFlat creates a 2D topic array used by whisper.Filter from flat +// binary data elements. +func NewFilterTopicsFromStringsFlat(data ...string) [][]Topic { + filter := make([][]Topic, len(data)) + for i, element := range data { + // Only add non-wildcard topics + filter[i] = make([]Topic, 0, 1) + if element != "" { + filter[i] = append(filter[i], NewTopicFromString(element)) + } + } + return filter +} + +// filterer is the internal, fully initialized filter ready to match inbound +// messages to a variety of criteria. +type filterer struct { + to string // Recipient of the message + from string // Sender of the message + matcher *topicMatcher // Topics to filter messages with + fn func(data interface{}) // Handler in case of a match +} + +// Compare checks if the specified filter matches the current one. +func (self filterer) Compare(f filter.Filter) bool { + filter := f.(filterer) + + // Check the message sender and recipient + if len(self.to) > 0 && self.to != filter.to { + return false + } + if len(self.from) > 0 && self.from != filter.from { + return false + } + // Check the topic filtering + topics := make([]Topic, len(filter.matcher.conditions)) + for i, group := range filter.matcher.conditions { + // Message should contain a single topic entry, extract + for topics[i], _ = range group { + break + } + } + if !self.matcher.Matches(topics) { + return false + } + return true +} + +// Trigger is called when a filter successfully matches an inbound message. +func (self filterer) Trigger(data interface{}) { + self.fn(data) +} diff --git a/whisper/whisperv2/filter_test.go b/whisper/whisperv2/filter_test.go new file mode 100644 index 000000000..5a14a84bb --- /dev/null +++ b/whisper/whisperv2/filter_test.go @@ -0,0 +1,215 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package whisperv2 + +import ( + "bytes" + + "testing" +) + +var filterTopicsCreationTests = []struct { + topics [][]string + filter [][][4]byte +}{ + { // Simple topic filter + topics: [][]string{ + {"abc", "def", "ghi"}, + {"def"}, + {"ghi", "abc"}, + }, + filter: [][][4]byte{ + {{0x4e, 0x03, 0x65, 0x7a}, {0x34, 0x60, 0x7c, 0x9b}, {0x21, 0x41, 0x7d, 0xf9}}, + {{0x34, 0x60, 0x7c, 0x9b}}, + {{0x21, 0x41, 0x7d, 0xf9}, {0x4e, 0x03, 0x65, 0x7a}}, + }, + }, + { // Wild-carded topic filter + topics: [][]string{ + {"abc", "def", "ghi"}, + {}, + {""}, + {"def"}, + }, + filter: [][][4]byte{ + {{0x4e, 0x03, 0x65, 0x7a}, {0x34, 0x60, 0x7c, 0x9b}, {0x21, 0x41, 0x7d, 0xf9}}, + {}, + {}, + {{0x34, 0x60, 0x7c, 0x9b}}, + }, + }, +} + +var filterTopicsCreationFlatTests = []struct { + topics []string + filter [][][4]byte +}{ + { // Simple topic list + topics: []string{"abc", "def", "ghi"}, + filter: [][][4]byte{ + {{0x4e, 0x03, 0x65, 0x7a}}, + {{0x34, 0x60, 0x7c, 0x9b}}, + {{0x21, 0x41, 0x7d, 0xf9}}, + }, + }, + { // Wild-carded topic list + topics: []string{"abc", "", "ghi"}, + filter: [][][4]byte{ + {{0x4e, 0x03, 0x65, 0x7a}}, + {}, + {{0x21, 0x41, 0x7d, 0xf9}}, + }, + }, +} + +func TestFilterTopicsCreation(t *testing.T) { + // Check full filter creation + for i, tt := range filterTopicsCreationTests { + // Check the textual creation + filter := NewFilterTopicsFromStrings(tt.topics...) + if len(filter) != len(tt.topics) { + t.Errorf("test %d: condition count mismatch: have %v, want %v", i, len(filter), len(tt.topics)) + continue + } + for j, condition := range filter { + if len(condition) != len(tt.filter[j]) { + t.Errorf("test %d, condition %d: size mismatch: have %v, want %v", i, j, len(condition), len(tt.filter[j])) + continue + } + for k := 0; k < len(condition); k++ { + if bytes.Compare(condition[k][:], tt.filter[j][k][:]) != 0 { + t.Errorf("test %d, condition %d, segment %d: filter mismatch: have 0x%x, want 0x%x", i, j, k, condition[k], tt.filter[j][k]) + } + } + } + // Check the binary creation + binary := make([][][]byte, len(tt.topics)) + for j, condition := range tt.topics { + binary[j] = make([][]byte, len(condition)) + for k, segment := range condition { + binary[j][k] = []byte(segment) + } + } + filter = NewFilterTopics(binary...) + if len(filter) != len(tt.topics) { + t.Errorf("test %d: condition count mismatch: have %v, want %v", i, len(filter), len(tt.topics)) + continue + } + for j, condition := range filter { + if len(condition) != len(tt.filter[j]) { + t.Errorf("test %d, condition %d: size mismatch: have %v, want %v", i, j, len(condition), len(tt.filter[j])) + continue + } + for k := 0; k < len(condition); k++ { + if bytes.Compare(condition[k][:], tt.filter[j][k][:]) != 0 { + t.Errorf("test %d, condition %d, segment %d: filter mismatch: have 0x%x, want 0x%x", i, j, k, condition[k], tt.filter[j][k]) + } + } + } + } + // Check flat filter creation + for i, tt := range filterTopicsCreationFlatTests { + // Check the textual creation + filter := NewFilterTopicsFromStringsFlat(tt.topics...) + if len(filter) != len(tt.topics) { + t.Errorf("test %d: condition count mismatch: have %v, want %v", i, len(filter), len(tt.topics)) + continue + } + for j, condition := range filter { + if len(condition) != len(tt.filter[j]) { + t.Errorf("test %d, condition %d: size mismatch: have %v, want %v", i, j, len(condition), len(tt.filter[j])) + continue + } + for k := 0; k < len(condition); k++ { + if bytes.Compare(condition[k][:], tt.filter[j][k][:]) != 0 { + t.Errorf("test %d, condition %d, segment %d: filter mismatch: have 0x%x, want 0x%x", i, j, k, condition[k], tt.filter[j][k]) + } + } + } + // Check the binary creation + binary := make([][]byte, len(tt.topics)) + for j, topic := range tt.topics { + binary[j] = []byte(topic) + } + filter = NewFilterTopicsFlat(binary...) + if len(filter) != len(tt.topics) { + t.Errorf("test %d: condition count mismatch: have %v, want %v", i, len(filter), len(tt.topics)) + continue + } + for j, condition := range filter { + if len(condition) != len(tt.filter[j]) { + t.Errorf("test %d, condition %d: size mismatch: have %v, want %v", i, j, len(condition), len(tt.filter[j])) + continue + } + for k := 0; k < len(condition); k++ { + if bytes.Compare(condition[k][:], tt.filter[j][k][:]) != 0 { + t.Errorf("test %d, condition %d, segment %d: filter mismatch: have 0x%x, want 0x%x", i, j, k, condition[k], tt.filter[j][k]) + } + } + } + } +} + +var filterCompareTests = []struct { + matcher filterer + message filterer + match bool +}{ + { // Wild-card filter matching anything + matcher: filterer{to: "", from: "", matcher: newTopicMatcher()}, + message: filterer{to: "to", from: "from", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)}, + match: true, + }, + { // Filter matching the to field + matcher: filterer{to: "to", from: "", matcher: newTopicMatcher()}, + message: filterer{to: "to", from: "from", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)}, + match: true, + }, + { // Filter rejecting the to field + matcher: filterer{to: "to", from: "", matcher: newTopicMatcher()}, + message: filterer{to: "", from: "from", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)}, + match: false, + }, + { // Filter matching the from field + matcher: filterer{to: "", from: "from", matcher: newTopicMatcher()}, + message: filterer{to: "to", from: "from", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)}, + match: true, + }, + { // Filter rejecting the from field + matcher: filterer{to: "", from: "from", matcher: newTopicMatcher()}, + message: filterer{to: "to", from: "", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)}, + match: false, + }, + { // Filter matching the topic field + matcher: filterer{to: "", from: "from", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)}, + message: filterer{to: "to", from: "from", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)}, + match: true, + }, + { // Filter rejecting the topic field + matcher: filterer{to: "", from: "", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)}, + message: filterer{to: "to", from: "from", matcher: newTopicMatcher()}, + match: false, + }, +} + +func TestFilterCompare(t *testing.T) { + for i, tt := range filterCompareTests { + if match := tt.matcher.Compare(tt.message); match != tt.match { + t.Errorf("test %d: match mismatch: have %v, want %v", i, match, tt.match) + } + } +} diff --git a/whisper/whisperv2/main.go b/whisper/whisperv2/main.go new file mode 100644 index 000000000..be4160489 --- /dev/null +++ b/whisper/whisperv2/main.go @@ -0,0 +1,106 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// +build none + +// Contains a simple whisper peer setup and self messaging to allow playing +// around with the protocol and API without a fancy client implementation. + +package main + +import ( + "fmt" + "log" + "os" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/nat" + "github.com/ethereum/go-ethereum/whisper" +) + +func main() { + logger.AddLogSystem(logger.NewStdLogSystem(os.Stdout, log.LstdFlags, logger.InfoLevel)) + + // Generate the peer identity + key, err := crypto.GenerateKey() + if err != nil { + fmt.Printf("Failed to generate peer key: %v.\n", err) + os.Exit(-1) + } + name := common.MakeName("whisper-go", "1.0") + shh := whisper.New() + + // Create an Ethereum peer to communicate through + server := p2p.Server{ + PrivateKey: key, + MaxPeers: 10, + Name: name, + Protocols: []p2p.Protocol{shh.Protocol()}, + ListenAddr: ":30300", + NAT: nat.Any(), + } + fmt.Println("Starting Ethereum peer...") + if err := server.Start(); err != nil { + fmt.Printf("Failed to start Ethereum peer: %v.\n", err) + os.Exit(1) + } + + // Send a message to self to check that something works + payload := fmt.Sprintf("Hello world, this is %v. In case you're wondering, the time is %v", name, time.Now()) + if err := selfSend(shh, []byte(payload)); err != nil { + fmt.Printf("Failed to self message: %v.\n", err) + os.Exit(-1) + } +} + +// SendSelf wraps a payload into a Whisper envelope and forwards it to itself. +func selfSend(shh *whisper.Whisper, payload []byte) error { + ok := make(chan struct{}) + + // Start watching for self messages, output any arrivals + id := shh.NewIdentity() + shh.Watch(whisper.Filter{ + To: &id.PublicKey, + Fn: func(msg *whisper.Message) { + fmt.Printf("Message received: %s, signed with 0x%x.\n", string(msg.Payload), msg.Signature) + close(ok) + }, + }) + // Wrap the payload and encrypt it + msg := whisper.NewMessage(payload) + envelope, err := msg.Wrap(whisper.DefaultPoW, whisper.Options{ + From: id, + To: &id.PublicKey, + TTL: whisper.DefaultTTL, + }) + if err != nil { + return fmt.Errorf("failed to seal message: %v", err) + } + // Dump the message into the system and wait for it to pop back out + if err := shh.Send(envelope); err != nil { + return fmt.Errorf("failed to send self-message: %v", err) + } + select { + case <-ok: + case <-time.After(time.Second): + return fmt.Errorf("failed to receive message in time") + } + return nil +} diff --git a/whisper/whisperv2/message.go b/whisper/whisperv2/message.go new file mode 100644 index 000000000..7ef9d0912 --- /dev/null +++ b/whisper/whisperv2/message.go @@ -0,0 +1,156 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Contains the Whisper protocol Message element. For formal details please see +// the specs at https://github.com/ethereum/wiki/wiki/Whisper-PoC-1-Protocol-Spec#messages. + +package whisperv2 + +import ( + "crypto/ecdsa" + "math/rand" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" +) + +// Message represents an end-user data packet to transmit through the Whisper +// protocol. These are wrapped into Envelopes that need not be understood by +// intermediate nodes, just forwarded. +type Message struct { + Flags byte // First bit is signature presence, rest reserved and should be random + Signature []byte + Payload []byte + + Sent time.Time // Time when the message was posted into the network + TTL time.Duration // Maximum time to live allowed for the message + + To *ecdsa.PublicKey // Message recipient (identity used to decode the message) + Hash common.Hash // Message envelope hash to act as a unique id +} + +// Options specifies the exact way a message should be wrapped into an Envelope. +type Options struct { + From *ecdsa.PrivateKey + To *ecdsa.PublicKey + TTL time.Duration + Topics []Topic +} + +// NewMessage creates and initializes a non-signed, non-encrypted Whisper message. +func NewMessage(payload []byte) *Message { + // Construct an initial flag set: no signature, rest random + flags := byte(rand.Intn(256)) + flags &= ^signatureFlag + + // Assemble and return the message + return &Message{ + Flags: flags, + Payload: payload, + Sent: time.Now(), + } +} + +// Wrap bundles the message into an Envelope to transmit over the network. +// +// pow (Proof Of Work) controls how much time to spend on hashing the message, +// inherently controlling its priority through the network (smaller hash, bigger +// priority). +// +// The user can control the amount of identity, privacy and encryption through +// the options parameter as follows: +// - options.From == nil && options.To == nil: anonymous broadcast +// - options.From != nil && options.To == nil: signed broadcast (known sender) +// - options.From == nil && options.To != nil: encrypted anonymous message +// - options.From != nil && options.To != nil: encrypted signed message +func (self *Message) Wrap(pow time.Duration, options Options) (*Envelope, error) { + // Use the default TTL if non was specified + if options.TTL == 0 { + options.TTL = DefaultTTL + } + self.TTL = options.TTL + + // Sign and encrypt the message if requested + if options.From != nil { + if err := self.sign(options.From); err != nil { + return nil, err + } + } + if options.To != nil { + if err := self.encrypt(options.To); err != nil { + return nil, err + } + } + // Wrap the processed message, seal it and return + envelope := NewEnvelope(options.TTL, options.Topics, self) + envelope.Seal(pow) + + return envelope, nil +} + +// sign calculates and sets the cryptographic signature for the message , also +// setting the sign flag. +func (self *Message) sign(key *ecdsa.PrivateKey) (err error) { + self.Flags |= signatureFlag + self.Signature, err = crypto.Sign(self.hash(), key) + return +} + +// Recover retrieves the public key of the message signer. +func (self *Message) Recover() *ecdsa.PublicKey { + defer func() { recover() }() // in case of invalid signature + + // Short circuit if no signature is present + if self.Signature == nil { + return nil + } + // Otherwise try and recover the signature + pub, err := crypto.SigToPub(self.hash(), self.Signature) + if err != nil { + glog.V(logger.Error).Infof("Could not get public key from signature: %v", err) + return nil + } + return pub +} + +// encrypt encrypts a message payload with a public key. +func (self *Message) encrypt(key *ecdsa.PublicKey) (err error) { + self.Payload, err = crypto.Encrypt(key, self.Payload) + return +} + +// decrypt decrypts an encrypted payload with a private key. +func (self *Message) decrypt(key *ecdsa.PrivateKey) error { + cleartext, err := crypto.Decrypt(key, self.Payload) + if err == nil { + self.Payload = cleartext + } + return err +} + +// hash calculates the SHA3 checksum of the message flags and payload. +func (self *Message) hash() []byte { + return crypto.Keccak256(append([]byte{self.Flags}, self.Payload...)) +} + +// bytes flattens the message contents (flags, signature and payload) into a +// single binary blob. +func (self *Message) bytes() []byte { + return append([]byte{self.Flags}, append(self.Signature, self.Payload...)...) +} diff --git a/whisper/whisperv2/message_test.go b/whisper/whisperv2/message_test.go new file mode 100644 index 000000000..efa64e431 --- /dev/null +++ b/whisper/whisperv2/message_test.go @@ -0,0 +1,159 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package whisperv2 + +import ( + "bytes" + "crypto/elliptic" + "testing" + "time" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/crypto/secp256k1" +) + +// Tests whether a message can be wrapped without any identity or encryption. +func TestMessageSimpleWrap(t *testing.T) { + payload := []byte("hello world") + + msg := NewMessage(payload) + if _, err := msg.Wrap(DefaultPoW, Options{}); err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + if msg.Flags&signatureFlag != 0 { + t.Fatalf("signature flag mismatch: have %d, want %d", msg.Flags&signatureFlag, 0) + } + if len(msg.Signature) != 0 { + t.Fatalf("signature found for simple wrapping: 0x%x", msg.Signature) + } + if bytes.Compare(msg.Payload, payload) != 0 { + t.Fatalf("payload mismatch after wrapping: have 0x%x, want 0x%x", msg.Payload, payload) + } + if msg.TTL/time.Second != DefaultTTL/time.Second { + t.Fatalf("message TTL mismatch: have %v, want %v", msg.TTL, DefaultTTL) + } +} + +// Tests whether a message can be signed, and wrapped in plain-text. +func TestMessageCleartextSignRecover(t *testing.T) { + key, err := crypto.GenerateKey() + if err != nil { + t.Fatalf("failed to create crypto key: %v", err) + } + payload := []byte("hello world") + + msg := NewMessage(payload) + if _, err := msg.Wrap(DefaultPoW, Options{ + From: key, + }); err != nil { + t.Fatalf("failed to sign message: %v", err) + } + if msg.Flags&signatureFlag != signatureFlag { + t.Fatalf("signature flag mismatch: have %d, want %d", msg.Flags&signatureFlag, signatureFlag) + } + if bytes.Compare(msg.Payload, payload) != 0 { + t.Fatalf("payload mismatch after signing: have 0x%x, want 0x%x", msg.Payload, payload) + } + + pubKey := msg.Recover() + if pubKey == nil { + t.Fatalf("failed to recover public key") + } + p1 := elliptic.Marshal(secp256k1.S256(), key.PublicKey.X, key.PublicKey.Y) + p2 := elliptic.Marshal(secp256k1.S256(), pubKey.X, pubKey.Y) + if !bytes.Equal(p1, p2) { + t.Fatalf("public key mismatch: have 0x%x, want 0x%x", p2, p1) + } +} + +// Tests whether a message can be encrypted and decrypted using an anonymous +// sender (i.e. no signature). +func TestMessageAnonymousEncryptDecrypt(t *testing.T) { + key, err := crypto.GenerateKey() + if err != nil { + t.Fatalf("failed to create recipient crypto key: %v", err) + } + payload := []byte("hello world") + + msg := NewMessage(payload) + envelope, err := msg.Wrap(DefaultPoW, Options{ + To: &key.PublicKey, + }) + if err != nil { + t.Fatalf("failed to encrypt message: %v", err) + } + if msg.Flags&signatureFlag != 0 { + t.Fatalf("signature flag mismatch: have %d, want %d", msg.Flags&signatureFlag, 0) + } + if len(msg.Signature) != 0 { + t.Fatalf("signature found for anonymous message: 0x%x", msg.Signature) + } + + out, err := envelope.Open(key) + if err != nil { + t.Fatalf("failed to open encrypted message: %v", err) + } + if !bytes.Equal(out.Payload, payload) { + t.Errorf("payload mismatch: have 0x%x, want 0x%x", out.Payload, payload) + } +} + +// Tests whether a message can be properly signed and encrypted. +func TestMessageFullCrypto(t *testing.T) { + fromKey, err := crypto.GenerateKey() + if err != nil { + t.Fatalf("failed to create sender crypto key: %v", err) + } + toKey, err := crypto.GenerateKey() + if err != nil { + t.Fatalf("failed to create recipient crypto key: %v", err) + } + + payload := []byte("hello world") + msg := NewMessage(payload) + envelope, err := msg.Wrap(DefaultPoW, Options{ + From: fromKey, + To: &toKey.PublicKey, + }) + if err != nil { + t.Fatalf("failed to encrypt message: %v", err) + } + if msg.Flags&signatureFlag != signatureFlag { + t.Fatalf("signature flag mismatch: have %d, want %d", msg.Flags&signatureFlag, signatureFlag) + } + if len(msg.Signature) == 0 { + t.Fatalf("no signature found for signed message") + } + + out, err := envelope.Open(toKey) + if err != nil { + t.Fatalf("failed to open encrypted message: %v", err) + } + if !bytes.Equal(out.Payload, payload) { + t.Errorf("payload mismatch: have 0x%x, want 0x%x", out.Payload, payload) + } + + pubKey := out.Recover() + if pubKey == nil { + t.Fatalf("failed to recover public key") + } + p1 := elliptic.Marshal(secp256k1.S256(), fromKey.PublicKey.X, fromKey.PublicKey.Y) + p2 := elliptic.Marshal(secp256k1.S256(), pubKey.X, pubKey.Y) + if !bytes.Equal(p1, p2) { + t.Fatalf("public key mismatch: have 0x%x, want 0x%x", p2, p1) + } +} diff --git a/whisper/whisperv2/peer.go b/whisper/whisperv2/peer.go new file mode 100644 index 000000000..404ebd513 --- /dev/null +++ b/whisper/whisperv2/peer.go @@ -0,0 +1,175 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package whisperv2 + +import ( + "fmt" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rlp" + "gopkg.in/fatih/set.v0" +) + +// peer represents a whisper protocol peer connection. +type peer struct { + host *Whisper + peer *p2p.Peer + ws p2p.MsgReadWriter + + known *set.Set // Messages already known by the peer to avoid wasting bandwidth + + quit chan struct{} +} + +// newPeer creates a new whisper peer object, but does not run the handshake itself. +func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) *peer { + return &peer{ + host: host, + peer: remote, + ws: rw, + known: set.New(), + quit: make(chan struct{}), + } +} + +// start initiates the peer updater, periodically broadcasting the whisper packets +// into the network. +func (self *peer) start() { + go self.update() + glog.V(logger.Debug).Infof("%v: whisper started", self.peer) +} + +// stop terminates the peer updater, stopping message forwarding to it. +func (self *peer) stop() { + close(self.quit) + glog.V(logger.Debug).Infof("%v: whisper stopped", self.peer) +} + +// handshake sends the protocol initiation status message to the remote peer and +// verifies the remote status too. +func (self *peer) handshake() error { + // Send the handshake status message asynchronously + errc := make(chan error, 1) + go func() { + errc <- p2p.SendItems(self.ws, statusCode, protocolVersion) + }() + // Fetch the remote status packet and verify protocol match + packet, err := self.ws.ReadMsg() + if err != nil { + return err + } + if packet.Code != statusCode { + return fmt.Errorf("peer sent %x before status packet", packet.Code) + } + s := rlp.NewStream(packet.Payload, uint64(packet.Size)) + if _, err := s.List(); err != nil { + return fmt.Errorf("bad status message: %v", err) + } + peerVersion, err := s.Uint() + if err != nil { + return fmt.Errorf("bad status message: %v", err) + } + if peerVersion != protocolVersion { + return fmt.Errorf("protocol version mismatch %d != %d", peerVersion, protocolVersion) + } + // Wait until out own status is consumed too + if err := <-errc; err != nil { + return fmt.Errorf("failed to send status packet: %v", err) + } + return nil +} + +// update executes periodic operations on the peer, including message transmission +// and expiration. +func (self *peer) update() { + // Start the tickers for the updates + expire := time.NewTicker(expirationCycle) + transmit := time.NewTicker(transmissionCycle) + + // Loop and transmit until termination is requested + for { + select { + case <-expire.C: + self.expire() + + case <-transmit.C: + if err := self.broadcast(); err != nil { + glog.V(logger.Info).Infof("%v: broadcast failed: %v", self.peer, err) + return + } + + case <-self.quit: + return + } + } +} + +// mark marks an envelope known to the peer so that it won't be sent back. +func (self *peer) mark(envelope *Envelope) { + self.known.Add(envelope.Hash()) +} + +// marked checks if an envelope is already known to the remote peer. +func (self *peer) marked(envelope *Envelope) bool { + return self.known.Has(envelope.Hash()) +} + +// expire iterates over all the known envelopes in the host and removes all +// expired (unknown) ones from the known list. +func (self *peer) expire() { + // Assemble the list of available envelopes + available := set.NewNonTS() + for _, envelope := range self.host.envelopes() { + available.Add(envelope.Hash()) + } + // Cross reference availability with known status + unmark := make(map[common.Hash]struct{}) + self.known.Each(func(v interface{}) bool { + if !available.Has(v.(common.Hash)) { + unmark[v.(common.Hash)] = struct{}{} + } + return true + }) + // Dump all known but unavailable + for hash, _ := range unmark { + self.known.Remove(hash) + } +} + +// broadcast iterates over the collection of envelopes and transmits yet unknown +// ones over the network. +func (self *peer) broadcast() error { + // Fetch the envelopes and collect the unknown ones + envelopes := self.host.envelopes() + transmit := make([]*Envelope, 0, len(envelopes)) + for _, envelope := range envelopes { + if !self.marked(envelope) { + transmit = append(transmit, envelope) + self.mark(envelope) + } + } + // Transmit the unknown batch (potentially empty) + if err := p2p.Send(self.ws, messagesCode, transmit); err != nil { + return err + } + glog.V(logger.Detail).Infoln(self.peer, "broadcasted", len(transmit), "message(s)") + return nil +} diff --git a/whisper/whisperv2/peer_test.go b/whisper/whisperv2/peer_test.go new file mode 100644 index 000000000..9755e134c --- /dev/null +++ b/whisper/whisperv2/peer_test.go @@ -0,0 +1,261 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package whisperv2 + +import ( + "testing" + "time" + + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" +) + +type testPeer struct { + client *Whisper + stream *p2p.MsgPipeRW + termed chan struct{} +} + +func startTestPeer() *testPeer { + // Create a simulated P2P remote peer and data streams to it + remote := p2p.NewPeer(discover.NodeID{}, "", nil) + tester, tested := p2p.MsgPipe() + + // Create a whisper client and connect with it to the tester peer + client := New() + client.Start(nil) + + termed := make(chan struct{}) + go func() { + defer client.Stop() + defer close(termed) + defer tested.Close() + + client.handlePeer(remote, tested) + }() + + return &testPeer{ + client: client, + stream: tester, + termed: termed, + } +} + +func startTestPeerInited() (*testPeer, error) { + peer := startTestPeer() + + if err := p2p.ExpectMsg(peer.stream, statusCode, []uint64{protocolVersion}); err != nil { + peer.stream.Close() + return nil, err + } + if err := p2p.SendItems(peer.stream, statusCode, protocolVersion); err != nil { + peer.stream.Close() + return nil, err + } + return peer, nil +} + +func TestPeerStatusMessage(t *testing.T) { + tester := startTestPeer() + + // Wait for the handshake status message and check it + if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil { + t.Fatalf("status message mismatch: %v", err) + } + // Terminate the node + tester.stream.Close() + + select { + case <-tester.termed: + case <-time.After(time.Second): + t.Fatalf("local close timed out") + } +} + +func TestPeerHandshakeFail(t *testing.T) { + tester := startTestPeer() + + // Wait for and check the handshake + if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil { + t.Fatalf("status message mismatch: %v", err) + } + // Send an invalid handshake status and verify disconnect + if err := p2p.SendItems(tester.stream, messagesCode); err != nil { + t.Fatalf("failed to send malformed status: %v", err) + } + select { + case <-tester.termed: + case <-time.After(time.Second): + t.Fatalf("remote close timed out") + } +} + +func TestPeerHandshakeSuccess(t *testing.T) { + tester := startTestPeer() + + // Wait for and check the handshake + if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil { + t.Fatalf("status message mismatch: %v", err) + } + // Send a valid handshake status and make sure connection stays live + if err := p2p.SendItems(tester.stream, statusCode, protocolVersion); err != nil { + t.Fatalf("failed to send status: %v", err) + } + select { + case <-tester.termed: + t.Fatalf("valid handshake disconnected") + + case <-time.After(100 * time.Millisecond): + } + // Clean up the test + tester.stream.Close() + + select { + case <-tester.termed: + case <-time.After(time.Second): + t.Fatalf("local close timed out") + } +} + +func TestPeerSend(t *testing.T) { + // Start a tester and execute the handshake + tester, err := startTestPeerInited() + if err != nil { + t.Fatalf("failed to start initialized peer: %v", err) + } + defer tester.stream.Close() + + // Construct a message and inject into the tester + message := NewMessage([]byte("peer broadcast test message")) + envelope, err := message.Wrap(DefaultPoW, Options{ + TTL: DefaultTTL, + }) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + if err := tester.client.Send(envelope); err != nil { + t.Fatalf("failed to send message: %v", err) + } + // Check that the message is eventually forwarded + payload := []interface{}{envelope} + if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil { + t.Fatalf("message mismatch: %v", err) + } + // Make sure that even with a re-insert, an empty batch is received + if err := tester.client.Send(envelope); err != nil { + t.Fatalf("failed to send message: %v", err) + } + if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil { + t.Fatalf("message mismatch: %v", err) + } +} + +func TestPeerDeliver(t *testing.T) { + // Start a tester and execute the handshake + tester, err := startTestPeerInited() + if err != nil { + t.Fatalf("failed to start initialized peer: %v", err) + } + defer tester.stream.Close() + + // Watch for all inbound messages + arrived := make(chan struct{}, 1) + tester.client.Watch(Filter{ + Fn: func(message *Message) { + arrived <- struct{}{} + }, + }) + // Construct a message and deliver it to the tester peer + message := NewMessage([]byte("peer broadcast test message")) + envelope, err := message.Wrap(DefaultPoW, Options{ + TTL: DefaultTTL, + }) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + if err := p2p.Send(tester.stream, messagesCode, []*Envelope{envelope}); err != nil { + t.Fatalf("failed to transfer message: %v", err) + } + // Check that the message is delivered upstream + select { + case <-arrived: + case <-time.After(time.Second): + t.Fatalf("message delivery timeout") + } + // Check that a resend is not delivered + if err := p2p.Send(tester.stream, messagesCode, []*Envelope{envelope}); err != nil { + t.Fatalf("failed to transfer message: %v", err) + } + select { + case <-time.After(2 * transmissionCycle): + case <-arrived: + t.Fatalf("repeating message arrived") + } +} + +func TestPeerMessageExpiration(t *testing.T) { + // Start a tester and execute the handshake + tester, err := startTestPeerInited() + if err != nil { + t.Fatalf("failed to start initialized peer: %v", err) + } + defer tester.stream.Close() + + // Fetch the peer instance for later inspection + tester.client.peerMu.RLock() + if peers := len(tester.client.peers); peers != 1 { + t.Fatalf("peer pool size mismatch: have %v, want %v", peers, 1) + } + var peer *peer + for peer, _ = range tester.client.peers { + break + } + tester.client.peerMu.RUnlock() + + // Construct a message and pass it through the tester + message := NewMessage([]byte("peer test message")) + envelope, err := message.Wrap(DefaultPoW, Options{ + TTL: time.Second, + }) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + if err := tester.client.Send(envelope); err != nil { + t.Fatalf("failed to send message: %v", err) + } + payload := []interface{}{envelope} + if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil { + // A premature empty message may have been broadcast, check the next too + if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil { + t.Fatalf("message mismatch: %v", err) + } + } + // Check that the message is inside the cache + if !peer.known.Has(envelope.Hash()) { + t.Fatalf("message not found in cache") + } + // Discard messages until expiration and check cache again + exp := time.Now().Add(time.Second + 2*expirationCycle + 100*time.Millisecond) + for time.Now().Before(exp) { + if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil { + t.Fatalf("message mismatch: %v", err) + } + } + if peer.known.Has(envelope.Hash()) { + t.Fatalf("message not expired from cache") + } +} diff --git a/whisper/whisperv2/topic.go b/whisper/whisperv2/topic.go new file mode 100644 index 000000000..3e2b47bd3 --- /dev/null +++ b/whisper/whisperv2/topic.go @@ -0,0 +1,140 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Contains the Whisper protocol Topic element. For formal details please see +// the specs at https://github.com/ethereum/wiki/wiki/Whisper-PoC-1-Protocol-Spec#topics. + +package whisperv2 + +import "github.com/ethereum/go-ethereum/crypto" + +// Topic represents a cryptographically secure, probabilistic partial +// classifications of a message, determined as the first (left) 4 bytes of the +// SHA3 hash of some arbitrary data given by the original author of the message. +type Topic [4]byte + +// NewTopic creates a topic from the 4 byte prefix of the SHA3 hash of the data. +// +// Note, empty topics are considered the wildcard, and cannot be used in messages. +func NewTopic(data []byte) Topic { + prefix := [4]byte{} + copy(prefix[:], crypto.Keccak256(data)[:4]) + return Topic(prefix) +} + +// NewTopics creates a list of topics from a list of binary data elements, by +// iteratively calling NewTopic on each of them. +func NewTopics(data ...[]byte) []Topic { + topics := make([]Topic, len(data)) + for i, element := range data { + topics[i] = NewTopic(element) + } + return topics +} + +// NewTopicFromString creates a topic using the binary data contents of the +// specified string. +func NewTopicFromString(data string) Topic { + return NewTopic([]byte(data)) +} + +// NewTopicsFromStrings creates a list of topics from a list of textual data +// elements, by iteratively calling NewTopicFromString on each of them. +func NewTopicsFromStrings(data ...string) []Topic { + topics := make([]Topic, len(data)) + for i, element := range data { + topics[i] = NewTopicFromString(element) + } + return topics +} + +// String converts a topic byte array to a string representation. +func (self *Topic) String() string { + return string(self[:]) +} + +// topicMatcher is a filter expression to verify if a list of topics contained +// in an arriving message matches some topic conditions. The topic matcher is +// built up of a list of conditions, each of which must be satisfied by the +// corresponding topic in the message. Each condition may require: a) an exact +// topic match; b) a match from a set of topics; or c) a wild-card matching all. +// +// If a message contains more topics than required by the matcher, those beyond +// the condition count are ignored and assumed to match. +// +// Consider the following sample topic matcher: +// sample := { +// {TopicA1, TopicA2, TopicA3}, +// {TopicB}, +// nil, +// {TopicD1, TopicD2} +// } +// In order for a message to pass this filter, it should enumerate at least 4 +// topics, the first any of [TopicA1, TopicA2, TopicA3], the second mandatory +// "TopicB", the third is ignored by the filter and the fourth either "TopicD1" +// or "TopicD2". If the message contains further topics, the filter will match +// them too. +type topicMatcher struct { + conditions []map[Topic]struct{} +} + +// newTopicMatcher create a topic matcher from a list of topic conditions. +func newTopicMatcher(topics ...[]Topic) *topicMatcher { + matcher := make([]map[Topic]struct{}, len(topics)) + for i, condition := range topics { + matcher[i] = make(map[Topic]struct{}) + for _, topic := range condition { + matcher[i][topic] = struct{}{} + } + } + return &topicMatcher{conditions: matcher} +} + +// newTopicMatcherFromBinary create a topic matcher from a list of binary conditions. +func newTopicMatcherFromBinary(data ...[][]byte) *topicMatcher { + topics := make([][]Topic, len(data)) + for i, condition := range data { + topics[i] = NewTopics(condition...) + } + return newTopicMatcher(topics...) +} + +// newTopicMatcherFromStrings creates a topic matcher from a list of textual +// conditions. +func newTopicMatcherFromStrings(data ...[]string) *topicMatcher { + topics := make([][]Topic, len(data)) + for i, condition := range data { + topics[i] = NewTopicsFromStrings(condition...) + } + return newTopicMatcher(topics...) +} + +// Matches checks if a list of topics matches this particular condition set. +func (self *topicMatcher) Matches(topics []Topic) bool { + // Mismatch if there aren't enough topics + if len(self.conditions) > len(topics) { + return false + } + // Check each topic condition for existence (skip wild-cards) + for i := 0; i < len(topics) && i < len(self.conditions); i++ { + if len(self.conditions[i]) > 0 { + if _, ok := self.conditions[i][topics[i]]; !ok { + return false + } + } + } + return true +} diff --git a/whisper/whisperv2/topic_test.go b/whisper/whisperv2/topic_test.go new file mode 100644 index 000000000..efd4a2c61 --- /dev/null +++ b/whisper/whisperv2/topic_test.go @@ -0,0 +1,215 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package whisperv2 + +import ( + "bytes" + "testing" +) + +var topicCreationTests = []struct { + data []byte + hash [4]byte +}{ + {hash: [4]byte{0x8f, 0x9a, 0x2b, 0x7d}, data: []byte("test name")}, + {hash: [4]byte{0xf2, 0x6e, 0x77, 0x79}, data: []byte("some other test")}, +} + +func TestTopicCreation(t *testing.T) { + // Create the topics individually + for i, tt := range topicCreationTests { + topic := NewTopic(tt.data) + if bytes.Compare(topic[:], tt.hash[:]) != 0 { + t.Errorf("binary test %d: hash mismatch: have %v, want %v.", i, topic, tt.hash) + } + } + for i, tt := range topicCreationTests { + topic := NewTopicFromString(string(tt.data)) + if bytes.Compare(topic[:], tt.hash[:]) != 0 { + t.Errorf("textual test %d: hash mismatch: have %v, want %v.", i, topic, tt.hash) + } + } + // Create the topics in batches + binaryData := make([][]byte, len(topicCreationTests)) + for i, tt := range topicCreationTests { + binaryData[i] = tt.data + } + textualData := make([]string, len(topicCreationTests)) + for i, tt := range topicCreationTests { + textualData[i] = string(tt.data) + } + + topics := NewTopics(binaryData...) + for i, tt := range topicCreationTests { + if bytes.Compare(topics[i][:], tt.hash[:]) != 0 { + t.Errorf("binary batch test %d: hash mismatch: have %v, want %v.", i, topics[i], tt.hash) + } + } + topics = NewTopicsFromStrings(textualData...) + for i, tt := range topicCreationTests { + if bytes.Compare(topics[i][:], tt.hash[:]) != 0 { + t.Errorf("textual batch test %d: hash mismatch: have %v, want %v.", i, topics[i], tt.hash) + } + } +} + +var topicMatcherCreationTest = struct { + binary [][][]byte + textual [][]string + matcher []map[[4]byte]struct{} +}{ + binary: [][][]byte{ + [][]byte{}, + [][]byte{ + []byte("Topic A"), + }, + [][]byte{ + []byte("Topic B1"), + []byte("Topic B2"), + []byte("Topic B3"), + }, + }, + textual: [][]string{ + []string{}, + []string{"Topic A"}, + []string{"Topic B1", "Topic B2", "Topic B3"}, + }, + matcher: []map[[4]byte]struct{}{ + map[[4]byte]struct{}{}, + map[[4]byte]struct{}{ + [4]byte{0x25, 0xfc, 0x95, 0x66}: struct{}{}, + }, + map[[4]byte]struct{}{ + [4]byte{0x93, 0x6d, 0xec, 0x09}: struct{}{}, + [4]byte{0x25, 0x23, 0x34, 0xd3}: struct{}{}, + [4]byte{0x6b, 0xc2, 0x73, 0xd1}: struct{}{}, + }, + }, +} + +func TestTopicMatcherCreation(t *testing.T) { + test := topicMatcherCreationTest + + matcher := newTopicMatcherFromBinary(test.binary...) + for i, cond := range matcher.conditions { + for topic, _ := range cond { + if _, ok := test.matcher[i][topic]; !ok { + t.Errorf("condition %d; extra topic found: 0x%x", i, topic[:]) + } + } + } + for i, cond := range test.matcher { + for topic, _ := range cond { + if _, ok := matcher.conditions[i][topic]; !ok { + t.Errorf("condition %d; topic not found: 0x%x", i, topic[:]) + } + } + } + + matcher = newTopicMatcherFromStrings(test.textual...) + for i, cond := range matcher.conditions { + for topic, _ := range cond { + if _, ok := test.matcher[i][topic]; !ok { + t.Errorf("condition %d; extra topic found: 0x%x", i, topic[:]) + } + } + } + for i, cond := range test.matcher { + for topic, _ := range cond { + if _, ok := matcher.conditions[i][topic]; !ok { + t.Errorf("condition %d; topic not found: 0x%x", i, topic[:]) + } + } + } +} + +var topicMatcherTests = []struct { + filter [][]string + topics []string + match bool +}{ + // Empty topic matcher should match everything + { + filter: [][]string{}, + topics: []string{}, + match: true, + }, + { + filter: [][]string{}, + topics: []string{"a", "b", "c"}, + match: true, + }, + // Fixed topic matcher should match strictly, but only prefix + { + filter: [][]string{[]string{"a"}, []string{"b"}}, + topics: []string{"a"}, + match: false, + }, + { + filter: [][]string{[]string{"a"}, []string{"b"}}, + topics: []string{"a", "b"}, + match: true, + }, + { + filter: [][]string{[]string{"a"}, []string{"b"}}, + topics: []string{"a", "b", "c"}, + match: true, + }, + // Multi-matcher should match any from a sub-group + { + filter: [][]string{[]string{"a1", "a2"}}, + topics: []string{"a"}, + match: false, + }, + { + filter: [][]string{[]string{"a1", "a2"}}, + topics: []string{"a1"}, + match: true, + }, + { + filter: [][]string{[]string{"a1", "a2"}}, + topics: []string{"a2"}, + match: true, + }, + // Wild-card condition should match anything + { + filter: [][]string{[]string{}, []string{"b"}}, + topics: []string{"a"}, + match: false, + }, + { + filter: [][]string{[]string{}, []string{"b"}}, + topics: []string{"a", "b"}, + match: true, + }, + { + filter: [][]string{[]string{}, []string{"b"}}, + topics: []string{"b", "b"}, + match: true, + }, +} + +func TestTopicMatcher(t *testing.T) { + for i, tt := range topicMatcherTests { + topics := NewTopicsFromStrings(tt.topics...) + + matcher := newTopicMatcherFromStrings(tt.filter...) + if match := matcher.Matches(topics); match != tt.match { + t.Errorf("test %d: match mismatch: have %v, want %v", i, match, tt.match) + } + } +} diff --git a/whisper/whisperv2/whisper.go b/whisper/whisperv2/whisper.go new file mode 100644 index 000000000..d9054959e --- /dev/null +++ b/whisper/whisperv2/whisper.go @@ -0,0 +1,378 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package whisperv2 + +import ( + "crypto/ecdsa" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/crypto/ecies" + "github.com/ethereum/go-ethereum/event/filter" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rpc" + + "gopkg.in/fatih/set.v0" +) + +const ( + statusCode = 0x00 + messagesCode = 0x01 + + protocolVersion uint64 = 0x02 + protocolName = "shh" + + signatureFlag = byte(1 << 7) + signatureLength = 65 + + expirationCycle = 800 * time.Millisecond + transmissionCycle = 300 * time.Millisecond +) + +const ( + DefaultTTL = 50 * time.Second + DefaultPoW = 50 * time.Millisecond +) + +type MessageEvent struct { + To *ecdsa.PrivateKey + From *ecdsa.PublicKey + Message *Message +} + +// Whisper represents a dark communication interface through the Ethereum +// network, using its very own P2P communication layer. +type Whisper struct { + protocol p2p.Protocol + filters *filter.Filters + + keys map[string]*ecdsa.PrivateKey + + messages map[common.Hash]*Envelope // Pool of messages currently tracked by this node + expirations map[uint32]*set.SetNonTS // Message expiration pool (TODO: something lighter) + poolMu sync.RWMutex // Mutex to sync the message and expiration pools + + peers map[*peer]struct{} // Set of currently active peers + peerMu sync.RWMutex // Mutex to sync the active peer set + + quit chan struct{} +} + +// New creates a Whisper client ready to communicate through the Ethereum P2P +// network. +func New() *Whisper { + whisper := &Whisper{ + filters: filter.New(), + keys: make(map[string]*ecdsa.PrivateKey), + messages: make(map[common.Hash]*Envelope), + expirations: make(map[uint32]*set.SetNonTS), + peers: make(map[*peer]struct{}), + quit: make(chan struct{}), + } + whisper.filters.Start() + + // p2p whisper sub protocol handler + whisper.protocol = p2p.Protocol{ + Name: protocolName, + Version: uint(protocolVersion), + Length: 2, + Run: whisper.handlePeer, + } + + return whisper +} + +// APIs returns the RPC descriptors the Whisper implementation offers +func (s *Whisper) APIs() []rpc.API { + return []rpc.API{ + { + Namespace: "shh", + Version: "1.0", + Service: NewPublicWhisperAPI(s), + Public: true, + }, + } +} + +// Protocols returns the whisper sub-protocols ran by this particular client. +func (self *Whisper) Protocols() []p2p.Protocol { + return []p2p.Protocol{self.protocol} +} + +// Version returns the whisper sub-protocols version number. +func (self *Whisper) Version() uint { + return self.protocol.Version +} + +// NewIdentity generates a new cryptographic identity for the client, and injects +// it into the known identities for message decryption. +func (self *Whisper) NewIdentity() *ecdsa.PrivateKey { + key, err := crypto.GenerateKey() + if err != nil { + panic(err) + } + self.keys[string(crypto.FromECDSAPub(&key.PublicKey))] = key + + return key +} + +// HasIdentity checks if the the whisper node is configured with the private key +// of the specified public pair. +func (self *Whisper) HasIdentity(key *ecdsa.PublicKey) bool { + return self.keys[string(crypto.FromECDSAPub(key))] != nil +} + +// GetIdentity retrieves the private key of the specified public identity. +func (self *Whisper) GetIdentity(key *ecdsa.PublicKey) *ecdsa.PrivateKey { + return self.keys[string(crypto.FromECDSAPub(key))] +} + +// Watch installs a new message handler to run in case a matching packet arrives +// from the whisper network. +func (self *Whisper) Watch(options Filter) int { + filter := filterer{ + to: string(crypto.FromECDSAPub(options.To)), + from: string(crypto.FromECDSAPub(options.From)), + matcher: newTopicMatcher(options.Topics...), + fn: func(data interface{}) { + options.Fn(data.(*Message)) + }, + } + return self.filters.Install(filter) +} + +// Unwatch removes an installed message handler. +func (self *Whisper) Unwatch(id int) { + self.filters.Uninstall(id) +} + +// Send injects a message into the whisper send queue, to be distributed in the +// network in the coming cycles. +func (self *Whisper) Send(envelope *Envelope) error { + return self.add(envelope) +} + +// Start implements node.Service, starting the background data propagation thread +// of the Whisper protocol. +func (self *Whisper) Start(*p2p.Server) error { + glog.V(logger.Info).Infoln("Whisper started") + go self.update() + return nil +} + +// Stop implements node.Service, stopping the background data propagation thread +// of the Whisper protocol. +func (self *Whisper) Stop() error { + close(self.quit) + glog.V(logger.Info).Infoln("Whisper stopped") + return nil +} + +// Messages retrieves all the currently pooled messages matching a filter id. +func (self *Whisper) Messages(id int) []*Message { + messages := make([]*Message, 0) + if filter := self.filters.Get(id); filter != nil { + for _, envelope := range self.messages { + if message := self.open(envelope); message != nil { + if self.filters.Match(filter, createFilter(message, envelope.Topics)) { + messages = append(messages, message) + } + } + } + } + return messages +} + +// handlePeer is called by the underlying P2P layer when the whisper sub-protocol +// connection is negotiated. +func (self *Whisper) handlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { + // Create the new peer and start tracking it + whisperPeer := newPeer(self, peer, rw) + + self.peerMu.Lock() + self.peers[whisperPeer] = struct{}{} + self.peerMu.Unlock() + + defer func() { + self.peerMu.Lock() + delete(self.peers, whisperPeer) + self.peerMu.Unlock() + }() + + // Run the peer handshake and state updates + if err := whisperPeer.handshake(); err != nil { + return err + } + whisperPeer.start() + defer whisperPeer.stop() + + // Read and process inbound messages directly to merge into client-global state + for { + // Fetch the next packet and decode the contained envelopes + packet, err := rw.ReadMsg() + if err != nil { + return err + } + var envelopes []*Envelope + if err := packet.Decode(&envelopes); err != nil { + glog.V(logger.Info).Infof("%v: failed to decode envelope: %v", peer, err) + continue + } + // Inject all envelopes into the internal pool + for _, envelope := range envelopes { + if err := self.add(envelope); err != nil { + // TODO Punish peer here. Invalid envelope. + glog.V(logger.Debug).Infof("%v: failed to pool envelope: %v", peer, err) + } + whisperPeer.mark(envelope) + } + } +} + +// add inserts a new envelope into the message pool to be distributed within the +// whisper network. It also inserts the envelope into the expiration pool at the +// appropriate time-stamp. +func (self *Whisper) add(envelope *Envelope) error { + self.poolMu.Lock() + defer self.poolMu.Unlock() + + // short circuit when a received envelope has already expired + if envelope.Expiry < uint32(time.Now().Unix()) { + return nil + } + + // Insert the message into the tracked pool + hash := envelope.Hash() + if _, ok := self.messages[hash]; ok { + glog.V(logger.Detail).Infof("whisper envelope already cached: %x\n", envelope) + return nil + } + self.messages[hash] = envelope + + // Insert the message into the expiration pool for later removal + if self.expirations[envelope.Expiry] == nil { + self.expirations[envelope.Expiry] = set.NewNonTS() + } + if !self.expirations[envelope.Expiry].Has(hash) { + self.expirations[envelope.Expiry].Add(hash) + + // Notify the local node of a message arrival + go self.postEvent(envelope) + } + glog.V(logger.Detail).Infof("cached whisper envelope %x\n", envelope) + return nil +} + +// postEvent opens an envelope with the configured identities and delivers the +// message upstream from application processing. +func (self *Whisper) postEvent(envelope *Envelope) { + if message := self.open(envelope); message != nil { + self.filters.Notify(createFilter(message, envelope.Topics), message) + } +} + +// open tries to decrypt a whisper envelope with all the configured identities, +// returning the decrypted message and the key used to achieve it. If not keys +// are configured, open will return the payload as if non encrypted. +func (self *Whisper) open(envelope *Envelope) *Message { + // Short circuit if no identity is set, and assume clear-text + if len(self.keys) == 0 { + if message, err := envelope.Open(nil); err == nil { + return message + } + } + // Iterate over the keys and try to decrypt the message + for _, key := range self.keys { + message, err := envelope.Open(key) + if err == nil { + message.To = &key.PublicKey + return message + } else if err == ecies.ErrInvalidPublicKey { + return message + } + } + // Failed to decrypt, don't return anything + return nil +} + +// createFilter creates a message filter to check against installed handlers. +func createFilter(message *Message, topics []Topic) filter.Filter { + matcher := make([][]Topic, len(topics)) + for i, topic := range topics { + matcher[i] = []Topic{topic} + } + return filterer{ + to: string(crypto.FromECDSAPub(message.To)), + from: string(crypto.FromECDSAPub(message.Recover())), + matcher: newTopicMatcher(matcher...), + } +} + +// update loops until the lifetime of the whisper node, updating its internal +// state by expiring stale messages from the pool. +func (self *Whisper) update() { + // Start a ticker to check for expirations + expire := time.NewTicker(expirationCycle) + + // Repeat updates until termination is requested + for { + select { + case <-expire.C: + self.expire() + + case <-self.quit: + return + } + } +} + +// expire iterates over all the expiration timestamps, removing all stale +// messages from the pools. +func (self *Whisper) expire() { + self.poolMu.Lock() + defer self.poolMu.Unlock() + + now := uint32(time.Now().Unix()) + for then, hashSet := range self.expirations { + // Short circuit if a future time + if then > now { + continue + } + // Dump all expired messages and remove timestamp + hashSet.Each(func(v interface{}) bool { + delete(self.messages, v.(common.Hash)) + return true + }) + self.expirations[then].Clear() + } +} + +// envelopes retrieves all the messages currently pooled by the node. +func (self *Whisper) envelopes() []*Envelope { + self.poolMu.RLock() + defer self.poolMu.RUnlock() + + envelopes := make([]*Envelope, 0, len(self.messages)) + for _, envelope := range self.messages { + envelopes = append(envelopes, envelope) + } + return envelopes +} diff --git a/whisper/whisperv2/whisper_test.go b/whisper/whisperv2/whisper_test.go new file mode 100644 index 000000000..1e0d3f85d --- /dev/null +++ b/whisper/whisperv2/whisper_test.go @@ -0,0 +1,216 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package whisperv2 + +import ( + "testing" + "time" + + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" +) + +func startTestCluster(n int) []*Whisper { + // Create the batch of simulated peers + nodes := make([]*p2p.Peer, n) + for i := 0; i < n; i++ { + nodes[i] = p2p.NewPeer(discover.NodeID{}, "", nil) + } + whispers := make([]*Whisper, n) + for i := 0; i < n; i++ { + whispers[i] = New() + whispers[i].Start(nil) + } + // Wire all the peers to the root one + for i := 1; i < n; i++ { + src, dst := p2p.MsgPipe() + + go whispers[0].handlePeer(nodes[i], src) + go whispers[i].handlePeer(nodes[0], dst) + } + return whispers +} + +func TestSelfMessage(t *testing.T) { + // Start the single node cluster + client := startTestCluster(1)[0] + + // Start watching for self messages, signal any arrivals + self := client.NewIdentity() + done := make(chan struct{}) + + client.Watch(Filter{ + To: &self.PublicKey, + Fn: func(msg *Message) { + close(done) + }, + }) + // Send a dummy message to oneself + msg := NewMessage([]byte("self whisper")) + envelope, err := msg.Wrap(DefaultPoW, Options{ + From: self, + To: &self.PublicKey, + TTL: DefaultTTL, + }) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + // Dump the message into the system and wait for it to pop back out + if err := client.Send(envelope); err != nil { + t.Fatalf("failed to send self-message: %v", err) + } + select { + case <-done: + case <-time.After(time.Second): + t.Fatalf("self-message receive timeout") + } +} + +func TestDirectMessage(t *testing.T) { + // Start the sender-recipient cluster + cluster := startTestCluster(2) + + sender := cluster[0] + senderId := sender.NewIdentity() + + recipient := cluster[1] + recipientId := recipient.NewIdentity() + + // Watch for arriving messages on the recipient + done := make(chan struct{}) + recipient.Watch(Filter{ + To: &recipientId.PublicKey, + Fn: func(msg *Message) { + close(done) + }, + }) + // Send a dummy message from the sender + msg := NewMessage([]byte("direct whisper")) + envelope, err := msg.Wrap(DefaultPoW, Options{ + From: senderId, + To: &recipientId.PublicKey, + TTL: DefaultTTL, + }) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + if err := sender.Send(envelope); err != nil { + t.Fatalf("failed to send direct message: %v", err) + } + // Wait for an arrival or a timeout + select { + case <-done: + case <-time.After(time.Second): + t.Fatalf("direct message receive timeout") + } +} + +func TestAnonymousBroadcast(t *testing.T) { + testBroadcast(true, t) +} + +func TestIdentifiedBroadcast(t *testing.T) { + testBroadcast(false, t) +} + +func testBroadcast(anonymous bool, t *testing.T) { + // Start the single sender multi recipient cluster + cluster := startTestCluster(3) + + sender := cluster[1] + targets := cluster[1:] + for _, target := range targets { + if !anonymous { + target.NewIdentity() + } + } + // Watch for arriving messages on the recipients + dones := make([]chan struct{}, len(targets)) + for i := 0; i < len(targets); i++ { + done := make(chan struct{}) // need for the closure + dones[i] = done + + targets[i].Watch(Filter{ + Topics: NewFilterTopicsFromStringsFlat("broadcast topic"), + Fn: func(msg *Message) { + close(done) + }, + }) + } + // Send a dummy message from the sender + msg := NewMessage([]byte("broadcast whisper")) + envelope, err := msg.Wrap(DefaultPoW, Options{ + Topics: NewTopicsFromStrings("broadcast topic"), + TTL: DefaultTTL, + }) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + if err := sender.Send(envelope); err != nil { + t.Fatalf("failed to send broadcast message: %v", err) + } + // Wait for an arrival on each recipient, or timeouts + timeout := time.After(time.Second) + for _, done := range dones { + select { + case <-done: + case <-timeout: + t.Fatalf("broadcast message receive timeout") + } + } +} + +func TestMessageExpiration(t *testing.T) { + // Start the single node cluster and inject a dummy message + node := startTestCluster(1)[0] + + message := NewMessage([]byte("expiring message")) + envelope, err := message.Wrap(DefaultPoW, Options{TTL: time.Second}) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + if err := node.Send(envelope); err != nil { + t.Fatalf("failed to inject message: %v", err) + } + // Check that the message is inside the cache + node.poolMu.RLock() + _, found := node.messages[envelope.Hash()] + node.poolMu.RUnlock() + + if !found { + t.Fatalf("message not found in cache") + } + // Wait for expiration and check cache again + time.Sleep(time.Second) // wait for expiration + time.Sleep(2 * expirationCycle) // wait for cleanup cycle + + node.poolMu.RLock() + _, found = node.messages[envelope.Hash()] + node.poolMu.RUnlock() + if found { + t.Fatalf("message not expired from cache") + } + + // Check that adding an expired envelope doesn't do anything. + node.add(envelope) + node.poolMu.RLock() + _, found = node.messages[envelope.Hash()] + node.poolMu.RUnlock() + if found { + t.Fatalf("message was added to cache") + } +} |