aboutsummaryrefslogtreecommitdiffstats
path: root/eth
diff options
context:
space:
mode:
Diffstat (limited to 'eth')
-rw-r--r--eth/backend.go6
-rw-r--r--eth/downloader/downloader.go6
-rw-r--r--eth/filters/filter_system.go88
-rw-r--r--eth/sync.go7
4 files changed, 44 insertions, 63 deletions
diff --git a/eth/backend.go b/eth/backend.go
index 4bd0eb371..0683705df 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -92,7 +92,6 @@ type Config struct {
Name string
NetworkId int
- GenesisNonce int
GenesisFile string
GenesisBlock *types.Block // used by block tests
FastSync bool
@@ -105,7 +104,6 @@ type Config struct {
DataDir string
LogFile string
Verbosity int
- LogJSON string
VmDebug bool
NatSpec bool
DocRoot string
@@ -274,11 +272,7 @@ type Ethereum struct {
}
func New(config *Config) (*Ethereum, error) {
- // Bootstrap database
logger.New(config.DataDir, config.LogFile, config.Verbosity)
- if len(config.LogJSON) > 0 {
- logger.NewJSONsystem(config.DataDir, config.LogJSON)
- }
// Let the database take 3/4 of the max open files (TODO figure out a way to get the actual limit of the open files)
const dbCount = 3
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 4bcbd8557..153427ee4 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -248,10 +248,11 @@ func (d *Downloader) UnregisterPeer(id string) error {
// Synchronise tries to sync up our local block chain with a remote peer, both
// adding various sanity checks as well as wrapping it with various log entries.
-func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) {
+func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
glog.V(logger.Detail).Infof("Attempting synchronisation: %v, head [%x…], TD %v", id, head[:4], td)
- switch err := d.synchronise(id, head, td, mode); err {
+ err := d.synchronise(id, head, td, mode)
+ switch err {
case nil:
glog.V(logger.Detail).Infof("Synchronisation completed")
@@ -268,6 +269,7 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
default:
glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
}
+ return err
}
// synchronise will select the peer and use it for synchronising. If an empty string is given
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index ae6093525..df3ce90c6 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -31,30 +31,32 @@ import (
// block, transaction and log events. The Filtering system can be used to listen
// for specific LOG events fired by the EVM (Ethereum Virtual Machine).
type FilterSystem struct {
- eventMux *event.TypeMux
-
filterMu sync.RWMutex
filterId int
filters map[int]*Filter
created map[int]time.Time
-
- quit chan struct{}
+ sub event.Subscription
}
// NewFilterSystem returns a newly allocated filter manager
func NewFilterSystem(mux *event.TypeMux) *FilterSystem {
fs := &FilterSystem{
- eventMux: mux,
- filters: make(map[int]*Filter),
- created: make(map[int]time.Time),
+ filters: make(map[int]*Filter),
+ created: make(map[int]time.Time),
}
+ fs.sub = mux.Subscribe(
+ //core.PendingBlockEvent{},
+ core.ChainEvent{},
+ core.TxPreEvent{},
+ vm.Logs(nil),
+ )
go fs.filterLoop()
return fs
}
// Stop quits the filter loop required for polling events
func (fs *FilterSystem) Stop() {
- close(fs.quit)
+ fs.sub.Unsubscribe()
}
// Add adds a filter to the filter manager
@@ -89,57 +91,37 @@ func (fs *FilterSystem) Get(id int) *Filter {
// filterLoop waits for specific events from ethereum and fires their handlers
// when the filter matches the requirements.
func (fs *FilterSystem) filterLoop() {
- // Subscribe to events
- eventCh := fs.eventMux.Subscribe(
- //core.PendingBlockEvent{},
- core.ChainEvent{},
- core.TxPreEvent{},
- vm.Logs(nil),
- ).Chan()
-
-out:
- for {
- select {
- case <-fs.quit:
- break out
- case event, ok := <-eventCh:
- if !ok {
- // Event subscription closed, set the channel to nil to stop spinning
- eventCh = nil
- continue
- }
- // A real event arrived, notify the registered filters
- switch ev := event.Data.(type) {
- case core.ChainEvent:
- fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.BlockCallback != nil && fs.created[id].Before(event.Time) {
- filter.BlockCallback(ev.Block, ev.Logs)
- }
+ for event := range fs.sub.Chan() {
+ switch ev := event.Data.(type) {
+ case core.ChainEvent:
+ fs.filterMu.RLock()
+ for id, filter := range fs.filters {
+ if filter.BlockCallback != nil && fs.created[id].Before(event.Time) {
+ filter.BlockCallback(ev.Block, ev.Logs)
}
- fs.filterMu.RUnlock()
+ }
+ fs.filterMu.RUnlock()
- case core.TxPreEvent:
- fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) {
- filter.TransactionCallback(ev.Tx)
- }
+ case core.TxPreEvent:
+ fs.filterMu.RLock()
+ for id, filter := range fs.filters {
+ if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) {
+ filter.TransactionCallback(ev.Tx)
}
- fs.filterMu.RUnlock()
-
- case vm.Logs:
- fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.LogsCallback != nil && fs.created[id].Before(event.Time) {
- msgs := filter.FilterLogs(ev)
- if len(msgs) > 0 {
- filter.LogsCallback(msgs)
- }
+ }
+ fs.filterMu.RUnlock()
+
+ case vm.Logs:
+ fs.filterMu.RLock()
+ for id, filter := range fs.filters {
+ if filter.LogsCallback != nil && fs.created[id].Before(event.Time) {
+ msgs := filter.FilterLogs(ev)
+ if len(msgs) > 0 {
+ filter.LogsCallback(msgs)
}
}
- fs.filterMu.RUnlock()
}
+ fs.filterMu.RUnlock()
}
}
}
diff --git a/eth/sync.go b/eth/sync.go
index b69a24556..bbf2abc04 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -170,13 +170,16 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
if pm.fastSync {
mode = downloader.FastSync
}
- pm.downloader.Synchronise(peer.id, peer.Head(), peer.Td(), mode)
-
+ if err := pm.downloader.Synchronise(peer.id, peer.Head(), peer.Td(), mode); err != nil {
+ return
+ }
// If fast sync was enabled, and we synced up, disable it
if pm.fastSync {
+ // Wait until all pending imports finish processing
for pm.downloader.Synchronising() {
time.Sleep(100 * time.Millisecond)
}
+ // Disable fast sync if we indeed have something in our chain
if pm.blockchain.CurrentBlock().NumberU64() > 0 {
glog.V(logger.Info).Infof("fast sync complete, auto disabling")
pm.fastSync = false