diff options
author | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-01-06 20:13:16 +0800 |
---|---|---|
committer | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-01-06 20:13:16 +0800 |
commit | 3b8725e0f5622733dcb6c3ec142a83ddbb94bd6c (patch) | |
tree | cf9e8919920cfc3a3ea776bfd1d04442d05efdaf | |
parent | 117f66e82375b752cc6a9ff22aa0d398ac337bb4 (diff) | |
parent | 3caa4ad1baba3019c06733e1a80d78d9a57137bb (diff) | |
download | dexon-3b8725e0f5622733dcb6c3ec142a83ddbb94bd6c.tar dexon-3b8725e0f5622733dcb6c3ec142a83ddbb94bd6c.tar.gz dexon-3b8725e0f5622733dcb6c3ec142a83ddbb94bd6c.tar.bz2 dexon-3b8725e0f5622733dcb6c3ec142a83ddbb94bd6c.tar.lz dexon-3b8725e0f5622733dcb6c3ec142a83ddbb94bd6c.tar.xz dexon-3b8725e0f5622733dcb6c3ec142a83ddbb94bd6c.tar.zst dexon-3b8725e0f5622733dcb6c3ec142a83ddbb94bd6c.zip |
Merge pull request #239 from fjl/grab-bag
Grab bag of fixes
-rw-r--r-- | cmd/evm/main.go | 6 | ||||
-rw-r--r-- | cmd/peerserver/main.go | 34 | ||||
-rw-r--r-- | cmd/rlpdump/main.go | 3 | ||||
-rw-r--r-- | eth/protocol.go | 8 | ||||
-rw-r--r-- | eth/protocol_test.go | 4 | ||||
-rw-r--r-- | p2p/message.go | 20 | ||||
-rw-r--r-- | p2p/message_test.go | 6 | ||||
-rw-r--r-- | p2p/peer.go | 22 | ||||
-rw-r--r-- | p2p/peer_test.go | 4 | ||||
-rw-r--r-- | p2p/protocol.go | 34 | ||||
-rw-r--r-- | p2p/protocol_test.go | 68 |
11 files changed, 116 insertions, 93 deletions
diff --git a/cmd/evm/main.go b/cmd/evm/main.go index 84ab0dc27..f902c99e5 100644 --- a/cmd/evm/main.go +++ b/cmd/evm/main.go @@ -131,6 +131,12 @@ func (self *VMEnv) Value() *big.Int { return self.value } func (self *VMEnv) GasLimit() *big.Int { return big.NewInt(1000000000) } func (self *VMEnv) Depth() int { return 0 } func (self *VMEnv) SetDepth(i int) { self.depth = i } +func (self *VMEnv) GetHash(n uint64) []byte { + if self.block.Number().Cmp(big.NewInt(int64(n))) == 0 { + return self.block.Hash() + } + return nil +} func (self *VMEnv) AddLog(log state.Log) { self.state.AddLog(log) } diff --git a/cmd/peerserver/main.go b/cmd/peerserver/main.go index eb0900f8b..341c4dbb9 100644 --- a/cmd/peerserver/main.go +++ b/cmd/peerserver/main.go @@ -18,9 +18,8 @@ package main import ( "crypto/elliptic" - "fmt" + "flag" "log" - "net" "os" "github.com/ethereum/go-ethereum/crypto" @@ -28,29 +27,32 @@ import ( "github.com/ethereum/go-ethereum/p2p" ) +var ( + natType = flag.String("nat", "", "NAT traversal implementation") + pmpGateway = flag.String("gateway", "", "gateway address for NAT-PMP") + listenAddr = flag.String("addr", ":30301", "listen address") +) + func main() { + flag.Parse() + nat, err := p2p.ParseNAT(*natType, *pmpGateway) + if err != nil { + log.Fatal("invalid nat:", err) + } + logger.AddLogSystem(logger.NewStdLogSystem(os.Stdout, log.LstdFlags, logger.InfoLevel)) key, _ := crypto.GenerateKey() marshaled := elliptic.Marshal(crypto.S256(), key.PublicKey.X, key.PublicKey.Y) srv := p2p.Server{ MaxPeers: 100, - Identity: p2p.NewSimpleClientIdentity("Ethereum(G)", "0.1", "Peer Server Two", string(marshaled)), - ListenAddr: ":30301", - NAT: p2p.UPNP(), + Identity: p2p.NewSimpleClientIdentity("Ethereum(G)", "0.1", "Peer Server Two", marshaled), + ListenAddr: *listenAddr, + NAT: nat, + NoDial: true, } if err := srv.Start(); err != nil { - fmt.Println("could not start server:", err) - os.Exit(1) + log.Fatal("could not start server:", err) } - - // add seed peers - seed, err := net.ResolveTCPAddr("tcp", "poc-8.ethdev.com:30303") - if err != nil { - fmt.Println("couldn't resolve:", err) - } else { - srv.SuggestPeer(seed.IP, seed.Port, nil) - } - select {} } diff --git a/cmd/rlpdump/main.go b/cmd/rlpdump/main.go index 8f1c4a8c2..8567dcff8 100644 --- a/cmd/rlpdump/main.go +++ b/cmd/rlpdump/main.go @@ -110,8 +110,7 @@ func dump(s *rlp.Stream, depth int) error { s.List() defer s.ListEnd() if size == 0 { - fmt.Printf(ws(depth) + "[]") - return nil + fmt.Print(ws(depth) + "[]") } else { fmt.Println(ws(depth) + "[") for i := 0; ; i++ { diff --git a/eth/protocol.go b/eth/protocol.go index b67e5aaea..723ab5502 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -140,7 +140,7 @@ func (self *ethProtocol) handle() error { return self.protoError(ErrDecode, "->msg %v: %v", msg, err) } hashes := self.chainManager.GetBlockHashesFromHash(request.Hash, request.Amount) - return self.rw.EncodeMsg(BlockHashesMsg, ethutil.ByteSliceToInterface(hashes)...) + return p2p.EncodeMsg(self.rw, BlockHashesMsg, ethutil.ByteSliceToInterface(hashes)...) case BlockHashesMsg: // TODO: redo using lazy decode , this way very inefficient on known chains @@ -185,7 +185,7 @@ func (self *ethProtocol) handle() error { break } } - return self.rw.EncodeMsg(BlocksMsg, blocks...) + return p2p.EncodeMsg(self.rw, BlocksMsg, blocks...) case BlocksMsg: msgStream := rlp.NewStream(msg.Payload) @@ -298,12 +298,12 @@ func (self *ethProtocol) handleStatus() error { func (self *ethProtocol) requestBlockHashes(from []byte) error { self.peer.Debugf("fetching hashes (%d) %x...\n", blockHashesBatchSize, from[0:4]) - return self.rw.EncodeMsg(GetBlockHashesMsg, interface{}(from), uint64(blockHashesBatchSize)) + return p2p.EncodeMsg(self.rw, GetBlockHashesMsg, interface{}(from), uint64(blockHashesBatchSize)) } func (self *ethProtocol) requestBlocks(hashes [][]byte) error { self.peer.Debugf("fetching %v blocks", len(hashes)) - return self.rw.EncodeMsg(GetBlocksMsg, ethutil.ByteSliceToInterface(hashes)...) + return p2p.EncodeMsg(self.rw, GetBlocksMsg, ethutil.ByteSliceToInterface(hashes)...) } func (self *ethProtocol) protoError(code int, format string, params ...interface{}) (err *protocolError) { diff --git a/eth/protocol_test.go b/eth/protocol_test.go index ab2aa289f..224b59abd 100644 --- a/eth/protocol_test.go +++ b/eth/protocol_test.go @@ -41,10 +41,6 @@ func (self *testMsgReadWriter) WriteMsg(msg p2p.Msg) error { return nil } -func (self *testMsgReadWriter) EncodeMsg(code uint64, data ...interface{}) error { - return self.WriteMsg(p2p.NewMsg(code, data...)) -} - func (self *testMsgReadWriter) ReadMsg() (p2p.Msg, error) { msg, ok := <-self.in if !ok { diff --git a/p2p/message.go b/p2p/message.go index a6f62ec4c..daf2bf05c 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -71,14 +71,11 @@ type MsgReader interface { } type MsgWriter interface { - // WriteMsg sends an existing message. - // The Payload reader of the message is consumed. + // WriteMsg sends a message. It will block until the message's + // Payload has been consumed by the other end. + // // Note that messages can be sent only once. WriteMsg(Msg) error - - // EncodeMsg writes an RLP-encoded message with the given - // code and data elements. - EncodeMsg(code uint64, data ...interface{}) error } // MsgReadWriter provides reading and writing of encoded messages. @@ -87,6 +84,12 @@ type MsgReadWriter interface { MsgWriter } +// EncodeMsg writes an RLP-encoded message with the given code and +// data elements. +func EncodeMsg(w MsgWriter, code uint64, data ...interface{}) error { + return w.WriteMsg(NewMsg(code, data...)) +} + var magicToken = []byte{34, 64, 8, 145} func writeMsg(w io.Writer, msg Msg) error { @@ -209,11 +212,6 @@ func (p *MsgPipeRW) WriteMsg(msg Msg) error { return ErrPipeClosed } -// EncodeMsg is a convenient shorthand for sending an RLP-encoded message. -func (p *MsgPipeRW) EncodeMsg(code uint64, data ...interface{}) error { - return p.WriteMsg(NewMsg(code, data...)) -} - // ReadMsg returns a message sent on the other end of the pipe. func (p *MsgPipeRW) ReadMsg() (Msg, error) { if atomic.LoadInt32(p.closed) == 0 { diff --git a/p2p/message_test.go b/p2p/message_test.go index 066d2516d..5cde9abf5 100644 --- a/p2p/message_test.go +++ b/p2p/message_test.go @@ -75,8 +75,8 @@ func TestDecodeRealMsg(t *testing.T) { func ExampleMsgPipe() { rw1, rw2 := MsgPipe() go func() { - rw1.EncodeMsg(8, []byte{0, 0}) - rw1.EncodeMsg(5, []byte{1, 1}) + EncodeMsg(rw1, 8, []byte{0, 0}) + EncodeMsg(rw1, 5, []byte{1, 1}) rw1.Close() }() @@ -100,7 +100,7 @@ loop: rw1, rw2 := MsgPipe() done := make(chan struct{}) go func() { - if err := rw1.EncodeMsg(1); err == nil { + if err := EncodeMsg(rw1, 1); err == nil { t.Error("EncodeMsg returned nil error") } else if err != ErrPipeClosed { t.Error("EncodeMsg returned wrong error: got %v, want %v", err, ErrPipeClosed) diff --git a/p2p/peer.go b/p2p/peer.go index 0d7eec9f4..2380a3285 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -460,25 +460,3 @@ func (r *eofSignal) Read(buf []byte) (int, error) { } return n, err } - -func (peer *Peer) PeerList() []interface{} { - peers := peer.otherPeers() - ds := make([]interface{}, 0, len(peers)) - for _, p := range peers { - p.infolock.Lock() - addr := p.listenAddr - p.infolock.Unlock() - // filter out this peer and peers that are not listening or - // have not completed the handshake. - // TODO: track previously sent peers and exclude them as well. - if p == peer || addr == nil { - continue - } - ds = append(ds, addr) - } - ourAddr := peer.ourListenAddr - if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() { - ds = append(ds, ourAddr) - } - return ds -} diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 5b9e9e784..4ee88f112 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -126,10 +126,10 @@ func TestPeerProtoEncodeMsg(t *testing.T) { Name: "a", Length: 2, Run: func(peer *Peer, rw MsgReadWriter) error { - if err := rw.EncodeMsg(2); err == nil { + if err := EncodeMsg(rw, 2); err == nil { t.Error("expected error for out-of-range msg code, got nil") } - if err := rw.EncodeMsg(1, "foo", "bar"); err != nil { + if err := EncodeMsg(rw, 1, "foo", "bar"); err != nil { t.Errorf("write error: %v", err) } return nil diff --git a/p2p/protocol.go b/p2p/protocol.go index dd8cbc4ec..1d121a885 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -119,14 +119,14 @@ func (bp *baseProtocol) loop(quit <-chan error) error { getPeersTick := time.NewTicker(10 * time.Second) defer getPeersTick.Stop() - err := bp.rw.EncodeMsg(getPeersMsg) + err := EncodeMsg(bp.rw, getPeersMsg) for err == nil { select { case err = <-quit: return err case <-getPeersTick.C: - err = bp.rw.EncodeMsg(getPeersMsg) + err = EncodeMsg(bp.rw, getPeersMsg) case event := <-activity.Chan(): ping.Reset(pingTimeout) lastActive = event.(time.Time) @@ -134,7 +134,7 @@ func (bp *baseProtocol) loop(quit <-chan error) error { if lastActive.Add(pingTimeout * 2).Before(t) { err = newPeerError(errPingTimeout, "") } else if lastActive.Add(pingTimeout).Before(t) { - err = bp.rw.EncodeMsg(pingMsg) + err = EncodeMsg(bp.rw, pingMsg) } } } @@ -164,12 +164,12 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error { return discRequestedError(reason[0]) case pingMsg: - return bp.rw.EncodeMsg(pongMsg) + return EncodeMsg(bp.rw, pongMsg) case pongMsg: case getPeersMsg: - peers := bp.peer.PeerList() + peers := bp.peerList() // this is dangerous. the spec says that we should _delay_ // sending the response if no new information is available. // this means that would need to send a response later when @@ -177,7 +177,7 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error { // // TODO: add event mechanism to notify baseProtocol for new peers if len(peers) > 0 { - return bp.rw.EncodeMsg(peersMsg, peers...) + return EncodeMsg(bp.rw, peersMsg, peers...) } case peersMsg: @@ -264,3 +264,25 @@ func (bp *baseProtocol) handshakeMsg() Msg { bp.peer.ourID.Pubkey()[1:], ) } + +func (bp *baseProtocol) peerList() []interface{} { + peers := bp.peer.otherPeers() + ds := make([]interface{}, 0, len(peers)) + for _, p := range peers { + p.infolock.Lock() + addr := p.listenAddr + p.infolock.Unlock() + // filter out this peer and peers that are not listening or + // have not completed the handshake. + // TODO: track previously sent peers and exclude them as well. + if p == bp.peer || addr == nil { + continue + } + ds = append(ds, addr) + } + ourAddr := bp.peer.ourListenAddr + if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() { + ds = append(ds, ourAddr) + } + return ds +} diff --git a/p2p/protocol_test.go b/p2p/protocol_test.go index ce25b3e1b..b1d10ac53 100644 --- a/p2p/protocol_test.go +++ b/p2p/protocol_test.go @@ -4,6 +4,7 @@ import ( "fmt" "net" "reflect" + "sync" "testing" "github.com/ethereum/go-ethereum/crypto" @@ -36,50 +37,71 @@ func newTestPeer() (peer *Peer) { } func TestBaseProtocolPeers(t *testing.T) { - cannedPeerList := []*peerAddr{ + peerList := []*peerAddr{ {IP: net.ParseIP("1.2.3.4"), Port: 2222, Pubkey: []byte{}}, {IP: net.ParseIP("5.6.7.8"), Port: 3333, Pubkey: []byte{}}, } - var ownAddr *peerAddr = &peerAddr{IP: net.ParseIP("1.3.5.7"), Port: 1111, Pubkey: []byte{}} + listenAddr := &peerAddr{IP: net.ParseIP("1.3.5.7"), Port: 1111, Pubkey: []byte{}} rw1, rw2 := MsgPipe() + defer rw1.Close() + wg := new(sync.WaitGroup) + // run matcher, close pipe when addresses have arrived - addrChan := make(chan *peerAddr, len(cannedPeerList)) + numPeers := len(peerList) + 1 + addrChan := make(chan *peerAddr) + wg.Add(1) go func() { - for _, want := range cannedPeerList { - got := <-addrChan - t.Logf("got peer: %+v", got) + i := 0 + for got := range addrChan { + var want *peerAddr + switch { + case i < len(peerList): + want = peerList[i] + case i == len(peerList): + want = listenAddr // listenAddr should be the last thing sent + } + t.Logf("got peer %d/%d: %v", i+1, numPeers, got) if !reflect.DeepEqual(want, got) { - t.Errorf("mismatch: got %#v, want %#v", got, want) + t.Errorf("mismatch: got %+v, want %+v", got, want) + } + i++ + if i == numPeers { + break } } - close(addrChan) - var own []*peerAddr - var got *peerAddr - for got = range addrChan { - own = append(own, got) - } - if len(own) != 1 || !reflect.DeepEqual(ownAddr, own[0]) { - t.Errorf("mismatch: peers own address is incorrectly or not given, got %v, want %#v", ownAddr) + if i != numPeers { + t.Errorf("wrong number of peers received: got %d, want %d", i, numPeers) } - rw2.Close() + rw1.Close() + wg.Done() }() - // run first peer + + // run first peer (in background) peer1 := newTestPeer() - peer1.ourListenAddr = ownAddr + peer1.ourListenAddr = listenAddr peer1.otherPeers = func() []*Peer { - pl := make([]*Peer, len(cannedPeerList)) - for i, addr := range cannedPeerList { + pl := make([]*Peer, len(peerList)) + for i, addr := range peerList { pl[i] = &Peer{listenAddr: addr} } return pl } - go runBaseProtocol(peer1, rw1) + wg.Add(1) + go func() { + runBaseProtocol(peer1, rw1) + wg.Done() + }() + // run second peer peer2 := newTestPeer() peer2.newPeerAddr = addrChan // feed peer suggestions into matcher if err := runBaseProtocol(peer2, rw2); err != ErrPipeClosed { t.Errorf("peer2 terminated with unexpected error: %v", err) } + + // terminate matcher + close(addrChan) + wg.Wait() } func TestBaseProtocolDisconnect(t *testing.T) { @@ -93,7 +115,7 @@ func TestBaseProtocolDisconnect(t *testing.T) { if err := expectMsg(rw2, handshakeMsg); err != nil { t.Error(err) } - err := rw2.EncodeMsg(handshakeMsg, + err := EncodeMsg(rw2, handshakeMsg, baseProtocolVersion, "", []interface{}{}, @@ -106,7 +128,7 @@ func TestBaseProtocolDisconnect(t *testing.T) { if err := expectMsg(rw2, getPeersMsg); err != nil { t.Error(err) } - if err := rw2.EncodeMsg(discMsg, DiscQuitting); err != nil { + if err := EncodeMsg(rw2, discMsg, DiscQuitting); err != nil { t.Error(err) } |