diff options
Diffstat (limited to 'les/server.go')
-rw-r--r-- | les/server.go | 198 |
1 files changed, 83 insertions, 115 deletions
diff --git a/les/server.go b/les/server.go index 8b2730714..d8f93cd87 100644 --- a/les/server.go +++ b/les/server.go @@ -18,10 +18,11 @@ package les import ( + "crypto/ecdsa" "encoding/binary" + "fmt" "math" "sync" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" @@ -34,7 +35,6 @@ import ( "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discv5" "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/trie" ) type LesServer struct { @@ -42,23 +42,55 @@ type LesServer struct { fcManager *flowcontrol.ClientManager // nil if our node is client only fcCostStats *requestCostStats defParams *flowcontrol.ServerParams - lesTopic discv5.Topic + lesTopics []discv5.Topic + privateKey *ecdsa.PrivateKey quitSync chan struct{} + + chtIndexer, bloomTrieIndexer *core.ChainIndexer } func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) { quitSync := make(chan struct{}) - pm, err := NewProtocolManager(eth.BlockChain().Config(), false, config.NetworkId, eth.EventMux(), eth.Engine(), newPeerSet(), eth.BlockChain(), eth.TxPool(), eth.ChainDb(), nil, nil, quitSync, new(sync.WaitGroup)) + pm, err := NewProtocolManager(eth.BlockChain().Config(), false, ServerProtocolVersions, config.NetworkId, eth.EventMux(), eth.Engine(), newPeerSet(), eth.BlockChain(), eth.TxPool(), eth.ChainDb(), nil, nil, quitSync, new(sync.WaitGroup)) if err != nil { return nil, err } - pm.blockLoop() + + lesTopics := make([]discv5.Topic, len(ServerProtocolVersions)) + for i, pv := range ServerProtocolVersions { + lesTopics[i] = lesTopic(eth.BlockChain().Genesis().Hash(), pv) + } srv := &LesServer{ - protocolManager: pm, - quitSync: quitSync, - lesTopic: lesTopic(eth.BlockChain().Genesis().Hash()), + protocolManager: pm, + quitSync: quitSync, + lesTopics: lesTopics, + chtIndexer: light.NewChtIndexer(eth.ChainDb(), false), + bloomTrieIndexer: light.NewBloomTrieIndexer(eth.ChainDb(), false), + } + logger := log.New() + + chtV1SectionCount, _, _ := srv.chtIndexer.Sections() // indexer still uses LES/1 4k section size for backwards server compatibility + chtV2SectionCount := chtV1SectionCount / (light.ChtFrequency / light.ChtV1Frequency) + if chtV2SectionCount != 0 { + // convert to LES/2 section + chtLastSection := chtV2SectionCount - 1 + // convert last LES/2 section index back to LES/1 index for chtIndexer.SectionHead + chtLastSectionV1 := (chtLastSection+1)*(light.ChtFrequency/light.ChtV1Frequency) - 1 + chtSectionHead := srv.chtIndexer.SectionHead(chtLastSectionV1) + chtRoot := light.GetChtV2Root(pm.chainDb, chtLastSection, chtSectionHead) + logger.Info("CHT", "section", chtLastSection, "sectionHead", fmt.Sprintf("%064x", chtSectionHead), "root", fmt.Sprintf("%064x", chtRoot)) + } + + bloomTrieSectionCount, _, _ := srv.bloomTrieIndexer.Sections() + if bloomTrieSectionCount != 0 { + bloomTrieLastSection := bloomTrieSectionCount - 1 + bloomTrieSectionHead := srv.bloomTrieIndexer.SectionHead(bloomTrieLastSection) + bloomTrieRoot := light.GetBloomTrieRoot(pm.chainDb, bloomTrieLastSection, bloomTrieSectionHead) + logger.Info("BloomTrie", "section", bloomTrieLastSection, "sectionHead", fmt.Sprintf("%064x", bloomTrieSectionHead), "root", fmt.Sprintf("%064x", bloomTrieRoot)) } + + srv.chtIndexer.Start(eth.BlockChain()) pm.server = srv srv.defParams = &flowcontrol.ServerParams{ @@ -77,17 +109,28 @@ func (s *LesServer) Protocols() []p2p.Protocol { // Start starts the LES server func (s *LesServer) Start(srvr *p2p.Server) { s.protocolManager.Start() - go func() { - logger := log.New("topic", s.lesTopic) - logger.Info("Starting topic registration") - defer logger.Info("Terminated topic registration") + for _, topic := range s.lesTopics { + topic := topic + go func() { + logger := log.New("topic", topic) + logger.Info("Starting topic registration") + defer logger.Info("Terminated topic registration") + + srvr.DiscV5.RegisterTopic(topic, s.quitSync) + }() + } + s.privateKey = srvr.PrivateKey + s.protocolManager.blockLoop() +} - srvr.DiscV5.RegisterTopic(s.lesTopic, s.quitSync) - }() +func (s *LesServer) SetBloomBitsIndexer(bloomIndexer *core.ChainIndexer) { + bloomIndexer.AddChildIndexer(s.bloomTrieIndexer) } // Stop stops the LES service func (s *LesServer) Stop() { + s.chtIndexer.Close() + // bloom trie indexer is closed by parent bloombits indexer s.fcCostStats.store() s.fcManager.Stop() go func() { @@ -273,10 +316,7 @@ func (pm *ProtocolManager) blockLoop() { pm.wg.Add(1) headCh := make(chan core.ChainHeadEvent, 10) headSub := pm.blockchain.SubscribeChainHeadEvent(headCh) - newCht := make(chan struct{}, 10) - newCht <- struct{}{} go func() { - var mu sync.Mutex var lastHead *types.Header lastBroadcastTd := common.Big0 for { @@ -299,26 +339,37 @@ func (pm *ProtocolManager) blockLoop() { log.Debug("Announcing block to peers", "number", number, "hash", hash, "td", td, "reorg", reorg) announce := announceData{Hash: hash, Number: number, Td: td, ReorgDepth: reorg} + var ( + signed bool + signedAnnounce announceData + ) + for _, p := range peers { - select { - case p.announceChn <- announce: - default: - pm.removePeer(p.id) + switch p.announceType { + + case announceTypeSimple: + select { + case p.announceChn <- announce: + default: + pm.removePeer(p.id) + } + + case announceTypeSigned: + if !signed { + signedAnnounce = announce + signedAnnounce.sign(pm.server.privateKey) + signed = true + } + + select { + case p.announceChn <- signedAnnounce: + default: + pm.removePeer(p.id) + } } } } } - newCht <- struct{}{} - case <-newCht: - go func() { - mu.Lock() - more := makeCht(pm.chainDb) - mu.Unlock() - if more { - time.Sleep(time.Millisecond * 10) - newCht <- struct{}{} - } - }() case <-pm.quitSync: headSub.Unsubscribe() pm.wg.Done() @@ -327,86 +378,3 @@ func (pm *ProtocolManager) blockLoop() { } }() } - -var ( - lastChtKey = []byte("LastChtNumber") // chtNum (uint64 big endian) - chtPrefix = []byte("cht") // chtPrefix + chtNum (uint64 big endian) -> trie root hash -) - -func getChtRoot(db ethdb.Database, num uint64) common.Hash { - var encNumber [8]byte - binary.BigEndian.PutUint64(encNumber[:], num) - data, _ := db.Get(append(chtPrefix, encNumber[:]...)) - return common.BytesToHash(data) -} - -func storeChtRoot(db ethdb.Database, num uint64, root common.Hash) { - var encNumber [8]byte - binary.BigEndian.PutUint64(encNumber[:], num) - db.Put(append(chtPrefix, encNumber[:]...), root[:]) -} - -func makeCht(db ethdb.Database) bool { - headHash := core.GetHeadBlockHash(db) - headNum := core.GetBlockNumber(db, headHash) - - var newChtNum uint64 - if headNum > light.ChtConfirmations { - newChtNum = (headNum - light.ChtConfirmations) / light.ChtFrequency - } - - var lastChtNum uint64 - data, _ := db.Get(lastChtKey) - if len(data) == 8 { - lastChtNum = binary.BigEndian.Uint64(data[:]) - } - if newChtNum <= lastChtNum { - return false - } - - var t *trie.Trie - if lastChtNum > 0 { - var err error - t, err = trie.New(getChtRoot(db, lastChtNum), db) - if err != nil { - lastChtNum = 0 - } - } - if lastChtNum == 0 { - t, _ = trie.New(common.Hash{}, db) - } - - for num := lastChtNum * light.ChtFrequency; num < (lastChtNum+1)*light.ChtFrequency; num++ { - hash := core.GetCanonicalHash(db, num) - if hash == (common.Hash{}) { - panic("Canonical hash not found") - } - td := core.GetTd(db, hash, num) - if td == nil { - panic("TD not found") - } - var encNumber [8]byte - binary.BigEndian.PutUint64(encNumber[:], num) - var node light.ChtNode - node.Hash = hash - node.Td = td - data, _ := rlp.EncodeToBytes(node) - t.Update(encNumber[:], data) - } - - root, err := t.Commit() - if err != nil { - lastChtNum = 0 - } else { - lastChtNum++ - - log.Trace("Generated CHT", "number", lastChtNum, "root", root.Hex()) - - storeChtRoot(db, lastChtNum, root) - var data [8]byte - binary.BigEndian.PutUint64(data[:], lastChtNum) - db.Put(lastChtKey, data[:]) - } - - return newChtNum > lastChtNum -} |