aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorobscuren <geffobscura@gmail.com>2014-01-22 06:27:08 +0800
committerobscuren <geffobscura@gmail.com>2014-01-22 06:27:08 +0800
commit3616080db46931202003157bacf10748008bebc0 (patch)
treea2004c7c4c8f6c91999b734b75a57ac31b04cb97
parente47230f82da93ef0110faa76211b9b6f13b1060b (diff)
downloaddexon-3616080db46931202003157bacf10748008bebc0.tar
dexon-3616080db46931202003157bacf10748008bebc0.tar.gz
dexon-3616080db46931202003157bacf10748008bebc0.tar.bz2
dexon-3616080db46931202003157bacf10748008bebc0.tar.lz
dexon-3616080db46931202003157bacf10748008bebc0.tar.xz
dexon-3616080db46931202003157bacf10748008bebc0.tar.zst
dexon-3616080db46931202003157bacf10748008bebc0.zip
Added synchronisation of transactions across remote pools
-rw-r--r--dev_console.go16
-rw-r--r--ethereum.go63
-rw-r--r--peer.go6
-rw-r--r--server.go34
-rw-r--r--transaction_pool.go26
5 files changed, 96 insertions, 49 deletions
diff --git a/dev_console.go b/dev_console.go
index 5340a5f46..d14f019e5 100644
--- a/dev_console.go
+++ b/dev_console.go
@@ -12,15 +12,16 @@ import (
)
type Console struct {
- db *ethdb.MemDatabase
- trie *ethutil.Trie
+ db *ethdb.MemDatabase
+ trie *ethutil.Trie
+ server *Server
}
-func NewConsole() *Console {
+func NewConsole(s *Server) *Console {
db, _ := ethdb.NewMemDatabase()
trie := ethutil.NewTrie(db, "")
- return &Console{db: db, trie: trie}
+ return &Console{db: db, trie: trie, server: s}
}
func (i *Console) ValidateInput(action string, argumentLength int) error {
@@ -43,6 +44,9 @@ func (i *Console) ValidateInput(action string, argumentLength int) error {
case action == "encode" && argumentLength != 1:
err = true
expArgCount = 1
+ case action == "tx" && argumentLength != 2:
+ err = true
+ expArgCount = 2
}
if err {
@@ -105,6 +109,10 @@ func (i *Console) ParseInput(input string) bool {
fmt.Printf("%q\n", d)
case "encode":
fmt.Printf("%q\n", ethutil.Encode(tokens[1]))
+ case "tx":
+ tx := ethutil.NewTransaction(tokens[1], ethutil.Big(tokens[2]), []string{""})
+
+ i.server.txPool.QueueTransaction(tx)
case "exit", "quit", "q":
return false
case "help":
diff --git a/ethereum.go b/ethereum.go
index d74cb4ff2..b7f059a02 100644
--- a/ethereum.go
+++ b/ethereum.go
@@ -7,6 +7,7 @@ import (
"log"
"os"
"os/signal"
+ "path"
"runtime"
)
@@ -44,36 +45,54 @@ func main() {
Init()
+ ethutil.ReadConfig()
+
+ server, err := NewServer()
+
+ if err != nil {
+ log.Println(err)
+ return
+ }
+
if StartConsole {
- console := NewConsole()
- console.Start()
- } else {
- log.Println("Starting Ethereum")
- server, err := NewServer()
+ err := os.Mkdir(ethutil.Config.ExecPath, os.ModePerm)
+ // Error is OK if the error is ErrExist
+ if err != nil && !os.IsExist(err) {
+ log.Panic("Unable to create EXECPATH. Exiting")
+ }
+ // TODO The logger will eventually be a non blocking logger. Logging is a expensive task
+ // Log to file only
+ file, err := os.OpenFile(path.Join(ethutil.Config.ExecPath, "debug.log"), os.O_RDWR|os.O_CREATE, os.ModePerm)
if err != nil {
- log.Println(err)
- return
+ log.Panic("Unable to set proper logger", err)
}
- RegisterInterupts(server)
+ ethutil.Config.Log = log.New(file, "", 0)
- if StartMining {
- log.Println("Mining started")
- dagger := &Dagger{}
+ console := NewConsole(server)
+ go console.Start()
+ }
- go func() {
- for {
- res := dagger.Search(ethutil.Big("01001"), ethutil.BigPow(2, 36))
- log.Println("Res dagger", res)
- //server.Broadcast("blockmine", ethutil.Encode(res.String()))
- }
- }()
- }
+ log.Println("Starting Ethereum")
+
+ RegisterInterupts(server)
- server.Start()
+ if StartMining {
+ log.Println("Mining started")
+ dagger := &Dagger{}
- // Wait for shutdown
- server.WaitForShutdown()
+ go func() {
+ for {
+ res := dagger.Search(ethutil.Big("01001"), ethutil.BigPow(2, 36))
+ log.Println("Res dagger", res)
+ //server.Broadcast("blockmine", ethutil.Encode(res.String()))
+ }
+ }()
}
+
+ server.Start()
+
+ // Wait for shutdown
+ server.WaitForShutdown()
}
diff --git a/peer.go b/peer.go
index 0f3422826..207f9e59f 100644
--- a/peer.go
+++ b/peer.go
@@ -62,7 +62,7 @@ func NewOutboundPeer(addr string, server *Server) *Peer {
server: server,
inbound: false,
connected: 0,
- disconnect: 1,
+ disconnect: 0,
}
// Set up the connection in another goroutine so we don't block the main thread
@@ -169,12 +169,12 @@ out:
// Version message
p.handleHandshake(msg)
case ethwire.MsgBlockTy:
- err := p.server.blockManager.ProcessBlock(ethutil.NewBlock(ethutil.Encode(msg.Data)))
+ err := p.server.blockManager.ProcessBlock(ethutil.NewBlock(msg.Data))
if err != nil {
log.Println(err)
}
case ethwire.MsgTxTy:
- p.server.txPool.QueueTransaction(ethutil.NewTransactionFromData(ethutil.Encode(msg.Data)))
+ p.server.txPool.QueueTransaction(ethutil.NewTransactionFromData(msg.Data))
case ethwire.MsgInvTy:
case ethwire.MsgGetPeersTy:
p.requestedPeerList = true
diff --git a/server.go b/server.go
index 3a35a43a2..2927f023a 100644
--- a/server.go
+++ b/server.go
@@ -48,7 +48,7 @@ func NewServer() (*Server, error) {
return nil, err
}
- ethutil.SetConfig(db)
+ ethutil.Config.Db = db
nonce, _ := ethutil.RandomUint64()
server := &Server{
@@ -152,28 +152,30 @@ func (s *Server) Start() {
s.Stop()
}
-
- return
} else {
log.Fatal(err)
}
+ } else {
+ // Starting accepting connections
+ go func() {
+ for {
+ conn, err := ln.Accept()
+ if err != nil {
+ log.Println(err)
+
+ continue
+ }
+
+ go s.AddPeer(conn)
+ }
+ }()
}
// Start the reaping processes
go s.ReapDeadPeers()
- go func() {
- for {
- conn, err := ln.Accept()
- if err != nil {
- log.Println(err)
-
- continue
- }
-
- go s.AddPeer(conn)
- }
- }()
+ // Start the tx pool
+ s.txPool.Start()
// TMP
/*
@@ -196,6 +198,8 @@ func (s *Server) Stop() {
})
s.shutdownChan <- true
+
+ s.txPool.Stop()
}
// This function will wait for a shutdown and resumes main thread execution
diff --git a/transaction_pool.go b/transaction_pool.go
index f645afd06..b302931de 100644
--- a/transaction_pool.go
+++ b/transaction_pool.go
@@ -1,9 +1,11 @@
package main
import (
+ "bytes"
"container/list"
"errors"
"github.com/ethereum/ethutil-go"
+ "github.com/ethereum/ethwire-go"
"log"
"math/big"
"sync"
@@ -56,9 +58,11 @@ func NewTxPool(s *Server) *TxPool {
// Blocking function. Don't use directly. Use QueueTransaction instead
func (pool *TxPool) addTransaction(tx *ethutil.Transaction) {
pool.mutex.Lock()
- defer pool.mutex.Unlock()
-
pool.pool.PushBack(tx)
+ pool.mutex.Unlock()
+
+ // Broadcast the transaction to the rest of the peers
+ pool.server.Broadcast(ethwire.MsgTxTy, tx.RlpEncode())
}
// Process transaction validates the Tx and processes funds from the
@@ -89,7 +93,12 @@ func (pool *TxPool) processTransaction(tx *ethutil.Transaction) error {
// Make sure there's enough in the sender's account. Having insufficient
// funds won't invalidate this transaction but simple ignores it.
if sender.Amount.Cmp(tx.Value) < 0 {
- return errors.New("Insufficient amount in sender's account")
+ if Debug {
+ log.Println("Insufficient amount in sender's account. Adding 1 ETH for debug")
+ sender.Amount = ethutil.BigPow(10, 18)
+ } else {
+ return errors.New("Insufficient amount in sender's account")
+ }
}
// Subtract the amount from the senders account
@@ -121,6 +130,15 @@ out:
for {
select {
case tx := <-pool.queueChan:
+ hash := tx.Hash()
+ foundTx := FindTx(pool.pool, func(tx *ethutil.Transaction, e *list.Element) bool {
+ return bytes.Compare(tx.Hash(), hash) == 0
+ })
+
+ if foundTx != nil {
+ break
+ }
+
// Process the transaction
err := pool.processTransaction(tx)
if err != nil {
@@ -144,8 +162,6 @@ func (pool *TxPool) Flush() {
pool.mutex.Lock()
defer pool.mutex.Unlock()
-
- pool.mutex.Unlock()
}
func (pool *TxPool) Start() {