diff options
Diffstat (limited to 'ethereum.go')
-rw-r--r-- | ethereum.go | 213 |
1 files changed, 213 insertions, 0 deletions
diff --git a/ethereum.go b/ethereum.go new file mode 100644 index 000000000..b1b675c88 --- /dev/null +++ b/ethereum.go @@ -0,0 +1,213 @@ +package eth + +import ( + "container/list" + "github.com/ethereum/ethchain-go" + "github.com/ethereum/ethdb-go" + "github.com/ethereum/ethutil-go" + "github.com/ethereum/ethwire-go" + "log" + "net" + "sync/atomic" + "time" +) + +func eachPeer(peers *list.List, callback func(*Peer, *list.Element)) { + // Loop thru the peers and close them (if we had them) + for e := peers.Front(); e != nil; e = e.Next() { + if peer, ok := e.Value.(*Peer); ok { + callback(peer, e) + } + } +} + +const ( + processReapingTimeout = 60 // TODO increase +) + +type Ethereum struct { + // Channel for shutting down the ethereum + shutdownChan chan bool + // DB interface + //db *ethdb.LDBDatabase + db *ethdb.MemDatabase + // Block manager for processing new blocks and managing the block chain + BlockManager *ethchain.BlockManager + // The transaction pool. Transaction can be pushed on this pool + // for later including in the blocks + TxPool *ethchain.TxPool + // Peers (NYI) + peers *list.List + // Nonce + Nonce uint64 +} + +func New() (*Ethereum, error) { + //db, err := ethdb.NewLDBDatabase() + db, err := ethdb.NewMemDatabase() + if err != nil { + return nil, err + } + + ethutil.Config.Db = db + + nonce, _ := ethutil.RandomUint64() + ethereum := &Ethereum{ + shutdownChan: make(chan bool), + db: db, + peers: list.New(), + Nonce: nonce, + } + ethereum.TxPool = ethchain.NewTxPool() + ethereum.TxPool.Speaker = ethereum + ethereum.BlockManager = ethchain.NewBlockManager() + + ethereum.TxPool.BlockManager = ethereum.BlockManager + ethereum.BlockManager.TransactionPool = ethereum.TxPool + + return ethereum, nil +} + +func (s *Ethereum) AddPeer(conn net.Conn) { + peer := NewPeer(conn, s, true) + + if peer != nil { + s.peers.PushBack(peer) + peer.Start() + + log.Println("Peer connected ::", conn.RemoteAddr()) + } +} + +func (s *Ethereum) ProcessPeerList(addrs []string) { + for _, addr := range addrs { + // TODO Probably requires some sanity checks + s.ConnectToPeer(addr) + } +} + +func (s *Ethereum) ConnectToPeer(addr string) error { + peer := NewOutboundPeer(addr, s) + + s.peers.PushBack(peer) + + return nil +} + +func (s *Ethereum) OutboundPeers() []*Peer { + // Create a new peer slice with at least the length of the total peers + outboundPeers := make([]*Peer, s.peers.Len()) + length := 0 + eachPeer(s.peers, func(p *Peer, e *list.Element) { + if !p.inbound { + outboundPeers[length] = p + length++ + } + }) + + return outboundPeers[:length] +} + +func (s *Ethereum) InboundPeers() []*Peer { + // Create a new peer slice with at least the length of the total peers + inboundPeers := make([]*Peer, s.peers.Len()) + length := 0 + eachPeer(s.peers, func(p *Peer, e *list.Element) { + if p.inbound { + inboundPeers[length] = p + length++ + } + }) + + return inboundPeers[:length] +} + +func (s *Ethereum) Broadcast(msgType ethwire.MsgType, data []byte) { + eachPeer(s.peers, func(p *Peer, e *list.Element) { + p.QueueMessage(ethwire.NewMessage(msgType, data)) + }) +} + +func (s *Ethereum) ReapDeadPeers() { + for { + eachPeer(s.peers, func(p *Peer, e *list.Element) { + if atomic.LoadInt32(&p.disconnect) == 1 || (p.inbound && (time.Now().Unix()-p.lastPong) > int64(5*time.Minute)) { + log.Println("Dead peer found .. reaping") + + s.peers.Remove(e) + } + }) + + time.Sleep(processReapingTimeout * time.Second) + } +} + +// Start the ethereum +func (s *Ethereum) Start() { + // For now this function just blocks the main thread + ln, err := net.Listen("tcp", ":12345") + if err != nil { + // This is mainly for testing to create a "network" + if ethutil.Config.Debug { + log.Println("Connection listening disabled. Acting as client") + + err = s.ConnectToPeer("localhost:12345") + if err != nil { + log.Println("Error starting ethereum", err) + + s.Stop() + } + } else { + log.Fatal(err) + } + } else { + // Starting accepting connections + go func() { + for { + conn, err := ln.Accept() + if err != nil { + log.Println(err) + + continue + } + + go s.AddPeer(conn) + } + }() + } + + // Start the reaping processes + go s.ReapDeadPeers() + + // Start the tx pool + s.TxPool.Start() + + // TMP + /* + go func() { + for { + s.Broadcast("block", s.blockManager.bc.GenesisBlock().RlpEncode()) + + time.Sleep(1000 * time.Millisecond) + } + }() + */ +} + +func (s *Ethereum) Stop() { + // Close the database + defer s.db.Close() + + eachPeer(s.peers, func(p *Peer, e *list.Element) { + p.Stop() + }) + + s.shutdownChan <- true + + s.TxPool.Stop() +} + +// This function will wait for a shutdown and resumes main thread execution +func (s *Ethereum) WaitForShutdown() { + <-s.shutdownChan +} |