jldb/fstore/store-rep.go
2023-10-16 08:50:19 +00:00

173 lines
3.3 KiB
Go

package fstore
import (
"encoding/binary"
"errors"
"io"
"io/fs"
"net"
"os"
"path/filepath"
"time"
"git.crumpington.com/public/jldb/lib/errs"
"git.crumpington.com/public/jldb/lib/wal"
)
func (s *Store) repSendState(conn net.Conn) error {
err := filepath.Walk(s.filesRoot, func(path string, info fs.FileInfo, err error) error {
if err != nil {
// Skip deleted files.
if os.IsNotExist(err) {
return nil
}
return err
}
if info.IsDir() {
return nil
}
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
relPath, err := filepath.Rel(s.filesRoot, path)
if err != nil {
return err
}
conn.SetWriteDeadline(time.Now().Add(s.conf.NetTimeout))
if err := binary.Write(conn, binary.LittleEndian, int32(len(relPath))); err != nil {
return err
}
if _, err := conn.Write([]byte(relPath)); err != nil {
return err
}
if err := binary.Write(conn, binary.LittleEndian, int64(info.Size())); err != nil {
return err
}
conn.SetWriteDeadline(time.Now().Add(s.conf.NetTimeout))
if _, err := io.CopyN(conn, f, info.Size()); err != nil {
return err
}
return nil
})
if err != nil {
return errs.IO.WithErr(err)
}
conn.SetWriteDeadline(time.Now().Add(s.conf.NetTimeout))
if err := binary.Write(conn, binary.LittleEndian, int32(0)); err != nil {
return errs.IO.WithErr(err)
}
return nil
}
func (s *Store) repRecvState(conn net.Conn) error {
var (
errorDone = errors.New("Done")
pathLen = int32(0)
fileSize = int64(0)
pathBuf = make([]byte, 1024)
)
for {
err := func() error {
conn.SetReadDeadline(time.Now().Add(s.conf.NetTimeout))
if err := binary.Read(conn, binary.LittleEndian, &pathLen); err != nil {
return err
}
if pathLen == 0 {
return errorDone
}
if cap(pathBuf) < int(pathLen) {
pathBuf = make([]byte, pathLen)
}
pathBuf = pathBuf[:pathLen]
if _, err := io.ReadFull(conn, pathBuf); err != nil {
return err
}
fullPath := filepath.Join(s.filesRoot, string(pathBuf))
if err := os.MkdirAll(filepath.Dir(fullPath), 0700); err != nil {
return err
}
if err := binary.Read(conn, binary.LittleEndian, &fileSize); err != nil {
return err
}
f, err := os.Create(fullPath)
if err != nil {
return err
}
defer f.Close()
conn.SetReadDeadline(time.Now().Add(s.conf.NetTimeout))
if _, err = io.CopyN(f, conn, fileSize); err != nil {
return err
}
return f.Sync()
}()
if err != nil {
if err == errorDone {
return nil
}
return errs.IO.WithErr(err)
}
}
}
func (s *Store) repInitStorage() (err error) {
if err := os.MkdirAll(s.filesRoot, 0700); err != nil {
return errs.IO.WithErr(err)
}
if err := os.MkdirAll(s.tmpDir, 0700); err != nil {
return errs.IO.WithErr(err)
}
return nil
}
func (s *Store) repReplay(rec wal.Record) (err error) {
cmd := command{}
if err := cmd.ReadFrom(rec.Reader); err != nil {
return err
}
if cmd.Store {
return s.applyStoreFromReader(cmd)
}
return s.applyRemove(cmd)
}
func (s *Store) repLoadFromStorage() (err error) {
// Nothing to do.
return nil
}
func (s *Store) repApply(rec wal.Record) (err error) {
cmd := command{}
if err := cmd.ReadFrom(rec.Reader); err != nil {
return err
}
if cmd.Store {
if s.conf.Primary {
return s.applyStoreFromTempID(cmd)
}
return s.applyStoreFromReader(cmd)
}
return s.applyRemove(cmd)
}