package dex

import (
	"errors"
	"fmt"
	"sync"
	"sync/atomic"
	"time"

	dexCore "github.com/dexon-foundation/dexon-consensus/core"
	coreEcdsa "github.com/dexon-foundation/dexon-consensus/core/crypto/ecdsa"
	"github.com/dexon-foundation/dexon-consensus/core/syncer"
	coreTypes "github.com/dexon-foundation/dexon-consensus/core/types"

	"github.com/dexon-foundation/dexon/core"
	"github.com/dexon-foundation/dexon/dex/db"
	"github.com/dexon-foundation/dexon/log"
	"github.com/dexon-foundation/dexon/rlp"
)

type blockProposer struct {
	mu        sync.Mutex
	running   int32
	syncing   int32
	proposing int32
	dex       *Dexon
	watchCat  *syncer.WatchCat
	dMoment   time.Time

	wg     sync.WaitGroup
	stopCh chan struct{}
}

func NewBlockProposer(dex *Dexon, watchCat *syncer.WatchCat, dMoment time.Time) *blockProposer {
	return &blockProposer{
		dex:      dex,
		watchCat: watchCat,
		dMoment:  dMoment,
	}
}

func (b *blockProposer) Start() error {
	b.mu.Lock()
	defer b.mu.Unlock()

	if !atomic.CompareAndSwapInt32(&b.running, 0, 1) {
		return fmt.Errorf("block proposer is already running")
	}
	log.Info("Block proposer started")

	b.stopCh = make(chan struct{})
	b.wg.Add(1)
	go func() {
		defer b.wg.Done()
		defer atomic.StoreInt32(&b.running, 0)

		var err error
		var c *dexCore.Consensus
		if b.dMoment.After(time.Now()) {
			c = b.initConsensus()
		} else {
			c, err = b.syncConsensus()
		}

		if err != nil {
			log.Error("Block proposer stopped, before start running", "err", err)
			return
		}

		b.run(c)
		log.Info("Block proposer successfully stopped")
	}()
	return nil
}

func (b *blockProposer) run(c *dexCore.Consensus) {
	log.Info("Start running consensus core")
	go c.Run()
	atomic.StoreInt32(&b.proposing, 1)
	<-b.stopCh
	log.Debug("Block proposer receive stop signal")
	c.Stop()
}

func (b *blockProposer) Stop() {
	b.mu.Lock()
	defer b.mu.Unlock()

	if atomic.LoadInt32(&b.running) == 1 {
		b.dex.protocolManager.isBlockProposer = false
		close(b.stopCh)
		b.wg.Wait()
		atomic.StoreInt32(&b.proposing, 0)
	}
}

func (b *blockProposer) IsCoreSyncing() bool {
	return atomic.LoadInt32(&b.syncing) == 1
}

func (b *blockProposer) IsProposing() bool {
	return atomic.LoadInt32(&b.proposing) == 1
}

func (b *blockProposer) initConsensus() *dexCore.Consensus {
	db := db.NewDatabase(b.dex.chainDb)
	privkey := coreEcdsa.NewPrivateKeyFromECDSA(b.dex.config.PrivateKey)
	return dexCore.NewConsensus(b.dMoment,
		b.dex.app, b.dex.governance, db, b.dex.network, privkey, log.Root())
}

func (b *blockProposer) syncConsensus() (*dexCore.Consensus, error) {
	atomic.StoreInt32(&b.syncing, 1)
	defer atomic.StoreInt32(&b.syncing, 0)

	db := db.NewDatabase(b.dex.chainDb)
	privkey := coreEcdsa.NewPrivateKeyFromECDSA(b.dex.config.PrivateKey)
	consensusSync := syncer.NewConsensus(b.dMoment, b.dex.app, b.dex.governance,
		db, b.dex.network, privkey, log.Root())

	// Start the watchCat.
	log.Info("Starting sync watchCat ...")
	b.watchCat.Start()

	// Feed the current block we have in local blockchain.
	cb := b.dex.blockchain.CurrentBlock()
	var block coreTypes.Block
	if err := rlp.DecodeBytes(cb.Header().DexconMeta, &block); err != nil {
		panic(err)
	}
	b.watchCat.Feed(block.Position)

	blocksToSync := func(coreHeight, height uint64) []*coreTypes.Block {
		var blocks []*coreTypes.Block
		for coreHeight < height {
			var block coreTypes.Block
			b := b.dex.blockchain.GetBlockByNumber(coreHeight + 1)
			if err := rlp.DecodeBytes(b.Header().DexconMeta, &block); err != nil {
				panic(err)
			}
			blocks = append(blocks, &block)
			coreHeight = coreHeight + 1
		}
		return blocks
	}

	// Sync all blocks in compaction chain to core.
	_, coreHeight := db.GetCompactionChainTipInfo()

Loop:
	for {
		currentBlock := b.dex.blockchain.CurrentBlock()
		log.Debug("Syncing compaction chain", "core height", coreHeight,
			"height", currentBlock.NumberU64())
		blocks := blocksToSync(coreHeight, currentBlock.NumberU64())

		if len(blocks) == 0 {
			break Loop
		}
		b.watchCat.Feed(blocks[len(blocks)-1].Position)

		log.Debug("Filling compaction chain", "num", len(blocks),
			"first", blocks[0].Finalization.Height,
			"last", blocks[len(blocks)-1].Finalization.Height)
		if _, err := consensusSync.SyncBlocks(blocks, false); err != nil {
			return nil, err
		}
		coreHeight = blocks[len(blocks)-1].Finalization.Height

		select {
		case <-b.stopCh:
			return nil, errors.New("early stop")
		default:
		}
	}

	// Enable isBlockProposer flag to start receiving msg.
	b.dex.protocolManager.isBlockProposer = true

	ch := make(chan core.ChainHeadEvent)
	sub := b.dex.blockchain.SubscribeChainHeadEvent(ch)
	defer sub.Unsubscribe()

	// Listen chain head event until synced.
ListenLoop:
	for {
		select {
		case ev := <-ch:
			blocks := blocksToSync(coreHeight, ev.Block.NumberU64())
			b.watchCat.Feed(blocks[len(blocks)-1].Position)

			if len(blocks) > 0 {
				log.Debug("Filling compaction chain", "num", len(blocks),
					"first", blocks[0].Finalization.Height,
					"last", blocks[len(blocks)-1].Finalization.Height)
				synced, err := consensusSync.SyncBlocks(blocks, true)
				if err != nil {
					log.Error("SyncBlocks fail", "err", err)
					return nil, err
				}
				if synced {
					log.Debug("Consensus core synced")
					break ListenLoop
				}
				coreHeight = blocks[len(blocks)-1].Finalization.Height
			}
		case <-sub.Err():
			log.Debug("System stopped when syncing consensus core")
			return nil, errors.New("system stop")
		case <-b.stopCh:
			log.Debug("Early stop, before consensus core can run")
			return nil, errors.New("early stop")
		case <-b.watchCat.Meow():
			log.Info("WatchCat signaled to stop syncing")
			consensusSync.ForceSync(true)
			break ListenLoop
		}
	}

	b.watchCat.Stop()
	return consensusSync.GetSyncedConsensus()
}