From e98b53bbef8cdbeed54546c75d856d53810e424c Mon Sep 17 00:00:00 2001 From: obscuren Date: Tue, 25 Feb 2014 11:22:27 +0100 Subject: WIP Observing pattern --- ethchain/transaction_pool.go | 41 +++++++++++++++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 8 deletions(-) (limited to 'ethchain') diff --git a/ethchain/transaction_pool.go b/ethchain/transaction_pool.go index 1278cc4dc..cd09bf02e 100644 --- a/ethchain/transaction_pool.go +++ b/ethchain/transaction_pool.go @@ -17,6 +17,17 @@ const ( ) type TxPoolHook chan *Transaction +type TxMsgTy byte + +const ( + TxPre = iota + TxPost +) + +type TxMsg struct { + Tx *Transaction + Type TxMsgTy +} func FindTx(pool *list.List, finder func(*Transaction, *list.Element) bool) *Transaction { for e := pool.Front(); e != nil; e = e.Next() { @@ -59,6 +70,8 @@ type TxPool struct { BlockManager *BlockManager SecondaryProcessor TxProcessor + + subscribers []chan TxMsg } func NewTxPool() *TxPool { @@ -73,21 +86,17 @@ func NewTxPool() *TxPool { // Blocking function. Don't use directly. Use QueueTransaction instead func (pool *TxPool) addTransaction(tx *Transaction) { - log.Println("Adding tx to pool") pool.mutex.Lock() pool.pool.PushBack(tx) pool.mutex.Unlock() // Broadcast the transaction to the rest of the peers pool.Speaker.Broadcast(ethwire.MsgTxTy, []interface{}{tx.RlpData()}) - log.Println("broadcasting it") } // Process transaction validates the Tx and processes funds from the // sender to the recipient. func (pool *TxPool) ProcessTransaction(tx *Transaction, block *Block) (err error) { - log.Printf("[TXPL] Processing Tx %x\n", tx.Hash()) - defer func() { if r := recover(); r != nil { log.Println(r) @@ -132,6 +141,11 @@ func (pool *TxPool) ProcessTransaction(tx *Transaction, block *Block) (err error block.UpdateAddr(tx.Sender(), sender) + log.Printf("[TXPL] Processed Tx %x\n", tx.Hash()) + + // Notify the subscribers + pool.notifySubscribers(TxPost, tx) + return } @@ -145,7 +159,8 @@ func (pool *TxPool) ValidateTransaction(tx *Transaction) error { } // Get the sender - sender := block.GetAddr(tx.Sender()) + accountState := pool.BlockManager.GetAddrState(tx.Sender()) + sender := accountState.Account totAmount := new(big.Int).Add(tx.Value, new(big.Int).Mul(TxFee, TxFeeRat)) // Make sure there's enough in the sender's account. Having insufficient @@ -185,9 +200,8 @@ out: // doesn't matter since this is a goroutine pool.addTransaction(tx) - if pool.SecondaryProcessor != nil { - pool.SecondaryProcessor.ProcessTransaction(tx) - } + // Notify the subscribers + pool.notifySubscribers(TxPre, tx) } case <-pool.quit: break out @@ -231,3 +245,14 @@ func (pool *TxPool) Stop() { pool.Flush() } + +func (pool *TxPool) Subscribe(channel chan TxMsg) { + pool.subscribers = append(pool.subscribers, channel) +} + +func (pool *TxPool) notifySubscribers(ty TxMsgTy, tx *Transaction) { + msg := TxMsg{Type: ty, Tx: tx} + for _, subscriber := range pool.subscribers { + subscriber <- msg + } +} -- cgit v1.2.3