diff options
author | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-04-28 16:54:05 +0800 |
---|---|---|
committer | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-04-28 16:54:05 +0800 |
commit | a05c420371aa56657b86ba3dce6ebb087adb708d (patch) | |
tree | 3e0853dcff756c503764d0f73c1757c585be2aa9 | |
parent | 182d484aa70bcd5b22117f02333b1fd3b1535dcb (diff) | |
parent | 978ffd3097242a5faeb7b23b9c72590170004dc6 (diff) | |
download | go-tangerine-a05c420371aa56657b86ba3dce6ebb087adb708d.tar go-tangerine-a05c420371aa56657b86ba3dce6ebb087adb708d.tar.gz go-tangerine-a05c420371aa56657b86ba3dce6ebb087adb708d.tar.bz2 go-tangerine-a05c420371aa56657b86ba3dce6ebb087adb708d.tar.lz go-tangerine-a05c420371aa56657b86ba3dce6ebb087adb708d.tar.xz go-tangerine-a05c420371aa56657b86ba3dce6ebb087adb708d.tar.zst go-tangerine-a05c420371aa56657b86ba3dce6ebb087adb708d.zip |
Merge pull request #738 from karalabe/whisper-cleanup
Whisper cleanup, part 3
-rw-r--r-- | eth/backend.go | 6 | ||||
-rw-r--r-- | rpc/api.go | 36 | ||||
-rw-r--r-- | rpc/args.go | 69 | ||||
-rw-r--r-- | rpc/args_test.go | 2 | ||||
-rw-r--r-- | ui/qt/qwhisper/whisper.go | 2 | ||||
-rw-r--r-- | whisper/envelope.go | 3 | ||||
-rw-r--r-- | whisper/envelope_test.go | 142 | ||||
-rw-r--r-- | whisper/filter.go | 113 | ||||
-rw-r--r-- | whisper/filter_test.go | 199 | ||||
-rw-r--r-- | whisper/message.go | 23 | ||||
-rw-r--r-- | whisper/message_test.go | 4 | ||||
-rw-r--r-- | whisper/peer.go | 11 | ||||
-rw-r--r-- | whisper/topic.go | 79 | ||||
-rw-r--r-- | whisper/topic_test.go | 154 | ||||
-rw-r--r-- | whisper/whisper.go | 57 | ||||
-rw-r--r-- | whisper/whisper_test.go | 2 | ||||
-rw-r--r-- | xeth/whisper.go | 149 | ||||
-rw-r--r-- | xeth/whisper_filter.go | 84 | ||||
-rw-r--r-- | xeth/whisper_message.go | 37 | ||||
-rw-r--r-- | xeth/xeth.go | 68 |
20 files changed, 1024 insertions, 216 deletions
diff --git a/eth/backend.go b/eth/backend.go index 783f33908..466912899 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -222,10 +222,12 @@ func New(config *Config) (*Ethereum, error) { eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State, eth.chainManager.GasLimit) eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux()) eth.chainManager.SetProcessor(eth.blockProcessor) - eth.whisper = whisper.New() - eth.shhVersionId = int(eth.whisper.Version()) eth.miner = miner.New(eth, eth.pow, config.MinerThreads) eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.eventMux, eth.txPool, eth.chainManager, eth.downloader) + if config.Shh { + eth.whisper = whisper.New() + eth.shhVersionId = int(eth.whisper.Version()) + } netprv, err := config.nodeKey() if err != nil { diff --git a/rpc/api.go b/rpc/api.go index 5930a4c7b..4a9eb5963 100644 --- a/rpc/api.go +++ b/rpc/api.go @@ -406,65 +406,67 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err res, _ := api.xeth().DbGet([]byte(args.Database + args.Key)) *reply = newHexData(res) + case "shh_version": + // Retrieves the currently running whisper protocol version *reply = api.xeth().WhisperVersion() + case "shh_post": + // Injects a new message into the whisper network args := new(WhisperMessageArgs) if err := json.Unmarshal(req.Params, &args); err != nil { return err } - err := api.xeth().Whisper().Post(args.Payload, args.To, args.From, args.Topics, args.Priority, args.Ttl) if err != nil { return err } - *reply = true + case "shh_newIdentity": + // Creates a new whisper identity to use for sending/receiving messages *reply = api.xeth().Whisper().NewIdentity() - // case "shh_removeIdentity": - // args := new(WhisperIdentityArgs) - // if err := json.Unmarshal(req.Params, &args); err != nil { - // return err - // } - // *reply = api.xeth().Whisper().RemoveIdentity(args.Identity) + case "shh_hasIdentity": + // Checks if an identity if owned or not args := new(WhisperIdentityArgs) if err := json.Unmarshal(req.Params, &args); err != nil { return err } *reply = api.xeth().Whisper().HasIdentity(args.Identity) - case "shh_newGroup", "shh_addToGroup": - return NewNotImplementedError(req.Method) + case "shh_newFilter": + // Create a new filter to watch and match messages with args := new(WhisperFilterArgs) if err := json.Unmarshal(req.Params, &args); err != nil { return err } - opts := new(xeth.Options) - // opts.From = args.From - opts.To = args.To - opts.Topics = args.Topics - id := api.xeth().NewWhisperFilter(opts) + id := api.xeth().NewWhisperFilter(args.To, args.From, args.Topics) *reply = newHexNum(big.NewInt(int64(id)).Bytes()) + case "shh_uninstallFilter": + // Remove an existing filter watching messages args := new(FilterIdArgs) if err := json.Unmarshal(req.Params, &args); err != nil { return err } *reply = api.xeth().UninstallWhisperFilter(args.Id) + case "shh_getFilterChanges": + // Retrieve all the new messages arrived since the last request args := new(FilterIdArgs) if err := json.Unmarshal(req.Params, &args); err != nil { return err } - *reply = api.xeth().MessagesChanged(args.Id) + *reply = api.xeth().WhisperMessagesChanged(args.Id) + case "shh_getMessages": + // Retrieve all the cached messages matching a specific, existing filter args := new(FilterIdArgs) if err := json.Unmarshal(req.Params, &args); err != nil { return err } - *reply = api.xeth().Whisper().Messages(args.Id) + *reply = api.xeth().WhisperMessages(args.Id) // case "eth_register": // // Placeholder for actual type diff --git a/rpc/args.go b/rpc/args.go index 7694a3d3f..4bd48e6d6 100644 --- a/rpc/args.go +++ b/rpc/args.go @@ -1010,25 +1010,27 @@ func (args *WhisperIdentityArgs) UnmarshalJSON(b []byte) (err error) { } type WhisperFilterArgs struct { - To string `json:"to"` + To string From string - Topics []string + Topics [][]string } +// UnmarshalJSON implements the json.Unmarshaler interface, invoked to convert a +// JSON message blob into a WhisperFilterArgs structure. func (args *WhisperFilterArgs) UnmarshalJSON(b []byte) (err error) { + // Unmarshal the JSON message and sanity check var obj []struct { - To interface{} - Topics []interface{} + To interface{} `json:"to"` + From interface{} `json:"from"` + Topics interface{} `json:"topics"` } - - if err = json.Unmarshal(b, &obj); err != nil { + if err := json.Unmarshal(b, &obj); err != nil { return NewDecodeParamError(err.Error()) } - if len(obj) < 1 { return NewInsufficientParamsError(len(obj), 1) } - + // Retrieve the simple data contents of the filter arguments if obj[0].To == nil { args.To = "" } else { @@ -1038,17 +1040,52 @@ func (args *WhisperFilterArgs) UnmarshalJSON(b []byte) (err error) { } args.To = argstr } - - t := make([]string, len(obj[0].Topics)) - for i, j := range obj[0].Topics { - argstr, ok := j.(string) + if obj[0].From == nil { + args.From = "" + } else { + argstr, ok := obj[0].From.(string) if !ok { - return NewInvalidTypeError("topics["+string(i)+"]", "is not a string") + return NewInvalidTypeError("from", "is not a string") } - t[i] = argstr + args.From = argstr + } + // Construct the nested topic array + if obj[0].Topics != nil { + // Make sure we have an actual topic array + list, ok := obj[0].Topics.([]interface{}) + if !ok { + return NewInvalidTypeError("topics", "is not an array") + } + // Iterate over each topic and handle nil, string or array + topics := make([][]string, len(list)) + for idx, field := range list { + switch value := field.(type) { + case nil: + topics[idx] = []string{} + + case string: + topics[idx] = []string{value} + + case []interface{}: + topics[idx] = make([]string, len(value)) + for i, nested := range value { + switch value := nested.(type) { + case nil: + topics[idx][i] = "" + + case string: + topics[idx][i] = value + + default: + return NewInvalidTypeError(fmt.Sprintf("topic[%d][%d]", idx, i), "is not a string") + } + } + default: + return NewInvalidTypeError(fmt.Sprintf("topic[%d]", idx), "not a string or array") + } + } + args.Topics = topics } - args.Topics = t - return nil } diff --git a/rpc/args_test.go b/rpc/args_test.go index 2f011bfd9..f5949b7a2 100644 --- a/rpc/args_test.go +++ b/rpc/args_test.go @@ -1943,7 +1943,7 @@ func TestWhisperFilterArgs(t *testing.T) { input := `[{"topics": ["0x68656c6c6f20776f726c64"], "to": "0x34ag445g3455b34"}]` expected := new(WhisperFilterArgs) expected.To = "0x34ag445g3455b34" - expected.Topics = []string{"0x68656c6c6f20776f726c64"} + expected.Topics = [][]string{[]string{"0x68656c6c6f20776f726c64"}} args := new(WhisperFilterArgs) if err := json.Unmarshal([]byte(input), &args); err != nil { diff --git a/ui/qt/qwhisper/whisper.go b/ui/qt/qwhisper/whisper.go index 50b0626f5..4ab6d2e5a 100644 --- a/ui/qt/qwhisper/whisper.go +++ b/ui/qt/qwhisper/whisper.go @@ -106,7 +106,7 @@ func filterFromMap(opts map[string]interface{}) (f whisper.Filter) { if topicList, ok := opts["topics"].(*qml.List); ok { var topics []string topicList.Convert(&topics) - f.Topics = whisper.NewTopicsFromStrings(topics...) + f.Topics = whisper.NewFilterTopicsFromStringsFlat(topics...) } return diff --git a/whisper/envelope.go b/whisper/envelope.go index 07762c300..a4e2fa031 100644 --- a/whisper/envelope.go +++ b/whisper/envelope.go @@ -72,6 +72,9 @@ func (self *Envelope) Open(key *ecdsa.PrivateKey) (msg *Message, err error) { message := &Message{ Flags: data[0], + Sent: time.Unix(int64(self.Expiry-self.TTL), 0), + TTL: time.Duration(self.TTL) * time.Second, + Hash: self.Hash(), } data = data[1:] diff --git a/whisper/envelope_test.go b/whisper/envelope_test.go new file mode 100644 index 000000000..b64767b2e --- /dev/null +++ b/whisper/envelope_test.go @@ -0,0 +1,142 @@ +package whisper + +import ( + "bytes" + "testing" + "time" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/crypto/ecies" +) + +func TestEnvelopeOpen(t *testing.T) { + payload := []byte("hello world") + message := NewMessage(payload) + + envelope, err := message.Wrap(DefaultPoW, Options{}) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + opened, err := envelope.Open(nil) + if err != nil { + t.Fatalf("failed to open envelope: %v", err) + } + if opened.Flags != message.Flags { + t.Fatalf("flags mismatch: have %d, want %d", opened.Flags, message.Flags) + } + if bytes.Compare(opened.Signature, message.Signature) != 0 { + t.Fatalf("signature mismatch: have 0x%x, want 0x%x", opened.Signature, message.Signature) + } + if bytes.Compare(opened.Payload, message.Payload) != 0 { + t.Fatalf("payload mismatch: have 0x%x, want 0x%x", opened.Payload, message.Payload) + } + if opened.Sent.Unix() != message.Sent.Unix() { + t.Fatalf("send time mismatch: have %d, want %d", opened.Sent, message.Sent) + } + if opened.TTL/time.Second != DefaultTTL/time.Second { + t.Fatalf("message TTL mismatch: have %v, want %v", opened.TTL, DefaultTTL) + } + + if opened.Hash != envelope.Hash() { + t.Fatalf("message hash mismatch: have 0x%x, want 0x%x", opened.Hash, envelope.Hash()) + } +} + +func TestEnvelopeAnonymousOpenUntargeted(t *testing.T) { + payload := []byte("hello envelope") + envelope, err := NewMessage(payload).Wrap(DefaultPoW, Options{}) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + opened, err := envelope.Open(nil) + if err != nil { + t.Fatalf("failed to open envelope: %v", err) + } + if opened.To != nil { + t.Fatalf("recipient mismatch: have 0x%x, want nil", opened.To) + } + if bytes.Compare(opened.Payload, payload) != 0 { + t.Fatalf("payload mismatch: have 0x%x, want 0x%x", opened.Payload, payload) + } +} + +func TestEnvelopeAnonymousOpenTargeted(t *testing.T) { + key, err := crypto.GenerateKey() + if err != nil { + t.Fatalf("failed to generate test identity: %v", err) + } + + payload := []byte("hello envelope") + envelope, err := NewMessage(payload).Wrap(DefaultPoW, Options{ + To: &key.PublicKey, + }) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + opened, err := envelope.Open(nil) + if err != nil { + t.Fatalf("failed to open envelope: %v", err) + } + if opened.To != nil { + t.Fatalf("recipient mismatch: have 0x%x, want nil", opened.To) + } + if bytes.Compare(opened.Payload, payload) == 0 { + t.Fatalf("payload match, should have been encrypted: 0x%x", opened.Payload) + } +} + +func TestEnvelopeIdentifiedOpenUntargeted(t *testing.T) { + key, err := crypto.GenerateKey() + if err != nil { + t.Fatalf("failed to generate test identity: %v", err) + } + + payload := []byte("hello envelope") + envelope, err := NewMessage(payload).Wrap(DefaultPoW, Options{}) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + opened, err := envelope.Open(key) + switch err { + case nil: + t.Fatalf("envelope opened with bad key: %v", opened) + + case ecies.ErrInvalidPublicKey: + // Ok, key mismatch but opened + + default: + t.Fatalf("failed to open envelope: %v", err) + } + + if opened.To != nil { + t.Fatalf("recipient mismatch: have 0x%x, want nil", opened.To) + } + if bytes.Compare(opened.Payload, payload) != 0 { + t.Fatalf("payload mismatch: have 0x%x, want 0x%x", opened.Payload, payload) + } +} + +func TestEnvelopeIdentifiedOpenTargeted(t *testing.T) { + key, err := crypto.GenerateKey() + if err != nil { + t.Fatalf("failed to generate test identity: %v", err) + } + + payload := []byte("hello envelope") + envelope, err := NewMessage(payload).Wrap(DefaultPoW, Options{ + To: &key.PublicKey, + }) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + opened, err := envelope.Open(key) + if err != nil { + t.Fatalf("failed to open envelope: %v", err) + } + if opened.To != nil { + t.Fatalf("recipient mismatch: have 0x%x, want nil", opened.To) + } + if bytes.Compare(opened.Payload, payload) != 0 { + t.Fatalf("payload mismatch: have 0x%x, want 0x%x", opened.Payload, payload) + } +} diff --git a/whisper/filter.go b/whisper/filter.go index 8fcc45afd..c946d9380 100644 --- a/whisper/filter.go +++ b/whisper/filter.go @@ -2,12 +2,115 @@ package whisper -import "crypto/ecdsa" +import ( + "crypto/ecdsa" + + "github.com/ethereum/go-ethereum/event/filter" +) // Filter is used to subscribe to specific types of whisper messages. type Filter struct { - To *ecdsa.PublicKey // Recipient of the message - From *ecdsa.PublicKey // Sender of the message - Topics []Topic // Topics to watch messages on - Fn func(*Message) // Handler in case of a match + To *ecdsa.PublicKey // Recipient of the message + From *ecdsa.PublicKey // Sender of the message + Topics [][]Topic // Topics to filter messages with + Fn func(msg *Message) // Handler in case of a match +} + +// NewFilterTopics creates a 2D topic array used by whisper.Filter from binary +// data elements. +func NewFilterTopics(data ...[][]byte) [][]Topic { + filter := make([][]Topic, len(data)) + for i, condition := range data { + // Handle the special case of condition == [[]byte{}] + if len(condition) == 1 && len(condition[0]) == 0 { + filter[i] = []Topic{} + continue + } + // Otherwise flatten normally + filter[i] = NewTopics(condition...) + } + return filter +} + +// NewFilterTopicsFlat creates a 2D topic array used by whisper.Filter from flat +// binary data elements. +func NewFilterTopicsFlat(data ...[]byte) [][]Topic { + filter := make([][]Topic, len(data)) + for i, element := range data { + // Only add non-wildcard topics + filter[i] = make([]Topic, 0, 1) + if len(element) > 0 { + filter[i] = append(filter[i], NewTopic(element)) + } + } + return filter +} + +// NewFilterTopicsFromStrings creates a 2D topic array used by whisper.Filter +// from textual data elements. +func NewFilterTopicsFromStrings(data ...[]string) [][]Topic { + filter := make([][]Topic, len(data)) + for i, condition := range data { + // Handle the special case of condition == [""] + if len(condition) == 1 && condition[0] == "" { + filter[i] = []Topic{} + continue + } + // Otherwise flatten normally + filter[i] = NewTopicsFromStrings(condition...) + } + return filter +} + +// NewFilterTopicsFromStringsFlat creates a 2D topic array used by whisper.Filter from flat +// binary data elements. +func NewFilterTopicsFromStringsFlat(data ...string) [][]Topic { + filter := make([][]Topic, len(data)) + for i, element := range data { + // Only add non-wildcard topics + filter[i] = make([]Topic, 0, 1) + if element != "" { + filter[i] = append(filter[i], NewTopicFromString(element)) + } + } + return filter +} + +// filterer is the internal, fully initialized filter ready to match inbound +// messages to a variety of criteria. +type filterer struct { + to string // Recipient of the message + from string // Sender of the message + matcher *topicMatcher // Topics to filter messages with + fn func(data interface{}) // Handler in case of a match +} + +// Compare checks if the specified filter matches the current one. +func (self filterer) Compare(f filter.Filter) bool { + filter := f.(filterer) + + // Check the message sender and recipient + if len(self.to) > 0 && self.to != filter.to { + return false + } + if len(self.from) > 0 && self.from != filter.from { + return false + } + // Check the topic filtering + topics := make([]Topic, len(filter.matcher.conditions)) + for i, group := range filter.matcher.conditions { + // Message should contain a single topic entry, extract + for topics[i], _ = range group { + break + } + } + if !self.matcher.Matches(topics) { + return false + } + return true +} + +// Trigger is called when a filter successfully matches an inbound message. +func (self filterer) Trigger(data interface{}) { + self.fn(data) } diff --git a/whisper/filter_test.go b/whisper/filter_test.go new file mode 100644 index 000000000..ca28fd83c --- /dev/null +++ b/whisper/filter_test.go @@ -0,0 +1,199 @@ +package whisper + +import ( + "bytes" + + "testing" +) + +var filterTopicsCreationTests = []struct { + topics [][]string + filter [][][4]byte +}{ + { // Simple topic filter + topics: [][]string{ + {"abc", "def", "ghi"}, + {"def"}, + {"ghi", "abc"}, + }, + filter: [][][4]byte{ + {{0x4e, 0x03, 0x65, 0x7a}, {0x34, 0x60, 0x7c, 0x9b}, {0x21, 0x41, 0x7d, 0xf9}}, + {{0x34, 0x60, 0x7c, 0x9b}}, + {{0x21, 0x41, 0x7d, 0xf9}, {0x4e, 0x03, 0x65, 0x7a}}, + }, + }, + { // Wild-carded topic filter + topics: [][]string{ + {"abc", "def", "ghi"}, + {}, + {""}, + {"def"}, + }, + filter: [][][4]byte{ + {{0x4e, 0x03, 0x65, 0x7a}, {0x34, 0x60, 0x7c, 0x9b}, {0x21, 0x41, 0x7d, 0xf9}}, + {}, + {}, + {{0x34, 0x60, 0x7c, 0x9b}}, + }, + }, +} + +var filterTopicsCreationFlatTests = []struct { + topics []string + filter [][][4]byte +}{ + { // Simple topic list + topics: []string{"abc", "def", "ghi"}, + filter: [][][4]byte{ + {{0x4e, 0x03, 0x65, 0x7a}}, + {{0x34, 0x60, 0x7c, 0x9b}}, + {{0x21, 0x41, 0x7d, 0xf9}}, + }, + }, + { // Wild-carded topic list + topics: []string{"abc", "", "ghi"}, + filter: [][][4]byte{ + {{0x4e, 0x03, 0x65, 0x7a}}, + {}, + {{0x21, 0x41, 0x7d, 0xf9}}, + }, + }, +} + +func TestFilterTopicsCreation(t *testing.T) { + // Check full filter creation + for i, tt := range filterTopicsCreationTests { + // Check the textual creation + filter := NewFilterTopicsFromStrings(tt.topics...) + if len(filter) != len(tt.topics) { + t.Errorf("test %d: condition count mismatch: have %v, want %v", i, len(filter), len(tt.topics)) + continue + } + for j, condition := range filter { + if len(condition) != len(tt.filter[j]) { + t.Errorf("test %d, condition %d: size mismatch: have %v, want %v", i, j, len(condition), len(tt.filter[j])) + continue + } + for k := 0; k < len(condition); k++ { + if bytes.Compare(condition[k][:], tt.filter[j][k][:]) != 0 { + t.Errorf("test %d, condition %d, segment %d: filter mismatch: have 0x%x, want 0x%x", i, j, k, condition[k], tt.filter[j][k]) + } + } + } + // Check the binary creation + binary := make([][][]byte, len(tt.topics)) + for j, condition := range tt.topics { + binary[j] = make([][]byte, len(condition)) + for k, segment := range condition { + binary[j][k] = []byte(segment) + } + } + filter = NewFilterTopics(binary...) + if len(filter) != len(tt.topics) { + t.Errorf("test %d: condition count mismatch: have %v, want %v", i, len(filter), len(tt.topics)) + continue + } + for j, condition := range filter { + if len(condition) != len(tt.filter[j]) { + t.Errorf("test %d, condition %d: size mismatch: have %v, want %v", i, j, len(condition), len(tt.filter[j])) + continue + } + for k := 0; k < len(condition); k++ { + if bytes.Compare(condition[k][:], tt.filter[j][k][:]) != 0 { + t.Errorf("test %d, condition %d, segment %d: filter mismatch: have 0x%x, want 0x%x", i, j, k, condition[k], tt.filter[j][k]) + } + } + } + } + // Check flat filter creation + for i, tt := range filterTopicsCreationFlatTests { + // Check the textual creation + filter := NewFilterTopicsFromStringsFlat(tt.topics...) + if len(filter) != len(tt.topics) { + t.Errorf("test %d: condition count mismatch: have %v, want %v", i, len(filter), len(tt.topics)) + continue + } + for j, condition := range filter { + if len(condition) != len(tt.filter[j]) { + t.Errorf("test %d, condition %d: size mismatch: have %v, want %v", i, j, len(condition), len(tt.filter[j])) + continue + } + for k := 0; k < len(condition); k++ { + if bytes.Compare(condition[k][:], tt.filter[j][k][:]) != 0 { + t.Errorf("test %d, condition %d, segment %d: filter mismatch: have 0x%x, want 0x%x", i, j, k, condition[k], tt.filter[j][k]) + } + } + } + // Check the binary creation + binary := make([][]byte, len(tt.topics)) + for j, topic := range tt.topics { + binary[j] = []byte(topic) + } + filter = NewFilterTopicsFlat(binary...) + if len(filter) != len(tt.topics) { + t.Errorf("test %d: condition count mismatch: have %v, want %v", i, len(filter), len(tt.topics)) + continue + } + for j, condition := range filter { + if len(condition) != len(tt.filter[j]) { + t.Errorf("test %d, condition %d: size mismatch: have %v, want %v", i, j, len(condition), len(tt.filter[j])) + continue + } + for k := 0; k < len(condition); k++ { + if bytes.Compare(condition[k][:], tt.filter[j][k][:]) != 0 { + t.Errorf("test %d, condition %d, segment %d: filter mismatch: have 0x%x, want 0x%x", i, j, k, condition[k], tt.filter[j][k]) + } + } + } + } +} + +var filterCompareTests = []struct { + matcher filterer + message filterer + match bool +}{ + { // Wild-card filter matching anything + matcher: filterer{to: "", from: "", matcher: newTopicMatcher()}, + message: filterer{to: "to", from: "from", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)}, + match: true, + }, + { // Filter matching the to field + matcher: filterer{to: "to", from: "", matcher: newTopicMatcher()}, + message: filterer{to: "to", from: "from", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)}, + match: true, + }, + { // Filter rejecting the to field + matcher: filterer{to: "to", from: "", matcher: newTopicMatcher()}, + message: filterer{to: "", from: "from", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)}, + match: false, + }, + { // Filter matching the from field + matcher: filterer{to: "", from: "from", matcher: newTopicMatcher()}, + message: filterer{to: "to", from: "from", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)}, + match: true, + }, + { // Filter rejecting the from field + matcher: filterer{to: "", from: "from", matcher: newTopicMatcher()}, + message: filterer{to: "to", from: "", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)}, + match: false, + }, + { // Filter matching the topic field + matcher: filterer{to: "", from: "from", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)}, + message: filterer{to: "to", from: "from", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)}, + match: true, + }, + { // Filter rejecting the topic field + matcher: filterer{to: "", from: "", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)}, + message: filterer{to: "to", from: "from", matcher: newTopicMatcher()}, + match: false, + }, +} + +func TestFilterCompare(t *testing.T) { + for i, tt := range filterCompareTests { + if match := tt.matcher.Compare(tt.message); match != tt.match { + t.Errorf("test %d: match mismatch: have %v, want %v", i, match, tt.match) + } + } +} diff --git a/whisper/message.go b/whisper/message.go index 07c673567..a80380a92 100644 --- a/whisper/message.go +++ b/whisper/message.go @@ -8,21 +8,25 @@ import ( "math/rand" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" ) -// Message represents an end-user data packet to trasmit through the Whisper +// Message represents an end-user data packet to transmit through the Whisper // protocol. These are wrapped into Envelopes that need not be understood by // intermediate nodes, just forwarded. type Message struct { Flags byte // First bit is signature presence, rest reserved and should be random Signature []byte Payload []byte - Sent int64 - To *ecdsa.PublicKey + Sent time.Time // Time when the message was posted into the network + TTL time.Duration // Maximum time to live allowed for the message + + To *ecdsa.PublicKey // Message recipient (identity used to decode the message) + Hash common.Hash // Message envelope hash to act as a unique id } // Options specifies the exact way a message should be wrapped into an Envelope. @@ -43,7 +47,7 @@ func NewMessage(payload []byte) *Message { return &Message{ Flags: flags, Payload: payload, - Sent: time.Now().Unix(), + Sent: time.Now(), } } @@ -64,6 +68,8 @@ func (self *Message) Wrap(pow time.Duration, options Options) (*Envelope, error) if options.TTL == 0 { options.TTL = DefaultTTL } + self.TTL = options.TTL + // Sign and encrypt the message if requested if options.From != nil { if err := self.sign(options.From); err != nil { @@ -114,9 +120,12 @@ func (self *Message) encrypt(key *ecdsa.PublicKey) (err error) { } // decrypt decrypts an encrypted payload with a private key. -func (self *Message) decrypt(key *ecdsa.PrivateKey) (err error) { - self.Payload, err = crypto.Decrypt(key, self.Payload) - return +func (self *Message) decrypt(key *ecdsa.PrivateKey) error { + cleartext, err := crypto.Decrypt(key, self.Payload) + if err == nil { + self.Payload = cleartext + } + return err } // hash calculates the SHA3 checksum of the message flags and payload. diff --git a/whisper/message_test.go b/whisper/message_test.go index 18a254e5c..0b4a24c24 100644 --- a/whisper/message_test.go +++ b/whisper/message_test.go @@ -4,6 +4,7 @@ import ( "bytes" "crypto/elliptic" "testing" + "time" "github.com/ethereum/go-ethereum/crypto" ) @@ -25,6 +26,9 @@ func TestMessageSimpleWrap(t *testing.T) { if bytes.Compare(msg.Payload, payload) != 0 { t.Fatalf("payload mismatch after wrapping: have 0x%x, want 0x%x", msg.Payload, payload) } + if msg.TTL/time.Second != DefaultTTL/time.Second { + t.Fatalf("message TTL mismatch: have %v, want %v", msg.TTL, DefaultTTL) + } } // Tests whether a message can be signed, and wrapped in plain-text. diff --git a/whisper/peer.go b/whisper/peer.go index 28abf4260..9fdc28434 100644 --- a/whisper/peer.go +++ b/whisper/peer.go @@ -21,20 +21,15 @@ type peer struct { quit chan struct{} } -// newPeer creates and initializes a new whisper peer connection, returning either -// the newly constructed link or a failure reason. -func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) (*peer, error) { - p := &peer{ +// newPeer creates a new whisper peer object, but does not run the handshake itself. +func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) *peer { + return &peer{ host: host, peer: remote, ws: rw, known: set.New(), quit: make(chan struct{}), } - if err := p.handshake(); err != nil { - return nil, err - } - return p, nil } // start initiates the peer updater, periodically broadcasting the whisper packets diff --git a/whisper/topic.go b/whisper/topic.go index a965c7cc2..c47c94ae1 100644 --- a/whisper/topic.go +++ b/whisper/topic.go @@ -11,6 +11,8 @@ import "github.com/ethereum/go-ethereum/crypto" type Topic [4]byte // NewTopic creates a topic from the 4 byte prefix of the SHA3 hash of the data. +// +// Note, empty topics are considered the wildcard, and cannot be used in messages. func NewTopic(data []byte) Topic { prefix := [4]byte{} copy(prefix[:], crypto.Sha3(data)[:4]) @@ -48,14 +50,75 @@ func (self *Topic) String() string { return string(self[:]) } -// TopicSet represents a hash set to check if a topic exists or not. -type topicSet map[string]struct{} +// topicMatcher is a filter expression to verify if a list of topics contained +// in an arriving message matches some topic conditions. The topic matcher is +// built up of a list of conditions, each of which must be satisfied by the +// corresponding topic in the message. Each condition may require: a) an exact +// topic match; b) a match from a set of topics; or c) a wild-card matching all. +// +// If a message contains more topics than required by the matcher, those beyond +// the condition count are ignored and assumed to match. +// +// Consider the following sample topic matcher: +// sample := { +// {TopicA1, TopicA2, TopicA3}, +// {TopicB}, +// nil, +// {TopicD1, TopicD2} +// } +// In order for a message to pass this filter, it should enumerate at least 4 +// topics, the first any of [TopicA1, TopicA2, TopicA3], the second mandatory +// "TopicB", the third is ignored by the filter and the fourth either "TopicD1" +// or "TopicD2". If the message contains further topics, the filter will match +// them too. +type topicMatcher struct { + conditions []map[Topic]struct{} +} + +// newTopicMatcher create a topic matcher from a list of topic conditions. +func newTopicMatcher(topics ...[]Topic) *topicMatcher { + matcher := make([]map[Topic]struct{}, len(topics)) + for i, condition := range topics { + matcher[i] = make(map[Topic]struct{}) + for _, topic := range condition { + matcher[i][topic] = struct{}{} + } + } + return &topicMatcher{conditions: matcher} +} -// NewTopicSet creates a topic hash set from a slice of topics. -func newTopicSet(topics []Topic) topicSet { - set := make(map[string]struct{}) - for _, topic := range topics { - set[topic.String()] = struct{}{} +// newTopicMatcherFromBinary create a topic matcher from a list of binary conditions. +func newTopicMatcherFromBinary(data ...[][]byte) *topicMatcher { + topics := make([][]Topic, len(data)) + for i, condition := range data { + topics[i] = NewTopics(condition...) + } + return newTopicMatcher(topics...) +} + +// newTopicMatcherFromStrings creates a topic matcher from a list of textual +// conditions. +func newTopicMatcherFromStrings(data ...[]string) *topicMatcher { + topics := make([][]Topic, len(data)) + for i, condition := range data { + topics[i] = NewTopicsFromStrings(condition...) + } + return newTopicMatcher(topics...) +} + +// Matches checks if a list of topics matches this particular condition set. +func (self *topicMatcher) Matches(topics []Topic) bool { + // Mismatch if there aren't enough topics + if len(self.conditions) > len(topics) { + return false + } + // Check each topic condition for existence (skip wild-cards) + for i := 0; i < len(topics) && i < len(self.conditions); i++ { + if len(self.conditions[i]) > 0 { + if _, ok := self.conditions[i][topics[i]]; !ok { + return false + } + } } - return topicSet(set) + return true } diff --git a/whisper/topic_test.go b/whisper/topic_test.go index 4015079dc..976f3e88d 100644 --- a/whisper/topic_test.go +++ b/whisper/topic_test.go @@ -9,9 +9,8 @@ var topicCreationTests = []struct { data []byte hash [4]byte }{ - {hash: [4]byte{0xc5, 0xd2, 0x46, 0x01}, data: nil}, - {hash: [4]byte{0xc5, 0xd2, 0x46, 0x01}, data: []byte{}}, {hash: [4]byte{0x8f, 0x9a, 0x2b, 0x7d}, data: []byte("test name")}, + {hash: [4]byte{0xf2, 0x6e, 0x77, 0x79}, data: []byte("some other test")}, } func TestTopicCreation(t *testing.T) { @@ -52,16 +51,149 @@ func TestTopicCreation(t *testing.T) { } } -func TestTopicSetCreation(t *testing.T) { - topics := make([]Topic, len(topicCreationTests)) - for i, tt := range topicCreationTests { - topics[i] = NewTopic(tt.data) +var topicMatcherCreationTest = struct { + binary [][][]byte + textual [][]string + matcher []map[[4]byte]struct{} +}{ + binary: [][][]byte{ + [][]byte{}, + [][]byte{ + []byte("Topic A"), + }, + [][]byte{ + []byte("Topic B1"), + []byte("Topic B2"), + []byte("Topic B3"), + }, + }, + textual: [][]string{ + []string{}, + []string{"Topic A"}, + []string{"Topic B1", "Topic B2", "Topic B3"}, + }, + matcher: []map[[4]byte]struct{}{ + map[[4]byte]struct{}{}, + map[[4]byte]struct{}{ + [4]byte{0x25, 0xfc, 0x95, 0x66}: struct{}{}, + }, + map[[4]byte]struct{}{ + [4]byte{0x93, 0x6d, 0xec, 0x09}: struct{}{}, + [4]byte{0x25, 0x23, 0x34, 0xd3}: struct{}{}, + [4]byte{0x6b, 0xc2, 0x73, 0xd1}: struct{}{}, + }, + }, +} + +func TestTopicMatcherCreation(t *testing.T) { + test := topicMatcherCreationTest + + matcher := newTopicMatcherFromBinary(test.binary...) + for i, cond := range matcher.conditions { + for topic, _ := range cond { + if _, ok := test.matcher[i][topic]; !ok { + t.Errorf("condition %d; extra topic found: 0x%x", i, topic[:]) + } + } } - set := newTopicSet(topics) - for i, tt := range topicCreationTests { - topic := NewTopic(tt.data) - if _, ok := set[topic.String()]; !ok { - t.Errorf("topic %d: not found in set", i) + for i, cond := range test.matcher { + for topic, _ := range cond { + if _, ok := matcher.conditions[i][topic]; !ok { + t.Errorf("condition %d; topic not found: 0x%x", i, topic[:]) + } + } + } + + matcher = newTopicMatcherFromStrings(test.textual...) + for i, cond := range matcher.conditions { + for topic, _ := range cond { + if _, ok := test.matcher[i][topic]; !ok { + t.Errorf("condition %d; extra topic found: 0x%x", i, topic[:]) + } + } + } + for i, cond := range test.matcher { + for topic, _ := range cond { + if _, ok := matcher.conditions[i][topic]; !ok { + t.Errorf("condition %d; topic not found: 0x%x", i, topic[:]) + } + } + } +} + +var topicMatcherTests = []struct { + filter [][]string + topics []string + match bool +}{ + // Empty topic matcher should match everything + { + filter: [][]string{}, + topics: []string{}, + match: true, + }, + { + filter: [][]string{}, + topics: []string{"a", "b", "c"}, + match: true, + }, + // Fixed topic matcher should match strictly, but only prefix + { + filter: [][]string{[]string{"a"}, []string{"b"}}, + topics: []string{"a"}, + match: false, + }, + { + filter: [][]string{[]string{"a"}, []string{"b"}}, + topics: []string{"a", "b"}, + match: true, + }, + { + filter: [][]string{[]string{"a"}, []string{"b"}}, + topics: []string{"a", "b", "c"}, + match: true, + }, + // Multi-matcher should match any from a sub-group + { + filter: [][]string{[]string{"a1", "a2"}}, + topics: []string{"a"}, + match: false, + }, + { + filter: [][]string{[]string{"a1", "a2"}}, + topics: []string{"a1"}, + match: true, + }, + { + filter: [][]string{[]string{"a1", "a2"}}, + topics: []string{"a2"}, + match: true, + }, + // Wild-card condition should match anything + { + filter: [][]string{[]string{}, []string{"b"}}, + topics: []string{"a"}, + match: false, + }, + { + filter: [][]string{[]string{}, []string{"b"}}, + topics: []string{"a", "b"}, + match: true, + }, + { + filter: [][]string{[]string{}, []string{"b"}}, + topics: []string{"b", "b"}, + match: true, + }, +} + +func TestTopicMatcher(t *testing.T) { + for i, tt := range topicMatcherTests { + topics := NewTopicsFromStrings(tt.topics...) + + matcher := newTopicMatcherFromStrings(tt.filter...) + if match := matcher.Matches(topics); match != tt.match { + t.Errorf("test %d: match mismatch: have %v, want %v", i, match, tt.match) } } } diff --git a/whisper/whisper.go b/whisper/whisper.go index 9317fad50..a48e1e380 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -58,6 +58,8 @@ type Whisper struct { quit chan struct{} } +// New creates a Whisper client ready to communicate through the Ethereum P2P +// network. func New() *Whisper { whisper := &Whisper{ filters: filter.New(), @@ -116,11 +118,11 @@ func (self *Whisper) GetIdentity(key *ecdsa.PublicKey) *ecdsa.PrivateKey { // Watch installs a new message handler to run in case a matching packet arrives // from the whisper network. func (self *Whisper) Watch(options Filter) int { - filter := filter.Generic{ - Str1: string(crypto.FromECDSAPub(options.To)), - Str2: string(crypto.FromECDSAPub(options.From)), - Data: newTopicSet(options.Topics), - Fn: func(data interface{}) { + filter := filterer{ + to: string(crypto.FromECDSAPub(options.To)), + from: string(crypto.FromECDSAPub(options.From)), + matcher: newTopicMatcher(options.Topics...), + fn: func(data interface{}) { options.Fn(data.(*Message)) }, } @@ -148,7 +150,7 @@ func (self *Whisper) Stop() { glog.V(logger.Info).Infoln("Whisper stopped") } -// Messages retrieves the currently pooled messages matching a filter id. +// Messages retrieves all the currently pooled messages matching a filter id. func (self *Whisper) Messages(id int) []*Message { messages := make([]*Message, 0) if filter := self.filters.Get(id); filter != nil { @@ -163,27 +165,12 @@ func (self *Whisper) Messages(id int) []*Message { return messages } -// func (self *Whisper) RemoveIdentity(key *ecdsa.PublicKey) bool { -// k := string(crypto.FromECDSAPub(key)) -// if _, ok := self.keys[k]; ok { -// delete(self.keys, k) -// return true -// } -// return false -// } - // handlePeer is called by the underlying P2P layer when the whisper sub-protocol // connection is negotiated. func (self *Whisper) handlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { - // Create, initialize and start the whisper peer - whisperPeer, err := newPeer(self, peer, rw) - if err != nil { - return err - } - whisperPeer.start() - defer whisperPeer.stop() + // Create the new peer and start tracking it + whisperPeer := newPeer(self, peer, rw) - // Start tracking the active peer self.peerMu.Lock() self.peers[whisperPeer] = struct{}{} self.peerMu.Unlock() @@ -193,6 +180,14 @@ func (self *Whisper) handlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { delete(self.peers, whisperPeer) self.peerMu.Unlock() }() + + // Run the peer handshake and state updates + if err := whisperPeer.handshake(); err != nil { + return err + } + whisperPeer.start() + defer whisperPeer.stop() + // Read and process inbound messages directly to merge into client-global state for { // Fetch the next packet and decode the contained envelopes @@ -267,9 +262,11 @@ func (self *Whisper) open(envelope *Envelope) *Message { // Iterate over the keys and try to decrypt the message for _, key := range self.keys { message, err := envelope.Open(key) - if err == nil || err == ecies.ErrInvalidPublicKey { + if err == nil { message.To = &key.PublicKey return message + } else if err == ecies.ErrInvalidPublicKey { + return message } } // Failed to decrypt, don't return anything @@ -278,10 +275,14 @@ func (self *Whisper) open(envelope *Envelope) *Message { // createFilter creates a message filter to check against installed handlers. func createFilter(message *Message, topics []Topic) filter.Filter { - return filter.Generic{ - Str1: string(crypto.FromECDSAPub(message.To)), - Str2: string(crypto.FromECDSAPub(message.Recover())), - Data: newTopicSet(topics), + matcher := make([][]Topic, len(topics)) + for i, topic := range topics { + matcher[i] = []Topic{topic} + } + return filterer{ + to: string(crypto.FromECDSAPub(message.To)), + from: string(crypto.FromECDSAPub(message.Recover())), + matcher: newTopicMatcher(matcher...), } } diff --git a/whisper/whisper_test.go b/whisper/whisper_test.go index def8e68d8..7c5067f51 100644 --- a/whisper/whisper_test.go +++ b/whisper/whisper_test.go @@ -129,7 +129,7 @@ func testBroadcast(anonymous bool, t *testing.T) { dones[i] = done targets[i].Watch(Filter{ - Topics: NewTopicsFromStrings("broadcast topic"), + Topics: NewFilterTopicsFromStringsFlat("broadcast topic"), Fn: func(msg *Message) { close(done) }, diff --git a/xeth/whisper.go b/xeth/whisper.go index 342910b5c..edb62c748 100644 --- a/xeth/whisper.go +++ b/xeth/whisper.go @@ -1,7 +1,9 @@ +// Contains the external API to the whisper sub-protocol. + package xeth import ( - "errors" + "fmt" "time" "github.com/ethereum/go-ethereum/common" @@ -12,109 +14,92 @@ import ( var qlogger = logger.NewLogger("XSHH") +// Whisper represents the API wrapper around the internal whisper implementation. type Whisper struct { *whisper.Whisper } +// NewWhisper wraps an internal whisper client into an external API version. func NewWhisper(w *whisper.Whisper) *Whisper { return &Whisper{w} } -func (self *Whisper) Post(payload string, to, from string, topics []string, priority, ttl uint32) error { - if priority == 0 { - priority = 1000 - } - - if ttl == 0 { - ttl = 100 - } - - pk := crypto.ToECDSAPub(common.FromHex(from)) - if key := self.Whisper.GetIdentity(pk); key != nil || len(from) == 0 { - msg := whisper.NewMessage(common.FromHex(payload)) - envelope, err := msg.Wrap(time.Duration(priority*100000), whisper.Options{ - TTL: time.Duration(ttl) * time.Second, - To: crypto.ToECDSAPub(common.FromHex(to)), - From: key, - Topics: whisper.NewTopicsFromStrings(topics...), - }) - - if err != nil { - return err - } - - if err := self.Whisper.Send(envelope); err != nil { - return err - } - } else { - return errors.New("unmatched pub / priv for seal") - } - - return nil -} - +// NewIdentity generates a new cryptographic identity for the client, and injects +// it into the known identities for message decryption. func (self *Whisper) NewIdentity() string { - key := self.Whisper.NewIdentity() - - return common.ToHex(crypto.FromECDSAPub(&key.PublicKey)) + identity := self.Whisper.NewIdentity() + return common.ToHex(crypto.FromECDSAPub(&identity.PublicKey)) } +// HasIdentity checks if the the whisper node is configured with the private key +// of the specified public pair. func (self *Whisper) HasIdentity(key string) bool { return self.Whisper.HasIdentity(crypto.ToECDSAPub(common.FromHex(key))) } -// func (self *Whisper) RemoveIdentity(key string) bool { -// return self.Whisper.RemoveIdentity(crypto.ToECDSAPub(common.FromHex(key))) -// } - -func (self *Whisper) Watch(opts *Options) int { - filter := whisper.Filter{ - To: crypto.ToECDSAPub(common.FromHex(opts.To)), - From: crypto.ToECDSAPub(common.FromHex(opts.From)), - Topics: whisper.NewTopicsFromStrings(opts.Topics...), +// Post injects a message into the whisper network for distribution. +func (self *Whisper) Post(payload string, to, from string, topics []string, priority, ttl uint32) error { + // Decode the topic strings + topicsDecoded := make([][]byte, len(topics)) + for i, topic := range topics { + topicsDecoded[i] = common.FromHex(topic) } - - var i int - filter.Fn = func(msg *whisper.Message) { - opts.Fn(NewWhisperMessage(msg)) + // Construct the whisper message and transmission options + message := whisper.NewMessage(common.FromHex(payload)) + options := whisper.Options{ + To: crypto.ToECDSAPub(common.FromHex(to)), + TTL: time.Duration(ttl) * time.Second, + Topics: whisper.NewTopics(topicsDecoded...), } - - i = self.Whisper.Watch(filter) - - return i -} - -func (self *Whisper) Messages(id int) (messages []WhisperMessage) { - msgs := self.Whisper.Messages(id) - messages = make([]WhisperMessage, len(msgs)) - for i, message := range msgs { - messages[i] = NewWhisperMessage(message) + if len(from) != 0 { + if key := self.Whisper.GetIdentity(crypto.ToECDSAPub(common.FromHex(from))); key != nil { + options.From = key + } else { + return fmt.Errorf("unknown identity to send from: %s", from) + } } - - return + // Wrap and send the message + pow := time.Duration(priority) * time.Millisecond + envelope, err := message.Wrap(pow, options) + if err != nil { + return err + } + if err := self.Whisper.Send(envelope); err != nil { + return err + } + return nil } -type Options struct { - To string - From string - Topics []string - Fn func(msg WhisperMessage) +// Watch installs a new message handler to run in case a matching packet arrives +// from the whisper network. +func (self *Whisper) Watch(to, from string, topics [][]string, fn func(WhisperMessage)) int { + // Decode the topic strings + topicsDecoded := make([][][]byte, len(topics)) + for i, condition := range topics { + topicsDecoded[i] = make([][]byte, len(condition)) + for j, topic := range condition { + topicsDecoded[i][j] = common.FromHex(topic) + } + } + // Assemble and inject the filter into the whisper client + filter := whisper.Filter{ + To: crypto.ToECDSAPub(common.FromHex(to)), + From: crypto.ToECDSAPub(common.FromHex(from)), + Topics: whisper.NewFilterTopics(topicsDecoded...), + } + filter.Fn = func(message *whisper.Message) { + fn(NewWhisperMessage(message)) + } + return self.Whisper.Watch(filter) } -type WhisperMessage struct { - ref *whisper.Message - Payload string `json:"payload"` - To string `json:"to"` - From string `json:"from"` - Sent int64 `json:"sent"` -} +// Messages retrieves all the currently pooled messages matching a filter id. +func (self *Whisper) Messages(id int) []WhisperMessage { + pool := self.Whisper.Messages(id) -func NewWhisperMessage(msg *whisper.Message) WhisperMessage { - return WhisperMessage{ - ref: msg, - Payload: common.ToHex(msg.Payload), - From: common.ToHex(crypto.FromECDSAPub(msg.Recover())), - To: common.ToHex(crypto.FromECDSAPub(msg.To)), - Sent: msg.Sent, + messages := make([]WhisperMessage, len(pool)) + for i, message := range pool { + messages[i] = NewWhisperMessage(message) } + return messages } diff --git a/xeth/whisper_filter.go b/xeth/whisper_filter.go new file mode 100644 index 000000000..52e70e041 --- /dev/null +++ b/xeth/whisper_filter.go @@ -0,0 +1,84 @@ +// Contains the external API side message filter for watching, pooling and polling +// matched whisper messages, also serializing data access to avoid duplications. + +package xeth + +import ( + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" +) + +// whisperFilter is the message cache matching a specific filter, accumulating +// inbound messages until the are requested by the client. +type whisperFilter struct { + id int // Filter identifier for old message retrieval + ref *Whisper // Whisper reference for old message retrieval + + cache []WhisperMessage // Cache of messages not yet polled + skip map[common.Hash]struct{} // List of retrieved messages to avoid duplication + update time.Time // Time of the last message query + + lock sync.RWMutex // Lock protecting the filter internals +} + +// newWhisperFilter creates a new serialized, poll based whisper topic filter. +func newWhisperFilter(id int, ref *Whisper) *whisperFilter { + return &whisperFilter{ + id: id, + ref: ref, + + update: time.Now(), + skip: make(map[common.Hash]struct{}), + } +} + +// messages retrieves all the cached messages from the entire pool matching the +// filter, resetting the filter's change buffer. +func (w *whisperFilter) messages() []WhisperMessage { + w.lock.Lock() + defer w.lock.Unlock() + + w.cache = nil + w.update = time.Now() + + w.skip = make(map[common.Hash]struct{}) + messages := w.ref.Messages(w.id) + for _, message := range messages { + w.skip[message.ref.Hash] = struct{}{} + } + return messages +} + +// insert injects a new batch of messages into the filter cache. +func (w *whisperFilter) insert(messages ...WhisperMessage) { + w.lock.Lock() + defer w.lock.Unlock() + + for _, message := range messages { + if _, ok := w.skip[message.ref.Hash]; !ok { + w.cache = append(w.cache, messages...) + } + } +} + +// retrieve fetches all the cached messages from the filter. +func (w *whisperFilter) retrieve() (messages []WhisperMessage) { + w.lock.Lock() + defer w.lock.Unlock() + + messages, w.cache = w.cache, nil + w.update = time.Now() + + return +} + +// activity returns the last time instance when client requests were executed on +// the filter. +func (w *whisperFilter) activity() time.Time { + w.lock.RLock() + defer w.lock.RUnlock() + + return w.update +} diff --git a/xeth/whisper_message.go b/xeth/whisper_message.go new file mode 100644 index 000000000..c8195cec1 --- /dev/null +++ b/xeth/whisper_message.go @@ -0,0 +1,37 @@ +// Contains the external API representation of a whisper message. + +package xeth + +import ( + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/whisper" +) + +// WhisperMessage is the external API representation of a whisper.Message. +type WhisperMessage struct { + ref *whisper.Message + + Payload string `json:"payload"` + To string `json:"to"` + From string `json:"from"` + Sent int64 `json:"sent"` + TTL int64 `json:"ttl"` + Hash string `json:"hash"` +} + +// NewWhisperMessage converts an internal message into an API version. +func NewWhisperMessage(message *whisper.Message) WhisperMessage { + return WhisperMessage{ + ref: message, + + Payload: common.ToHex(message.Payload), + From: common.ToHex(crypto.FromECDSAPub(message.Recover())), + To: common.ToHex(crypto.FromECDSAPub(message.To)), + Sent: message.Sent.Unix(), + TTL: int64(message.TTL / time.Second), + Hash: common.ToHex(message.Hash.Bytes()), + } +} diff --git a/xeth/xeth.go b/xeth/xeth.go index 693acb910..e4040d9d8 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -97,7 +97,7 @@ done: } for id, filter := range self.messages { - if time.Since(filter.timeout) > filterTickerTime { + if time.Since(filter.activity()) > filterTickerTime { self.Whisper().Unwatch(id) delete(self.messages, id) } @@ -452,35 +452,61 @@ func (self *XEth) AllLogs(earliest, latest int64, skip, max int, address []strin return filter.Find() } -func (p *XEth) NewWhisperFilter(opts *Options) int { +// NewWhisperFilter creates and registers a new message filter to watch for +// inbound whisper messages. All parameters at this point are assumed to be +// HEX encoded. +func (p *XEth) NewWhisperFilter(to, from string, topics [][]string) int { + // Pre-define the id to be filled later var id int - opts.Fn = func(msg WhisperMessage) { - p.messagesMut.Lock() - defer p.messagesMut.Unlock() - p.messages[id].add(msg) // = append(p.messages[id], msg) + + // Callback to delegate core whisper messages to this xeth filter + callback := func(msg WhisperMessage) { + p.messagesMut.RLock() // Only read lock to the filter pool + defer p.messagesMut.RUnlock() + p.messages[id].insert(msg) } - id = p.Whisper().Watch(opts) - p.messages[id] = &whisperFilter{timeout: time.Now()} + // Initialize the core whisper filter and wrap into xeth + id = p.Whisper().Watch(to, from, topics, callback) + + p.messagesMut.Lock() + p.messages[id] = newWhisperFilter(id, p.Whisper()) + p.messagesMut.Unlock() + return id } +// UninstallWhisperFilter disables and removes an existing filter. func (p *XEth) UninstallWhisperFilter(id int) bool { + p.messagesMut.Lock() + defer p.messagesMut.Unlock() + if _, ok := p.messages[id]; ok { delete(p.messages, id) return true } - return false } -func (self *XEth) MessagesChanged(id int) []WhisperMessage { - self.messagesMut.Lock() - defer self.messagesMut.Unlock() +// WhisperMessages retrieves all the known messages that match a specific filter. +func (self *XEth) WhisperMessages(id int) []WhisperMessage { + self.messagesMut.RLock() + defer self.messagesMut.RUnlock() if self.messages[id] != nil { - return self.messages[id].get() + return self.messages[id].messages() } + return nil +} + +// WhisperMessagesChanged retrieves all the new messages matched by a filter +// since the last retrieval +func (self *XEth) WhisperMessagesChanged(id int) []WhisperMessage { + self.messagesMut.RLock() + defer self.messagesMut.RUnlock() + if self.messages[id] != nil { + return self.messages[id].retrieve() + } return nil } @@ -731,22 +757,6 @@ func (m callmsg) Gas() *big.Int { return m.gas } func (m callmsg) Value() *big.Int { return m.value } func (m callmsg) Data() []byte { return m.data } -type whisperFilter struct { - messages []WhisperMessage - timeout time.Time - id int -} - -func (w *whisperFilter) add(msgs ...WhisperMessage) { - w.messages = append(w.messages, msgs...) -} -func (w *whisperFilter) get() []WhisperMessage { - w.timeout = time.Now() - tmp := w.messages - w.messages = nil - return tmp -} - type logFilter struct { logs state.Logs timeout time.Time |