aboutsummaryrefslogtreecommitdiffstats
path: root/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal.go
diff options
context:
space:
mode:
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal.go')
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal.go115
1 files changed, 61 insertions, 54 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal.go
index b522c76e6..6519ec660 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/journal/journal.go
@@ -79,10 +79,10 @@ package journal
import (
"encoding/binary"
- "errors"
"fmt"
"io"
+ "github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/util"
)
@@ -103,18 +103,18 @@ type flusher interface {
Flush() error
}
-// DroppedError is the error type that passed to Dropper.Drop method.
-type DroppedError struct {
+// ErrCorrupted is the error type that generated by corrupted block or chunk.
+type ErrCorrupted struct {
Size int
Reason string
}
-func (e DroppedError) Error() string {
- return fmt.Sprintf("leveldb/journal: dropped %d bytes: %s", e.Size, e.Reason)
+func (e *ErrCorrupted) Error() string {
+ return fmt.Sprintf("leveldb/journal: block/chunk corrupted: %s (%d bytes)", e.Reason, e.Size)
}
// Dropper is the interface that wrap simple Drop method. The Drop
-// method will be called when the journal reader dropping a chunk.
+// method will be called when the journal reader dropping a block or chunk.
type Dropper interface {
Drop(err error)
}
@@ -158,76 +158,78 @@ func NewReader(r io.Reader, dropper Dropper, strict, checksum bool) *Reader {
}
}
+var errSkip = errors.New("leveldb/journal: skipped")
+
+func (r *Reader) corrupt(n int, reason string, skip bool) error {
+ if r.dropper != nil {
+ r.dropper.Drop(&ErrCorrupted{n, reason})
+ }
+ if r.strict && !skip {
+ r.err = errors.NewErrCorrupted(nil, &ErrCorrupted{n, reason})
+ return r.err
+ }
+ return errSkip
+}
+
// nextChunk sets r.buf[r.i:r.j] to hold the next chunk's payload, reading the
// next block into the buffer if necessary.
-func (r *Reader) nextChunk(wantFirst, skip bool) error {
+func (r *Reader) nextChunk(first bool) error {
for {
if r.j+headerSize <= r.n {
checksum := binary.LittleEndian.Uint32(r.buf[r.j+0 : r.j+4])
length := binary.LittleEndian.Uint16(r.buf[r.j+4 : r.j+6])
chunkType := r.buf[r.j+6]
- var err error
if checksum == 0 && length == 0 && chunkType == 0 {
// Drop entire block.
- err = DroppedError{r.n - r.j, "zero header"}
+ m := r.n - r.j
r.i = r.n
r.j = r.n
+ return r.corrupt(m, "zero header", false)
} else {
m := r.n - r.j
r.i = r.j + headerSize
r.j = r.j + headerSize + int(length)
if r.j > r.n {
// Drop entire block.
- err = DroppedError{m, "chunk length overflows block"}
r.i = r.n
r.j = r.n
+ return r.corrupt(m, "chunk length overflows block", false)
} else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() {
// Drop entire block.
- err = DroppedError{m, "checksum mismatch"}
r.i = r.n
r.j = r.n
+ return r.corrupt(m, "checksum mismatch", false)
}
}
- if wantFirst && err == nil && chunkType != fullChunkType && chunkType != firstChunkType {
- if skip {
- // The chunk are intentionally skipped.
- if chunkType == lastChunkType {
- skip = false
- }
- continue
- } else {
- // Drop the chunk.
- err = DroppedError{r.j - r.i + headerSize, "orphan chunk"}
- }
- }
- if err == nil {
- r.last = chunkType == fullChunkType || chunkType == lastChunkType
- } else {
- if r.dropper != nil {
- r.dropper.Drop(err)
- }
- if r.strict {
- r.err = err
- }
+ if first && chunkType != fullChunkType && chunkType != firstChunkType {
+ m := r.j - r.i
+ r.i = r.j
+ // Report the error, but skip it.
+ return r.corrupt(m+headerSize, "orphan chunk", true)
}
- return err
+ r.last = chunkType == fullChunkType || chunkType == lastChunkType
+ return nil
}
+
+ // The last block.
if r.n < blockSize && r.n > 0 {
- // This is the last block.
- if r.j != r.n {
- r.err = io.ErrUnexpectedEOF
- } else {
- r.err = io.EOF
+ if !first {
+ return r.corrupt(0, "missing chunk part", false)
}
+ r.err = io.EOF
return r.err
}
+
+ // Read block.
n, err := io.ReadFull(r.r, r.buf[:])
- if err != nil && err != io.ErrUnexpectedEOF {
- r.err = err
- return r.err
+ if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
+ return err
}
if n == 0 {
+ if !first {
+ return r.corrupt(0, "missing chunk part", false)
+ }
r.err = io.EOF
return r.err
}
@@ -237,29 +239,26 @@ func (r *Reader) nextChunk(wantFirst, skip bool) error {
// Next returns a reader for the next journal. It returns io.EOF if there are no
// more journals. The reader returned becomes stale after the next Next call,
-// and should no longer be used.
+// and should no longer be used. If strict is false, the reader will returns
+// io.ErrUnexpectedEOF error when found corrupted journal.
func (r *Reader) Next() (io.Reader, error) {
r.seq++
if r.err != nil {
return nil, r.err
}
- skip := !r.last
+ r.i = r.j
for {
- r.i = r.j
- if r.nextChunk(true, skip) != nil {
- // So that 'orphan chunk' drop will be reported.
- skip = false
- } else {
+ if err := r.nextChunk(true); err == nil {
break
- }
- if r.err != nil {
- return nil, r.err
+ } else if err != errSkip {
+ return nil, err
}
}
return &singleReader{r, r.seq, nil}, nil
}
-// Reset resets the journal reader, allows reuse of the journal reader.
+// Reset resets the journal reader, allows reuse of the journal reader. Reset returns
+// last accumulated error.
func (r *Reader) Reset(reader io.Reader, dropper Dropper, strict, checksum bool) error {
r.seq++
err := r.err
@@ -296,7 +295,11 @@ func (x *singleReader) Read(p []byte) (int, error) {
if r.last {
return 0, io.EOF
}
- if x.err = r.nextChunk(false, false); x.err != nil {
+ x.err = r.nextChunk(false)
+ if x.err != nil {
+ if x.err == errSkip {
+ x.err = io.ErrUnexpectedEOF
+ }
return 0, x.err
}
}
@@ -320,7 +323,11 @@ func (x *singleReader) ReadByte() (byte, error) {
if r.last {
return 0, io.EOF
}
- if x.err = r.nextChunk(false, false); x.err != nil {
+ x.err = r.nextChunk(false)
+ if x.err != nil {
+ if x.err == errSkip {
+ x.err = io.ErrUnexpectedEOF
+ }
return 0, x.err
}
}