jldb/mdb/filewriter.go
2023-10-13 11:43:27 +02:00

101 lines
1.9 KiB
Go

package mdb
/*
// The fileWriter writes changes from the WAL to the data file. It's run by the
// primary, and, for the primary, is the only way the pagefile is modified.
type fileWriter struct {
Stop chan struct{}
Done *sync.WaitGroup
PageFilePath string
WALRootDir string
}
func (w *fileWriter) Run() {
defer w.Done.Done()
for {
w.runOnce()
select {
case <-w.Stop:
return
default:
time.Sleep(time.Second)
}
}
}
func (w *fileWriter) runOnce() {
f, err := pagefile.Open(w.PageFilePath)
if err != nil {
w.logf("Failed to open page file: %v", err)
return
}
defer f.Close()
header, err := w.readHeader(f)
if err != nil {
w.logf("Failed to get header from page file: %v", err)
return
}
it, err := cswal.NewIterator(w.WALRootDir, header.SeqNum+1)
if err != nil {
w.logf("Failed to get WAL iterator: %v", err)
return
}
defer it.Close()
for {
hasNext := it.Next(time.Second)
select {
case <-w.Stop:
return
default:
}
if !hasNext {
if it.Error() != nil {
w.logf("Iteration error: %v", it.Error())
return
}
continue
}
rec := it.Record()
if err := w.applyChanges(f, rec); err != nil {
w.logf("Failed to apply changes: %v", err)
return
}
}
}
func (w *fileWriter) readHeader(f *pagefile.File) (pagefile.Header, error) {
defer f.RLock()()
return f.ReadHeader()
}
func (w *fileWriter) applyChanges(f *pagefile.File, rec *cswal.Record) error {
defer f.WLock()()
if err := f.ApplyChanges(rec.Changes); err != nil {
w.logf("Failed to apply changes to page file: %v", err)
return err
}
header := pagefile.Header{
SeqNum: rec.SeqNum,
UpdatedAt: rec.CreatedAt,
}
if err := f.WriteHeader(header); err != nil {
w.logf("Failed to write page file header: %v", err)
return err
}
return nil
}
func (w *fileWriter) logf(pattern string, args ...interface{}) {
log.Printf("[FILE-WRITER] "+pattern, args...)
}
*/