jldb/lib/atomicheader/atomicheader.go

138 lines
2.9 KiB
Go
Raw Permalink Normal View History

2023-10-13 09:43:27 +00:00
package atomicheader
import (
"encoding/binary"
"hash/crc32"
"os"
"sync"
2023-10-16 08:50:19 +00:00
"git.crumpington.com/public/jldb/lib/errs"
2023-10-13 09:43:27 +00:00
)
const (
PageSize = 512
AvailabePageSize = 508
ReservedBytes = PageSize * 4
offsetSwitch = 1 * PageSize
offset1 = 2 * PageSize
offset2 = 3 * PageSize
)
type Handler struct {
lock sync.Mutex
switchPage []byte // At offsetSwitch.
page []byte // Page buffer is re-used for reading and writing.
currentPage int64 // Either 0 or 1.
f *os.File
}
func Init(f *os.File) error {
if err := f.Truncate(ReservedBytes); err != nil {
return errs.IO.WithErr(err)
}
switchPage := make([]byte, PageSize)
switchPage[0] = 2
if _, err := f.WriteAt(switchPage, offsetSwitch); err != nil {
return errs.IO.WithErr(err)
}
return nil
}
func Open(f *os.File) (*Handler, error) {
switchPage := make([]byte, PageSize)
if _, err := f.ReadAt(switchPage, offsetSwitch); err != nil {
return nil, errs.IO.WithErr(err)
}
h := &Handler{
switchPage: switchPage,
page: make([]byte, PageSize),
currentPage: int64(switchPage[0]),
f: f,
}
if h.currentPage != 1 && h.currentPage != 2 {
return nil, errs.Corrupt.WithMsg("invalid page id: %d", h.currentPage)
}
return h, nil
}
// Read reads the currently active header page.
func (h *Handler) Read(read func(page []byte) error) error {
h.lock.Lock()
defer h.lock.Unlock()
if _, err := h.f.ReadAt(h.page, h.currentOffset()); err != nil {
return errs.IO.WithErr(err)
}
computedCRC := crc32.ChecksumIEEE(h.page[:PageSize-4])
storedCRC := binary.LittleEndian.Uint32(h.page[PageSize-4:])
if computedCRC != storedCRC {
return errs.Corrupt.WithMsg("checksum mismatch")
}
return read(h.page)
}
// Write writes the currently active header page. The page buffer given to the
// function may contain old data, so the caller may need to zero some bytes if
// necessary.
func (h *Handler) Write(update func(page []byte) error) error {
h.lock.Lock()
defer h.lock.Unlock()
if err := update(h.page); err != nil {
return err
}
crc := crc32.ChecksumIEEE(h.page[:PageSize-4])
binary.LittleEndian.PutUint32(h.page[PageSize-4:], crc)
newPageNum := 1 + h.currentPage%2
newOffset := h.getOffset(newPageNum)
if _, err := h.f.WriteAt(h.page, newOffset); err != nil {
return errs.IO.WithErr(err)
}
if err := h.f.Sync(); err != nil {
return errs.IO.WithErr(err)
}
h.switchPage[0] = byte(newPageNum)
if _, err := h.f.WriteAt(h.switchPage, offsetSwitch); err != nil {
return errs.IO.WithErr(err)
}
if err := h.f.Sync(); err != nil {
return errs.IO.WithErr(err)
}
h.currentPage = newPageNum
return nil
}
// ----------------------------------------------------------------------------
func (h *Handler) currentOffset() int64 {
return h.getOffset(h.currentPage)
}
func (h *Handler) getOffset(pageNum int64) int64 {
switch pageNum {
case 1:
return offset1
case 2:
return offset2
default:
panic("Invalid page number.")
}
}