From a6e8ee4d4800a1978eb474a01091f83937743718 Mon Sep 17 00:00:00 2001 From: Mission Liao Date: Tue, 2 Oct 2018 17:21:00 +0800 Subject: core: remove shard (#161) --- core/lattice.go | 555 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 555 insertions(+) create mode 100644 core/lattice.go (limited to 'core/lattice.go') diff --git a/core/lattice.go b/core/lattice.go new file mode 100644 index 0000000..2da32ba --- /dev/null +++ b/core/lattice.go @@ -0,0 +1,555 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// . + +package core + +import ( + "fmt" + "sync" + "time" + + "github.com/dexon-foundation/dexon-consensus-core/common" + "github.com/dexon-foundation/dexon-consensus-core/core/blockdb" + "github.com/dexon-foundation/dexon-consensus-core/core/crypto" + "github.com/dexon-foundation/dexon-consensus-core/core/types" +) + +// Errors for sanity check error. +var ( + ErrAckingBlockNotExists = fmt.Errorf("acking block not exists") + ErrInvalidParentChain = fmt.Errorf("invalid parent chain") + ErrDuplicatedAckOnOneChain = fmt.Errorf("duplicated ack on one chain") + ErrChainStatusCorrupt = fmt.Errorf("chain status corrupt") + ErrInvalidChainID = fmt.Errorf("invalid chain id") + ErrInvalidProposerID = fmt.Errorf("invalid proposer id") + ErrInvalidTimestamp = fmt.Errorf("invalid timestamp") + ErrForkBlock = fmt.Errorf("fork block") + ErrNotAckParent = fmt.Errorf("not ack parent") + ErrDoubleAck = fmt.Errorf("double ack") + ErrInvalidBlockHeight = fmt.Errorf("invalid block height") + ErrAlreadyInLattice = fmt.Errorf("block already in lattice") + ErrIncorrectBlockTime = fmt.Errorf("block timestampe is incorrect") +) + +// latticeData is a module for storing lattice. +type latticeData struct { + // chains stores chains' blocks and other info. + chains []*chainStatus + + // blockByHash stores blocks, indexed by block hash. + blockByHash map[common.Hash]*types.Block + + // Block interval specifies reasonable time difference between + // parent/child blocks. + minBlockTimeInterval time.Duration + maxBlockTimeInterval time.Duration +} + +// newLatticeData creates a new latticeData struct. +func newLatticeData( + chainNum uint32, + minBlockTimeInterval time.Duration, + maxBlockTimeInterval time.Duration) (data *latticeData) { + data = &latticeData{ + chains: make([]*chainStatus, chainNum), + blockByHash: make(map[common.Hash]*types.Block), + minBlockTimeInterval: minBlockTimeInterval, + maxBlockTimeInterval: maxBlockTimeInterval, + } + for i := range data.chains { + data.chains[i] = &chainStatus{ + ID: uint32(i), + blocks: []*types.Block{}, + nextAck: make([]uint64, chainNum), + } + } + return +} + +func (data *latticeData) sanityCheck(b *types.Block) error { + // Check if the chain id is valid. + if b.Position.ChainID >= uint32(len(data.chains)) { + return ErrInvalidChainID + } + + // TODO(mission): Check if its proposer is in validator set somewhere, + // lattice doesn't have to know about node set. + + // Check if it forks + if bInLattice := data.chains[b.Position.ChainID].getBlockByHeight( + b.Position.Height); bInLattice != nil { + + if b.Hash != bInLattice.Hash { + return ErrForkBlock + } + return ErrAlreadyInLattice + } + // TODO(mission): check if fork by loading blocks from DB if the block + // doesn't exists because forking is serious. + + // Check if it acks older blocks. + acksByChainID := make(map[uint32]struct{}, len(data.chains)) + for _, hash := range b.Acks { + if bAck, exist := data.blockByHash[hash]; exist { + if bAck.Position.Height < + data.chains[bAck.Position.ChainID].nextAck[b.Position.ChainID] { + return ErrDoubleAck + } + // Check if ack two blocks on the same chain. This would need + // to check after we replace map with slice for acks. + if _, acked := acksByChainID[bAck.Position.ChainID]; acked { + return ErrDuplicatedAckOnOneChain + } + acksByChainID[bAck.Position.ChainID] = struct{}{} + } else { + // This error has the same checking effect as areAllAcksInLattice. + return ErrAckingBlockNotExists + } + } + + // Check non-genesis blocks if it acks its parent. + if b.Position.Height > 0 { + if !b.IsAcking(b.ParentHash) { + return ErrNotAckParent + } + bParent := data.blockByHash[b.ParentHash] + if bParent.Position.ChainID != b.Position.ChainID { + return ErrInvalidParentChain + } + if bParent.Position.Height != b.Position.Height-1 { + return ErrInvalidBlockHeight + } + // Check if its timestamp is valid. + if !b.Timestamp.After(bParent.Timestamp) { + return ErrInvalidTimestamp + } + // Check if its timestamp is in expected range. + if b.Timestamp.Before(bParent.Timestamp.Add(data.minBlockTimeInterval)) || + b.Timestamp.After(bParent.Timestamp.Add(data.maxBlockTimeInterval)) { + + return ErrIncorrectBlockTime + } + } + return nil +} + +// areAllAcksReceived checks if all ack blocks of a block are all in lattice, +// we would make sure all blocks not acked by some chain would be kept +// in working set. +func (data *latticeData) areAllAcksInLattice(b *types.Block) bool { + for _, h := range b.Acks { + bAck, exist := data.blockByHash[h] + if !exist { + return false + } + if bAckInLattice := data.chains[bAck.Position.ChainID].getBlockByHeight( + bAck.Position.Height); bAckInLattice != nil { + + if bAckInLattice.Hash != bAck.Hash { + panic("areAllAcksInLattice: latticeData.chains has corrupted") + } + } else { + return false + } + } + return true +} + +// addBlock processes block, it does sanity check, inserts block into +// lattice and deletes blocks which will not be used. +func (data *latticeData) addBlock( + block *types.Block) (deliverable []*types.Block, err error) { + + var ( + bAck *types.Block + updated bool + ) + // TODO(mission): sanity check twice, might hurt performance. + // If a block does not pass sanity check, report error. + if err = data.sanityCheck(block); err != nil { + return + } + if err = data.chains[block.Position.ChainID].addBlock(block); err != nil { + return + } + data.blockByHash[block.Hash] = block + // Update nextAcks. + for _, ack := range block.Acks { + bAck = data.blockByHash[ack] + data.chains[bAck.Position.ChainID].nextAck[block.Position.ChainID] = + bAck.Position.Height + 1 + } + // Extract blocks that deliverable to total ordering. + // A block is deliverable to total ordering iff: + // - All its acking blocks are delivered to total ordering. + for { + updated = false + for _, status := range data.chains { + tip := status.getBlockByHeight(status.nextOutput) + if tip == nil { + continue + } + allAckingBlockDelivered := true + for _, ack := range tip.Acks { + bAck, exists := data.blockByHash[ack] + if !exists { + continue + } + if data.chains[bAck.Position.ChainID].nextOutput > + bAck.Position.Height { + + continue + } + // This acked block exists and not delivered yet. + allAckingBlockDelivered = false + } + if allAckingBlockDelivered { + deliverable = append(deliverable, tip) + status.nextOutput++ + updated = true + } + } + if !updated { + break + } + } + + // Delete old blocks in "chains" and "blocks" to release memory space. + // + // A block is safe to be deleted iff: + // - It's delivered to total ordering + // - All chains (including its proposing chain) acks some block with + // higher height in its proposing chain. + // + // This works because blocks of height below this minimum are not going to be + // acked anymore, the ackings of these blocks are illegal. + for _, status := range data.chains { + for _, h := range status.purge() { + delete(data.blockByHash, h) + } + } + return +} + +// prepareBlock helps to setup fields of block based on its ProposerID, +// including: +// - Set 'Acks' and 'Timestamps' for the highest block of each validator not +// acked by this proposer before. +// - Set 'ParentHash' and 'Height' from parent block, if we can't find a +// parent, these fields would be setup like a genesis block. +func (data *latticeData) prepareBlock(block *types.Block) { + // Reset fields to make sure we got these information from parent block. + block.Position.Height = 0 + block.ParentHash = common.Hash{} + acks := common.Hashes{} + for chainID := range data.chains { + // find height of the latest block for that validator. + var ( + curBlock *types.Block + nextHeight = data.chains[chainID].nextAck[block.Position.ChainID] + ) + for { + tmpBlock := data.chains[chainID].getBlockByHeight(nextHeight) + if tmpBlock == nil { + break + } + curBlock = tmpBlock + nextHeight++ + } + if curBlock == nil { + continue + } + acks = append(acks, curBlock.Hash) + if uint32(chainID) == block.Position.ChainID { + block.ParentHash = curBlock.Hash + block.Position.Height = curBlock.Position.Height + 1 + } + } + block.Acks = common.NewSortedHashes(acks) + return +} + +// TODO(mission): make more abstraction for this method. +// nextHeight returns the next height for the chain. +func (data *latticeData) nextPosition(chainID uint32) types.Position { + return data.chains[chainID].nextPosition() +} + +type chainStatus struct { + // ID keeps the chainID of this chain status. + ID uint32 + + // blocks stores blocks proposed for this chain, sorted by height. + blocks []*types.Block + + // minHeight keeps minimum height in blocks. + minHeight uint64 + + // nextAck stores the height of next height that should be acked, i.e. last + // acked height + 1. Initialized to 0. + // being acked. For example, rb.chains[vid1].nextAck[vid2] - 1 is the last + // acked height by vid2 acking vid1. + nextAck []uint64 + + // nextOutput is the next output height of block, default to 0. + nextOutput uint64 +} + +func (s *chainStatus) getBlockByHeight(height uint64) (b *types.Block) { + if height < s.minHeight { + return + } + idx := int(height - s.minHeight) + if idx >= len(s.blocks) { + return + } + b = s.blocks[idx] + return +} + +func (s *chainStatus) addBlock(b *types.Block) error { + if len(s.blocks) > 0 { + // Make sure the height of incoming block should be + // plus one to current latest blocks if exists. + if s.blocks[len(s.blocks)-1].Position.Height != b.Position.Height-1 { + return ErrChainStatusCorrupt + } + } else { + if b.Position.Height != 0 { + return ErrChainStatusCorrupt + } + } + s.blocks = append(s.blocks, b) + return nil +} + +func (s *chainStatus) calcPurgeHeight() (safe uint64, ok bool) { + // blocks with height less than min(nextOutput, nextAck...) + // are safe to be purged. + safe = s.nextOutput + for _, ackedHeight := range s.nextAck { + if safe > ackedHeight { + safe = ackedHeight + } + } + // Both 'nextOutput' and 'nextAck' represents some block to be + // outputed/acked. To find a block already outputed/acked, the height + // needs to be minus 1. + if safe == 0 { + // Avoid underflow. + return + } + safe-- + if safe < s.minHeight { + return + } + ok = true + return +} + +// purge blocks if they are safe to be deleted from working set. +func (s *chainStatus) purge() (purged common.Hashes) { + safe, ok := s.calcPurgeHeight() + if !ok { + return + } + newMinIndex := safe - s.minHeight + 1 + for _, b := range s.blocks[:newMinIndex] { + purged = append(purged, b.Hash) + } + s.blocks = s.blocks[newMinIndex:] + s.minHeight = safe + 1 + return +} + +// nextPosition returns a valid position for new block in this chain. +func (s *chainStatus) nextPosition() types.Position { + return types.Position{ + ChainID: s.ID, + Height: s.minHeight + uint64(len(s.blocks)), + } +} + +// Lattice represents a unit to produce a global ordering from multiple chains. +type Lattice struct { + lock sync.RWMutex + authModule *Authenticator + chainNum uint32 + app Application + debug Debug + db blockdb.BlockDatabase + pool blockPool + data *latticeData + toModule *totalOrdering + ctModule *consensusTimestamp +} + +// NewLattice constructs an Lattice instance. +func NewLattice( + cfg *types.Config, + authModule *Authenticator, + app Application, + debug Debug, + db blockdb.BlockDatabase) (s *Lattice) { + data := newLatticeData( + cfg.NumChains, + cfg.MinBlockInterval, + cfg.MaxBlockInterval) + s = &Lattice{ + authModule: authModule, + chainNum: cfg.NumChains, + app: app, + debug: debug, + db: db, + pool: newBlockPool(cfg.NumChains), + data: data, + toModule: newTotalOrdering( + uint64(cfg.K), + uint64(float32(cfg.NumChains-1)*cfg.PhiRatio+1), + cfg.NumChains), + ctModule: newConsensusTimestamp(), + } + return +} + +// PrepareBlock setup block's field based on current lattice status. +func (s *Lattice) PrepareBlock( + b *types.Block, proposeTime time.Time) (err error) { + + s.lock.RLock() + defer s.lock.RUnlock() + + s.data.prepareBlock(b) + // TODO(mission): the proposeTime might be earlier than tip block of + // that chain. We should let latticeData suggest the time. + b.Timestamp = proposeTime + b.Payload, b.Witness.Data = s.app.PrepareBlock(b.Position) + if err = s.authModule.SignBlock(b); err != nil { + return + } + return +} + +// SanityCheck check if a block is valid based on current lattice status. +// +// If some acking blocks don't exists, Lattice would help to cache this block +// and retry when lattice updated in Lattice.ProcessBlock. +func (s *Lattice) SanityCheck(b *types.Block) (err error) { + // Check the hash of block. + hash, err := hashBlock(b) + if err != nil || hash != b.Hash { + err = ErrIncorrectHash + return + } + // Check the signer. + pubKey, err := crypto.SigToPub(b.Hash, b.Signature) + if err != nil { + return + } + if !b.ProposerID.Equal(crypto.Keccak256Hash(pubKey.Bytes())) { + err = ErrIncorrectSignature + return + } + s.lock.RLock() + defer s.lock.RUnlock() + if err = s.data.sanityCheck(b); err != nil { + // Add to block pool, once the lattice updated, + // would be checked again. + if err == ErrAckingBlockNotExists { + s.pool.addBlock(b) + } + return + } + return +} + +// ProcessBlock adds a block into lattice, and deliver ordered blocks. +// If any block pass sanity check after this block add into lattice, they +// would be returned, too. +// +// NOTE: assume the block passed sanity check. +func (s *Lattice) ProcessBlock( + input *types.Block) (verified, delivered []*types.Block, err error) { + + var ( + tip, b *types.Block + toDelivered []*types.Block + inLattice []*types.Block + earlyDelivered bool + ) + s.lock.Lock() + defer s.lock.Unlock() + if inLattice, err = s.data.addBlock(input); err != nil { + return + } + if err = s.db.Put(*input); err != nil { + return + } + // TODO(mission): remove this hack, BA related stuffs should not + // be done here. + if s.debug != nil { + s.debug.StronglyAcked(input.Hash) + s.debug.BlockConfirmed(input.Hash) + } + // Purge blocks in pool with the same chainID and lower height. + s.pool.purgeBlocks(input.Position.ChainID, input.Position.Height) + // Replay tips in pool to check their validity. + for i := uint32(0); i < s.chainNum; i++ { + if tip = s.pool.tip(i); tip == nil { + continue + } + err = s.data.sanityCheck(tip) + if err == nil { + verified = append(verified, tip) + } + if err == ErrAckingBlockNotExists { + continue + } + s.pool.removeTip(i) + } + // Perform total ordering for each block added to lattice. + for _, b = range inLattice { + toDelivered, earlyDelivered, err = s.toModule.processBlock(b) + if err != nil { + return + } + if len(toDelivered) == 0 { + continue + } + hashes := make(common.Hashes, len(toDelivered)) + for idx := range toDelivered { + hashes[idx] = toDelivered[idx].Hash + } + if s.debug != nil { + s.debug.TotalOrderingDelivered(hashes, earlyDelivered) + } + // Perform timestamp generation. + if err = s.ctModule.processBlocks(toDelivered); err != nil { + return + } + delivered = append(delivered, toDelivered...) + } + return +} + +// NextPosition returns expected position of incoming block for that chain. +func (s *Lattice) NextPosition(chainID uint32) types.Position { + s.lock.RLock() + defer s.lock.RUnlock() + + return s.data.nextPosition(chainID) +} -- cgit v1.2.3