diff options
Diffstat (limited to 'ethereum.go')
-rw-r--r-- | ethereum.go | 659 |
1 files changed, 0 insertions, 659 deletions
diff --git a/ethereum.go b/ethereum.go deleted file mode 100644 index e8b1a9500..000000000 --- a/ethereum.go +++ /dev/null @@ -1,659 +0,0 @@ -package eth - -import ( - "container/list" - "encoding/json" - "fmt" - "math/big" - "math/rand" - "net" - "path" - "strconv" - "strings" - "sync" - "sync/atomic" - "time" - - "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/ethutil" - "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/rpc" - "github.com/ethereum/go-ethereum/state" - "github.com/ethereum/go-ethereum/wire" -) - -const ( - seedTextFileUri string = "http://www.ethereum.org/servers.poc3.txt" - seedNodeAddress = "poc-7.ethdev.com:30303" -) - -var loggerger = logger.NewLogger("SERV") - -func eachPeer(peers *list.List, callback func(*Peer, *list.Element)) { - // Loop thru the peers and close them (if we had them) - for e := peers.Front(); e != nil; e = e.Next() { - callback(e.Value.(*Peer), e) - } -} - -const ( - processReapingTimeout = 60 // TODO increase -) - -type Ethereum struct { - // Channel for shutting down the ethereum - shutdownChan chan bool - quit chan bool - - // DB interface - db ethutil.Database - // State manager for processing new blocks and managing the over all states - blockManager *core.BlockManager - // The transaction pool. Transaction can be pushed on this pool - // for later including in the blocks - txPool *core.TxPool - // The canonical chain - blockChain *core.ChainManager - // The block pool - blockPool *BlockPool - // Eventer - eventMux event.TypeMux - // Peers - peers *list.List - // Nonce - Nonce uint64 - - Addr net.Addr - Port string - - blacklist [][]byte - - peerMut sync.Mutex - - // Capabilities for outgoing peers - serverCaps Caps - - nat NAT - - // Specifies the desired amount of maximum peers - MaxPeers int - - Mining bool - - listening bool - - RpcServer *rpc.JsonRpcServer - - keyManager *crypto.KeyManager - - clientIdentity wire.ClientIdentity - - isUpToDate bool - - filterMu sync.RWMutex - filterId int - filters map[int]*core.Filter -} - -func New(db ethutil.Database, clientIdentity wire.ClientIdentity, keyManager *crypto.KeyManager, caps Caps, usePnp bool) (*Ethereum, error) { - var err error - var nat NAT - - if usePnp { - nat, err = Discover() - if err != nil { - loggerger.Debugln("UPnP failed", err) - } - } - - bootstrapDb(db) - - ethutil.Config.Db = db - - nonce, _ := ethutil.RandomUint64() - ethereum := &Ethereum{ - shutdownChan: make(chan bool), - quit: make(chan bool), - db: db, - peers: list.New(), - Nonce: nonce, - serverCaps: caps, - nat: nat, - keyManager: keyManager, - clientIdentity: clientIdentity, - isUpToDate: true, - filters: make(map[int]*core.Filter), - } - - ethereum.blockPool = NewBlockPool(ethereum) - ethereum.txPool = core.NewTxPool(ethereum) - ethereum.blockChain = core.NewChainManager(ethereum.EventMux()) - ethereum.blockManager = core.NewBlockManager(ethereum) - ethereum.blockChain.SetProcessor(ethereum.blockManager) - - // Start the tx pool - ethereum.txPool.Start() - - return ethereum, nil -} - -func (s *Ethereum) KeyManager() *crypto.KeyManager { - return s.keyManager -} - -func (s *Ethereum) ClientIdentity() wire.ClientIdentity { - return s.clientIdentity -} - -func (s *Ethereum) ChainManager() *core.ChainManager { - return s.blockChain -} - -func (s *Ethereum) BlockManager() *core.BlockManager { - return s.blockManager -} - -func (s *Ethereum) TxPool() *core.TxPool { - return s.txPool -} -func (s *Ethereum) BlockPool() *BlockPool { - return s.blockPool -} -func (s *Ethereum) EventMux() *event.TypeMux { - return &s.eventMux -} -func (self *Ethereum) Db() ethutil.Database { - return self.db -} - -func (s *Ethereum) ServerCaps() Caps { - return s.serverCaps -} -func (s *Ethereum) IsMining() bool { - return s.Mining -} -func (s *Ethereum) PeerCount() int { - return s.peers.Len() -} -func (s *Ethereum) IsUpToDate() bool { - upToDate := true - eachPeer(s.peers, func(peer *Peer, e *list.Element) { - if atomic.LoadInt32(&peer.connected) == 1 { - if peer.catchingUp == true && peer.versionKnown { - upToDate = false - } - } - }) - return upToDate -} -func (s *Ethereum) PushPeer(peer *Peer) { - s.peers.PushBack(peer) -} -func (s *Ethereum) IsListening() bool { - return s.listening -} - -func (s *Ethereum) HighestTDPeer() (td *big.Int) { - td = big.NewInt(0) - - eachPeer(s.peers, func(p *Peer, v *list.Element) { - if p.td.Cmp(td) > 0 { - td = p.td - } - }) - - return -} - -func (self *Ethereum) BlacklistPeer(peer *Peer) { - self.blacklist = append(self.blacklist, peer.pubkey) -} - -func (s *Ethereum) AddPeer(conn net.Conn) { - peer := NewPeer(conn, s, true) - - if peer != nil { - if s.peers.Len() < s.MaxPeers { - peer.Start() - } else { - loggerger.Debugf("Max connected peers reached. Not adding incoming peer.") - } - } -} - -func (s *Ethereum) ProcessPeerList(addrs []string) { - for _, addr := range addrs { - // TODO Probably requires some sanity checks - s.ConnectToPeer(addr) - } -} - -func (s *Ethereum) ConnectToPeer(addr string) error { - if s.peers.Len() < s.MaxPeers { - var alreadyConnected bool - - ahost, aport, _ := net.SplitHostPort(addr) - var chost string - - ips, err := net.LookupIP(ahost) - - if err != nil { - return err - } else { - // If more then one ip is available try stripping away the ipv6 ones - if len(ips) > 1 { - var ipsv4 []net.IP - // For now remove the ipv6 addresses - for _, ip := range ips { - if strings.Contains(ip.String(), "::") { - continue - } else { - ipsv4 = append(ipsv4, ip) - } - } - if len(ipsv4) == 0 { - return fmt.Errorf("[SERV] No IPV4 addresses available for hostname") - } - - // Pick a random ipv4 address, simulating round-robin DNS. - rand.Seed(time.Now().UTC().UnixNano()) - i := rand.Intn(len(ipsv4)) - chost = ipsv4[i].String() - } else { - if len(ips) == 0 { - return fmt.Errorf("[SERV] No IPs resolved for the given hostname") - return nil - } - chost = ips[0].String() - } - } - - eachPeer(s.peers, func(p *Peer, v *list.Element) { - if p.conn == nil { - return - } - phost, pport, _ := net.SplitHostPort(p.conn.RemoteAddr().String()) - - if phost == chost && pport == aport { - alreadyConnected = true - //loggerger.Debugf("Peer %s already added.\n", chost) - return - } - }) - - if alreadyConnected { - return nil - } - - NewOutboundPeer(addr, s, s.serverCaps) - } - - return nil -} - -func (s *Ethereum) OutboundPeers() []*Peer { - // Create a new peer slice with at least the length of the total peers - outboundPeers := make([]*Peer, s.peers.Len()) - length := 0 - eachPeer(s.peers, func(p *Peer, e *list.Element) { - if !p.inbound && p.conn != nil { - outboundPeers[length] = p - length++ - } - }) - - return outboundPeers[:length] -} - -func (s *Ethereum) InboundPeers() []*Peer { - // Create a new peer slice with at least the length of the total peers - inboundPeers := make([]*Peer, s.peers.Len()) - length := 0 - eachPeer(s.peers, func(p *Peer, e *list.Element) { - if p.inbound { - inboundPeers[length] = p - length++ - } - }) - - return inboundPeers[:length] -} - -func (s *Ethereum) InOutPeers() []*Peer { - // Reap the dead peers first - s.reapPeers() - - // Create a new peer slice with at least the length of the total peers - inboundPeers := make([]*Peer, s.peers.Len()) - length := 0 - eachPeer(s.peers, func(p *Peer, e *list.Element) { - // Only return peers with an actual ip - if len(p.host) > 0 { - inboundPeers[length] = p - length++ - } - }) - - return inboundPeers[:length] -} - -func (s *Ethereum) Broadcast(msgType wire.MsgType, data []interface{}) { - msg := wire.NewMessage(msgType, data) - s.BroadcastMsg(msg) -} - -func (s *Ethereum) BroadcastMsg(msg *wire.Msg) { - eachPeer(s.peers, func(p *Peer, e *list.Element) { - p.QueueMessage(msg) - }) -} - -func (s *Ethereum) Peers() *list.List { - return s.peers -} - -func (s *Ethereum) reapPeers() { - eachPeer(s.peers, func(p *Peer, e *list.Element) { - if atomic.LoadInt32(&p.disconnect) == 1 || (p.inbound && (time.Now().Unix()-p.lastPong) > int64(5*time.Minute)) { - s.removePeerElement(e) - } - }) -} - -func (s *Ethereum) removePeerElement(e *list.Element) { - s.peerMut.Lock() - defer s.peerMut.Unlock() - - s.peers.Remove(e) - - s.eventMux.Post(PeerListEvent{s.peers}) -} - -func (s *Ethereum) RemovePeer(p *Peer) { - eachPeer(s.peers, func(peer *Peer, e *list.Element) { - if peer == p { - s.removePeerElement(e) - } - }) -} - -func (s *Ethereum) reapDeadPeerHandler() { - reapTimer := time.NewTicker(processReapingTimeout * time.Second) - - for { - select { - case <-reapTimer.C: - s.reapPeers() - } - } -} - -// Start the ethereum -func (s *Ethereum) Start(seed bool) { - s.blockPool.Start() - - // Bind to addr and port - ln, err := net.Listen("tcp", ":"+s.Port) - if err != nil { - loggerger.Warnf("Port %s in use. Connection listening disabled. Acting as client", s.Port) - s.listening = false - } else { - s.listening = true - // Starting accepting connections - loggerger.Infoln("Ready and accepting connections") - // Start the peer handler - go s.peerHandler(ln) - } - - if s.nat != nil { - go s.upnpUpdateThread() - } - - // Start the reaping processes - go s.reapDeadPeerHandler() - go s.update() - go s.filterLoop() - - if seed { - s.Seed() - } - s.ConnectToPeer("localhost:40404") - loggerger.Infoln("Server started") -} - -func (s *Ethereum) Seed() { - // Sorry Py person. I must blacklist. you perform badly - s.blacklist = append(s.blacklist, ethutil.Hex2Bytes("64656330303561383532336435376331616537643864663236623336313863373537353163636634333530626263396330346237336262623931383064393031")) - ips := PastPeers() - if len(ips) > 0 { - for _, ip := range ips { - loggerger.Infoln("Connecting to previous peer ", ip) - s.ConnectToPeer(ip) - } - } else { - loggerger.Debugln("Retrieving seed nodes") - - // Eth-Go Bootstrapping - ips, er := net.LookupIP("seed.bysh.me") - if er == nil { - peers := []string{} - for _, ip := range ips { - node := fmt.Sprintf("%s:%d", ip.String(), 30303) - loggerger.Debugln("Found DNS Go Peer:", node) - peers = append(peers, node) - } - s.ProcessPeerList(peers) - } - - // Official DNS Bootstrapping - _, nodes, err := net.LookupSRV("eth", "tcp", "ethereum.org") - if err == nil { - peers := []string{} - // Iterate SRV nodes - for _, n := range nodes { - target := n.Target - port := strconv.Itoa(int(n.Port)) - // Resolve target to ip (Go returns list, so may resolve to multiple ips?) - addr, err := net.LookupHost(target) - if err == nil { - for _, a := range addr { - // Build string out of SRV port and Resolved IP - peer := net.JoinHostPort(a, port) - loggerger.Debugln("Found DNS Bootstrap Peer:", peer) - peers = append(peers, peer) - } - } else { - loggerger.Debugln("Couldn't resolve :", target) - } - } - // Connect to Peer list - s.ProcessPeerList(peers) - } - - s.ConnectToPeer(seedNodeAddress) - } -} - -func (s *Ethereum) peerHandler(listener net.Listener) { - for { - conn, err := listener.Accept() - if err != nil { - loggerger.Debugln(err) - - continue - } - - go s.AddPeer(conn) - } -} - -func (s *Ethereum) Stop() { - // Stop eventMux first, it will close all subscriptions. - s.eventMux.Stop() - - // Close the database - defer s.db.Close() - - var ips []string - eachPeer(s.peers, func(p *Peer, e *list.Element) { - ips = append(ips, p.conn.RemoteAddr().String()) - }) - - if len(ips) > 0 { - d, _ := json.MarshalIndent(ips, "", " ") - ethutil.WriteFile(path.Join(ethutil.Config.ExecPath, "known_peers.json"), d) - } - - eachPeer(s.peers, func(p *Peer, e *list.Element) { - p.Stop() - }) - - close(s.quit) - - if s.RpcServer != nil { - s.RpcServer.Stop() - } - s.txPool.Stop() - s.blockPool.Stop() - - loggerger.Infoln("Server stopped") - close(s.shutdownChan) -} - -// This function will wait for a shutdown and resumes main thread execution -func (s *Ethereum) WaitForShutdown() { - <-s.shutdownChan -} - -func (s *Ethereum) upnpUpdateThread() { - // Go off immediately to prevent code duplication, thereafter we renew - // lease every 15 minutes. - timer := time.NewTimer(5 * time.Minute) - lport, _ := strconv.ParseInt(s.Port, 10, 16) - first := true -out: - for { - select { - case <-timer.C: - var err error - _, err = s.nat.AddPortMapping("TCP", int(lport), int(lport), "eth listen port", 20*60) - if err != nil { - loggerger.Debugln("can't add UPnP port mapping:", err) - break out - } - if first && err == nil { - _, err = s.nat.GetExternalAddress() - if err != nil { - loggerger.Debugln("UPnP can't get external address:", err) - continue out - } - first = false - } - timer.Reset(time.Minute * 15) - case <-s.quit: - break out - } - } - - timer.Stop() - - if err := s.nat.DeletePortMapping("TCP", int(lport), int(lport)); err != nil { - loggerger.Debugln("unable to remove UPnP port mapping:", err) - } else { - loggerger.Debugln("succesfully disestablished UPnP port mapping") - } -} - -func (self *Ethereum) update() { - upToDateTimer := time.NewTicker(1 * time.Second) - -out: - for { - select { - case <-upToDateTimer.C: - if self.IsUpToDate() && !self.isUpToDate { - self.eventMux.Post(ChainSyncEvent{false}) - self.isUpToDate = true - } else if !self.IsUpToDate() && self.isUpToDate { - self.eventMux.Post(ChainSyncEvent{true}) - self.isUpToDate = false - } - case <-self.quit: - break out - } - } -} - -// InstallFilter adds filter for blockchain events. -// The filter's callbacks will run for matching blocks and messages. -// The filter should not be modified after it has been installed. -func (self *Ethereum) InstallFilter(filter *core.Filter) (id int) { - self.filterMu.Lock() - id = self.filterId - self.filters[id] = filter - self.filterId++ - self.filterMu.Unlock() - return id -} - -func (self *Ethereum) UninstallFilter(id int) { - self.filterMu.Lock() - delete(self.filters, id) - self.filterMu.Unlock() -} - -// GetFilter retrieves a filter installed using InstallFilter. -// The filter may not be modified. -func (self *Ethereum) GetFilter(id int) *core.Filter { - self.filterMu.RLock() - defer self.filterMu.RUnlock() - return self.filters[id] -} - -func (self *Ethereum) filterLoop() { - // Subscribe to events - events := self.eventMux.Subscribe(core.NewBlockEvent{}, state.Messages(nil)) - for event := range events.Chan() { - switch event := event.(type) { - case core.NewBlockEvent: - self.filterMu.RLock() - for _, filter := range self.filters { - if filter.BlockCallback != nil { - filter.BlockCallback(event.Block) - } - } - self.filterMu.RUnlock() - - case state.Messages: - self.filterMu.RLock() - for _, filter := range self.filters { - if filter.MessageCallback != nil { - msgs := filter.FilterMessages(event) - if len(msgs) > 0 { - filter.MessageCallback(msgs) - } - } - } - self.filterMu.RUnlock() - } - } -} - -func bootstrapDb(db ethutil.Database) { - d, _ := db.Get([]byte("ProtocolVersion")) - protov := ethutil.NewValue(d).Uint() - - if protov == 0 { - db.Put([]byte("ProtocolVersion"), ethutil.NewValue(ProtocolVersion).Bytes()) - } -} - -func PastPeers() []string { - var ips []string - data, _ := ethutil.ReadAllFile(path.Join(ethutil.Config.ExecPath, "known_peers.json")) - json.Unmarshal([]byte(data), &ips) - - return ips -} |