aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/rawdb/freezer_table.go86
1 files changed, 62 insertions, 24 deletions
diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go
index d1ece414b..1e5c7cd0b 100644
--- a/core/rawdb/freezer_table.go
+++ b/core/rawdb/freezer_table.go
@@ -20,6 +20,7 @@ import (
"encoding/binary"
"errors"
"fmt"
+ "io"
"os"
"path/filepath"
"sync"
@@ -106,6 +107,44 @@ func newTable(path string, name string, readMeter metrics.Meter, writeMeter metr
return newCustomTable(path, name, readMeter, writeMeter, 2*1000*1000*1000, disableSnappy)
}
+// openFreezerFileForAppend opens a freezer table file and seeks to the end
+func openFreezerFileForAppend(filename string) (*os.File, error) {
+ // Open the file without the O_APPEND flag
+ // because it has differing behaviour during Truncate operations
+ // on different OS's
+ file, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
+ if err != nil {
+ return nil, err
+ }
+ // Seek to end for append
+ if _, err = file.Seek(0, io.SeekEnd); err != nil {
+ return nil, err
+ }
+ return file, nil
+}
+
+// openFreezerFileForReadOnly opens a freezer table file for read only access
+func openFreezerFileForReadOnly(filename string) (*os.File, error) {
+ return os.OpenFile(filename, os.O_RDONLY, 0644)
+}
+
+// openFreezerFileTruncated opens a freezer table making sure it is truncated
+func openFreezerFileTruncated(filename string) (*os.File, error) {
+ return os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
+}
+
+// truncateFreezerFile resizes a freezer table file and seeks to the end
+func truncateFreezerFile(file *os.File, size int64) error {
+ if err := file.Truncate(size); err != nil {
+ return err
+ }
+ // Seek to end for append
+ if _, err := file.Seek(0, io.SeekEnd); err != nil {
+ return err
+ }
+ return nil
+}
+
// newCustomTable opens a freezer table, creating the data and index files if they are
// non existent. Both files are truncated to the shortest common length to ensure
// they don't go out of sync.
@@ -116,13 +155,13 @@ func newCustomTable(path string, name string, readMeter metrics.Meter, writeMete
}
var idxName string
if noCompression {
- // raw idx
+ // Raw idx
idxName = fmt.Sprintf("%s.ridx", name)
} else {
- // compressed idx
+ // Compressed idx
idxName = fmt.Sprintf("%s.cidx", name)
}
- offsets, err := os.OpenFile(filepath.Join(path, idxName), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
+ offsets, err := openFreezerFileForAppend(filepath.Join(path, idxName))
if err != nil {
return nil, err
}
@@ -163,7 +202,7 @@ func (t *freezerTable) repair() error {
}
// Ensure the index is a multiple of indexEntrySize bytes
if overflow := stat.Size() % indexEntrySize; overflow != 0 {
- t.index.Truncate(stat.Size() - overflow) // New file can't trigger this path
+ truncateFreezerFile(t.index, stat.Size()-overflow) // New file can't trigger this path
}
// Retrieve the file sizes and prepare for truncation
if stat, err = t.index.Stat(); err != nil {
@@ -188,7 +227,7 @@ func (t *freezerTable) repair() error {
t.index.ReadAt(buffer, offsetsSize-indexEntrySize)
lastIndex.unmarshalBinary(buffer)
- t.head, err = t.openFile(lastIndex.filenum, os.O_RDWR|os.O_CREATE|os.O_APPEND)
+ t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForAppend)
if err != nil {
return err
}
@@ -204,7 +243,7 @@ func (t *freezerTable) repair() error {
// Truncate the head file to the last offset pointer
if contentExp < contentSize {
t.logger.Warn("Truncating dangling head", "indexed", common.StorageSize(contentExp), "stored", common.StorageSize(contentSize))
- if err := t.head.Truncate(contentExp); err != nil {
+ if err := truncateFreezerFile(t.head, contentExp); err != nil {
return err
}
contentSize = contentExp
@@ -212,7 +251,7 @@ func (t *freezerTable) repair() error {
// Truncate the index to point within the head file
if contentExp > contentSize {
t.logger.Warn("Truncating dangling indexes", "indexed", common.StorageSize(contentExp), "stored", common.StorageSize(contentSize))
- if err := t.index.Truncate(offsetsSize - indexEntrySize); err != nil {
+ if err := truncateFreezerFile(t.index, offsetsSize-indexEntrySize); err != nil {
return err
}
offsetsSize -= indexEntrySize
@@ -221,9 +260,9 @@ func (t *freezerTable) repair() error {
newLastIndex.unmarshalBinary(buffer)
// We might have slipped back into an earlier head-file here
if newLastIndex.filenum != lastIndex.filenum {
- // release earlier opened file
+ // Release earlier opened file
t.releaseFile(lastIndex.filenum)
- t.head, err = t.openFile(newLastIndex.filenum, os.O_RDWR|os.O_CREATE|os.O_APPEND)
+ t.head, err = t.openFile(newLastIndex.filenum, openFreezerFileForAppend)
if stat, err = t.head.Stat(); err != nil {
// TODO, anything more we can do here?
// A data file has gone missing...
@@ -264,16 +303,16 @@ func (t *freezerTable) preopen() (err error) {
t.releaseFilesAfter(0, false)
// Open all except head in RDONLY
for i := t.tailId; i < t.headId; i++ {
- if _, err = t.openFile(i, os.O_RDONLY); err != nil {
+ if _, err = t.openFile(i, openFreezerFileForReadOnly); err != nil {
return err
}
}
// Open head in read/write
- t.head, err = t.openFile(t.headId, os.O_RDWR|os.O_CREATE|os.O_APPEND)
+ t.head, err = t.openFile(t.headId, openFreezerFileForAppend)
return err
}
-// truncate discards any recent data above the provided threashold number.
+// truncate discards any recent data above the provided threshold number.
func (t *freezerTable) truncate(items uint64) error {
t.lock.Lock()
defer t.lock.Unlock()
@@ -284,7 +323,7 @@ func (t *freezerTable) truncate(items uint64) error {
}
// Something's out of sync, truncate the table's offset index
t.logger.Warn("Truncating freezer table", "items", t.items, "limit", items)
- if err := t.index.Truncate(int64(items+1) * indexEntrySize); err != nil {
+ if err := truncateFreezerFile(t.index, int64(items+1)*indexEntrySize); err != nil {
return err
}
// Calculate the new expected size of the data file and truncate it
@@ -299,18 +338,18 @@ func (t *freezerTable) truncate(items uint64) error {
if expected.filenum != t.headId {
// If already open for reading, force-reopen for writing
t.releaseFile(expected.filenum)
- newHead, err := t.openFile(expected.filenum, os.O_RDWR|os.O_CREATE|os.O_APPEND)
+ newHead, err := t.openFile(expected.filenum, openFreezerFileForAppend)
if err != nil {
return err
}
- // release any files _after the current head -- both the previous head
+ // Release any files _after the current head -- both the previous head
// and any files which may have been opened for reading
t.releaseFilesAfter(expected.filenum, true)
- // set back the historic head
+ // Set back the historic head
t.head = newHead
atomic.StoreUint32(&t.headId, expected.filenum)
}
- if err := t.head.Truncate(int64(expected.offset)); err != nil {
+ if err := truncateFreezerFile(t.head, int64(expected.offset)); err != nil {
return err
}
// All data files truncated, set internal counters and return
@@ -344,7 +383,7 @@ func (t *freezerTable) Close() error {
}
// openFile assumes that the write-lock is held by the caller
-func (t *freezerTable) openFile(num uint32, flag int) (f *os.File, err error) {
+func (t *freezerTable) openFile(num uint32, opener func(string) (*os.File, error)) (f *os.File, err error) {
var exist bool
if f, exist = t.files[num]; !exist {
var name string
@@ -353,7 +392,7 @@ func (t *freezerTable) openFile(num uint32, flag int) (f *os.File, err error) {
} else {
name = fmt.Sprintf("%s.%04d.cdat", t.name, num)
}
- f, err = os.OpenFile(filepath.Join(t.path, name), flag, 0644)
+ f, err = opener(filepath.Join(t.path, name))
if err != nil {
return nil, err
}
@@ -413,28 +452,27 @@ func (t *freezerTable) Append(item uint64, blob []byte) error {
// we need a new file, writing would overflow
t.lock.RUnlock()
t.lock.Lock()
- nextId := atomic.LoadUint32(&t.headId) + 1
+ nextID := atomic.LoadUint32(&t.headId) + 1
// We open the next file in truncated mode -- if this file already
// exists, we need to start over from scratch on it
- newHead, err := t.openFile(nextId, os.O_RDWR|os.O_CREATE|os.O_TRUNC)
+ newHead, err := t.openFile(nextID, openFreezerFileTruncated)
if err != nil {
t.lock.Unlock()
return err
}
// Close old file, and reopen in RDONLY mode
t.releaseFile(t.headId)
- t.openFile(t.headId, os.O_RDONLY)
+ t.openFile(t.headId, openFreezerFileForReadOnly)
// Swap out the current head
t.head = newHead
atomic.StoreUint32(&t.headBytes, 0)
- atomic.StoreUint32(&t.headId, nextId)
+ atomic.StoreUint32(&t.headId, nextID)
t.lock.Unlock()
t.lock.RLock()
}
defer t.lock.RUnlock()
-
if _, err := t.head.Write(blob); err != nil {
return err
}