package mdb import ( "log" "net" "os" "git.crumpington.com/public/jldb/lib/errs" "git.crumpington.com/public/jldb/lib/wal" "git.crumpington.com/public/jldb/mdb/change" "git.crumpington.com/public/jldb/mdb/pfile" ) func (db *Database) repSendState(conn net.Conn) error { pf, err := pfile.Open(pageFilePath(db.rootDir)) if err != nil { return err } defer pf.Close() return pf.Send(conn, db.conf.NetTimeout) } func (db *Database) repRecvState(conn net.Conn) error { finalPath := pageFilePath(db.rootDir) tmpPath := finalPath + ".dl" if err := pfile.Recv(conn, tmpPath, db.conf.NetTimeout); err != nil { return err } if err := os.Rename(tmpPath, finalPath); err != nil { return errs.Unexpected.WithErr(err) } return nil } func (db *Database) repInitStorage() (err error) { db.pf, err = pfile.Open(pageFilePath(db.rootDir)) return err } func (db *Database) repReplay(rec wal.Record) (err error) { db.changes, err = change.Read(db.changes[:0], rec.Reader) if err != nil { return err } return db.pf.ApplyChanges(db.changes) } func (db *Database) repLoadFromStorage() (err error) { db.idx, err = pfile.NewIndex(db.pf) if err != nil { return err } tx := db.snapshot.Load() err = pfile.IterateAllocated(db.pf, db.idx, func(cID, iID uint64, data []byte) error { return db.loadItem(tx, cID, iID, data) }) if err != nil { return err } db.snapshot.Store(tx) return nil } func (db *Database) loadItem(tx *Snapshot, cID, iID uint64, data []byte) error { c, ok := db.collections[cID] if !ok { log.Printf("Failed to find collection %d for item in page file.", cID) return nil } return c.insertItem(tx, iID, data) } func (db *Database) repApply(rec wal.Record) (err error) { db.changes, err = change.Read(db.changes[:0], rec.Reader) if err != nil { return err } if err := db.pf.ApplyChanges(db.changes); err != nil { return err } if db.rep.Primary() { return nil } // For secondary, we need to also apply changes to memory. tx := db.snapshot.Load().begin() for _, change := range db.changes { if err = db.applyChange(tx, change); err != nil { return err } } tx.seqNum = rec.SeqNum tx.timestampMS = rec.TimestampMS db.snapshot.Store(tx) return nil } func (db *Database) applyChange(tx *Snapshot, change change.Change) error { c, ok := db.collections[change.CollectionID] if !ok { return nil } if change.Store { return c.upsertItem(tx, change.ItemID, change.Data) } // The only error this could return is NotFound. We'll ignore that error here. c.deleteItem(tx, change.ItemID) return nil }