3 Commits

Author SHA1 Message Date
jdl
875957f662 Fixed WAL gc age bug 2023-12-05 09:54:41 +01:00
jdl
b251368b09 Cleanup, no logic changes. 2023-12-04 20:25:37 +01:00
jdl
c2a1a7f247 Fix data corruption bug (overwrite data) 2023-12-04 20:05:15 +01:00
5 changed files with 28 additions and 3 deletions

View File

@@ -4,6 +4,7 @@ Replicated in-memory database and file store.
## TODO ## TODO
* [ ] mdb: Tests for using `nil` snapshots ?
* [ ] mdb: tests for sanitize and validate functions * [ ] mdb: tests for sanitize and validate functions
* [ ] Test: lib/wal iterator w/ corrupt file (random corruptions) * [ ] Test: lib/wal iterator w/ corrupt file (random corruptions)
* [ ] Test: lib/wal io.go * [ ] Test: lib/wal io.go

View File

@@ -15,7 +15,7 @@ func (rep *Replicator) runWALGC() {
select { select {
case <-ticker.C: case <-ticker.C:
state := rep.getState() state := rep.getState()
before := time.Now().Unix() - rep.conf.WALSegMaxAgeSec before := time.Now().Unix() - rep.conf.WALSegGCAgeSec
if err := rep.wal.DeleteBefore(before, state.SeqNum); err != nil { if err := rep.wal.DeleteBefore(before, state.SeqNum); err != nil {
log.Printf("[WAL-GC] failed to delete wal segments: %v", err) log.Printf("[WAL-GC] failed to delete wal segments: %v", err)
} }

View File

@@ -51,6 +51,10 @@ func (f *freeList) Push(pages ...uint64) {
} }
} }
func (f *freeList) SetNextPage(nextPage uint64) {
f.nextPage = nextPage
}
func (f *freeList) Pop(count int, out []uint64) []uint64 { func (f *freeList) Pop(count int, out []uint64) []uint64 {
out = out[:0] out = out[:0]

View File

@@ -13,14 +13,19 @@ type Index struct {
} }
func NewIndex(f *File) (*Index, error) { func NewIndex(f *File) (*Index, error) {
firstPage, err := f.pageCount()
if err != nil {
return nil, err
}
idx := &Index{ idx := &Index{
fList: newFreeList(0), fList: newFreeList(firstPage),
aList: *newAllocList(), aList: *newAllocList(),
seen: map[[2]uint64]struct{}{}, seen: map[[2]uint64]struct{}{},
mask: []bool{}, mask: []bool{},
} }
err := f.iterate(func(pageID uint64, page dataPage) error { err = f.iterate(func(pageID uint64, page dataPage) error {
header := page.Header() header := page.Header()
switch header.PageType { switch header.PageType {
case pageTypeHead: case pageTypeHead:

View File

@@ -134,6 +134,21 @@ func (pf *File) writePage(page dataPage, id uint64) error {
// Reading // Reading
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
func (pf *File) pageCount() (uint64, error) {
fi, err := pf.f.Stat()
if err != nil {
return 0, errs.IO.WithErr(err)
}
fileSize := fi.Size()
if fileSize%pageSize != 0 {
return 0, errs.Corrupt.WithMsg("File size isn't a multiple of page size.")
}
maxPage := uint64(fileSize / pageSize)
return maxPage, nil
}
func (pf *File) iterate(each func(pageID uint64, page dataPage) error) error { func (pf *File) iterate(each func(pageID uint64, page dataPage) error) error {
pf.lock.RLock() pf.lock.RLock()
defer pf.lock.RUnlock() defer pf.lock.RUnlock()