aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ethchain/filter.go3
-rw-r--r--ethereum.go59
-rw-r--r--ethpipe/js_pipe.go97
-rw-r--r--ethutil/list.go4
-rw-r--r--ethutil/value.go10
-rw-r--r--ethutil/value_test.go6
-rw-r--r--ethvm/vm.go91
-rw-r--r--ethwire/messaging.go1
-rw-r--r--peer.go2
9 files changed, 82 insertions, 191 deletions
diff --git a/ethchain/filter.go b/ethchain/filter.go
index 5ed9af977..d9f1796f4 100644
--- a/ethchain/filter.go
+++ b/ethchain/filter.go
@@ -23,6 +23,9 @@ type Filter struct {
max int
altered []data
+
+ BlockCallback func(*Block)
+ MessageCallback func(ethstate.Messages)
}
// Create a new filter which uses a bloom filter on blocks to figure out whether a particular block
diff --git a/ethereum.go b/ethereum.go
index 4c5e13b6d..fdfb59b09 100644
--- a/ethereum.go
+++ b/ethereum.go
@@ -16,6 +16,7 @@ import (
"github.com/ethereum/eth-go/ethlog"
"github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethrpc"
+ "github.com/ethereum/eth-go/ethstate"
"github.com/ethereum/eth-go/ethutil"
"github.com/ethereum/eth-go/ethwire"
)
@@ -87,6 +88,8 @@ type Ethereum struct {
clientIdentity ethwire.ClientIdentity
isUpToDate bool
+
+ filters map[int]*ethchain.Filter
}
func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager *ethcrypto.KeyManager, caps Caps, usePnp bool) (*Ethereum, error) {
@@ -116,6 +119,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager
keyManager: keyManager,
clientIdentity: clientIdentity,
isUpToDate: true,
+ filters: make(map[int]*ethchain.Filter),
}
ethereum.reactor = ethreact.New()
@@ -386,6 +390,7 @@ func (s *Ethereum) Start(seed bool) {
// Start the reaping processes
go s.ReapDeadPeerHandler()
go s.update()
+ go s.filterLoop()
if seed {
s.Seed()
@@ -536,6 +541,60 @@ out:
}
}
+var filterId = 0
+
+func (self *Ethereum) InstallFilter(object map[string]interface{}) (*ethchain.Filter, int) {
+ defer func() { filterId++ }()
+
+ filter := ethchain.NewFilterFromMap(object, self)
+ self.filters[filterId] = filter
+
+ return filter, filterId
+}
+
+func (self *Ethereum) UninstallFilter(id int) {
+ delete(self.filters, id)
+}
+
+func (self *Ethereum) GetFilter(id int) *ethchain.Filter {
+ return self.filters[id]
+}
+
+func (self *Ethereum) filterLoop() {
+ blockChan := make(chan ethreact.Event, 5)
+ messageChan := make(chan ethreact.Event, 5)
+ // Subscribe to events
+ reactor := self.Reactor()
+ reactor.Subscribe("newBlock", blockChan)
+ reactor.Subscribe("messages", messageChan)
+out:
+ for {
+ select {
+ case <-self.quit:
+ break out
+ case block := <-blockChan:
+ if block, ok := block.Resource.(*ethchain.Block); ok {
+ for _, filter := range self.filters {
+ if filter.BlockCallback != nil {
+ filter.BlockCallback(block)
+ }
+ }
+ }
+ case msg := <-messageChan:
+ if messages, ok := msg.Resource.(ethstate.Messages); ok {
+ for _, filter := range self.filters {
+ if filter.MessageCallback != nil {
+ msgs := filter.FilterMessages(messages)
+ if len(msgs) > 0 {
+ filter.MessageCallback(msgs)
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
func bootstrapDb(db ethutil.Database) {
d, _ := db.Get([]byte("ProtocolVersion"))
protov := ethutil.NewValue(d).Uint()
diff --git a/ethpipe/js_pipe.go b/ethpipe/js_pipe.go
index 47cac2ca2..32212b26a 100644
--- a/ethpipe/js_pipe.go
+++ b/ethpipe/js_pipe.go
@@ -3,12 +3,10 @@ package ethpipe
import (
"bytes"
"encoding/json"
- "fmt"
"sync/atomic"
"github.com/ethereum/eth-go/ethchain"
"github.com/ethereum/eth-go/ethcrypto"
- "github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethstate"
"github.com/ethereum/eth-go/ethutil"
)
@@ -240,102 +238,11 @@ func (self *JSPipe) CompileMutan(code string) string {
return ethutil.Bytes2Hex(data)
}
-func (self *JSPipe) Watch(object map[string]interface{}) *JSFilter {
- return NewJSFilterFromMap(object, self.Pipe.obj)
- /*} else if str, ok := object.(string); ok {
- println("str")
- return NewJSFilterFromString(str, self.Pipe.obj)
- */
-}
-
-func (self *JSPipe) Messages(object map[string]interface{}) string {
- filter := self.Watch(object)
- filter.Uninstall()
-
- return filter.Messages()
-
-}
-
-type JSFilter struct {
- eth ethchain.EthManager
- *ethchain.Filter
- quit chan bool
-
- BlockCallback func(*ethchain.Block)
- MessageCallback func(ethstate.Messages)
-}
-
-func NewJSFilterFromMap(object map[string]interface{}, eth ethchain.EthManager) *JSFilter {
- filter := &JSFilter{eth, ethchain.NewFilterFromMap(object, eth), make(chan bool), nil, nil}
-
- go filter.mainLoop()
-
- return filter
-}
-
-func NewJSFilterFromString(str string, eth ethchain.EthManager) *JSFilter {
- return nil
-}
-
-func (self *JSFilter) MessagesToJson(messages ethstate.Messages) string {
+func ToJSMessages(messages ethstate.Messages) *ethutil.List {
var msgs []JSMessage
for _, m := range messages {
msgs = append(msgs, NewJSMessage(m))
}
- // Return an empty array instead of "null"
- if len(msgs) == 0 {
- return "[]"
- }
-
- b, err := json.Marshal(msgs)
- if err != nil {
- return "{\"error\":" + err.Error() + "}"
- }
-
- return string(b)
-}
-
-func (self *JSFilter) Messages() string {
- return self.MessagesToJson(self.Find())
-}
-
-func (self *JSFilter) mainLoop() {
- blockChan := make(chan ethreact.Event, 5)
- messageChan := make(chan ethreact.Event, 5)
- // Subscribe to events
- reactor := self.eth.Reactor()
- reactor.Subscribe("newBlock", blockChan)
- reactor.Subscribe("messages", messageChan)
-out:
- for {
- select {
- case <-self.quit:
- break out
- case block := <-blockChan:
- if block, ok := block.Resource.(*ethchain.Block); ok {
- if self.BlockCallback != nil {
- self.BlockCallback(block)
- }
- }
- case msg := <-messageChan:
- if messages, ok := msg.Resource.(ethstate.Messages); ok {
- if self.MessageCallback != nil {
- println("messages!")
- msgs := self.FilterMessages(messages)
- if len(msgs) > 0 {
- self.MessageCallback(msgs)
- }
- }
- }
- }
- }
-}
-
-func (self *JSFilter) Changed(object interface{}) {
- fmt.Printf("%T\n", object)
-}
-
-func (self *JSFilter) Uninstall() {
- self.quit <- true
+ return ethutil.NewList(msgs)
}
diff --git a/ethutil/list.go b/ethutil/list.go
index 18bf04792..0aa657a14 100644
--- a/ethutil/list.go
+++ b/ethutil/list.go
@@ -20,6 +20,10 @@ func NewList(t interface{}) *List {
return &List{list, list.Len()}
}
+func EmptyList() *List {
+ return NewList([]interface{}{})
+}
+
// Get N element from the embedded slice. Returns nil if OOB.
func (self *List) Get(i int) interface{} {
if self.list.Len() > i {
diff --git a/ethutil/value.go b/ethutil/value.go
index b336345ca..e8148b990 100644
--- a/ethutil/value.go
+++ b/ethutil/value.go
@@ -1,9 +1,11 @@
package ethutil
import (
+ "bytes"
"fmt"
"math/big"
"reflect"
+ "strconv"
)
// Data values are returned by the rlp decoder. The data values represents
@@ -93,6 +95,9 @@ func (val *Value) Int() int64 {
return new(big.Int).SetBytes(Val).Int64()
} else if Val, ok := val.Val.(*big.Int); ok {
return Val.Int64()
+ } else if Val, ok := val.Val.(string); ok {
+ n, _ := strconv.Atoi(Val)
+ return int64(n)
}
return 0
@@ -246,10 +251,7 @@ func (val *Value) Cmp(o *Value) bool {
}
func (self *Value) DeepCmp(o *Value) bool {
- a := NewValue(self.BigInt())
- b := NewValue(o.BigInt())
-
- return a.Cmp(b)
+ return bytes.Compare(self.Bytes(), o.Bytes()) == 0
}
func (val *Value) Encode() []byte {
diff --git a/ethutil/value_test.go b/ethutil/value_test.go
index 710cbd887..5452a0790 100644
--- a/ethutil/value_test.go
+++ b/ethutil/value_test.go
@@ -2,6 +2,7 @@ package ethutil
import (
"bytes"
+ "fmt"
"math/big"
"testing"
)
@@ -78,3 +79,8 @@ func TestMath(t *testing.T) {
t.Error("Expected 0, got", a)
}
}
+
+func TestString(t *testing.T) {
+ a := NewValue("10")
+ fmt.Println("VALUE WITH STRING:", a.Int())
+}
diff --git a/ethvm/vm.go b/ethvm/vm.go
index 9518540e0..fba8c4a0e 100644
--- a/ethvm/vm.go
+++ b/ethvm/vm.go
@@ -765,51 +765,7 @@ func (self *Vm) RunClosure(closure *Closure) (ret []byte, err error) {
stack.Push(ethutil.BigD(addr))
}
- /*
- msg := self.env.State().Manifest().AddMessage(&ethstate.Message{
- To: addr, From: closure.Address(),
- Origin: self.env.Origin(),
- Block: self.env.BlockHash(), Timestamp: self.env.Time(), Coinbase: self.env.Coinbase(), Number: self.env.BlockNumber(),
- Value: value,
- })
-
- // Create a new contract
- contract := self.env.State().NewStateObject(addr)
- if contract.Balance.Cmp(value) >= 0 {
- closure.object.SubAmount(value)
- contract.AddAmount(value)
-
- // Set the init script
- initCode := mem.Get(offset.Int64(), size.Int64())
- msg.Input = initCode
-
- // Transfer all remaining gas to the new
- // contract so it may run the init script
- gas := new(big.Int).Set(closure.Gas)
- closure.UseGas(closure.Gas)
-
- // Create the closure
- c := NewClosure(msg, closure, contract, initCode, gas, closure.Price)
- // Call the closure and set the return value as
- // main script.
- contract.Code, _, err = c.Call(self, nil)
- } else {
- err = fmt.Errorf("Insufficient funds to transfer value. Req %v, has %v", value, closure.object.Balance)
- }
-
- if err != nil {
- stack.Push(ethutil.BigFalse)
-
- // Revert the state as it was before.
- self.env.State().Set(snapshot)
-
- self.Printf("CREATE err %v", err)
- } else {
- stack.Push(ethutil.BigD(addr))
- msg.Output = contract.Code
- }
- */
self.Endl()
// Debug hook
@@ -858,53 +814,6 @@ func (self *Vm) RunClosure(closure *Closure) (ret []byte, err error) {
self.Dbg.SetCode(closure.Code)
}
- /*
- msg := self.env.State().Manifest().AddMessage(&ethstate.Message{
- To: addr.Bytes(), From: closure.Address(),
- Input: args,
- Origin: self.env.Origin(),
- Block: self.env.BlockHash(), Timestamp: self.env.Time(), Coinbase: self.env.Coinbase(), Number: self.env.BlockNumber(),
- Value: value,
- })
-
- if closure.object.Balance.Cmp(value) < 0 {
- vmlogger.Debugf("Insufficient funds to transfer value. Req %v, has %v", value, closure.object.Balance)
-
- closure.ReturnGas(gas, nil)
-
- stack.Push(ethutil.BigFalse)
- } else {
- snapshot := self.env.State().Copy()
-
- stateObject := self.env.State().GetOrNewStateObject(addr.Bytes())
-
- closure.object.SubAmount(value)
- stateObject.AddAmount(value)
-
- // Create a new callable closure
- c := NewClosure(msg, closure, stateObject, stateObject.Code, gas, closure.Price)
- // Executer the closure and get the return value (if any)
- ret, _, err := c.Call(self, args)
- if err != nil {
- stack.Push(ethutil.BigFalse)
-
- vmlogger.Debugf("Closure execution failed. %v\n", err)
-
- self.env.State().Set(snapshot)
- } else {
- stack.Push(ethutil.BigTrue)
-
- mem.Set(retOffset.Int64(), retSize.Int64(), ret)
- }
-
- msg.Output = ret
-
- // Debug hook
- if self.Dbg != nil {
- self.Dbg.SetCode(closure.Code)
- }
- }
- */
case POST:
require(6)
diff --git a/ethwire/messaging.go b/ethwire/messaging.go
index c93c717a2..67a866f73 100644
--- a/ethwire/messaging.go
+++ b/ethwire/messaging.go
@@ -49,6 +49,7 @@ var msgTypeToString = map[MsgType]string{
MsgPingTy: "Ping",
MsgPongTy: "Pong",
MsgGetPeersTy: "Get peers",
+ MsgStatusTy: "Status",
MsgPeersTy: "Peers",
MsgTxTy: "Transactions",
MsgBlockTy: "Blocks",
diff --git a/peer.go b/peer.go
index a7259d712..5ca3ed641 100644
--- a/peer.go
+++ b/peer.go
@@ -694,7 +694,7 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) {
}
// Handle the pub key (validation, uniqueness)
- if pub == nil || len(pub) == 0 {
+ if len(pub) == 0 {
peerlogger.Warnln("Pubkey required, not supplied in handshake.")
p.Stop()
return