aboutsummaryrefslogtreecommitdiffstats
path: root/les/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'les/server.go')
-rw-r--r--les/server.go198
1 files changed, 83 insertions, 115 deletions
diff --git a/les/server.go b/les/server.go
index 8b2730714..d8f93cd87 100644
--- a/les/server.go
+++ b/les/server.go
@@ -18,10 +18,11 @@
package les
import (
+ "crypto/ecdsa"
"encoding/binary"
+ "fmt"
"math"
"sync"
- "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
@@ -34,7 +35,6 @@ import (
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/ethereum/go-ethereum/rlp"
- "github.com/ethereum/go-ethereum/trie"
)
type LesServer struct {
@@ -42,23 +42,55 @@ type LesServer struct {
fcManager *flowcontrol.ClientManager // nil if our node is client only
fcCostStats *requestCostStats
defParams *flowcontrol.ServerParams
- lesTopic discv5.Topic
+ lesTopics []discv5.Topic
+ privateKey *ecdsa.PrivateKey
quitSync chan struct{}
+
+ chtIndexer, bloomTrieIndexer *core.ChainIndexer
}
func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) {
quitSync := make(chan struct{})
- pm, err := NewProtocolManager(eth.BlockChain().Config(), false, config.NetworkId, eth.EventMux(), eth.Engine(), newPeerSet(), eth.BlockChain(), eth.TxPool(), eth.ChainDb(), nil, nil, quitSync, new(sync.WaitGroup))
+ pm, err := NewProtocolManager(eth.BlockChain().Config(), false, ServerProtocolVersions, config.NetworkId, eth.EventMux(), eth.Engine(), newPeerSet(), eth.BlockChain(), eth.TxPool(), eth.ChainDb(), nil, nil, quitSync, new(sync.WaitGroup))
if err != nil {
return nil, err
}
- pm.blockLoop()
+
+ lesTopics := make([]discv5.Topic, len(ServerProtocolVersions))
+ for i, pv := range ServerProtocolVersions {
+ lesTopics[i] = lesTopic(eth.BlockChain().Genesis().Hash(), pv)
+ }
srv := &LesServer{
- protocolManager: pm,
- quitSync: quitSync,
- lesTopic: lesTopic(eth.BlockChain().Genesis().Hash()),
+ protocolManager: pm,
+ quitSync: quitSync,
+ lesTopics: lesTopics,
+ chtIndexer: light.NewChtIndexer(eth.ChainDb(), false),
+ bloomTrieIndexer: light.NewBloomTrieIndexer(eth.ChainDb(), false),
+ }
+ logger := log.New()
+
+ chtV1SectionCount, _, _ := srv.chtIndexer.Sections() // indexer still uses LES/1 4k section size for backwards server compatibility
+ chtV2SectionCount := chtV1SectionCount / (light.ChtFrequency / light.ChtV1Frequency)
+ if chtV2SectionCount != 0 {
+ // convert to LES/2 section
+ chtLastSection := chtV2SectionCount - 1
+ // convert last LES/2 section index back to LES/1 index for chtIndexer.SectionHead
+ chtLastSectionV1 := (chtLastSection+1)*(light.ChtFrequency/light.ChtV1Frequency) - 1
+ chtSectionHead := srv.chtIndexer.SectionHead(chtLastSectionV1)
+ chtRoot := light.GetChtV2Root(pm.chainDb, chtLastSection, chtSectionHead)
+ logger.Info("CHT", "section", chtLastSection, "sectionHead", fmt.Sprintf("%064x", chtSectionHead), "root", fmt.Sprintf("%064x", chtRoot))
+ }
+
+ bloomTrieSectionCount, _, _ := srv.bloomTrieIndexer.Sections()
+ if bloomTrieSectionCount != 0 {
+ bloomTrieLastSection := bloomTrieSectionCount - 1
+ bloomTrieSectionHead := srv.bloomTrieIndexer.SectionHead(bloomTrieLastSection)
+ bloomTrieRoot := light.GetBloomTrieRoot(pm.chainDb, bloomTrieLastSection, bloomTrieSectionHead)
+ logger.Info("BloomTrie", "section", bloomTrieLastSection, "sectionHead", fmt.Sprintf("%064x", bloomTrieSectionHead), "root", fmt.Sprintf("%064x", bloomTrieRoot))
}
+
+ srv.chtIndexer.Start(eth.BlockChain())
pm.server = srv
srv.defParams = &flowcontrol.ServerParams{
@@ -77,17 +109,28 @@ func (s *LesServer) Protocols() []p2p.Protocol {
// Start starts the LES server
func (s *LesServer) Start(srvr *p2p.Server) {
s.protocolManager.Start()
- go func() {
- logger := log.New("topic", s.lesTopic)
- logger.Info("Starting topic registration")
- defer logger.Info("Terminated topic registration")
+ for _, topic := range s.lesTopics {
+ topic := topic
+ go func() {
+ logger := log.New("topic", topic)
+ logger.Info("Starting topic registration")
+ defer logger.Info("Terminated topic registration")
+
+ srvr.DiscV5.RegisterTopic(topic, s.quitSync)
+ }()
+ }
+ s.privateKey = srvr.PrivateKey
+ s.protocolManager.blockLoop()
+}
- srvr.DiscV5.RegisterTopic(s.lesTopic, s.quitSync)
- }()
+func (s *LesServer) SetBloomBitsIndexer(bloomIndexer *core.ChainIndexer) {
+ bloomIndexer.AddChildIndexer(s.bloomTrieIndexer)
}
// Stop stops the LES service
func (s *LesServer) Stop() {
+ s.chtIndexer.Close()
+ // bloom trie indexer is closed by parent bloombits indexer
s.fcCostStats.store()
s.fcManager.Stop()
go func() {
@@ -273,10 +316,7 @@ func (pm *ProtocolManager) blockLoop() {
pm.wg.Add(1)
headCh := make(chan core.ChainHeadEvent, 10)
headSub := pm.blockchain.SubscribeChainHeadEvent(headCh)
- newCht := make(chan struct{}, 10)
- newCht <- struct{}{}
go func() {
- var mu sync.Mutex
var lastHead *types.Header
lastBroadcastTd := common.Big0
for {
@@ -299,26 +339,37 @@ func (pm *ProtocolManager) blockLoop() {
log.Debug("Announcing block to peers", "number", number, "hash", hash, "td", td, "reorg", reorg)
announce := announceData{Hash: hash, Number: number, Td: td, ReorgDepth: reorg}
+ var (
+ signed bool
+ signedAnnounce announceData
+ )
+
for _, p := range peers {
- select {
- case p.announceChn <- announce:
- default:
- pm.removePeer(p.id)
+ switch p.announceType {
+
+ case announceTypeSimple:
+ select {
+ case p.announceChn <- announce:
+ default:
+ pm.removePeer(p.id)
+ }
+
+ case announceTypeSigned:
+ if !signed {
+ signedAnnounce = announce
+ signedAnnounce.sign(pm.server.privateKey)
+ signed = true
+ }
+
+ select {
+ case p.announceChn <- signedAnnounce:
+ default:
+ pm.removePeer(p.id)
+ }
}
}
}
}
- newCht <- struct{}{}
- case <-newCht:
- go func() {
- mu.Lock()
- more := makeCht(pm.chainDb)
- mu.Unlock()
- if more {
- time.Sleep(time.Millisecond * 10)
- newCht <- struct{}{}
- }
- }()
case <-pm.quitSync:
headSub.Unsubscribe()
pm.wg.Done()
@@ -327,86 +378,3 @@ func (pm *ProtocolManager) blockLoop() {
}
}()
}
-
-var (
- lastChtKey = []byte("LastChtNumber") // chtNum (uint64 big endian)
- chtPrefix = []byte("cht") // chtPrefix + chtNum (uint64 big endian) -> trie root hash
-)
-
-func getChtRoot(db ethdb.Database, num uint64) common.Hash {
- var encNumber [8]byte
- binary.BigEndian.PutUint64(encNumber[:], num)
- data, _ := db.Get(append(chtPrefix, encNumber[:]...))
- return common.BytesToHash(data)
-}
-
-func storeChtRoot(db ethdb.Database, num uint64, root common.Hash) {
- var encNumber [8]byte
- binary.BigEndian.PutUint64(encNumber[:], num)
- db.Put(append(chtPrefix, encNumber[:]...), root[:])
-}
-
-func makeCht(db ethdb.Database) bool {
- headHash := core.GetHeadBlockHash(db)
- headNum := core.GetBlockNumber(db, headHash)
-
- var newChtNum uint64
- if headNum > light.ChtConfirmations {
- newChtNum = (headNum - light.ChtConfirmations) / light.ChtFrequency
- }
-
- var lastChtNum uint64
- data, _ := db.Get(lastChtKey)
- if len(data) == 8 {
- lastChtNum = binary.BigEndian.Uint64(data[:])
- }
- if newChtNum <= lastChtNum {
- return false
- }
-
- var t *trie.Trie
- if lastChtNum > 0 {
- var err error
- t, err = trie.New(getChtRoot(db, lastChtNum), db)
- if err != nil {
- lastChtNum = 0
- }
- }
- if lastChtNum == 0 {
- t, _ = trie.New(common.Hash{}, db)
- }
-
- for num := lastChtNum * light.ChtFrequency; num < (lastChtNum+1)*light.ChtFrequency; num++ {
- hash := core.GetCanonicalHash(db, num)
- if hash == (common.Hash{}) {
- panic("Canonical hash not found")
- }
- td := core.GetTd(db, hash, num)
- if td == nil {
- panic("TD not found")
- }
- var encNumber [8]byte
- binary.BigEndian.PutUint64(encNumber[:], num)
- var node light.ChtNode
- node.Hash = hash
- node.Td = td
- data, _ := rlp.EncodeToBytes(node)
- t.Update(encNumber[:], data)
- }
-
- root, err := t.Commit()
- if err != nil {
- lastChtNum = 0
- } else {
- lastChtNum++
-
- log.Trace("Generated CHT", "number", lastChtNum, "root", root.Hex())
-
- storeChtRoot(db, lastChtNum, root)
- var data [8]byte
- binary.BigEndian.PutUint64(data[:], lastChtNum)
- db.Put(lastChtKey, data[:])
- }
-
- return newChtNum > lastChtNum
-}