aboutsummaryrefslogtreecommitdiffstats
path: root/eth/handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/handler.go')
-rw-r--r--eth/handler.go48
1 files changed, 25 insertions, 23 deletions
diff --git a/eth/handler.go b/eth/handler.go
index 7e9ec593a..acc16812a 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -18,15 +18,6 @@ import (
"github.com/ethereum/go-ethereum/rlp"
)
-const (
- forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
- blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process
- notifyCheckCycle = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching
- notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
- minDesiredPeerCount = 5 // Amount of peers desired to start syncing
- blockProcAmount = 256
-)
-
func errResp(code errCode, format string, v ...interface{}) error {
return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
}
@@ -57,9 +48,11 @@ type ProtocolManager struct {
txSub event.Subscription
minedBlockSub event.Subscription
- newPeerCh chan *peer
- newHashCh chan []*blockAnnounce
- quitSync chan struct{}
+ newPeerCh chan *peer
+ newHashCh chan []*blockAnnounce
+ newBlockCh chan chan []*types.Block
+ quitSync chan struct{}
+
// wait group is used for graceful shutdowns during downloading
// and processing
wg sync.WaitGroup
@@ -77,6 +70,7 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
peers: newPeerSet(),
newPeerCh: make(chan *peer, 1),
newHashCh: make(chan []*blockAnnounce, 1),
+ newBlockCh: make(chan chan []*types.Block),
quitSync: make(chan struct{}),
}
@@ -274,21 +268,26 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
return p.sendBlocks(blocks)
case BlocksMsg:
- var blocks []*types.Block
-
+ // Decode the arrived block message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
+
+ var blocks []*types.Block
if err := msgStream.Decode(&blocks); err != nil {
glog.V(logger.Detail).Infoln("Decode error", err)
blocks = nil
}
-
- // Either deliver to the downloader or the importer
- if self.downloader.Synchronising() {
- self.downloader.DeliverBlocks(p.id, blocks)
- } else {
- for _, block := range blocks {
- if err := self.importBlock(p, block, nil); err != nil {
- return err
+ // Filter out any explicitly requested blocks (cascading select to get blocking back to peer)
+ filter := make(chan []*types.Block)
+ select {
+ case <-self.quitSync:
+ case self.newBlockCh <- filter:
+ select {
+ case <-self.quitSync:
+ case filter <- blocks:
+ select {
+ case <-self.quitSync:
+ case blocks := <-filter:
+ self.downloader.DeliverBlocks(p.id, blocks)
}
}
}
@@ -322,7 +321,10 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
}
}
if len(announces) > 0 {
- self.newHashCh <- announces
+ select {
+ case self.newHashCh <- announces:
+ case <-self.quitSync:
+ }
}
case NewBlockMsg: