diff options
-rw-r--r-- | ethchain/filter.go | 3 | ||||
-rw-r--r-- | ethereum.go | 59 | ||||
-rw-r--r-- | ethpipe/js_pipe.go | 97 | ||||
-rw-r--r-- | ethutil/list.go | 4 | ||||
-rw-r--r-- | ethutil/value.go | 10 | ||||
-rw-r--r-- | ethutil/value_test.go | 6 | ||||
-rw-r--r-- | ethvm/vm.go | 91 | ||||
-rw-r--r-- | ethwire/messaging.go | 1 | ||||
-rw-r--r-- | peer.go | 2 |
9 files changed, 82 insertions, 191 deletions
diff --git a/ethchain/filter.go b/ethchain/filter.go index 5ed9af977..d9f1796f4 100644 --- a/ethchain/filter.go +++ b/ethchain/filter.go @@ -23,6 +23,9 @@ type Filter struct { max int altered []data + + BlockCallback func(*Block) + MessageCallback func(ethstate.Messages) } // Create a new filter which uses a bloom filter on blocks to figure out whether a particular block diff --git a/ethereum.go b/ethereum.go index 4c5e13b6d..fdfb59b09 100644 --- a/ethereum.go +++ b/ethereum.go @@ -16,6 +16,7 @@ import ( "github.com/ethereum/eth-go/ethlog" "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethrpc" + "github.com/ethereum/eth-go/ethstate" "github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethwire" ) @@ -87,6 +88,8 @@ type Ethereum struct { clientIdentity ethwire.ClientIdentity isUpToDate bool + + filters map[int]*ethchain.Filter } func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager *ethcrypto.KeyManager, caps Caps, usePnp bool) (*Ethereum, error) { @@ -116,6 +119,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager keyManager: keyManager, clientIdentity: clientIdentity, isUpToDate: true, + filters: make(map[int]*ethchain.Filter), } ethereum.reactor = ethreact.New() @@ -386,6 +390,7 @@ func (s *Ethereum) Start(seed bool) { // Start the reaping processes go s.ReapDeadPeerHandler() go s.update() + go s.filterLoop() if seed { s.Seed() @@ -536,6 +541,60 @@ out: } } +var filterId = 0 + +func (self *Ethereum) InstallFilter(object map[string]interface{}) (*ethchain.Filter, int) { + defer func() { filterId++ }() + + filter := ethchain.NewFilterFromMap(object, self) + self.filters[filterId] = filter + + return filter, filterId +} + +func (self *Ethereum) UninstallFilter(id int) { + delete(self.filters, id) +} + +func (self *Ethereum) GetFilter(id int) *ethchain.Filter { + return self.filters[id] +} + +func (self *Ethereum) filterLoop() { + blockChan := make(chan ethreact.Event, 5) + messageChan := make(chan ethreact.Event, 5) + // Subscribe to events + reactor := self.Reactor() + reactor.Subscribe("newBlock", blockChan) + reactor.Subscribe("messages", messageChan) +out: + for { + select { + case <-self.quit: + break out + case block := <-blockChan: + if block, ok := block.Resource.(*ethchain.Block); ok { + for _, filter := range self.filters { + if filter.BlockCallback != nil { + filter.BlockCallback(block) + } + } + } + case msg := <-messageChan: + if messages, ok := msg.Resource.(ethstate.Messages); ok { + for _, filter := range self.filters { + if filter.MessageCallback != nil { + msgs := filter.FilterMessages(messages) + if len(msgs) > 0 { + filter.MessageCallback(msgs) + } + } + } + } + } + } +} + func bootstrapDb(db ethutil.Database) { d, _ := db.Get([]byte("ProtocolVersion")) protov := ethutil.NewValue(d).Uint() diff --git a/ethpipe/js_pipe.go b/ethpipe/js_pipe.go index 47cac2ca2..32212b26a 100644 --- a/ethpipe/js_pipe.go +++ b/ethpipe/js_pipe.go @@ -3,12 +3,10 @@ package ethpipe import ( "bytes" "encoding/json" - "fmt" "sync/atomic" "github.com/ethereum/eth-go/ethchain" "github.com/ethereum/eth-go/ethcrypto" - "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethstate" "github.com/ethereum/eth-go/ethutil" ) @@ -240,102 +238,11 @@ func (self *JSPipe) CompileMutan(code string) string { return ethutil.Bytes2Hex(data) } -func (self *JSPipe) Watch(object map[string]interface{}) *JSFilter { - return NewJSFilterFromMap(object, self.Pipe.obj) - /*} else if str, ok := object.(string); ok { - println("str") - return NewJSFilterFromString(str, self.Pipe.obj) - */ -} - -func (self *JSPipe) Messages(object map[string]interface{}) string { - filter := self.Watch(object) - filter.Uninstall() - - return filter.Messages() - -} - -type JSFilter struct { - eth ethchain.EthManager - *ethchain.Filter - quit chan bool - - BlockCallback func(*ethchain.Block) - MessageCallback func(ethstate.Messages) -} - -func NewJSFilterFromMap(object map[string]interface{}, eth ethchain.EthManager) *JSFilter { - filter := &JSFilter{eth, ethchain.NewFilterFromMap(object, eth), make(chan bool), nil, nil} - - go filter.mainLoop() - - return filter -} - -func NewJSFilterFromString(str string, eth ethchain.EthManager) *JSFilter { - return nil -} - -func (self *JSFilter) MessagesToJson(messages ethstate.Messages) string { +func ToJSMessages(messages ethstate.Messages) *ethutil.List { var msgs []JSMessage for _, m := range messages { msgs = append(msgs, NewJSMessage(m)) } - // Return an empty array instead of "null" - if len(msgs) == 0 { - return "[]" - } - - b, err := json.Marshal(msgs) - if err != nil { - return "{\"error\":" + err.Error() + "}" - } - - return string(b) -} - -func (self *JSFilter) Messages() string { - return self.MessagesToJson(self.Find()) -} - -func (self *JSFilter) mainLoop() { - blockChan := make(chan ethreact.Event, 5) - messageChan := make(chan ethreact.Event, 5) - // Subscribe to events - reactor := self.eth.Reactor() - reactor.Subscribe("newBlock", blockChan) - reactor.Subscribe("messages", messageChan) -out: - for { - select { - case <-self.quit: - break out - case block := <-blockChan: - if block, ok := block.Resource.(*ethchain.Block); ok { - if self.BlockCallback != nil { - self.BlockCallback(block) - } - } - case msg := <-messageChan: - if messages, ok := msg.Resource.(ethstate.Messages); ok { - if self.MessageCallback != nil { - println("messages!") - msgs := self.FilterMessages(messages) - if len(msgs) > 0 { - self.MessageCallback(msgs) - } - } - } - } - } -} - -func (self *JSFilter) Changed(object interface{}) { - fmt.Printf("%T\n", object) -} - -func (self *JSFilter) Uninstall() { - self.quit <- true + return ethutil.NewList(msgs) } diff --git a/ethutil/list.go b/ethutil/list.go index 18bf04792..0aa657a14 100644 --- a/ethutil/list.go +++ b/ethutil/list.go @@ -20,6 +20,10 @@ func NewList(t interface{}) *List { return &List{list, list.Len()} } +func EmptyList() *List { + return NewList([]interface{}{}) +} + // Get N element from the embedded slice. Returns nil if OOB. func (self *List) Get(i int) interface{} { if self.list.Len() > i { diff --git a/ethutil/value.go b/ethutil/value.go index b336345ca..e8148b990 100644 --- a/ethutil/value.go +++ b/ethutil/value.go @@ -1,9 +1,11 @@ package ethutil import ( + "bytes" "fmt" "math/big" "reflect" + "strconv" ) // Data values are returned by the rlp decoder. The data values represents @@ -93,6 +95,9 @@ func (val *Value) Int() int64 { return new(big.Int).SetBytes(Val).Int64() } else if Val, ok := val.Val.(*big.Int); ok { return Val.Int64() + } else if Val, ok := val.Val.(string); ok { + n, _ := strconv.Atoi(Val) + return int64(n) } return 0 @@ -246,10 +251,7 @@ func (val *Value) Cmp(o *Value) bool { } func (self *Value) DeepCmp(o *Value) bool { - a := NewValue(self.BigInt()) - b := NewValue(o.BigInt()) - - return a.Cmp(b) + return bytes.Compare(self.Bytes(), o.Bytes()) == 0 } func (val *Value) Encode() []byte { diff --git a/ethutil/value_test.go b/ethutil/value_test.go index 710cbd887..5452a0790 100644 --- a/ethutil/value_test.go +++ b/ethutil/value_test.go @@ -2,6 +2,7 @@ package ethutil import ( "bytes" + "fmt" "math/big" "testing" ) @@ -78,3 +79,8 @@ func TestMath(t *testing.T) { t.Error("Expected 0, got", a) } } + +func TestString(t *testing.T) { + a := NewValue("10") + fmt.Println("VALUE WITH STRING:", a.Int()) +} diff --git a/ethvm/vm.go b/ethvm/vm.go index 9518540e0..fba8c4a0e 100644 --- a/ethvm/vm.go +++ b/ethvm/vm.go @@ -765,51 +765,7 @@ func (self *Vm) RunClosure(closure *Closure) (ret []byte, err error) { stack.Push(ethutil.BigD(addr)) } - /* - msg := self.env.State().Manifest().AddMessage(ðstate.Message{ - To: addr, From: closure.Address(), - Origin: self.env.Origin(), - Block: self.env.BlockHash(), Timestamp: self.env.Time(), Coinbase: self.env.Coinbase(), Number: self.env.BlockNumber(), - Value: value, - }) - - // Create a new contract - contract := self.env.State().NewStateObject(addr) - if contract.Balance.Cmp(value) >= 0 { - closure.object.SubAmount(value) - contract.AddAmount(value) - - // Set the init script - initCode := mem.Get(offset.Int64(), size.Int64()) - msg.Input = initCode - - // Transfer all remaining gas to the new - // contract so it may run the init script - gas := new(big.Int).Set(closure.Gas) - closure.UseGas(closure.Gas) - - // Create the closure - c := NewClosure(msg, closure, contract, initCode, gas, closure.Price) - // Call the closure and set the return value as - // main script. - contract.Code, _, err = c.Call(self, nil) - } else { - err = fmt.Errorf("Insufficient funds to transfer value. Req %v, has %v", value, closure.object.Balance) - } - - if err != nil { - stack.Push(ethutil.BigFalse) - - // Revert the state as it was before. - self.env.State().Set(snapshot) - - self.Printf("CREATE err %v", err) - } else { - stack.Push(ethutil.BigD(addr)) - msg.Output = contract.Code - } - */ self.Endl() // Debug hook @@ -858,53 +814,6 @@ func (self *Vm) RunClosure(closure *Closure) (ret []byte, err error) { self.Dbg.SetCode(closure.Code) } - /* - msg := self.env.State().Manifest().AddMessage(ðstate.Message{ - To: addr.Bytes(), From: closure.Address(), - Input: args, - Origin: self.env.Origin(), - Block: self.env.BlockHash(), Timestamp: self.env.Time(), Coinbase: self.env.Coinbase(), Number: self.env.BlockNumber(), - Value: value, - }) - - if closure.object.Balance.Cmp(value) < 0 { - vmlogger.Debugf("Insufficient funds to transfer value. Req %v, has %v", value, closure.object.Balance) - - closure.ReturnGas(gas, nil) - - stack.Push(ethutil.BigFalse) - } else { - snapshot := self.env.State().Copy() - - stateObject := self.env.State().GetOrNewStateObject(addr.Bytes()) - - closure.object.SubAmount(value) - stateObject.AddAmount(value) - - // Create a new callable closure - c := NewClosure(msg, closure, stateObject, stateObject.Code, gas, closure.Price) - // Executer the closure and get the return value (if any) - ret, _, err := c.Call(self, args) - if err != nil { - stack.Push(ethutil.BigFalse) - - vmlogger.Debugf("Closure execution failed. %v\n", err) - - self.env.State().Set(snapshot) - } else { - stack.Push(ethutil.BigTrue) - - mem.Set(retOffset.Int64(), retSize.Int64(), ret) - } - - msg.Output = ret - - // Debug hook - if self.Dbg != nil { - self.Dbg.SetCode(closure.Code) - } - } - */ case POST: require(6) diff --git a/ethwire/messaging.go b/ethwire/messaging.go index c93c717a2..67a866f73 100644 --- a/ethwire/messaging.go +++ b/ethwire/messaging.go @@ -49,6 +49,7 @@ var msgTypeToString = map[MsgType]string{ MsgPingTy: "Ping", MsgPongTy: "Pong", MsgGetPeersTy: "Get peers", + MsgStatusTy: "Status", MsgPeersTy: "Peers", MsgTxTy: "Transactions", MsgBlockTy: "Blocks", @@ -694,7 +694,7 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) { } // Handle the pub key (validation, uniqueness) - if pub == nil || len(pub) == 0 { + if len(pub) == 0 { peerlogger.Warnln("Pubkey required, not supplied in handshake.") p.Stop() return |