5 Commits

Author SHA1 Message Date
jdl
875957f662 Fixed WAL gc age bug 2023-12-05 09:54:41 +01:00
jdl
b251368b09 Cleanup, no logic changes. 2023-12-04 20:25:37 +01:00
jdl
c2a1a7f247 Fix data corruption bug (overwrite data) 2023-12-04 20:05:15 +01:00
jdl
9785637b3b Code cleanup / beta stuff 2023-12-03 20:38:45 +01:00
jdl
728b34b684 Fix streaming handlers for https. 2023-12-03 20:26:27 +01:00
7 changed files with 32 additions and 9 deletions

View File

@@ -4,6 +4,7 @@ Replicated in-memory database and file store.
## TODO
* [ ] mdb: Tests for using `nil` snapshots ?
* [ ] mdb: tests for sanitize and validate functions
* [ ] Test: lib/wal iterator w/ corrupt file (random corruptions)
* [ ] Test: lib/wal io.go

View File

@@ -12,12 +12,6 @@ const (
pathStreamWAL = "stream-wal"
)
func (rep *Replicator) RegisterHandlers(mux *http.ServeMux, rootPath string) {
mux.HandleFunc(path.Join(rootPath, pathGetInfo), rep.handleGetInfo)
mux.HandleFunc(path.Join(rootPath, pathSendState), rep.handleSendState)
mux.HandleFunc(path.Join(rootPath, pathStreamWAL), rep.handleStreamWAL)
}
// TODO: Remove this!
func (rep *Replicator) Handle(w http.ResponseWriter, r *http.Request) {
// We'll handle two types of requests: HTTP GET requests for JSON, or

View File

@@ -15,7 +15,7 @@ func (rep *Replicator) runWALGC() {
select {
case <-ticker.C:
state := rep.getState()
before := time.Now().Unix() - rep.conf.WALSegMaxAgeSec
before := time.Now().Unix() - rep.conf.WALSegGCAgeSec
if err := rep.wal.DeleteBefore(before, state.SeqNum); err != nil {
log.Printf("[WAL-GC] failed to delete wal segments: %v", err)
}

View File

@@ -73,6 +73,10 @@ type Database struct {
}
func New(conf Config) *Database {
if conf.NetTimeout <= 0 {
conf.NetTimeout = time.Minute
}
if conf.MaxConcurrentUpdates <= 0 {
conf.MaxConcurrentUpdates = 32
}

View File

@@ -51,6 +51,10 @@ func (f *freeList) Push(pages ...uint64) {
}
}
func (f *freeList) SetNextPage(nextPage uint64) {
f.nextPage = nextPage
}
func (f *freeList) Pop(count int, out []uint64) []uint64 {
out = out[:0]

View File

@@ -13,14 +13,19 @@ type Index struct {
}
func NewIndex(f *File) (*Index, error) {
firstPage, err := f.pageCount()
if err != nil {
return nil, err
}
idx := &Index{
fList: newFreeList(0),
fList: newFreeList(firstPage),
aList: *newAllocList(),
seen: map[[2]uint64]struct{}{},
mask: []bool{},
}
err := f.iterate(func(pageID uint64, page dataPage) error {
err = f.iterate(func(pageID uint64, page dataPage) error {
header := page.Header()
switch header.PageType {
case pageTypeHead:

View File

@@ -134,6 +134,21 @@ func (pf *File) writePage(page dataPage, id uint64) error {
// Reading
// ----------------------------------------------------------------------------
func (pf *File) pageCount() (uint64, error) {
fi, err := pf.f.Stat()
if err != nil {
return 0, errs.IO.WithErr(err)
}
fileSize := fi.Size()
if fileSize%pageSize != 0 {
return 0, errs.Corrupt.WithMsg("File size isn't a multiple of page size.")
}
maxPage := uint64(fileSize / pageSize)
return maxPage, nil
}
func (pf *File) iterate(each func(pageID uint64, page dataPage) error) error {
pf.lock.RLock()
defer pf.lock.RUnlock()