diff options
-rw-r--r-- | block_pool.go | 351 | ||||
-rwxr-xr-x | ethereum | bin | 0 -> 16025932 bytes | |||
-rw-r--r-- | ethereum.go | 659 | ||||
-rw-r--r-- | events.go | 11 | ||||
-rw-r--r-- | nat.go | 12 | ||||
-rw-r--r-- | natpmp.go | 55 | ||||
-rw-r--r-- | natupnp.go | 338 | ||||
-rw-r--r-- | peer.go | 881 |
8 files changed, 0 insertions, 2307 deletions
diff --git a/block_pool.go b/block_pool.go deleted file mode 100644 index 803927f21..000000000 --- a/block_pool.go +++ /dev/null @@ -1,351 +0,0 @@ -package eth - -import ( - "bytes" - "container/list" - "fmt" - "math" - "math/big" - "sync" - "time" - - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethutil" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/wire" -) - -var poollogger = logger.NewLogger("BPOOL") - -type block struct { - from *Peer - peer *Peer - block *types.Block - reqAt time.Time - requested int -} - -type BlockPool struct { - mut sync.Mutex - - eth *Ethereum - - hashes [][]byte - pool map[string]*block - - td *big.Int - quit chan bool - - fetchingHashes bool - downloadStartedAt time.Time - - ChainLength, BlocksProcessed int - - peer *Peer -} - -func NewBlockPool(eth *Ethereum) *BlockPool { - return &BlockPool{ - eth: eth, - pool: make(map[string]*block), - td: ethutil.Big0, - quit: make(chan bool), - } -} - -func (self *BlockPool) Len() int { - return len(self.hashes) -} - -func (self *BlockPool) Reset() { - self.pool = make(map[string]*block) - self.hashes = nil -} - -func (self *BlockPool) HasLatestHash() bool { - self.mut.Lock() - defer self.mut.Unlock() - - return self.pool[string(self.eth.ChainManager().CurrentBlock.Hash())] != nil -} - -func (self *BlockPool) HasCommonHash(hash []byte) bool { - return self.eth.ChainManager().GetBlock(hash) != nil -} - -func (self *BlockPool) Blocks() (blocks types.Blocks) { - for _, item := range self.pool { - if item.block != nil { - blocks = append(blocks, item.block) - } - } - - return -} - -func (self *BlockPool) FetchHashes(peer *Peer) bool { - highestTd := self.eth.HighestTDPeer() - - if (self.peer == nil && peer.td.Cmp(highestTd) >= 0) || (self.peer != nil && peer.td.Cmp(self.peer.td) > 0) || self.peer == peer { - if self.peer != peer { - poollogger.Infof("Found better suitable peer (%v vs %v)\n", self.td, peer.td) - - if self.peer != nil { - self.peer.doneFetchingHashes = true - } - } - - self.peer = peer - self.td = peer.td - - if !self.HasLatestHash() { - self.fetchHashes() - } - - return true - } - - return false -} - -func (self *BlockPool) fetchHashes() { - peer := self.peer - - peer.doneFetchingHashes = false - - const amount = 256 - peerlogger.Debugf("Fetching hashes (%d) %x...\n", amount, peer.lastReceivedHash[0:4]) - peer.QueueMessage(wire.NewMessage(wire.MsgGetBlockHashesTy, []interface{}{peer.lastReceivedHash, uint32(amount)})) -} - -func (self *BlockPool) AddHash(hash []byte, peer *Peer) { - self.mut.Lock() - defer self.mut.Unlock() - - if self.pool[string(hash)] == nil { - self.pool[string(hash)] = &block{peer, nil, nil, time.Now(), 0} - - self.hashes = append([][]byte{hash}, self.hashes...) - } -} - -func (self *BlockPool) Add(b *types.Block, peer *Peer) { - self.addBlock(b, peer, false) -} - -func (self *BlockPool) AddNew(b *types.Block, peer *Peer) { - self.addBlock(b, peer, true) -} - -func (self *BlockPool) addBlock(b *types.Block, peer *Peer, newBlock bool) { - self.mut.Lock() - defer self.mut.Unlock() - - hash := string(b.Hash()) - - if self.pool[hash] == nil && !self.eth.ChainManager().HasBlock(b.Hash()) { - poollogger.Infof("Got unrequested block (%x...)\n", hash[0:4]) - - self.hashes = append(self.hashes, b.Hash()) - self.pool[hash] = &block{peer, peer, b, time.Now(), 0} - - // The following is only performed on an unrequested new block - if newBlock { - fmt.Println("1.", !self.eth.ChainManager().HasBlock(b.PrevHash), ethutil.Bytes2Hex(b.Hash()[0:4]), ethutil.Bytes2Hex(b.PrevHash[0:4])) - fmt.Println("2.", self.pool[string(b.PrevHash)] == nil) - fmt.Println("3.", !self.fetchingHashes) - if !self.eth.ChainManager().HasBlock(b.PrevHash) /*&& self.pool[string(b.PrevHash)] == nil*/ && !self.fetchingHashes { - poollogger.Infof("Unknown chain, requesting (%x...)\n", b.PrevHash[0:4]) - peer.QueueMessage(wire.NewMessage(wire.MsgGetBlockHashesTy, []interface{}{b.Hash(), uint32(256)})) - } - } - } else if self.pool[hash] != nil { - self.pool[hash].block = b - } - - self.BlocksProcessed++ -} - -func (self *BlockPool) Remove(hash []byte) { - self.mut.Lock() - defer self.mut.Unlock() - - self.hashes = ethutil.DeleteFromByteSlice(self.hashes, hash) - delete(self.pool, string(hash)) -} - -func (self *BlockPool) DistributeHashes() { - self.mut.Lock() - defer self.mut.Unlock() - - var ( - peerLen = self.eth.peers.Len() - amount = 256 * peerLen - dist = make(map[*Peer][][]byte) - ) - - num := int(math.Min(float64(amount), float64(len(self.pool)))) - for i, j := 0, 0; i < len(self.hashes) && j < num; i++ { - hash := self.hashes[i] - item := self.pool[string(hash)] - - if item != nil && item.block == nil { - var peer *Peer - lastFetchFailed := time.Since(item.reqAt) > 5*time.Second - - // Handle failed requests - if lastFetchFailed && item.requested > 5 && item.peer != nil { - if item.requested < 100 { - // Select peer the hash was retrieved off - peer = item.from - } else { - // Remove it - self.hashes = ethutil.DeleteFromByteSlice(self.hashes, hash) - delete(self.pool, string(hash)) - } - } else if lastFetchFailed || item.peer == nil { - // Find a suitable, available peer - eachPeer(self.eth.peers, func(p *Peer, v *list.Element) { - if peer == nil && len(dist[p]) < amount/peerLen && p.statusKnown { - peer = p - } - }) - } - - if peer != nil { - item.reqAt = time.Now() - item.peer = peer - item.requested++ - - dist[peer] = append(dist[peer], hash) - } - } - } - - for peer, hashes := range dist { - peer.FetchBlocks(hashes) - } - - if len(dist) > 0 { - self.downloadStartedAt = time.Now() - } -} - -func (self *BlockPool) Start() { - go self.downloadThread() - go self.chainThread() -} - -func (self *BlockPool) Stop() { - close(self.quit) -} - -func (self *BlockPool) downloadThread() { - serviceTimer := time.NewTicker(100 * time.Millisecond) -out: - for { - select { - case <-self.quit: - break out - case <-serviceTimer.C: - // Check if we're catching up. If not distribute the hashes to - // the peers and download the blockchain - self.fetchingHashes = false - eachPeer(self.eth.peers, func(p *Peer, v *list.Element) { - if p.statusKnown && p.FetchingHashes() { - self.fetchingHashes = true - } - }) - - if len(self.hashes) > 0 { - self.DistributeHashes() - } - - if self.ChainLength < len(self.hashes) { - self.ChainLength = len(self.hashes) - } - - if self.peer != nil && - !self.peer.doneFetchingHashes && - time.Since(self.peer.lastHashAt) > 10*time.Second && - time.Since(self.peer.lastHashRequestedAt) > 5*time.Second { - self.fetchHashes() - } - - /* - if !self.fetchingHashes { - blocks := self.Blocks() - chain.BlockBy(chain.Number).Sort(blocks) - - if len(blocks) > 0 { - if !self.eth.ChainManager().HasBlock(b.PrevHash) && self.pool[string(b.PrevHash)] == nil && !self.fetchingHashes { - } - } - } - */ - } - } -} - -func (self *BlockPool) chainThread() { - procTimer := time.NewTicker(500 * time.Millisecond) -out: - for { - select { - case <-self.quit: - break out - case <-procTimer.C: - blocks := self.Blocks() - types.BlockBy(types.Number).Sort(blocks) - - // Find common block - for i, block := range blocks { - if self.eth.ChainManager().HasBlock(block.PrevHash) { - blocks = blocks[i:] - break - } - } - - if len(blocks) > 0 { - if self.eth.ChainManager().HasBlock(blocks[0].PrevHash) { - for i, block := range blocks[1:] { - // NOTE: The Ith element in this loop refers to the previous block in - // outer "blocks" - if bytes.Compare(block.PrevHash, blocks[i].Hash()) != 0 { - blocks = blocks[:i] - - break - } - } - } else { - blocks = nil - } - } - - if len(blocks) > 0 { - chainman := self.eth.ChainManager() - - err := chainman.InsertChain(blocks) - if err != nil { - poollogger.Debugln(err) - - self.Reset() - - if self.peer != nil && self.peer.conn != nil { - poollogger.Debugf("Punishing peer for supplying bad chain (%v)\n", self.peer.conn.RemoteAddr()) - } - - // This peer gave us bad hashes and made us fetch a bad chain, therefor he shall be punished. - self.eth.BlacklistPeer(self.peer) - self.peer.StopWithReason(DiscBadPeer) - self.td = ethutil.Big0 - self.peer = nil - } - - for _, block := range blocks { - self.Remove(block.Hash()) - } - } - } - } -} diff --git a/ethereum b/ethereum Binary files differnew file mode 100755 index 000000000..7e17d95a4 --- /dev/null +++ b/ethereum diff --git a/ethereum.go b/ethereum.go deleted file mode 100644 index e8b1a9500..000000000 --- a/ethereum.go +++ /dev/null @@ -1,659 +0,0 @@ -package eth - -import ( - "container/list" - "encoding/json" - "fmt" - "math/big" - "math/rand" - "net" - "path" - "strconv" - "strings" - "sync" - "sync/atomic" - "time" - - "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/ethutil" - "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/rpc" - "github.com/ethereum/go-ethereum/state" - "github.com/ethereum/go-ethereum/wire" -) - -const ( - seedTextFileUri string = "http://www.ethereum.org/servers.poc3.txt" - seedNodeAddress = "poc-7.ethdev.com:30303" -) - -var loggerger = logger.NewLogger("SERV") - -func eachPeer(peers *list.List, callback func(*Peer, *list.Element)) { - // Loop thru the peers and close them (if we had them) - for e := peers.Front(); e != nil; e = e.Next() { - callback(e.Value.(*Peer), e) - } -} - -const ( - processReapingTimeout = 60 // TODO increase -) - -type Ethereum struct { - // Channel for shutting down the ethereum - shutdownChan chan bool - quit chan bool - - // DB interface - db ethutil.Database - // State manager for processing new blocks and managing the over all states - blockManager *core.BlockManager - // The transaction pool. Transaction can be pushed on this pool - // for later including in the blocks - txPool *core.TxPool - // The canonical chain - blockChain *core.ChainManager - // The block pool - blockPool *BlockPool - // Eventer - eventMux event.TypeMux - // Peers - peers *list.List - // Nonce - Nonce uint64 - - Addr net.Addr - Port string - - blacklist [][]byte - - peerMut sync.Mutex - - // Capabilities for outgoing peers - serverCaps Caps - - nat NAT - - // Specifies the desired amount of maximum peers - MaxPeers int - - Mining bool - - listening bool - - RpcServer *rpc.JsonRpcServer - - keyManager *crypto.KeyManager - - clientIdentity wire.ClientIdentity - - isUpToDate bool - - filterMu sync.RWMutex - filterId int - filters map[int]*core.Filter -} - -func New(db ethutil.Database, clientIdentity wire.ClientIdentity, keyManager *crypto.KeyManager, caps Caps, usePnp bool) (*Ethereum, error) { - var err error - var nat NAT - - if usePnp { - nat, err = Discover() - if err != nil { - loggerger.Debugln("UPnP failed", err) - } - } - - bootstrapDb(db) - - ethutil.Config.Db = db - - nonce, _ := ethutil.RandomUint64() - ethereum := &Ethereum{ - shutdownChan: make(chan bool), - quit: make(chan bool), - db: db, - peers: list.New(), - Nonce: nonce, - serverCaps: caps, - nat: nat, - keyManager: keyManager, - clientIdentity: clientIdentity, - isUpToDate: true, - filters: make(map[int]*core.Filter), - } - - ethereum.blockPool = NewBlockPool(ethereum) - ethereum.txPool = core.NewTxPool(ethereum) - ethereum.blockChain = core.NewChainManager(ethereum.EventMux()) - ethereum.blockManager = core.NewBlockManager(ethereum) - ethereum.blockChain.SetProcessor(ethereum.blockManager) - - // Start the tx pool - ethereum.txPool.Start() - - return ethereum, nil -} - -func (s *Ethereum) KeyManager() *crypto.KeyManager { - return s.keyManager -} - -func (s *Ethereum) ClientIdentity() wire.ClientIdentity { - return s.clientIdentity -} - -func (s *Ethereum) ChainManager() *core.ChainManager { - return s.blockChain -} - -func (s *Ethereum) BlockManager() *core.BlockManager { - return s.blockManager -} - -func (s *Ethereum) TxPool() *core.TxPool { - return s.txPool -} -func (s *Ethereum) BlockPool() *BlockPool { - return s.blockPool -} -func (s *Ethereum) EventMux() *event.TypeMux { - return &s.eventMux -} -func (self *Ethereum) Db() ethutil.Database { - return self.db -} - -func (s *Ethereum) ServerCaps() Caps { - return s.serverCaps -} -func (s *Ethereum) IsMining() bool { - return s.Mining -} -func (s *Ethereum) PeerCount() int { - return s.peers.Len() -} -func (s *Ethereum) IsUpToDate() bool { - upToDate := true - eachPeer(s.peers, func(peer *Peer, e *list.Element) { - if atomic.LoadInt32(&peer.connected) == 1 { - if peer.catchingUp == true && peer.versionKnown { - upToDate = false - } - } - }) - return upToDate -} -func (s *Ethereum) PushPeer(peer *Peer) { - s.peers.PushBack(peer) -} -func (s *Ethereum) IsListening() bool { - return s.listening -} - -func (s *Ethereum) HighestTDPeer() (td *big.Int) { - td = big.NewInt(0) - - eachPeer(s.peers, func(p *Peer, v *list.Element) { - if p.td.Cmp(td) > 0 { - td = p.td - } - }) - - return -} - -func (self *Ethereum) BlacklistPeer(peer *Peer) { - self.blacklist = append(self.blacklist, peer.pubkey) -} - -func (s *Ethereum) AddPeer(conn net.Conn) { - peer := NewPeer(conn, s, true) - - if peer != nil { - if s.peers.Len() < s.MaxPeers { - peer.Start() - } else { - loggerger.Debugf("Max connected peers reached. Not adding incoming peer.") - } - } -} - -func (s *Ethereum) ProcessPeerList(addrs []string) { - for _, addr := range addrs { - // TODO Probably requires some sanity checks - s.ConnectToPeer(addr) - } -} - -func (s *Ethereum) ConnectToPeer(addr string) error { - if s.peers.Len() < s.MaxPeers { - var alreadyConnected bool - - ahost, aport, _ := net.SplitHostPort(addr) - var chost string - - ips, err := net.LookupIP(ahost) - - if err != nil { - return err - } else { - // If more then one ip is available try stripping away the ipv6 ones - if len(ips) > 1 { - var ipsv4 []net.IP - // For now remove the ipv6 addresses - for _, ip := range ips { - if strings.Contains(ip.String(), "::") { - continue - } else { - ipsv4 = append(ipsv4, ip) - } - } - if len(ipsv4) == 0 { - return fmt.Errorf("[SERV] No IPV4 addresses available for hostname") - } - - // Pick a random ipv4 address, simulating round-robin DNS. - rand.Seed(time.Now().UTC().UnixNano()) - i := rand.Intn(len(ipsv4)) - chost = ipsv4[i].String() - } else { - if len(ips) == 0 { - return fmt.Errorf("[SERV] No IPs resolved for the given hostname") - return nil - } - chost = ips[0].String() - } - } - - eachPeer(s.peers, func(p *Peer, v *list.Element) { - if p.conn == nil { - return - } - phost, pport, _ := net.SplitHostPort(p.conn.RemoteAddr().String()) - - if phost == chost && pport == aport { - alreadyConnected = true - //loggerger.Debugf("Peer %s already added.\n", chost) - return - } - }) - - if alreadyConnected { - return nil - } - - NewOutboundPeer(addr, s, s.serverCaps) - } - - return nil -} - -func (s *Ethereum) OutboundPeers() []*Peer { - // Create a new peer slice with at least the length of the total peers - outboundPeers := make([]*Peer, s.peers.Len()) - length := 0 - eachPeer(s.peers, func(p *Peer, e *list.Element) { - if !p.inbound && p.conn != nil { - outboundPeers[length] = p - length++ - } - }) - - return outboundPeers[:length] -} - -func (s *Ethereum) InboundPeers() []*Peer { - // Create a new peer slice with at least the length of the total peers - inboundPeers := make([]*Peer, s.peers.Len()) - length := 0 - eachPeer(s.peers, func(p *Peer, e *list.Element) { - if p.inbound { - inboundPeers[length] = p - length++ - } - }) - - return inboundPeers[:length] -} - -func (s *Ethereum) InOutPeers() []*Peer { - // Reap the dead peers first - s.reapPeers() - - // Create a new peer slice with at least the length of the total peers - inboundPeers := make([]*Peer, s.peers.Len()) - length := 0 - eachPeer(s.peers, func(p *Peer, e *list.Element) { - // Only return peers with an actual ip - if len(p.host) > 0 { - inboundPeers[length] = p - length++ - } - }) - - return inboundPeers[:length] -} - -func (s *Ethereum) Broadcast(msgType wire.MsgType, data []interface{}) { - msg := wire.NewMessage(msgType, data) - s.BroadcastMsg(msg) -} - -func (s *Ethereum) BroadcastMsg(msg *wire.Msg) { - eachPeer(s.peers, func(p *Peer, e *list.Element) { - p.QueueMessage(msg) - }) -} - -func (s *Ethereum) Peers() *list.List { - return s.peers -} - -func (s *Ethereum) reapPeers() { - eachPeer(s.peers, func(p *Peer, e *list.Element) { - if atomic.LoadInt32(&p.disconnect) == 1 || (p.inbound && (time.Now().Unix()-p.lastPong) > int64(5*time.Minute)) { - s.removePeerElement(e) - } - }) -} - -func (s *Ethereum) removePeerElement(e *list.Element) { - s.peerMut.Lock() - defer s.peerMut.Unlock() - - s.peers.Remove(e) - - s.eventMux.Post(PeerListEvent{s.peers}) -} - -func (s *Ethereum) RemovePeer(p *Peer) { - eachPeer(s.peers, func(peer *Peer, e *list.Element) { - if peer == p { - s.removePeerElement(e) - } - }) -} - -func (s *Ethereum) reapDeadPeerHandler() { - reapTimer := time.NewTicker(processReapingTimeout * time.Second) - - for { - select { - case <-reapTimer.C: - s.reapPeers() - } - } -} - -// Start the ethereum -func (s *Ethereum) Start(seed bool) { - s.blockPool.Start() - - // Bind to addr and port - ln, err := net.Listen("tcp", ":"+s.Port) - if err != nil { - loggerger.Warnf("Port %s in use. Connection listening disabled. Acting as client", s.Port) - s.listening = false - } else { - s.listening = true - // Starting accepting connections - loggerger.Infoln("Ready and accepting connections") - // Start the peer handler - go s.peerHandler(ln) - } - - if s.nat != nil { - go s.upnpUpdateThread() - } - - // Start the reaping processes - go s.reapDeadPeerHandler() - go s.update() - go s.filterLoop() - - if seed { - s.Seed() - } - s.ConnectToPeer("localhost:40404") - loggerger.Infoln("Server started") -} - -func (s *Ethereum) Seed() { - // Sorry Py person. I must blacklist. you perform badly - s.blacklist = append(s.blacklist, ethutil.Hex2Bytes("64656330303561383532336435376331616537643864663236623336313863373537353163636634333530626263396330346237336262623931383064393031")) - ips := PastPeers() - if len(ips) > 0 { - for _, ip := range ips { - loggerger.Infoln("Connecting to previous peer ", ip) - s.ConnectToPeer(ip) - } - } else { - loggerger.Debugln("Retrieving seed nodes") - - // Eth-Go Bootstrapping - ips, er := net.LookupIP("seed.bysh.me") - if er == nil { - peers := []string{} - for _, ip := range ips { - node := fmt.Sprintf("%s:%d", ip.String(), 30303) - loggerger.Debugln("Found DNS Go Peer:", node) - peers = append(peers, node) - } - s.ProcessPeerList(peers) - } - - // Official DNS Bootstrapping - _, nodes, err := net.LookupSRV("eth", "tcp", "ethereum.org") - if err == nil { - peers := []string{} - // Iterate SRV nodes - for _, n := range nodes { - target := n.Target - port := strconv.Itoa(int(n.Port)) - // Resolve target to ip (Go returns list, so may resolve to multiple ips?) - addr, err := net.LookupHost(target) - if err == nil { - for _, a := range addr { - // Build string out of SRV port and Resolved IP - peer := net.JoinHostPort(a, port) - loggerger.Debugln("Found DNS Bootstrap Peer:", peer) - peers = append(peers, peer) - } - } else { - loggerger.Debugln("Couldn't resolve :", target) - } - } - // Connect to Peer list - s.ProcessPeerList(peers) - } - - s.ConnectToPeer(seedNodeAddress) - } -} - -func (s *Ethereum) peerHandler(listener net.Listener) { - for { - conn, err := listener.Accept() - if err != nil { - loggerger.Debugln(err) - - continue - } - - go s.AddPeer(conn) - } -} - -func (s *Ethereum) Stop() { - // Stop eventMux first, it will close all subscriptions. - s.eventMux.Stop() - - // Close the database - defer s.db.Close() - - var ips []string - eachPeer(s.peers, func(p *Peer, e *list.Element) { - ips = append(ips, p.conn.RemoteAddr().String()) - }) - - if len(ips) > 0 { - d, _ := json.MarshalIndent(ips, "", " ") - ethutil.WriteFile(path.Join(ethutil.Config.ExecPath, "known_peers.json"), d) - } - - eachPeer(s.peers, func(p *Peer, e *list.Element) { - p.Stop() - }) - - close(s.quit) - - if s.RpcServer != nil { - s.RpcServer.Stop() - } - s.txPool.Stop() - s.blockPool.Stop() - - loggerger.Infoln("Server stopped") - close(s.shutdownChan) -} - -// This function will wait for a shutdown and resumes main thread execution -func (s *Ethereum) WaitForShutdown() { - <-s.shutdownChan -} - -func (s *Ethereum) upnpUpdateThread() { - // Go off immediately to prevent code duplication, thereafter we renew - // lease every 15 minutes. - timer := time.NewTimer(5 * time.Minute) - lport, _ := strconv.ParseInt(s.Port, 10, 16) - first := true -out: - for { - select { - case <-timer.C: - var err error - _, err = s.nat.AddPortMapping("TCP", int(lport), int(lport), "eth listen port", 20*60) - if err != nil { - loggerger.Debugln("can't add UPnP port mapping:", err) - break out - } - if first && err == nil { - _, err = s.nat.GetExternalAddress() - if err != nil { - loggerger.Debugln("UPnP can't get external address:", err) - continue out - } - first = false - } - timer.Reset(time.Minute * 15) - case <-s.quit: - break out - } - } - - timer.Stop() - - if err := s.nat.DeletePortMapping("TCP", int(lport), int(lport)); err != nil { - loggerger.Debugln("unable to remove UPnP port mapping:", err) - } else { - loggerger.Debugln("succesfully disestablished UPnP port mapping") - } -} - -func (self *Ethereum) update() { - upToDateTimer := time.NewTicker(1 * time.Second) - -out: - for { - select { - case <-upToDateTimer.C: - if self.IsUpToDate() && !self.isUpToDate { - self.eventMux.Post(ChainSyncEvent{false}) - self.isUpToDate = true - } else if !self.IsUpToDate() && self.isUpToDate { - self.eventMux.Post(ChainSyncEvent{true}) - self.isUpToDate = false - } - case <-self.quit: - break out - } - } -} - -// InstallFilter adds filter for blockchain events. -// The filter's callbacks will run for matching blocks and messages. -// The filter should not be modified after it has been installed. -func (self *Ethereum) InstallFilter(filter *core.Filter) (id int) { - self.filterMu.Lock() - id = self.filterId - self.filters[id] = filter - self.filterId++ - self.filterMu.Unlock() - return id -} - -func (self *Ethereum) UninstallFilter(id int) { - self.filterMu.Lock() - delete(self.filters, id) - self.filterMu.Unlock() -} - -// GetFilter retrieves a filter installed using InstallFilter. -// The filter may not be modified. -func (self *Ethereum) GetFilter(id int) *core.Filter { - self.filterMu.RLock() - defer self.filterMu.RUnlock() - return self.filters[id] -} - -func (self *Ethereum) filterLoop() { - // Subscribe to events - events := self.eventMux.Subscribe(core.NewBlockEvent{}, state.Messages(nil)) - for event := range events.Chan() { - switch event := event.(type) { - case core.NewBlockEvent: - self.filterMu.RLock() - for _, filter := range self.filters { - if filter.BlockCallback != nil { - filter.BlockCallback(event.Block) - } - } - self.filterMu.RUnlock() - - case state.Messages: - self.filterMu.RLock() - for _, filter := range self.filters { - if filter.MessageCallback != nil { - msgs := filter.FilterMessages(event) - if len(msgs) > 0 { - filter.MessageCallback(msgs) - } - } - } - self.filterMu.RUnlock() - } - } -} - -func bootstrapDb(db ethutil.Database) { - d, _ := db.Get([]byte("ProtocolVersion")) - protov := ethutil.NewValue(d).Uint() - - if protov == 0 { - db.Put([]byte("ProtocolVersion"), ethutil.NewValue(ProtocolVersion).Bytes()) - } -} - -func PastPeers() []string { - var ips []string - data, _ := ethutil.ReadAllFile(path.Join(ethutil.Config.ExecPath, "known_peers.json")) - json.Unmarshal([]byte(data), &ips) - - return ips -} diff --git a/events.go b/events.go deleted file mode 100644 index 5fff1d831..000000000 --- a/events.go +++ /dev/null @@ -1,11 +0,0 @@ -package eth - -import "container/list" - -type PeerListEvent struct { - Peers *list.List -} - -type ChainSyncEvent struct { - InSync bool -} diff --git a/nat.go b/nat.go deleted file mode 100644 index 999308eb2..000000000 --- a/nat.go +++ /dev/null @@ -1,12 +0,0 @@ -package eth - -import ( - "net" -) - -// protocol is either "udp" or "tcp" -type NAT interface { - GetExternalAddress() (addr net.IP, err error) - AddPortMapping(protocol string, externalPort, internalPort int, description string, timeout int) (mappedExternalPort int, err error) - DeletePortMapping(protocol string, externalPort, internalPort int) (err error) -} diff --git a/natpmp.go b/natpmp.go deleted file mode 100644 index 489342a4b..000000000 --- a/natpmp.go +++ /dev/null @@ -1,55 +0,0 @@ -package eth - -import ( - "fmt" - "net" - - natpmp "github.com/jackpal/go-nat-pmp" -) - -// Adapt the NAT-PMP protocol to the NAT interface - -// TODO: -// + Register for changes to the external address. -// + Re-register port mapping when router reboots. -// + A mechanism for keeping a port mapping registered. - -type natPMPClient struct { - client *natpmp.Client -} - -func NewNatPMP(gateway net.IP) (nat NAT) { - return &natPMPClient{natpmp.NewClient(gateway)} -} - -func (n *natPMPClient) GetExternalAddress() (addr net.IP, err error) { - response, err := n.client.GetExternalAddress() - if err != nil { - return - } - ip := response.ExternalIPAddress - addr = net.IPv4(ip[0], ip[1], ip[2], ip[3]) - return -} - -func (n *natPMPClient) AddPortMapping(protocol string, externalPort, internalPort int, - description string, timeout int) (mappedExternalPort int, err error) { - if timeout <= 0 { - err = fmt.Errorf("timeout must not be <= 0") - return - } - // Note order of port arguments is switched between our AddPortMapping and the client's AddPortMapping. - response, err := n.client.AddPortMapping(protocol, internalPort, externalPort, timeout) - if err != nil { - return - } - mappedExternalPort = int(response.MappedExternalPort) - return -} - -func (n *natPMPClient) DeletePortMapping(protocol string, externalPort, internalPort int) (err error) { - // To destroy a mapping, send an add-port with - // an internalPort of the internal port to destroy, an external port of zero and a time of zero. - _, err = n.client.AddPortMapping(protocol, internalPort, 0, 0) - return -} diff --git a/natupnp.go b/natupnp.go deleted file mode 100644 index c7f9eeb62..000000000 --- a/natupnp.go +++ /dev/null @@ -1,338 +0,0 @@ -package eth - -// Just enough UPnP to be able to forward ports -// - -import ( - "bytes" - "encoding/xml" - "errors" - "net" - "net/http" - "os" - "strconv" - "strings" - "time" -) - -type upnpNAT struct { - serviceURL string - ourIP string -} - -func Discover() (nat NAT, err error) { - ssdp, err := net.ResolveUDPAddr("udp4", "239.255.255.250:1900") - if err != nil { - return - } - conn, err := net.ListenPacket("udp4", ":0") - if err != nil { - return - } - socket := conn.(*net.UDPConn) - defer socket.Close() - - err = socket.SetDeadline(time.Now().Add(10 * time.Second)) - if err != nil { - return - } - - st := "ST: urn:schemas-upnp-org:device:InternetGatewayDevice:1\r\n" - buf := bytes.NewBufferString( - "M-SEARCH * HTTP/1.1\r\n" + - "HOST: 239.255.255.250:1900\r\n" + - st + - "MAN: \"ssdp:discover\"\r\n" + - "MX: 2\r\n\r\n") - message := buf.Bytes() - answerBytes := make([]byte, 1024) - for i := 0; i < 3; i++ { - _, err = socket.WriteToUDP(message, ssdp) - if err != nil { - return - } - var n int - n, _, err = socket.ReadFromUDP(answerBytes) - if err != nil { - continue - // socket.Close() - // return - } - answer := string(answerBytes[0:n]) - if strings.Index(answer, "\r\n"+st) < 0 { - continue - } - // HTTP header field names are case-insensitive. - // http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2 - locString := "\r\nlocation: " - answer = strings.ToLower(answer) - locIndex := strings.Index(answer, locString) - if locIndex < 0 { - continue - } - loc := answer[locIndex+len(locString):] - endIndex := strings.Index(loc, "\r\n") - if endIndex < 0 { - continue - } - locURL := loc[0:endIndex] - var serviceURL string - serviceURL, err = getServiceURL(locURL) - if err != nil { - return - } - var ourIP string - ourIP, err = getOurIP() - if err != nil { - return - } - nat = &upnpNAT{serviceURL: serviceURL, ourIP: ourIP} - return - } - err = errors.New("UPnP port discovery failed.") - return -} - -// service represents the Service type in an UPnP xml description. -// Only the parts we care about are present and thus the xml may have more -// fields than present in the structure. -type service struct { - ServiceType string `xml:"serviceType"` - ControlURL string `xml:"controlURL"` -} - -// deviceList represents the deviceList type in an UPnP xml description. -// Only the parts we care about are present and thus the xml may have more -// fields than present in the structure. -type deviceList struct { - XMLName xml.Name `xml:"deviceList"` - Device []device `xml:"device"` -} - -// serviceList represents the serviceList type in an UPnP xml description. -// Only the parts we care about are present and thus the xml may have more -// fields than present in the structure. -type serviceList struct { - XMLName xml.Name `xml:"serviceList"` - Service []service `xml:"service"` -} - -// device represents the device type in an UPnP xml description. -// Only the parts we care about are present and thus the xml may have more -// fields than present in the structure. -type device struct { - XMLName xml.Name `xml:"device"` - DeviceType string `xml:"deviceType"` - DeviceList deviceList `xml:"deviceList"` - ServiceList serviceList `xml:"serviceList"` -} - -// specVersion represents the specVersion in a UPnP xml description. -// Only the parts we care about are present and thus the xml may have more -// fields than present in the structure. -type specVersion struct { - XMLName xml.Name `xml:"specVersion"` - Major int `xml:"major"` - Minor int `xml:"minor"` -} - -// root represents the Root document for a UPnP xml description. -// Only the parts we care about are present and thus the xml may have more -// fields than present in the structure. -type root struct { - XMLName xml.Name `xml:"root"` - SpecVersion specVersion - Device device -} - -func getChildDevice(d *device, deviceType string) *device { - dl := d.DeviceList.Device - for i := 0; i < len(dl); i++ { - if dl[i].DeviceType == deviceType { - return &dl[i] - } - } - return nil -} - -func getChildService(d *device, serviceType string) *service { - sl := d.ServiceList.Service - for i := 0; i < len(sl); i++ { - if sl[i].ServiceType == serviceType { - return &sl[i] - } - } - return nil -} - -func getOurIP() (ip string, err error) { - hostname, err := os.Hostname() - if err != nil { - return - } - p, err := net.LookupIP(hostname) - if err != nil && len(p) > 0 { - return - } - return p[0].String(), nil -} - -func getServiceURL(rootURL string) (url string, err error) { - r, err := http.Get(rootURL) - if err != nil { - return - } - defer r.Body.Close() - if r.StatusCode >= 400 { - err = errors.New(string(r.StatusCode)) - return - } - var root root - err = xml.NewDecoder(r.Body).Decode(&root) - - if err != nil { - return - } - a := &root.Device - if a.DeviceType != "urn:schemas-upnp-org:device:InternetGatewayDevice:1" { - err = errors.New("No InternetGatewayDevice") - return - } - b := getChildDevice(a, "urn:schemas-upnp-org:device:WANDevice:1") - if b == nil { - err = errors.New("No WANDevice") - return - } - c := getChildDevice(b, "urn:schemas-upnp-org:device:WANConnectionDevice:1") - if c == nil { - err = errors.New("No WANConnectionDevice") - return - } - d := getChildService(c, "urn:schemas-upnp-org:service:WANIPConnection:1") - if d == nil { - err = errors.New("No WANIPConnection") - return - } - url = combineURL(rootURL, d.ControlURL) - return -} - -func combineURL(rootURL, subURL string) string { - protocolEnd := "://" - protoEndIndex := strings.Index(rootURL, protocolEnd) - a := rootURL[protoEndIndex+len(protocolEnd):] - rootIndex := strings.Index(a, "/") - return rootURL[0:protoEndIndex+len(protocolEnd)+rootIndex] + subURL -} - -func soapRequest(url, function, message string) (r *http.Response, err error) { - fullMessage := "<?xml version=\"1.0\" ?>" + - "<s:Envelope xmlns:s=\"http://schemas.xmlsoap.org/soap/envelope/\" s:encodingStyle=\"http://schemas.xmlsoap.org/soap/encoding/\">\r\n" + - "<s:Body>" + message + "</s:Body></s:Envelope>" - - req, err := http.NewRequest("POST", url, strings.NewReader(fullMessage)) - if err != nil { - return nil, err - } - req.Header.Set("Content-Type", "text/xml ; charset=\"utf-8\"") - req.Header.Set("User-Agent", "Darwin/10.0.0, UPnP/1.0, MiniUPnPc/1.3") - //req.Header.Set("Transfer-Encoding", "chunked") - req.Header.Set("SOAPAction", "\"urn:schemas-upnp-org:service:WANIPConnection:1#"+function+"\"") - req.Header.Set("Connection", "Close") - req.Header.Set("Cache-Control", "no-cache") - req.Header.Set("Pragma", "no-cache") - - // log.Stderr("soapRequest ", req) - //fmt.Println(fullMessage) - - r, err = http.DefaultClient.Do(req) - if err != nil { - return - } - - if r.Body != nil { - defer r.Body.Close() - } - - if r.StatusCode >= 400 { - // log.Stderr(function, r.StatusCode) - err = errors.New("Error " + strconv.Itoa(r.StatusCode) + " for " + function) - r = nil - return - } - return -} - -type statusInfo struct { - externalIpAddress string -} - -func (n *upnpNAT) getStatusInfo() (info statusInfo, err error) { - - message := "<u:GetStatusInfo xmlns:u=\"urn:schemas-upnp-org:service:WANIPConnection:1\">\r\n" + - "</u:GetStatusInfo>" - - var response *http.Response - response, err = soapRequest(n.serviceURL, "GetStatusInfo", message) - if err != nil { - return - } - - // TODO: Write a soap reply parser. It has to eat the Body and envelope tags... - - response.Body.Close() - return -} - -func (n *upnpNAT) GetExternalAddress() (addr net.IP, err error) { - info, err := n.getStatusInfo() - if err != nil { - return - } - addr = net.ParseIP(info.externalIpAddress) - return -} - -func (n *upnpNAT) AddPortMapping(protocol string, externalPort, internalPort int, description string, timeout int) (mappedExternalPort int, err error) { - // A single concatenation would break ARM compilation. - message := "<u:AddPortMapping xmlns:u=\"urn:schemas-upnp-org:service:WANIPConnection:1\">\r\n" + - "<NewRemoteHost></NewRemoteHost><NewExternalPort>" + strconv.Itoa(externalPort) - message += "</NewExternalPort><NewProtocol>" + protocol + "</NewProtocol>" - message += "<NewInternalPort>" + strconv.Itoa(internalPort) + "</NewInternalPort>" + - "<NewInternalClient>" + n.ourIP + "</NewInternalClient>" + - "<NewEnabled>1</NewEnabled><NewPortMappingDescription>" - message += description + - "</NewPortMappingDescription><NewLeaseDuration>" + strconv.Itoa(timeout) + - "</NewLeaseDuration></u:AddPortMapping>" - - var response *http.Response - response, err = soapRequest(n.serviceURL, "AddPortMapping", message) - if err != nil { - return - } - - // TODO: check response to see if the port was forwarded - // log.Println(message, response) - mappedExternalPort = externalPort - _ = response - return -} - -func (n *upnpNAT) DeletePortMapping(protocol string, externalPort, internalPort int) (err error) { - - message := "<u:DeletePortMapping xmlns:u=\"urn:schemas-upnp-org:service:WANIPConnection:1\">\r\n" + - "<NewRemoteHost></NewRemoteHost><NewExternalPort>" + strconv.Itoa(externalPort) + - "</NewExternalPort><NewProtocol>" + protocol + "</NewProtocol>" + - "</u:DeletePortMapping>" - - var response *http.Response - response, err = soapRequest(n.serviceURL, "DeletePortMapping", message) - if err != nil { - return - } - - // TODO: check response to see if the port was deleted - // log.Println(message, response) - _ = response - return -} diff --git a/peer.go b/peer.go deleted file mode 100644 index 331e9de37..000000000 --- a/peer.go +++ /dev/null @@ -1,881 +0,0 @@ -package eth - -import ( - "bytes" - "container/list" - "fmt" - "math" - "math/big" - "net" - "strconv" - "strings" - "sync/atomic" - "time" - - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethutil" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/wire" -) - -var peerlogger = logger.NewLogger("PEER") - -const ( - // The size of the output buffer for writing messages - outputBufferSize = 50 - // Current protocol version - ProtocolVersion = 49 - // Current P2P version - P2PVersion = 2 - // Ethereum network version - NetVersion = 0 - // Interval for ping/pong message - pingPongTimer = 2 * time.Second -) - -type DiscReason byte - -const ( - // Values are given explicitly instead of by iota because these values are - // defined by the wire protocol spec; it is easier for humans to ensure - // correctness when values are explicit. - DiscRequested DiscReason = iota - DiscReTcpSysErr - DiscBadProto - DiscBadPeer - DiscTooManyPeers - DiscConnDup - DiscGenesisErr - DiscProtoErr - DiscQuitting -) - -var discReasonToString = []string{ - "requested", - "TCP sys error", - "bad protocol", - "useless peer", - "too many peers", - "already connected", - "wrong genesis block", - "incompatible network", - "quitting", -} - -func (d DiscReason) String() string { - if len(discReasonToString) < int(d) { - return "Unknown" - } - - return discReasonToString[d] -} - -// Peer capabilities -type Caps byte - -const ( - CapPeerDiscTy Caps = 1 << iota - CapTxTy - CapChainTy - - CapDefault = CapChainTy | CapTxTy | CapPeerDiscTy -) - -var capsToString = map[Caps]string{ - CapPeerDiscTy: "Peer discovery", - CapTxTy: "Transaction relaying", - CapChainTy: "Block chain relaying", -} - -func (c Caps) IsCap(cap Caps) bool { - return c&cap > 0 -} - -func (c Caps) String() string { - var caps []string - if c.IsCap(CapPeerDiscTy) { - caps = append(caps, capsToString[CapPeerDiscTy]) - } - if c.IsCap(CapChainTy) { - caps = append(caps, capsToString[CapChainTy]) - } - if c.IsCap(CapTxTy) { - caps = append(caps, capsToString[CapTxTy]) - } - - return strings.Join(caps, " | ") -} - -type Peer struct { - // Ethereum interface - ethereum *Ethereum - // Net connection - conn net.Conn - // Output queue which is used to communicate and handle messages - outputQueue chan *wire.Msg - // Quit channel - quit chan bool - // Determines whether it's an inbound or outbound peer - inbound bool - // Flag for checking the peer's connectivity state - connected int32 - disconnect int32 - // Last known message send - lastSend time.Time - // Indicated whether a verack has been send or not - // This flag is used by writeMessage to check if messages are allowed - // to be send or not. If no version is known all messages are ignored. - versionKnown bool - statusKnown bool - - // Last received pong message - lastPong int64 - lastBlockReceived time.Time - doneFetchingHashes bool - lastHashAt time.Time - lastHashRequestedAt time.Time - - host []byte - port uint16 - caps Caps - td *big.Int - bestHash []byte - lastReceivedHash []byte - requestedHashes [][]byte - - // This peer's public key - pubkey []byte - - // Indicated whether the node is catching up or not - catchingUp bool - diverted bool - blocksRequested int - - version string - - // We use this to give some kind of pingtime to a node, not very accurate, could be improved. - pingTime time.Duration - pingStartTime time.Time - - lastRequestedBlock *types.Block - - protocolCaps *ethutil.Value -} - -func NewPeer(conn net.Conn, ethereum *Ethereum, inbound bool) *Peer { - pubkey := ethereum.KeyManager().PublicKey()[1:] - - return &Peer{ - outputQueue: make(chan *wire.Msg, outputBufferSize), - quit: make(chan bool), - ethereum: ethereum, - conn: conn, - inbound: inbound, - disconnect: 0, - connected: 1, - port: 30303, - pubkey: pubkey, - blocksRequested: 10, - caps: ethereum.ServerCaps(), - version: ethereum.ClientIdentity().String(), - protocolCaps: ethutil.NewValue(nil), - td: big.NewInt(0), - doneFetchingHashes: true, - } -} - -func NewOutboundPeer(addr string, ethereum *Ethereum, caps Caps) *Peer { - p := &Peer{ - outputQueue: make(chan *wire.Msg, outputBufferSize), - quit: make(chan bool), - ethereum: ethereum, - inbound: false, - connected: 0, - disconnect: 0, - port: 30303, - caps: caps, - version: ethereum.ClientIdentity().String(), - protocolCaps: ethutil.NewValue(nil), - td: big.NewInt(0), - doneFetchingHashes: true, - } - - // Set up the connection in another goroutine so we don't block the main thread - go func() { - conn, err := p.Connect(addr) - if err != nil { - //peerlogger.Debugln("Connection to peer failed. Giving up.", err) - p.Stop() - return - } - p.conn = conn - - // Atomically set the connection state - atomic.StoreInt32(&p.connected, 1) - atomic.StoreInt32(&p.disconnect, 0) - - p.Start() - }() - - return p -} - -func (self *Peer) Connect(addr string) (conn net.Conn, err error) { - const maxTries = 3 - for attempts := 0; attempts < maxTries; attempts++ { - conn, err = net.DialTimeout("tcp", addr, 10*time.Second) - if err != nil { - time.Sleep(time.Duration(attempts*20) * time.Second) - continue - } - - // Success - return - } - - return -} - -// Getters -func (p *Peer) PingTime() string { - return p.pingTime.String() -} -func (p *Peer) Inbound() bool { - return p.inbound -} -func (p *Peer) LastSend() time.Time { - return p.lastSend -} -func (p *Peer) LastPong() int64 { - return p.lastPong -} -func (p *Peer) Host() []byte { - return p.host -} -func (p *Peer) Port() uint16 { - return p.port -} -func (p *Peer) Version() string { - return p.version -} -func (p *Peer) Connected() *int32 { - return &p.connected -} - -// Setters -func (p *Peer) SetVersion(version string) { - p.version = version -} - -// Outputs any RLP encoded data to the peer -func (p *Peer) QueueMessage(msg *wire.Msg) { - if atomic.LoadInt32(&p.connected) != 1 { - return - } - p.outputQueue <- msg -} - -func (p *Peer) writeMessage(msg *wire.Msg) { - // Ignore the write if we're not connected - if atomic.LoadInt32(&p.connected) != 1 { - return - } - - if !p.versionKnown { - switch msg.Type { - case wire.MsgHandshakeTy: // Ok - default: // Anything but ack is allowed - return - } - } else { - /* - if !p.statusKnown { - switch msg.Type { - case wire.MsgStatusTy: // Ok - default: // Anything but ack is allowed - return - } - } - */ - } - - peerlogger.DebugDetailf("(%v) <= %v\n", p.conn.RemoteAddr(), formatMessage(msg)) - - err := wire.WriteMessage(p.conn, msg) - if err != nil { - peerlogger.Debugln(" Can't send message:", err) - // Stop the client if there was an error writing to it - p.Stop() - return - } -} - -// Outbound message handler. Outbound messages are handled here -func (p *Peer) HandleOutbound() { - // The ping timer. Makes sure that every 2 minutes a ping is send to the peer - pingTimer := time.NewTicker(pingPongTimer) - serviceTimer := time.NewTicker(10 * time.Second) - -out: - for { - skip: - select { - // Main message queue. All outbound messages are processed through here - case msg := <-p.outputQueue: - if !p.statusKnown { - switch msg.Type { - case wire.MsgTxTy, wire.MsgGetBlockHashesTy, wire.MsgBlockHashesTy, wire.MsgGetBlocksTy, wire.MsgBlockTy: - break skip - } - } - - switch msg.Type { - case wire.MsgGetBlockHashesTy: - p.lastHashRequestedAt = time.Now() - } - - p.writeMessage(msg) - p.lastSend = time.Now() - - // Ping timer - case <-pingTimer.C: - p.writeMessage(wire.NewMessage(wire.MsgPingTy, "")) - p.pingStartTime = time.Now() - - // Service timer takes care of peer broadcasting, transaction - // posting or block posting - case <-serviceTimer.C: - p.QueueMessage(wire.NewMessage(wire.MsgGetPeersTy, "")) - - case <-p.quit: - // Break out of the for loop if a quit message is posted - break out - } - } - -clean: - // This loop is for draining the output queue and anybody waiting for us - for { - select { - case <-p.outputQueue: - // TODO - default: - break clean - } - } -} - -func formatMessage(msg *wire.Msg) (ret string) { - ret = fmt.Sprintf("%v %v", msg.Type, msg.Data) - - /* - XXX Commented out because I need the log level here to determine - if i should or shouldn't generate this message - */ - /* - switch msg.Type { - case wire.MsgPeersTy: - ret += fmt.Sprintf("(%d entries)", msg.Data.Len()) - case wire.MsgBlockTy: - b1, b2 := chain.NewBlockFromRlpValue(msg.Data.Get(0)), ethchain.NewBlockFromRlpValue(msg.Data.Get(msg.Data.Len()-1)) - ret += fmt.Sprintf("(%d entries) %x - %x", msg.Data.Len(), b1.Hash()[0:4], b2.Hash()[0:4]) - case wire.MsgBlockHashesTy: - h1, h2 := msg.Data.Get(0).Bytes(), msg.Data.Get(msg.Data.Len()-1).Bytes() - ret += fmt.Sprintf("(%d entries) %x - %x", msg.Data.Len(), h1, h2) - } - */ - - return -} - -// Inbound handler. Inbound messages are received here and passed to the appropriate methods -func (p *Peer) HandleInbound() { - for atomic.LoadInt32(&p.disconnect) == 0 { - - // HMM? - time.Sleep(50 * time.Millisecond) - // Wait for a message from the peer - msgs, err := wire.ReadMessages(p.conn) - if err != nil { - peerlogger.Debugln(err) - } - for _, msg := range msgs { - peerlogger.DebugDetailf("(%v) => %v\n", p.conn.RemoteAddr(), formatMessage(msg)) - - switch msg.Type { - case wire.MsgHandshakeTy: - // Version message - p.handleHandshake(msg) - - //if p.caps.IsCap(CapPeerDiscTy) { - p.QueueMessage(wire.NewMessage(wire.MsgGetPeersTy, "")) - //} - - case wire.MsgDiscTy: - p.Stop() - peerlogger.Infoln("Disconnect peer: ", DiscReason(msg.Data.Get(0).Uint())) - case wire.MsgPingTy: - // Respond back with pong - p.QueueMessage(wire.NewMessage(wire.MsgPongTy, "")) - case wire.MsgPongTy: - // If we received a pong back from a peer we set the - // last pong so the peer handler knows this peer is still - // active. - p.lastPong = time.Now().Unix() - p.pingTime = time.Since(p.pingStartTime) - case wire.MsgTxTy: - // If the message was a transaction queue the transaction - // in the TxPool where it will undergo validation and - // processing when a new block is found - for i := 0; i < msg.Data.Len(); i++ { - tx := types.NewTransactionFromValue(msg.Data.Get(i)) - err := p.ethereum.TxPool().Add(tx) - if err != nil { - peerlogger.Infoln(err) - } else { - peerlogger.Infof("tx OK (%x)\n", tx.Hash()[0:4]) - } - } - case wire.MsgGetPeersTy: - // Peer asked for list of connected peers - //p.pushPeers() - case wire.MsgPeersTy: - // Received a list of peers (probably because MsgGetPeersTy was send) - data := msg.Data - // Create new list of possible peers for the ethereum to process - peers := make([]string, data.Len()) - // Parse each possible peer - for i := 0; i < data.Len(); i++ { - value := data.Get(i) - peers[i] = unpackAddr(value.Get(0), value.Get(1).Uint()) - } - - // Connect to the list of peers - p.ethereum.ProcessPeerList(peers) - - case wire.MsgStatusTy: - // Handle peer's status msg - p.handleStatus(msg) - } - - // TMP - if p.statusKnown { - switch msg.Type { - - case wire.MsgGetBlockHashesTy: - if msg.Data.Len() < 2 { - peerlogger.Debugln("err: argument length invalid ", msg.Data.Len()) - } - - hash := msg.Data.Get(0).Bytes() - amount := msg.Data.Get(1).Uint() - - hashes := p.ethereum.ChainManager().GetChainHashesFromHash(hash, amount) - - p.QueueMessage(wire.NewMessage(wire.MsgBlockHashesTy, ethutil.ByteSliceToInterface(hashes))) - - case wire.MsgGetBlocksTy: - // Limit to max 300 blocks - max := int(math.Min(float64(msg.Data.Len()), 300.0)) - var blocks []interface{} - - for i := 0; i < max; i++ { - hash := msg.Data.Get(i).Bytes() - block := p.ethereum.ChainManager().GetBlock(hash) - if block != nil { - blocks = append(blocks, block.Value().Raw()) - } - } - - p.QueueMessage(wire.NewMessage(wire.MsgBlockTy, blocks)) - - case wire.MsgBlockHashesTy: - p.catchingUp = true - - blockPool := p.ethereum.blockPool - - foundCommonHash := false - p.lastHashAt = time.Now() - - it := msg.Data.NewIterator() - for it.Next() { - hash := it.Value().Bytes() - p.lastReceivedHash = hash - - if blockPool.HasCommonHash(hash) { - foundCommonHash = true - - break - } - - blockPool.AddHash(hash, p) - } - - if !foundCommonHash { - p.FetchHashes() - } else { - peerlogger.Infof("Found common hash (%x...)\n", p.lastReceivedHash[0:4]) - p.doneFetchingHashes = true - } - - case wire.MsgBlockTy: - p.catchingUp = true - - blockPool := p.ethereum.blockPool - - it := msg.Data.NewIterator() - for it.Next() { - block := types.NewBlockFromRlpValue(it.Value()) - blockPool.Add(block, p) - - p.lastBlockReceived = time.Now() - } - case wire.MsgNewBlockTy: - var ( - blockPool = p.ethereum.blockPool - block = types.NewBlockFromRlpValue(msg.Data.Get(0)) - td = msg.Data.Get(1).BigInt() - ) - - if td.Cmp(blockPool.td) > 0 { - p.ethereum.blockPool.AddNew(block, p) - } - } - - } - } - } - - p.Stop() -} - -func (self *Peer) FetchBlocks(hashes [][]byte) { - if len(hashes) > 0 { - peerlogger.Debugf("Fetching blocks (%d)\n", len(hashes)) - - self.QueueMessage(wire.NewMessage(wire.MsgGetBlocksTy, ethutil.ByteSliceToInterface(hashes))) - } -} - -func (self *Peer) FetchHashes() bool { - blockPool := self.ethereum.blockPool - - return blockPool.FetchHashes(self) -} - -func (self *Peer) FetchingHashes() bool { - return !self.doneFetchingHashes -} - -// General update method -func (self *Peer) update() { - serviceTimer := time.NewTicker(100 * time.Millisecond) - -out: - for { - select { - case <-serviceTimer.C: - if self.IsCap("eth") { - var ( - sinceBlock = time.Since(self.lastBlockReceived) - ) - - if sinceBlock > 5*time.Second { - self.catchingUp = false - } - } - case <-self.quit: - break out - } - } - - serviceTimer.Stop() -} - -func (p *Peer) Start() { - peerHost, peerPort, _ := net.SplitHostPort(p.conn.LocalAddr().String()) - servHost, servPort, _ := net.SplitHostPort(p.conn.RemoteAddr().String()) - - if p.inbound { - p.host, p.port = packAddr(peerHost, peerPort) - } else { - p.host, p.port = packAddr(servHost, servPort) - } - - err := p.pushHandshake() - if err != nil { - peerlogger.Debugln("Peer can't send outbound version ack", err) - - p.Stop() - - return - } - - go p.HandleOutbound() - // Run the inbound handler in a new goroutine - go p.HandleInbound() - // Run the general update handler - go p.update() - - // Wait a few seconds for startup and then ask for an initial ping - time.Sleep(2 * time.Second) - p.writeMessage(wire.NewMessage(wire.MsgPingTy, "")) - p.pingStartTime = time.Now() - -} - -func (p *Peer) Stop() { - p.StopWithReason(DiscRequested) -} - -func (p *Peer) StopWithReason(reason DiscReason) { - if atomic.AddInt32(&p.disconnect, 1) != 1 { - return - } - - // Pre-emptively remove the peer; don't wait for reaping. We already know it's dead if we are here - p.ethereum.RemovePeer(p) - - close(p.quit) - if atomic.LoadInt32(&p.connected) != 0 { - p.writeMessage(wire.NewMessage(wire.MsgDiscTy, reason)) - p.conn.Close() - } -} - -func (p *Peer) peersMessage() *wire.Msg { - outPeers := make([]interface{}, len(p.ethereum.InOutPeers())) - // Serialise each peer - for i, peer := range p.ethereum.InOutPeers() { - // Don't return localhost as valid peer - if !net.ParseIP(peer.conn.RemoteAddr().String()).IsLoopback() { - outPeers[i] = peer.RlpData() - } - } - - // Return the message to the peer with the known list of connected clients - return wire.NewMessage(wire.MsgPeersTy, outPeers) -} - -// Pushes the list of outbound peers to the client when requested -func (p *Peer) pushPeers() { - p.QueueMessage(p.peersMessage()) -} - -func (self *Peer) pushStatus() { - msg := wire.NewMessage(wire.MsgStatusTy, []interface{}{ - uint32(ProtocolVersion), - uint32(NetVersion), - self.ethereum.ChainManager().TD, - self.ethereum.ChainManager().CurrentBlock.Hash(), - self.ethereum.ChainManager().Genesis().Hash(), - }) - - self.QueueMessage(msg) -} - -func (self *Peer) handleStatus(msg *wire.Msg) { - c := msg.Data - - var ( - //protoVersion = c.Get(0).Uint() - netVersion = c.Get(1).Uint() - td = c.Get(2).BigInt() - bestHash = c.Get(3).Bytes() - genesis = c.Get(4).Bytes() - ) - - if bytes.Compare(self.ethereum.ChainManager().Genesis().Hash(), genesis) != 0 { - loggerger.Warnf("Invalid genisis hash %x. Disabling [eth]\n", genesis) - return - } - - if netVersion != NetVersion { - loggerger.Warnf("Invalid network version %d. Disabling [eth]\n", netVersion) - return - } - - /* - if protoVersion != ProtocolVersion { - loggerger.Warnf("Invalid protocol version %d. Disabling [eth]\n", protoVersion) - return - } - */ - - // Get the td and last hash - self.td = td - self.bestHash = bestHash - self.lastReceivedHash = bestHash - - self.statusKnown = true - - // Compare the total TD with the blockchain TD. If remote is higher - // fetch hashes from highest TD node. - self.FetchHashes() - - loggerger.Infof("Peer is [eth] capable. (TD = %v ~ %x)", self.td, self.bestHash) - -} - -func (p *Peer) pushHandshake() error { - pubkey := p.ethereum.KeyManager().PublicKey() - msg := wire.NewMessage(wire.MsgHandshakeTy, []interface{}{ - P2PVersion, []byte(p.version), []interface{}{[]interface{}{"eth", ProtocolVersion}}, p.port, pubkey[1:], - }) - - p.QueueMessage(msg) - - return nil -} - -func (p *Peer) handleHandshake(msg *wire.Msg) { - c := msg.Data - - var ( - p2pVersion = c.Get(0).Uint() - clientId = c.Get(1).Str() - caps = c.Get(2) - port = c.Get(3).Uint() - pub = c.Get(4).Bytes() - ) - - // Check correctness of p2p protocol version - if p2pVersion != P2PVersion { - peerlogger.Debugf("Invalid P2P version. Require protocol %d, received %d\n", P2PVersion, p2pVersion) - p.Stop() - return - } - - // Handle the pub key (validation, uniqueness) - if len(pub) == 0 { - peerlogger.Warnln("Pubkey required, not supplied in handshake.") - p.Stop() - return - } - - // Self connect detection - pubkey := p.ethereum.KeyManager().PublicKey() - if bytes.Compare(pubkey[1:], pub) == 0 { - p.Stop() - - return - } - - // Check for blacklisting - for _, pk := range p.ethereum.blacklist { - if bytes.Compare(pk, pub) == 0 { - peerlogger.Debugf("Blacklisted peer tried to connect (%x...)\n", pubkey[0:4]) - p.StopWithReason(DiscBadPeer) - - return - } - } - - usedPub := 0 - // This peer is already added to the peerlist so we expect to find a double pubkey at least once - eachPeer(p.ethereum.Peers(), func(peer *Peer, e *list.Element) { - if bytes.Compare(pub, peer.pubkey) == 0 { - usedPub++ - } - }) - - if usedPub > 0 { - peerlogger.Debugf("Pubkey %x found more then once. Already connected to client.", p.pubkey) - p.Stop() - return - } - p.pubkey = pub - - // If this is an inbound connection send an ack back - if p.inbound { - p.port = uint16(port) - } - - p.SetVersion(clientId) - - p.versionKnown = true - - p.ethereum.PushPeer(p) - p.ethereum.eventMux.Post(PeerListEvent{p.ethereum.Peers()}) - - p.protocolCaps = caps - - it := caps.NewIterator() - var capsStrs []string - for it.Next() { - cap := it.Value().Get(0).Str() - ver := it.Value().Get(1).Uint() - switch cap { - case "eth": - if ver != ProtocolVersion { - loggerger.Warnf("Invalid protocol version %d. Disabling [eth]\n", ver) - continue - } - p.pushStatus() - } - - capsStrs = append(capsStrs, fmt.Sprintf("%s/%d", cap, ver)) - } - - peerlogger.Infof("Added peer (%s) %d / %d (%v)\n", p.conn.RemoteAddr(), p.ethereum.Peers().Len(), p.ethereum.MaxPeers, capsStrs) - - peerlogger.Debugln(p) -} - -func (self *Peer) IsCap(cap string) bool { - capsIt := self.protocolCaps.NewIterator() - for capsIt.Next() { - if capsIt.Value().Str() == cap { - return true - } - } - - return false -} - -func (self *Peer) Caps() *ethutil.Value { - return self.protocolCaps -} - -func (p *Peer) String() string { - var strBoundType string - if p.inbound { - strBoundType = "inbound" - } else { - strBoundType = "outbound" - } - var strConnectType string - if atomic.LoadInt32(&p.disconnect) == 0 { - strConnectType = "connected" - } else { - strConnectType = "disconnected" - } - - return fmt.Sprintf("[%s] (%s) %v %s", strConnectType, strBoundType, p.conn.RemoteAddr(), p.version) - -} - -func (p *Peer) RlpData() []interface{} { - return []interface{}{p.host, p.port, p.pubkey} -} - -func packAddr(address, _port string) (host []byte, port uint16) { - p, _ := strconv.Atoi(_port) - port = uint16(p) - - h := net.ParseIP(address) - if ip := h.To4(); ip != nil { - host = []byte(ip) - } else { - host = []byte(h) - } - - return -} - -func unpackAddr(value *ethutil.Value, p uint64) string { - host, _ := net.IP(value.Bytes()).MarshalText() - prt := strconv.Itoa(int(p)) - - return net.JoinHostPort(string(host), prt) -} |