From a17a1f9208f858601f6660dbd7f1b77dd9a3f3d9 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 12 Dec 2014 22:23:42 +0100 Subject: Implemented watching using filter package * Added filters / watches * Removed event dep --- whisper/whisper.go | 66 ++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 59 insertions(+), 7 deletions(-) (limited to 'whisper/whisper.go') diff --git a/whisper/whisper.go b/whisper/whisper.go index 4d7a2a23e..356debd1c 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -9,6 +9,7 @@ import ( "time" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/event/filter" "github.com/ethereum/go-ethereum/p2p" "gopkg.in/fatih/set.v0" ) @@ -38,28 +39,38 @@ const ( envelopesMsg = 0x01 ) +type MessageEvent struct { + To *ecdsa.PrivateKey + From *ecdsa.PublicKey + Message *Message +} + const DefaultTtl = 50 * time.Second type Whisper struct { - key *ecdsa.PrivateKey protocol p2p.Protocol + filters *filter.Filters mmu sync.RWMutex messages map[Hash]*Envelope expiry map[uint32]*set.SetNonTS quit chan struct{} + + keys []*ecdsa.PrivateKey } -func New(sec []byte) *Whisper { +func New() *Whisper { whisper := &Whisper{ - key: crypto.ToECDSA(sec), messages: make(map[Hash]*Envelope), + filters: filter.New(), expiry: make(map[uint32]*set.SetNonTS), quit: make(chan struct{}), } + whisper.filters.Start() go whisper.update() + // XXX TODO REMOVE TESTING CODE msg := NewMessage([]byte(fmt.Sprintf("Hello world. This is whisper-go. Incase you're wondering; the time is %v", time.Now()))) envelope, _ := msg.Seal(DefaultPow, Opts{ Ttl: DefaultTtl, @@ -67,6 +78,7 @@ func New(sec []byte) *Whisper { if err := whisper.Send(envelope); err != nil { fmt.Println(err) } + // XXX TODO REMOVE TESTING CODE // p2p whisper sub protocol handler whisper.protocol = p2p.Protocol{ @@ -87,6 +99,35 @@ func (self *Whisper) Send(envelope *Envelope) error { return self.add(envelope) } +func (self *Whisper) NewIdentity() *ecdsa.PrivateKey { + key, err := crypto.GenerateKey() + if err != nil { + panic(err) + } + self.keys = append(self.keys, key) + + return key +} + +func (self *Whisper) HasIdentity(key *ecdsa.PrivateKey) bool { + for _, key := range self.keys { + if key.D.Cmp(key.D) == 0 { + return true + } + } + return false +} + +func (self *Whisper) Watch(opts Filter) int { + return self.filters.Install(filter.Generic{ + Str1: string(crypto.FromECDSA(opts.To)), + Str2: string(crypto.FromECDSAPub(opts.From)), + Fn: func(data interface{}) { + opts.Fn(data.(*Message)) + }, + }) +} + // Main handler for passing whisper messages to whisper peer objects func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error { wpeer := NewPeer(self, peer, ws) @@ -122,7 +163,7 @@ func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error { // takes care of adding envelopes to the messages pool. At this moment no sanity checks are being performed. func (self *Whisper) add(envelope *Envelope) error { if !envelope.valid() { - return errors.New("invalid pow for envelope") + return errors.New("invalid pow provided for envelope") } self.mmu.Lock() @@ -136,11 +177,9 @@ func (self *Whisper) add(envelope *Envelope) error { if !self.expiry[envelope.Expiry].Has(hash) { self.expiry[envelope.Expiry].Add(hash) - // TODO notify listeners (given that we had any ...) + self.postEvent(envelope) } - fmt.Println("add", envelope) - return nil } @@ -189,6 +228,19 @@ func (self *Whisper) envelopes() (envelopes []*Envelope) { return } +func (self *Whisper) postEvent(envelope *Envelope) { + for _, key := range self.keys { + if message, err := envelope.Open(key); err == nil { + // Create a custom filter? + self.filters.Notify(filter.Generic{ + Str1: string(crypto.FromECDSA(key)), Str2: string(crypto.FromECDSAPub(message.Recover())), + }, message) + } else { + fmt.Println(err) + } + } +} + func (self *Whisper) Protocol() p2p.Protocol { return self.protocol } -- cgit v1.2.3