From a17a1f9208f858601f6660dbd7f1b77dd9a3f3d9 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 12 Dec 2014 22:23:42 +0100 Subject: Implemented watching using filter package * Added filters / watches * Removed event dep --- whisper/envelope.go | 23 ++++++++++------- whisper/filter.go | 10 ++++++++ whisper/main.go | 5 ++-- whisper/message.go | 20 +++++++++------ whisper/whisper.go | 66 +++++++++++++++++++++++++++++++++++++++++++------ whisper/whisper_test.go | 47 +++++++++++++++++++++++++++++++++++ 6 files changed, 145 insertions(+), 26 deletions(-) create mode 100644 whisper/filter.go create mode 100644 whisper/whisper_test.go diff --git a/whisper/envelope.go b/whisper/envelope.go index 359fa1568..683e88128 100644 --- a/whisper/envelope.go +++ b/whisper/envelope.go @@ -61,22 +61,27 @@ func (self *Envelope) Seal(pow time.Duration) { self.proveWork(pow) } -func (self *Envelope) Open(prv *ecdsa.PrivateKey) (*Message, error) { +func (self *Envelope) Open(prv *ecdsa.PrivateKey) (msg *Message, err error) { data := self.Data - if data[0] > 0 && len(data) < 66 { - return nil, fmt.Errorf("unable to open envelope. First bit set but len(data) < 66") - } - + var message Message + dataStart := 1 if data[0] > 0 { - payload, err := crypto.Decrypt(prv, data[66:]) + if len(data) < 66 { + return nil, fmt.Errorf("unable to open envelope. First bit set but len(data) < 66") + } + dataStart = 66 + message.Flags = data[0] + message.Signature = data[1:66] + } + message.Payload = data[dataStart:] + if prv != nil { + message.Payload, err = crypto.Decrypt(prv, message.Payload) if err != nil { return nil, fmt.Errorf("unable to open envelope. Decrypt failed: %v", err) } - - return NewMessage(payload), nil } - return NewMessage(data[1:]), nil + return &message, nil } func (self *Envelope) proveWork(dura time.Duration) { diff --git a/whisper/filter.go b/whisper/filter.go new file mode 100644 index 000000000..4315aa556 --- /dev/null +++ b/whisper/filter.go @@ -0,0 +1,10 @@ +package whisper + +import "crypto/ecdsa" + +type Filter struct { + To *ecdsa.PrivateKey + From *ecdsa.PublicKey + Topics [][]byte + Fn func(*Message) +} diff --git a/whisper/main.go b/whisper/main.go index 80050d899..2ee2f3ff1 100644 --- a/whisper/main.go +++ b/whisper/main.go @@ -8,6 +8,7 @@ import ( "net" "os" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/whisper" @@ -17,9 +18,9 @@ import ( func main() { logger.AddLogSystem(logger.NewStdLogSystem(os.Stdout, log.LstdFlags, logger.InfoLevel)) - pub, sec := secp256k1.GenerateKeyPair() + pub, _ := secp256k1.GenerateKeyPair() - whisper := whisper.New(sec) + whisper := whisper.New(&event.TypeMux{}) srv := p2p.Server{ MaxPeers: 10, diff --git a/whisper/message.go b/whisper/message.go index 8ce5d880b..db0110b4a 100644 --- a/whisper/message.go +++ b/whisper/message.go @@ -28,15 +28,11 @@ func (self *Message) sign(key *ecdsa.PrivateKey) (err error) { } func (self *Message) Recover() *ecdsa.PublicKey { + defer func() { recover() }() // in case of invalid sig return crypto.SigToPub(self.hash(), self.Signature) } -func (self *Message) Encrypt(from *ecdsa.PrivateKey, to *ecdsa.PublicKey) (err error) { - err = self.sign(from) - if err != nil { - return err - } - +func (self *Message) Encrypt(to *ecdsa.PublicKey) (err error) { self.Payload, err = crypto.Encrypt(to, self.Payload) if err != nil { return err @@ -57,8 +53,16 @@ type Opts struct { } func (self *Message) Seal(pow time.Duration, opts Opts) (*Envelope, error) { - if opts.To != nil && opts.From != nil { - if err := self.Encrypt(opts.From, opts.To); err != nil { + if opts.From != nil { + err := self.sign(opts.From) + if err != nil { + return nil, err + } + } + + if opts.To != nil { + err := self.Encrypt(opts.To) + if err != nil { return nil, err } } diff --git a/whisper/whisper.go b/whisper/whisper.go index 4d7a2a23e..356debd1c 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -9,6 +9,7 @@ import ( "time" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/event/filter" "github.com/ethereum/go-ethereum/p2p" "gopkg.in/fatih/set.v0" ) @@ -38,28 +39,38 @@ const ( envelopesMsg = 0x01 ) +type MessageEvent struct { + To *ecdsa.PrivateKey + From *ecdsa.PublicKey + Message *Message +} + const DefaultTtl = 50 * time.Second type Whisper struct { - key *ecdsa.PrivateKey protocol p2p.Protocol + filters *filter.Filters mmu sync.RWMutex messages map[Hash]*Envelope expiry map[uint32]*set.SetNonTS quit chan struct{} + + keys []*ecdsa.PrivateKey } -func New(sec []byte) *Whisper { +func New() *Whisper { whisper := &Whisper{ - key: crypto.ToECDSA(sec), messages: make(map[Hash]*Envelope), + filters: filter.New(), expiry: make(map[uint32]*set.SetNonTS), quit: make(chan struct{}), } + whisper.filters.Start() go whisper.update() + // XXX TODO REMOVE TESTING CODE msg := NewMessage([]byte(fmt.Sprintf("Hello world. This is whisper-go. Incase you're wondering; the time is %v", time.Now()))) envelope, _ := msg.Seal(DefaultPow, Opts{ Ttl: DefaultTtl, @@ -67,6 +78,7 @@ func New(sec []byte) *Whisper { if err := whisper.Send(envelope); err != nil { fmt.Println(err) } + // XXX TODO REMOVE TESTING CODE // p2p whisper sub protocol handler whisper.protocol = p2p.Protocol{ @@ -87,6 +99,35 @@ func (self *Whisper) Send(envelope *Envelope) error { return self.add(envelope) } +func (self *Whisper) NewIdentity() *ecdsa.PrivateKey { + key, err := crypto.GenerateKey() + if err != nil { + panic(err) + } + self.keys = append(self.keys, key) + + return key +} + +func (self *Whisper) HasIdentity(key *ecdsa.PrivateKey) bool { + for _, key := range self.keys { + if key.D.Cmp(key.D) == 0 { + return true + } + } + return false +} + +func (self *Whisper) Watch(opts Filter) int { + return self.filters.Install(filter.Generic{ + Str1: string(crypto.FromECDSA(opts.To)), + Str2: string(crypto.FromECDSAPub(opts.From)), + Fn: func(data interface{}) { + opts.Fn(data.(*Message)) + }, + }) +} + // Main handler for passing whisper messages to whisper peer objects func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error { wpeer := NewPeer(self, peer, ws) @@ -122,7 +163,7 @@ func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error { // takes care of adding envelopes to the messages pool. At this moment no sanity checks are being performed. func (self *Whisper) add(envelope *Envelope) error { if !envelope.valid() { - return errors.New("invalid pow for envelope") + return errors.New("invalid pow provided for envelope") } self.mmu.Lock() @@ -136,11 +177,9 @@ func (self *Whisper) add(envelope *Envelope) error { if !self.expiry[envelope.Expiry].Has(hash) { self.expiry[envelope.Expiry].Add(hash) - // TODO notify listeners (given that we had any ...) + self.postEvent(envelope) } - fmt.Println("add", envelope) - return nil } @@ -189,6 +228,19 @@ func (self *Whisper) envelopes() (envelopes []*Envelope) { return } +func (self *Whisper) postEvent(envelope *Envelope) { + for _, key := range self.keys { + if message, err := envelope.Open(key); err == nil { + // Create a custom filter? + self.filters.Notify(filter.Generic{ + Str1: string(crypto.FromECDSA(key)), Str2: string(crypto.FromECDSAPub(message.Recover())), + }, message) + } else { + fmt.Println(err) + } + } +} + func (self *Whisper) Protocol() p2p.Protocol { return self.protocol } diff --git a/whisper/whisper_test.go b/whisper/whisper_test.go new file mode 100644 index 000000000..107cb8c97 --- /dev/null +++ b/whisper/whisper_test.go @@ -0,0 +1,47 @@ +package whisper + +import ( + "fmt" + "testing" + "time" +) + +func TestKeyManagement(t *testing.T) { + whisper := New() + + key := whisper.NewIdentity() + if !whisper.HasIdentity(key) { + t.Error("expected whisper to have identify") + } +} + +func TestEvent(t *testing.T) { + res := make(chan *Message, 1) + whisper := New() + id := whisper.NewIdentity() + whisper.Watch(Filter{ + To: id, + Fn: func(msg *Message) { + res <- msg + }, + }) + + msg := NewMessage([]byte(fmt.Sprintf("Hello world. This is whisper-go. Incase you're wondering; the time is %v", time.Now()))) + envelope, err := msg.Seal(DefaultPow, Opts{ + Ttl: DefaultTtl, + From: id, + To: &id.PublicKey, + }) + if err != nil { + fmt.Println(err) + t.FailNow() + } + + tick := time.NewTicker(time.Second) + whisper.postEvent(envelope) + select { + case <-res: + case <-tick.C: + t.Error("did not receive message") + } +} -- cgit v1.2.3