jldb/mdb/db-rep.go

121 lines
2.5 KiB
Go

package mdb
import (
"log"
"net"
"os"
"git.crumpington.com/public/jldb/lib/errs"
"git.crumpington.com/public/jldb/lib/wal"
"git.crumpington.com/public/jldb/mdb/change"
"git.crumpington.com/public/jldb/mdb/pfile"
)
func (db *Database) repSendState(conn net.Conn) error {
pf, err := pfile.Open(pageFilePath(db.rootDir))
if err != nil {
return err
}
defer pf.Close()
return pf.Send(conn, db.conf.NetTimeout)
}
func (db *Database) repRecvState(conn net.Conn) error {
finalPath := pageFilePath(db.rootDir)
tmpPath := finalPath + ".dl"
if err := pfile.Recv(conn, tmpPath, db.conf.NetTimeout); err != nil {
return err
}
if err := os.Rename(tmpPath, finalPath); err != nil {
return errs.Unexpected.WithErr(err)
}
return nil
}
func (db *Database) repInitStorage() (err error) {
db.pf, err = pfile.Open(pageFilePath(db.rootDir))
return err
}
func (db *Database) repReplay(rec wal.Record) (err error) {
db.changes, err = change.Read(db.changes[:0], rec.Reader)
if err != nil {
return err
}
return db.pf.ApplyChanges(db.changes)
}
func (db *Database) repLoadFromStorage() (err error) {
db.idx, err = pfile.NewIndex(db.pf)
if err != nil {
return err
}
tx := db.snapshot.Load()
err = pfile.IterateAllocated(db.pf, db.idx, func(cID, iID uint64, data []byte) error {
return db.loadItem(tx, cID, iID, data)
})
if err != nil {
return err
}
db.snapshot.Store(tx)
return nil
}
func (db *Database) loadItem(tx *Snapshot, cID, iID uint64, data []byte) error {
c, ok := db.collections[cID]
if !ok {
log.Printf("Failed to find collection %d for item in page file.", cID)
return nil
}
return c.insertItem(tx, iID, data)
}
func (db *Database) repApply(rec wal.Record) (err error) {
db.changes, err = change.Read(db.changes[:0], rec.Reader)
if err != nil {
return err
}
if err := db.pf.ApplyChanges(db.changes); err != nil {
return err
}
if db.rep.Primary() {
return nil
}
// For secondary, we need to also apply changes to memory.
tx := db.snapshot.Load().begin()
for _, change := range db.changes {
if err = db.applyChange(tx, change); err != nil {
return err
}
}
tx.seqNum = rec.SeqNum
tx.timestampMS = rec.TimestampMS
tx.setReadOnly()
db.snapshot.Store(tx)
return nil
}
func (db *Database) applyChange(tx *Snapshot, change change.Change) error {
c, ok := db.collections[change.CollectionID]
if !ok {
return nil
}
if change.Store {
return c.upsertItem(tx, change.ItemID, change.Data)
}
// The only error this could return is NotFound. We'll ignore that error here.
c.deleteItem(tx, change.ItemID)
return nil
}