101 lines
1.9 KiB
Go
101 lines
1.9 KiB
Go
|
package mdb
|
||
|
|
||
|
/*
|
||
|
// The fileWriter writes changes from the WAL to the data file. It's run by the
|
||
|
// primary, and, for the primary, is the only way the pagefile is modified.
|
||
|
type fileWriter struct {
|
||
|
Stop chan struct{}
|
||
|
Done *sync.WaitGroup
|
||
|
PageFilePath string
|
||
|
WALRootDir string
|
||
|
}
|
||
|
|
||
|
func (w *fileWriter) Run() {
|
||
|
defer w.Done.Done()
|
||
|
for {
|
||
|
w.runOnce()
|
||
|
select {
|
||
|
case <-w.Stop:
|
||
|
return
|
||
|
default:
|
||
|
time.Sleep(time.Second)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (w *fileWriter) runOnce() {
|
||
|
f, err := pagefile.Open(w.PageFilePath)
|
||
|
if err != nil {
|
||
|
w.logf("Failed to open page file: %v", err)
|
||
|
return
|
||
|
}
|
||
|
defer f.Close()
|
||
|
|
||
|
header, err := w.readHeader(f)
|
||
|
if err != nil {
|
||
|
w.logf("Failed to get header from page file: %v", err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
it, err := cswal.NewIterator(w.WALRootDir, header.SeqNum+1)
|
||
|
if err != nil {
|
||
|
w.logf("Failed to get WAL iterator: %v", err)
|
||
|
return
|
||
|
}
|
||
|
defer it.Close()
|
||
|
|
||
|
for {
|
||
|
hasNext := it.Next(time.Second)
|
||
|
|
||
|
select {
|
||
|
case <-w.Stop:
|
||
|
return
|
||
|
default:
|
||
|
}
|
||
|
|
||
|
if !hasNext {
|
||
|
if it.Error() != nil {
|
||
|
w.logf("Iteration error: %v", it.Error())
|
||
|
return
|
||
|
}
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
rec := it.Record()
|
||
|
if err := w.applyChanges(f, rec); err != nil {
|
||
|
w.logf("Failed to apply changes: %v", err)
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (w *fileWriter) readHeader(f *pagefile.File) (pagefile.Header, error) {
|
||
|
defer f.RLock()()
|
||
|
return f.ReadHeader()
|
||
|
}
|
||
|
|
||
|
func (w *fileWriter) applyChanges(f *pagefile.File, rec *cswal.Record) error {
|
||
|
defer f.WLock()()
|
||
|
|
||
|
if err := f.ApplyChanges(rec.Changes); err != nil {
|
||
|
w.logf("Failed to apply changes to page file: %v", err)
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
header := pagefile.Header{
|
||
|
SeqNum: rec.SeqNum,
|
||
|
UpdatedAt: rec.CreatedAt,
|
||
|
}
|
||
|
|
||
|
if err := f.WriteHeader(header); err != nil {
|
||
|
w.logf("Failed to write page file header: %v", err)
|
||
|
return err
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (w *fileWriter) logf(pattern string, args ...interface{}) {
|
||
|
log.Printf("[FILE-WRITER] "+pattern, args...)
|
||
|
}
|
||
|
*/
|