aboutsummaryrefslogtreecommitdiffstats
path: root/whisper/whisperv5/whisper.go
diff options
context:
space:
mode:
Diffstat (limited to 'whisper/whisperv5/whisper.go')
-rw-r--r--whisper/whisperv5/whisper.go145
1 files changed, 103 insertions, 42 deletions
diff --git a/whisper/whisperv5/whisper.go b/whisper/whisperv5/whisper.go
index f2aad08ef..cc6c65a9f 100644
--- a/whisper/whisperv5/whisper.go
+++ b/whisper/whisperv5/whisper.go
@@ -26,6 +26,8 @@ import (
"sync"
"time"
+ "sync/atomic"
+
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
@@ -44,6 +46,38 @@ type Statistics struct {
totalMessagesCleared int
}
+type settingType byte
+type settingsMap map[settingType]interface{}
+
+const (
+ minPowIdx settingType = iota // Minimal PoW required by the whisper node
+ maxMsgSizeIdx settingType = iota // Maximal message length allowed by the whisper node
+ OverflowIdx settingType = iota // Indicator of message queue overflow
+)
+
+type settingsVault struct {
+ vaultMu sync.Mutex
+ vault atomic.Value
+}
+
+func (s *settingsVault) get(idx settingType) interface{} {
+ m := s.vault.Load().(settingsMap)
+ return m[idx]
+}
+
+func (s *settingsVault) store(idx settingType, val interface{}) {
+ s.vaultMu.Lock()
+ defer s.vaultMu.Unlock()
+
+ m1 := s.vault.Load().(settingsMap)
+ m2 := make(settingsMap)
+ for k, v := range m1 {
+ m2[k] = v
+ }
+ m2[idx] = val
+ s.vault.Store(m2)
+}
+
// Whisper represents a dark communication interface through the Ethereum
// network, using its very own P2P communication layer.
type Whisper struct {
@@ -54,28 +88,27 @@ type Whisper struct {
symKeys map[string][]byte // Symmetric key storage
keyMu sync.RWMutex // Mutex associated with key storages
+ poolMu sync.RWMutex // Mutex to sync the message and expiration pools
envelopes map[common.Hash]*Envelope // Pool of envelopes currently tracked by this node
expirations map[uint32]*set.SetNonTS // Message expiration pool
- poolMu sync.RWMutex // Mutex to sync the message and expiration pools
- peers map[*Peer]struct{} // Set of currently active peers
peerMu sync.RWMutex // Mutex to sync the active peer set
+ peers map[*Peer]struct{} // Set of currently active peers
messageQueue chan *Envelope // Message queue for normal whisper messages
p2pMsgQueue chan *Envelope // Message queue for peer-to-peer messages (not to be forwarded any further)
quit chan struct{} // Channel used for graceful exit
- minPoW float64 // Minimal PoW required by the whisper node
- maxMsgLength int // Maximal message length allowed by the whisper node
- overflow bool // Indicator of message queue overflow
+ settings settingsVault // holds configuration settings that can be dynamically changed
- stats Statistics // Statistics of whisper node
+ statsMu sync.Mutex // guard stats
+ stats Statistics // Statistics of whisper node
mailServer MailServer // MailServer interface
}
// New creates a Whisper client ready to communicate through the Ethereum P2P network.
-func New() *Whisper {
+func New(cfg *Config) *Whisper {
whisper := &Whisper{
privateKeys: make(map[string]*ecdsa.PrivateKey),
symKeys: make(map[string][]byte),
@@ -85,22 +118,47 @@ func New() *Whisper {
messageQueue: make(chan *Envelope, messageQueueLimit),
p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
quit: make(chan struct{}),
- minPoW: DefaultMinimumPoW,
- maxMsgLength: DefaultMaxMessageLength,
}
+
whisper.filters = NewFilters(whisper)
+ whisper.settings.vault.Store(make(settingsMap))
+ whisper.settings.store(minPowIdx, cfg.MinimumAcceptedPOW)
+ whisper.settings.store(maxMsgSizeIdx, cfg.MaxMessageSize)
+ whisper.settings.store(OverflowIdx, false)
+
// p2p whisper sub protocol handler
whisper.protocol = p2p.Protocol{
Name: ProtocolName,
Version: uint(ProtocolVersion),
Length: NumberOfMessageCodes,
Run: whisper.HandlePeer,
+ NodeInfo: func() interface{} {
+ return map[string]interface{}{
+ "version": ProtocolVersionStr,
+ "maxMessageSize": whisper.MaxMessageSize(),
+ "minimumPoW": whisper.MinPow(),
+ }
+ },
}
return whisper
}
+func (w *Whisper) MinPow() float64 {
+ return w.settings.get(minPowIdx).(float64)
+}
+
+// MaxMessageSize returns the maximum accepted message size.
+func (w *Whisper) MaxMessageSize() uint32 {
+ return w.settings.get(maxMsgSizeIdx).(uint32)
+}
+
+// Overflow returns an indication if the message queue is full.
+func (w *Whisper) Overflow() bool {
+ return w.settings.get(OverflowIdx).(bool)
+}
+
// APIs returns the RPC descriptors the Whisper implementation offers
func (w *Whisper) APIs() []rpc.API {
return []rpc.API{
@@ -129,12 +187,12 @@ func (w *Whisper) Version() uint {
return w.protocol.Version
}
-// SetMaxMessageLength sets the maximal message length allowed by this node
-func (w *Whisper) SetMaxMessageLength(val int) error {
- if val <= 0 {
- return fmt.Errorf("invalid message length: %d", val)
+// SetMaxMessageSize sets the maximal message size allowed by this node
+func (w *Whisper) SetMaxMessageSize(size uint32) error {
+ if size > MaxMessageSize {
+ return fmt.Errorf("message size too large [%d>%d]", size, MaxMessageSize)
}
- w.maxMsgLength = val
+ w.settings.store(maxMsgSizeIdx, uint32(size))
return nil
}
@@ -143,7 +201,7 @@ func (w *Whisper) SetMinimumPoW(val float64) error {
if val <= 0.0 {
return fmt.Errorf("invalid PoW: %f", val)
}
- w.minPoW = val
+ w.settings.store(minPowIdx, val)
return nil
}
@@ -240,6 +298,20 @@ func (w *Whisper) DeleteKeyPair(key string) bool {
return false
}
+// AddKeyPair imports a asymmetric private key and returns it identifier.
+func (w *Whisper) AddKeyPair(key *ecdsa.PrivateKey) (string, error) {
+ id, err := GenerateRandomID()
+ if err != nil {
+ return "", fmt.Errorf("failed to generate ID: %s", err)
+ }
+
+ w.keyMu.Lock()
+ w.privateKeys[id] = key
+ w.keyMu.Unlock()
+
+ return id, nil
+}
+
// HasKeyPair checks if the the whisper node is configured with the private key
// of the specified public pair.
func (w *Whisper) HasKeyPair(id string) bool {
@@ -451,7 +523,7 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
log.Warn("message loop", "peer", p.peer.ID(), "err", err)
return err
}
- if packet.Size > uint32(wh.maxMsgLength) {
+ if packet.Size > wh.MaxMessageSize() {
log.Warn("oversized message received", "peer", p.peer.ID())
return errors.New("oversized message received")
}
@@ -532,7 +604,7 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
}
}
- if envelope.size() > wh.maxMsgLength {
+ if uint32(envelope.size()) > wh.MaxMessageSize() {
return false, fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash())
}
@@ -547,7 +619,7 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
return false, fmt.Errorf("wrong size of AESNonce: %d bytes [env: %x]", aesNonceSize, envelope.Hash())
}
- if envelope.PoW() < wh.minPoW {
+ if envelope.PoW() < wh.MinPow() {
log.Debug("envelope with low PoW dropped", "PoW", envelope.PoW(), "hash", envelope.Hash().Hex())
return false, nil // drop envelope without error
}
@@ -571,7 +643,9 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
log.Trace("whisper envelope already cached", "hash", envelope.Hash().Hex())
} else {
log.Trace("cached whisper envelope", "hash", envelope.Hash().Hex())
+ wh.statsMu.Lock()
wh.stats.memoryUsed += envelope.size()
+ wh.statsMu.Unlock()
wh.postEvent(envelope, false) // notify the local node about the new message
if wh.mailServer != nil {
wh.mailServer.Archive(envelope)
@@ -600,13 +674,13 @@ func (w *Whisper) checkOverflow() {
queueSize := len(w.messageQueue)
if queueSize == messageQueueLimit {
- if !w.overflow {
- w.overflow = true
+ if !w.Overflow() {
+ w.settings.store(OverflowIdx, true)
log.Warn("message queue overflow")
}
} else if queueSize <= messageQueueLimit/2 {
- if w.overflow {
- w.overflow = false
+ if w.Overflow() {
+ w.settings.store(OverflowIdx, false)
log.Warn("message queue overflow fixed (back to normal)")
}
}
@@ -653,6 +727,8 @@ func (w *Whisper) expire() {
w.poolMu.Lock()
defer w.poolMu.Unlock()
+ w.statsMu.Lock()
+ defer w.statsMu.Unlock()
w.stats.reset()
now := uint32(time.Now().Unix())
for expiry, hashSet := range w.expirations {
@@ -673,17 +749,11 @@ func (w *Whisper) expire() {
}
// Stats returns the whisper node statistics.
-func (w *Whisper) Stats() string {
- result := fmt.Sprintf("Memory usage: %d bytes. Average messages cleared per expiry cycle: %d. Total messages cleared: %d.",
- w.stats.memoryUsed, w.stats.totalMessagesCleared/w.stats.cycles, w.stats.totalMessagesCleared)
- if w.stats.messagesCleared > 0 {
- result += fmt.Sprintf(" Latest expiry cycle cleared %d messages (%d bytes).",
- w.stats.messagesCleared, w.stats.memoryCleared)
- }
- if w.overflow {
- result += " Message queue state: overflow."
- }
- return result
+func (w *Whisper) Stats() Statistics {
+ w.statsMu.Lock()
+ defer w.statsMu.Unlock()
+
+ return w.stats
}
// Envelopes retrieves all the messages currently pooled by the node.
@@ -734,15 +804,6 @@ func (s *Statistics) reset() {
s.messagesCleared = 0
}
-// ValidateKeyID checks the format of key id.
-func ValidateKeyID(id string) error {
- const target = keyIdSize * 2
- if len(id) != target {
- return fmt.Errorf("wrong size of key ID (expected %d bytes, got %d)", target, len(id))
- }
- return nil
-}
-
// ValidatePublicKey checks the format of the given public key.
func ValidatePublicKey(k *ecdsa.PublicKey) bool {
return k != nil && k.X != nil && k.Y != nil && k.X.Sign() != 0 && k.Y.Sign() != 0