package whisper import ( "bytes" "errors" "fmt" "sync" "time" "github.com/ethereum/go-ethereum/p2p" "gopkg.in/fatih/set.v0" ) // MOVE ME type Hash struct { hash string } var EmptyHash Hash func H(hash []byte) Hash { return Hash{string(hash)} } func HS(hash string) Hash { return Hash{hash} } func (self Hash) Compare(other Hash) int { return bytes.Compare([]byte(self.hash), []byte(other.hash)) } // MOVE ME END const ( statusMsg = 0x0 envelopesMsg = 0x01 ) const DefaultTtl = 50 * time.Second type Whisper struct { pub, sec []byte protocol p2p.Protocol mmu sync.RWMutex messages map[Hash]*Envelope expiry map[uint32]*set.SetNonTS quit chan struct{} } func New(pub, sec []byte) *Whisper { whisper := &Whisper{ pub: pub, sec: sec, messages: make(map[Hash]*Envelope), expiry: make(map[uint32]*set.SetNonTS), quit: make(chan struct{}), } go whisper.update() msg := NewMessage([]byte(fmt.Sprintf("Hello world. This is whisper-go. Incase you're wondering; the time is %v", time.Now()))) envelope, _ := msg.Seal(DefaultPow, Opts{ Ttl: DefaultTtl, }) if err := whisper.Send(envelope); err != nil { fmt.Println(err) } // p2p whisper sub protocol handler whisper.protocol = p2p.Protocol{ Name: "shh", Version: 2, Length: 2, Run: whisper.msgHandler, } return whisper } func (self *Whisper) Stop() { close(self.quit) } func (self *Whisper) Send(envelope *Envelope) error { return self.add(envelope) } // Main handler for passing whisper messages to whisper peer objects func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error { wpeer := NewPeer(self, peer, ws) // initialise whisper peer (handshake/status) if err := wpeer.init(); err != nil { return err } // kick of the main handler for broadcasting/managing envelopes go wpeer.start() defer wpeer.stop() // Main *read* loop. Writing is done by the peer it self. for { msg, err := ws.ReadMsg() if err != nil { return err } envelope, err := NewEnvelopeFromReader(msg.Payload) if err != nil { peer.Infoln(err) continue } if err := self.add(envelope); err != nil { // TODO Punish peer here. Invalid envelope. peer.Infoln(err) } wpeer.addKnown(envelope) } } // takes care of adding envelopes to the messages pool. At this moment no sanity checks are being performed. func (self *Whisper) add(envelope *Envelope) error { if !envelope.valid() { return errors.New("invalid pow for envelope") } self.mmu.Lock() defer self.mmu.Unlock() hash := envelope.Hash() self.messages[hash] = envelope if self.expiry[envelope.Expiry] == nil { self.expiry[envelope.Expiry] = set.NewNonTS() } if !self.expiry[envelope.Expiry].Has(hash) { self.expiry[envelope.Expiry].Add(hash) // TODO notify listeners (given that we had any ...) } fmt.Println("add", envelope) return nil } func (self *Whisper) update() { expire := time.NewTicker(800 * time.Millisecond) out: for { select { case <-expire.C: self.expire() case <-self.quit: break out } } } func (self *Whisper) expire() { self.mmu.Lock() defer self.mmu.Unlock() now := uint32(time.Now().Unix()) for then, hashSet := range self.expiry { if then > now { continue } hashSet.Each(func(v interface{}) bool { delete(self.messages, v.(Hash)) return true }) self.expiry[then].Clear() } } func (self *Whisper) envelopes() (envelopes []*Envelope) { self.mmu.RLock() defer self.mmu.RUnlock() envelopes = make([]*Envelope, len(self.messages)) i := 0 for _, envelope := range self.messages { envelopes[i] = envelope i++ } return } func (self *Whisper) Protocol() p2p.Protocol { return self.protocol }