2023-10-13 09:43:27 +00:00
|
|
|
package mdb
|
|
|
|
|
|
|
|
import (
|
2023-10-16 08:50:19 +00:00
|
|
|
"log"
|
|
|
|
"net"
|
|
|
|
"os"
|
|
|
|
|
2023-10-13 09:43:27 +00:00
|
|
|
"git.crumpington.com/public/jldb/lib/errs"
|
|
|
|
"git.crumpington.com/public/jldb/lib/wal"
|
|
|
|
"git.crumpington.com/public/jldb/mdb/change"
|
|
|
|
"git.crumpington.com/public/jldb/mdb/pfile"
|
|
|
|
)
|
|
|
|
|
|
|
|
func (db *Database) repSendState(conn net.Conn) error {
|
|
|
|
pf, err := pfile.Open(pageFilePath(db.rootDir))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
defer pf.Close()
|
|
|
|
return pf.Send(conn, db.conf.NetTimeout)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *Database) repRecvState(conn net.Conn) error {
|
|
|
|
finalPath := pageFilePath(db.rootDir)
|
|
|
|
tmpPath := finalPath + ".dl"
|
|
|
|
if err := pfile.Recv(conn, tmpPath, db.conf.NetTimeout); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := os.Rename(tmpPath, finalPath); err != nil {
|
|
|
|
return errs.Unexpected.WithErr(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *Database) repInitStorage() (err error) {
|
|
|
|
db.pf, err = pfile.Open(pageFilePath(db.rootDir))
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *Database) repReplay(rec wal.Record) (err error) {
|
|
|
|
db.changes, err = change.Read(db.changes[:0], rec.Reader)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return db.pf.ApplyChanges(db.changes)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *Database) repLoadFromStorage() (err error) {
|
|
|
|
db.idx, err = pfile.NewIndex(db.pf)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
tx := db.snapshot.Load()
|
|
|
|
err = pfile.IterateAllocated(db.pf, db.idx, func(cID, iID uint64, data []byte) error {
|
|
|
|
return db.loadItem(tx, cID, iID, data)
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
db.snapshot.Store(tx)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *Database) loadItem(tx *Snapshot, cID, iID uint64, data []byte) error {
|
|
|
|
c, ok := db.collections[cID]
|
|
|
|
if !ok {
|
|
|
|
log.Printf("Failed to find collection %d for item in page file.", cID)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return c.insertItem(tx, iID, data)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *Database) repApply(rec wal.Record) (err error) {
|
|
|
|
db.changes, err = change.Read(db.changes[:0], rec.Reader)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := db.pf.ApplyChanges(db.changes); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if db.rep.Primary() {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// For secondary, we need to also apply changes to memory.
|
|
|
|
|
|
|
|
tx := db.snapshot.Load().begin()
|
|
|
|
for _, change := range db.changes {
|
|
|
|
if err = db.applyChange(tx, change); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
tx.seqNum = rec.SeqNum
|
|
|
|
tx.timestampMS = rec.TimestampMS
|
|
|
|
db.snapshot.Store(tx)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *Database) applyChange(tx *Snapshot, change change.Change) error {
|
|
|
|
c, ok := db.collections[change.CollectionID]
|
|
|
|
if !ok {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
if change.Store {
|
|
|
|
return c.upsertItem(tx, change.ItemID, change.Data)
|
|
|
|
}
|
|
|
|
|
|
|
|
// The only error this could return is NotFound. We'll ignore that error here.
|
|
|
|
c.deleteItem(tx, change.ItemID)
|
|
|
|
return nil
|
|
|
|
}
|