package pfile import ( "bufio" "bytes" "compress/gzip" "encoding/binary" "io" "net" "os" "sync" "time" "git.crumpington.com/public/jldb/lib/errs" "git.crumpington.com/public/jldb/mdb/change" ) type File struct { lock sync.RWMutex f *os.File page dataPage } func Open(path string) (*File, error) { f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0600) if err != nil { return nil, errs.IO.WithErr(err) } pf := &File{f: f} pf.page = newDataPage() return pf, nil } func (pf *File) Close() error { pf.lock.Lock() defer pf.lock.Unlock() if err := pf.f.Close(); err != nil { return errs.IO.WithErr(err) } return nil } // ---------------------------------------------------------------------------- // Writing // ---------------------------------------------------------------------------- func (pf *File) ApplyChanges(changes []change.Change) error { pf.lock.Lock() defer pf.lock.Unlock() return pf.applyChanges(changes) } func (pf *File) applyChanges(changes []change.Change) error { for _, change := range changes { if len(change.WritePageIDs) > 0 { if err := pf.writeChangePages(change); err != nil { return err } } for _, id := range change.ClearPageIDs { if err := pf.writePage(emptyPage, id); err != nil { return err } } } if err := pf.f.Sync(); err != nil { return errs.IO.WithErr(err) } return nil } func (pf *File) writeChangePages(change change.Change) error { page := pf.page header := page.Header() header.PageType = pageTypeHead header.CollectionID = change.CollectionID header.ItemID = change.ItemID header.DataSize = uint64(len(change.Data)) pageIDs := change.WritePageIDs data := change.Data for len(change.Data) > 0 && len(pageIDs) > 0 { pageID := pageIDs[0] pageIDs = pageIDs[1:] if len(pageIDs) > 0 { header.NextPage = pageIDs[0] } else { header.NextPage = 0 } n := page.Write(data) data = data[n:] page.Header().CRC = page.ComputeCRC() if err := pf.writePage(page, pageID); err != nil { return err } // All but first page has pageTypeData. header.PageType = pageTypeData } if len(pageIDs) > 0 { return errs.Unexpected.WithMsg("Too many pages provided for given data.") } if len(data) > 0 { return errs.Unexpected.WithMsg("Not enough pages for given data.") } return nil } func (pf *File) writePage(page dataPage, id uint64) error { if _, err := pf.f.WriteAt(page, int64(id*pageSize)); err != nil { return errs.IO.WithErr(err) } return nil } // ---------------------------------------------------------------------------- // Reading // ---------------------------------------------------------------------------- func (pf *File) iterate(each func(pageID uint64, page dataPage) error) error { pf.lock.RLock() defer pf.lock.RUnlock() page := pf.page fi, err := pf.f.Stat() if err != nil { return errs.IO.WithErr(err) } fileSize := fi.Size() if fileSize%pageSize != 0 { return errs.Corrupt.WithMsg("File size isn't a multiple of page size.") } maxPage := uint64(fileSize / pageSize) if _, err := pf.f.Seek(0, io.SeekStart); err != nil { return errs.IO.WithErr(err) } r := bufio.NewReaderSize(pf.f, 1024*1024) for pageID := uint64(0); pageID < maxPage; pageID++ { if _, err := r.Read(page); err != nil { return errs.IO.WithErr(err) } if err := page.Validate(); err != nil { return err } if err := each(pageID, page); err != nil { return err } } return nil } func (pf *File) readData(id uint64, buf *bytes.Buffer) error { page := pf.page // The head page. if err := pf.readPage(page, id); err != nil { return err } remaining := int(page.Header().DataSize) for { data := page.Data() if len(data) > remaining { data = data[:remaining] } buf.Write(data) remaining -= len(data) if page.Header().NextPage == 0 { break } if err := pf.readPage(page, page.Header().NextPage); err != nil { return err } } if remaining != 0 { return errs.Corrupt.WithMsg("Incorrect data size. %d remaining.", remaining) } return nil } func (pf *File) readPage(p dataPage, id uint64) error { if _, err := pf.f.ReadAt(p, int64(id*pageSize)); err != nil { return errs.IO.WithErr(err) } return p.Validate() } // ---------------------------------------------------------------------------- // Send / Recv // ---------------------------------------------------------------------------- func (pf *File) Send(conn net.Conn, timeout time.Duration) error { pf.lock.RLock() defer pf.lock.RUnlock() if _, err := pf.f.Seek(0, io.SeekStart); err != nil { return errs.IO.WithErr(err) } fi, err := pf.f.Stat() if err != nil { return errs.IO.WithErr(err) } remaining := fi.Size() conn.SetWriteDeadline(time.Now().Add(timeout)) if err := binary.Write(conn, binary.LittleEndian, remaining); err != nil { return err } buf := make([]byte, 1024*1024) w, err := gzip.NewWriterLevel(conn, 3) if err != nil { return errs.Unexpected.WithErr(err) } defer w.Close() for remaining > 0 { n, err := pf.f.Read(buf) if err != nil { return errs.IO.WithErr(err) } conn.SetWriteDeadline(time.Now().Add(timeout)) if _, err := w.Write(buf[:n]); err != nil { return errs.IO.WithErr(err) } remaining -= int64(n) w.Flush() } return nil } func Recv(conn net.Conn, filePath string, timeout time.Duration) error { defer conn.Close() f, err := os.Create(filePath) if err != nil { return errs.IO.WithErr(err) } defer f.Close() remaining := uint64(0) if err := binary.Read(conn, binary.LittleEndian, &remaining); err != nil { return err } r, err := gzip.NewReader(conn) if err != nil { return errs.Unexpected.WithErr(err) } defer r.Close() buf := make([]byte, 1024*1024) for remaining > 0 { conn.SetReadDeadline(time.Now().Add(timeout)) n, err := io.ReadFull(r, buf) if err != nil && n == 0 { return errs.IO.WithErr(err) } remaining -= uint64(n) if _, err := f.Write(buf[:n]); err != nil { return errs.IO.WithErr(err) } } if err := f.Sync(); err != nil { return errs.IO.WithErr(err) } return nil }