jldb/mdb/pfile/pagefile.go

324 lines
6.3 KiB
Go

package pfile
import (
"bufio"
"bytes"
"compress/gzip"
"encoding/binary"
"io"
"net"
"os"
"sync"
"time"
"git.crumpington.com/public/jldb/lib/errs"
"git.crumpington.com/public/jldb/mdb/change"
)
type File struct {
lock sync.RWMutex
f *os.File
page dataPage
}
func Open(path string) (*File, error) {
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return nil, errs.IO.WithErr(err)
}
pf := &File{f: f}
pf.page = newDataPage()
return pf, nil
}
func (pf *File) Close() error {
pf.lock.Lock()
defer pf.lock.Unlock()
if err := pf.f.Close(); err != nil {
return errs.IO.WithErr(err)
}
return nil
}
// ----------------------------------------------------------------------------
// Writing
// ----------------------------------------------------------------------------
func (pf *File) ApplyChanges(changes []change.Change) error {
pf.lock.Lock()
defer pf.lock.Unlock()
return pf.applyChanges(changes)
}
func (pf *File) applyChanges(changes []change.Change) error {
for _, change := range changes {
if len(change.WritePageIDs) > 0 {
if err := pf.writeChangePages(change); err != nil {
return err
}
}
for _, id := range change.ClearPageIDs {
if err := pf.writePage(emptyPage, id); err != nil {
return err
}
}
}
if err := pf.f.Sync(); err != nil {
return errs.IO.WithErr(err)
}
return nil
}
func (pf *File) writeChangePages(change change.Change) error {
page := pf.page
header := page.Header()
header.PageType = pageTypeHead
header.CollectionID = change.CollectionID
header.ItemID = change.ItemID
header.DataSize = uint64(len(change.Data))
pageIDs := change.WritePageIDs
data := change.Data
for len(change.Data) > 0 && len(pageIDs) > 0 {
pageID := pageIDs[0]
pageIDs = pageIDs[1:]
if len(pageIDs) > 0 {
header.NextPage = pageIDs[0]
} else {
header.NextPage = 0
}
n := page.Write(data)
data = data[n:]
page.Header().CRC = page.ComputeCRC()
if err := pf.writePage(page, pageID); err != nil {
return err
}
// All but first page has pageTypeData.
header.PageType = pageTypeData
}
if len(pageIDs) > 0 {
return errs.Unexpected.WithMsg("Too many pages provided for given data.")
}
if len(data) > 0 {
return errs.Unexpected.WithMsg("Not enough pages for given data.")
}
return nil
}
func (pf *File) writePage(page dataPage, id uint64) error {
if _, err := pf.f.WriteAt(page, int64(id*pageSize)); err != nil {
return errs.IO.WithErr(err)
}
return nil
}
// ----------------------------------------------------------------------------
// Reading
// ----------------------------------------------------------------------------
func (pf *File) pageCount() (uint64, error) {
fi, err := pf.f.Stat()
if err != nil {
return 0, errs.IO.WithErr(err)
}
fileSize := fi.Size()
if fileSize%pageSize != 0 {
return 0, errs.Corrupt.WithMsg("File size isn't a multiple of page size.")
}
maxPage := uint64(fileSize / pageSize)
return maxPage, nil
}
func (pf *File) iterate(each func(pageID uint64, page dataPage) error) error {
pf.lock.RLock()
defer pf.lock.RUnlock()
page := pf.page
fi, err := pf.f.Stat()
if err != nil {
return errs.IO.WithErr(err)
}
fileSize := fi.Size()
if fileSize%pageSize != 0 {
return errs.Corrupt.WithMsg("File size isn't a multiple of page size.")
}
maxPage := uint64(fileSize / pageSize)
if _, err := pf.f.Seek(0, io.SeekStart); err != nil {
return errs.IO.WithErr(err)
}
r := bufio.NewReaderSize(pf.f, 1024*1024)
for pageID := uint64(0); pageID < maxPage; pageID++ {
if _, err := r.Read(page); err != nil {
return errs.IO.WithErr(err)
}
if err := page.Validate(); err != nil {
return err
}
if err := each(pageID, page); err != nil {
return err
}
}
return nil
}
func (pf *File) readData(id uint64, buf *bytes.Buffer) error {
page := pf.page
// The head page.
if err := pf.readPage(page, id); err != nil {
return err
}
remaining := int(page.Header().DataSize)
for {
data := page.Data()
if len(data) > remaining {
data = data[:remaining]
}
buf.Write(data)
remaining -= len(data)
if page.Header().NextPage == 0 {
break
}
if err := pf.readPage(page, page.Header().NextPage); err != nil {
return err
}
}
if remaining != 0 {
return errs.Corrupt.WithMsg("Incorrect data size. %d remaining.", remaining)
}
return nil
}
func (pf *File) readPage(p dataPage, id uint64) error {
if _, err := pf.f.ReadAt(p, int64(id*pageSize)); err != nil {
return errs.IO.WithErr(err)
}
return p.Validate()
}
// ----------------------------------------------------------------------------
// Send / Recv
// ----------------------------------------------------------------------------
func (pf *File) Send(conn net.Conn, timeout time.Duration) error {
pf.lock.RLock()
defer pf.lock.RUnlock()
if _, err := pf.f.Seek(0, io.SeekStart); err != nil {
return errs.IO.WithErr(err)
}
fi, err := pf.f.Stat()
if err != nil {
return errs.IO.WithErr(err)
}
remaining := fi.Size()
conn.SetWriteDeadline(time.Now().Add(timeout))
if err := binary.Write(conn, binary.LittleEndian, remaining); err != nil {
return err
}
buf := make([]byte, 1024*1024)
w, err := gzip.NewWriterLevel(conn, 3)
if err != nil {
return errs.Unexpected.WithErr(err)
}
defer w.Close()
for remaining > 0 {
n, err := pf.f.Read(buf)
if err != nil {
return errs.IO.WithErr(err)
}
conn.SetWriteDeadline(time.Now().Add(timeout))
if _, err := w.Write(buf[:n]); err != nil {
return errs.IO.WithErr(err)
}
remaining -= int64(n)
w.Flush()
}
return nil
}
func Recv(conn net.Conn, filePath string, timeout time.Duration) error {
defer conn.Close()
f, err := os.Create(filePath)
if err != nil {
return errs.IO.WithErr(err)
}
defer f.Close()
remaining := uint64(0)
if err := binary.Read(conn, binary.LittleEndian, &remaining); err != nil {
return err
}
r, err := gzip.NewReader(conn)
if err != nil {
return errs.Unexpected.WithErr(err)
}
defer r.Close()
buf := make([]byte, 1024*1024)
for remaining > 0 {
conn.SetReadDeadline(time.Now().Add(timeout))
n, err := io.ReadFull(r, buf)
if err != nil && n == 0 {
return errs.IO.WithErr(err)
}
remaining -= uint64(n)
if _, err := f.Write(buf[:n]); err != nil {
return errs.IO.WithErr(err)
}
}
if err := f.Sync(); err != nil {
return errs.IO.WithErr(err)
}
return nil
}