diff options
Diffstat (limited to 'eth')
-rw-r--r-- | eth/backend.go | 6 | ||||
-rw-r--r-- | eth/downloader/downloader.go | 6 | ||||
-rw-r--r-- | eth/filters/filter_system.go | 88 | ||||
-rw-r--r-- | eth/sync.go | 7 |
4 files changed, 44 insertions, 63 deletions
diff --git a/eth/backend.go b/eth/backend.go index 4bd0eb371..0683705df 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -92,7 +92,6 @@ type Config struct { Name string NetworkId int - GenesisNonce int GenesisFile string GenesisBlock *types.Block // used by block tests FastSync bool @@ -105,7 +104,6 @@ type Config struct { DataDir string LogFile string Verbosity int - LogJSON string VmDebug bool NatSpec bool DocRoot string @@ -274,11 +272,7 @@ type Ethereum struct { } func New(config *Config) (*Ethereum, error) { - // Bootstrap database logger.New(config.DataDir, config.LogFile, config.Verbosity) - if len(config.LogJSON) > 0 { - logger.NewJSONsystem(config.DataDir, config.LogJSON) - } // Let the database take 3/4 of the max open files (TODO figure out a way to get the actual limit of the open files) const dbCount = 3 diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 4bcbd8557..153427ee4 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -248,10 +248,11 @@ func (d *Downloader) UnregisterPeer(id string) error { // Synchronise tries to sync up our local block chain with a remote peer, both // adding various sanity checks as well as wrapping it with various log entries. -func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) { +func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error { glog.V(logger.Detail).Infof("Attempting synchronisation: %v, head [%x…], TD %v", id, head[:4], td) - switch err := d.synchronise(id, head, td, mode); err { + err := d.synchronise(id, head, td, mode) + switch err { case nil: glog.V(logger.Detail).Infof("Synchronisation completed") @@ -268,6 +269,7 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode default: glog.V(logger.Warn).Infof("Synchronisation failed: %v", err) } + return err } // synchronise will select the peer and use it for synchronising. If an empty string is given diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index ae6093525..df3ce90c6 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -31,30 +31,32 @@ import ( // block, transaction and log events. The Filtering system can be used to listen // for specific LOG events fired by the EVM (Ethereum Virtual Machine). type FilterSystem struct { - eventMux *event.TypeMux - filterMu sync.RWMutex filterId int filters map[int]*Filter created map[int]time.Time - - quit chan struct{} + sub event.Subscription } // NewFilterSystem returns a newly allocated filter manager func NewFilterSystem(mux *event.TypeMux) *FilterSystem { fs := &FilterSystem{ - eventMux: mux, - filters: make(map[int]*Filter), - created: make(map[int]time.Time), + filters: make(map[int]*Filter), + created: make(map[int]time.Time), } + fs.sub = mux.Subscribe( + //core.PendingBlockEvent{}, + core.ChainEvent{}, + core.TxPreEvent{}, + vm.Logs(nil), + ) go fs.filterLoop() return fs } // Stop quits the filter loop required for polling events func (fs *FilterSystem) Stop() { - close(fs.quit) + fs.sub.Unsubscribe() } // Add adds a filter to the filter manager @@ -89,57 +91,37 @@ func (fs *FilterSystem) Get(id int) *Filter { // filterLoop waits for specific events from ethereum and fires their handlers // when the filter matches the requirements. func (fs *FilterSystem) filterLoop() { - // Subscribe to events - eventCh := fs.eventMux.Subscribe( - //core.PendingBlockEvent{}, - core.ChainEvent{}, - core.TxPreEvent{}, - vm.Logs(nil), - ).Chan() - -out: - for { - select { - case <-fs.quit: - break out - case event, ok := <-eventCh: - if !ok { - // Event subscription closed, set the channel to nil to stop spinning - eventCh = nil - continue - } - // A real event arrived, notify the registered filters - switch ev := event.Data.(type) { - case core.ChainEvent: - fs.filterMu.RLock() - for id, filter := range fs.filters { - if filter.BlockCallback != nil && fs.created[id].Before(event.Time) { - filter.BlockCallback(ev.Block, ev.Logs) - } + for event := range fs.sub.Chan() { + switch ev := event.Data.(type) { + case core.ChainEvent: + fs.filterMu.RLock() + for id, filter := range fs.filters { + if filter.BlockCallback != nil && fs.created[id].Before(event.Time) { + filter.BlockCallback(ev.Block, ev.Logs) } - fs.filterMu.RUnlock() + } + fs.filterMu.RUnlock() - case core.TxPreEvent: - fs.filterMu.RLock() - for id, filter := range fs.filters { - if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) { - filter.TransactionCallback(ev.Tx) - } + case core.TxPreEvent: + fs.filterMu.RLock() + for id, filter := range fs.filters { + if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) { + filter.TransactionCallback(ev.Tx) } - fs.filterMu.RUnlock() - - case vm.Logs: - fs.filterMu.RLock() - for id, filter := range fs.filters { - if filter.LogsCallback != nil && fs.created[id].Before(event.Time) { - msgs := filter.FilterLogs(ev) - if len(msgs) > 0 { - filter.LogsCallback(msgs) - } + } + fs.filterMu.RUnlock() + + case vm.Logs: + fs.filterMu.RLock() + for id, filter := range fs.filters { + if filter.LogsCallback != nil && fs.created[id].Before(event.Time) { + msgs := filter.FilterLogs(ev) + if len(msgs) > 0 { + filter.LogsCallback(msgs) } } - fs.filterMu.RUnlock() } + fs.filterMu.RUnlock() } } } diff --git a/eth/sync.go b/eth/sync.go index b69a24556..bbf2abc04 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -170,13 +170,16 @@ func (pm *ProtocolManager) synchronise(peer *peer) { if pm.fastSync { mode = downloader.FastSync } - pm.downloader.Synchronise(peer.id, peer.Head(), peer.Td(), mode) - + if err := pm.downloader.Synchronise(peer.id, peer.Head(), peer.Td(), mode); err != nil { + return + } // If fast sync was enabled, and we synced up, disable it if pm.fastSync { + // Wait until all pending imports finish processing for pm.downloader.Synchronising() { time.Sleep(100 * time.Millisecond) } + // Disable fast sync if we indeed have something in our chain if pm.blockchain.CurrentBlock().NumberU64() > 0 { glog.V(logger.Info).Infof("fast sync complete, auto disabling") pm.fastSync = false |