Browse Source

WIP

master
jdl 2 years ago
parent
commit
cb710046d4
6 changed files with 161 additions and 78 deletions
  1. +72
    -0
      fsstorage/entry.go
  2. +46
    -0
      fsstorage/entry_test.go
  3. +1
    -1
      fsstorage/error.go
  4. +38
    -66
      fsstorage/segment.go
  5. +4
    -10
      fsstorage/segment_test.go
  6. +0
    -1
      interfaces.go

+ 72
- 0
fsstorage/entry.go View File

@ -0,0 +1,72 @@
package fsstorage
import (
"time"
"unsafe"
)
type walData []byte
const (
offsetPrevOffset = -28
offsetWALID = -20
offsetTS = -12
offsetDataLen = -4
offsetData = 0
walHeaderSize = 28
)
func readWALData(buf []byte) walData {
entry := walData(buf[walHeaderSize:])
return entry[:entry.DataLen()]
}
func writeWALData(
buf []byte, // Write walData to start of buffer.
walID uint64,
data []byte,
prevOffset uint64,
) walData {
e := walData(buf[walHeaderSize:])
copy(e, data)
ePtr := unsafe.Pointer(&e[0])
*(*uint64)(unsafe.Add(ePtr, offsetPrevOffset)) = prevOffset
*(*uint64)(unsafe.Add(ePtr, offsetWALID)) = walID
*(*uint64)(unsafe.Add(ePtr, offsetTS)) = uint64(time.Now().Unix())
*(*uint32)(unsafe.Add(ePtr, offsetDataLen)) = uint32(len(data))
return readWALData(buf)
}
func (e walData) Offset(buf []byte) uint64 {
return uint64(
uintptr(unsafe.Pointer(&e[0])) -
walHeaderSize -
uintptr(unsafe.Pointer(&buf[0])))
}
func (e walData) NextOffset(buf []byte) uint64 {
return e.Offset(buf) + walHeaderSize + uint64(len(e))
}
func (e walData) PrevOffset() uint64 {
return *(*uint64)(unsafe.Add(unsafe.Pointer(&e[0]), offsetPrevOffset))
}
func (e walData) WALID() uint64 {
return *(*uint64)(unsafe.Add(unsafe.Pointer(&e[0]), offsetWALID))
}
func (e walData) TS() uint64 {
return *(*uint64)(unsafe.Add(unsafe.Pointer(&e[0]), offsetTS))
}
func (e walData) DataLen() uint32 {
return *(*uint32)(unsafe.Add(unsafe.Pointer(&e[0]), offsetDataLen))
}
func (e walData) Data() []byte {
return e
}

+ 46
- 0
fsstorage/entry_test.go View File

@ -0,0 +1,46 @@
package fsstorage
import (
"log"
"testing"
)
func TestWALData(t *testing.T) {
type entry struct {
walID uint64
data []byte
}
buf := make([]byte, 1024*1024)
offset := uint64(1)
walID := uint64(1)
d1 := writeWALData(buf[offset:], walID, []byte{1, 2, 3}, 0)
if d1.Offset(buf) != offset {
t.Fatal(d1.Offset(buf), offset)
}
if d1.WALID() != walID {
t.Fatal(d1.WALID(), walID)
}
prevOffset := offset
offset = d1.NextOffset(buf)
walID++
d2 := writeWALData(buf[offset:], walID, []byte{2, 3, 4}, prevOffset)
if d2.Offset(buf) != offset {
t.Fatal(d2.Offset(buf), offset)
}
if d2.WALID() != walID {
t.Fatal(d2.WALID(), walID)
}
if d1.NextOffset(buf) != d2.Offset(buf) {
t.Fatal(d1.NextOffset(buf), d2.Offset(buf))
}
if d2.PrevOffset() != d1.Offset(buf) {
t.Fatal(d2.PrevOffset(), d1.Offset(buf))
}
log.Printf("%d", d2.Offset(buf))
}

+ 1
- 1
fsstorage/error.go View File

@ -4,6 +4,6 @@ import "errors"
var (
ErrInvalidData = errors.New("invalid data")
ErrSegmentFull = errors.New("segment is full")
ErrSegmentFull = errors.New("segment is full") // TODO: Rename InsuficientSpace
ErrInvalidWALID = errors.New("invalid WAL ID")
)

+ 38
- 66
fsstorage/segment.go View File

@ -1,32 +1,6 @@
package fsstorage
import (
"encoding/binary"
"io/ioutil"
"os"
"path/filepath"
"sync"
"syscall"
"time"
"unsafe"
"git.crumpington.com/public/toolbox/fsutil"
"git.crumpington.com/public/toolbox/mmap"
)
const fixedDataSize = 32
type walEntry struct {
offset uint64
prevOffset uint64
cmdDataSize uint32
WALID uint64
TS uint64 // Unix timestamp in seconds.
CmdID uint32
CmdData []byte
}
/*
type segment struct {
dataPath string
tailOffsetPath string
@ -36,23 +10,22 @@ type segment struct {
nextWALID uint64
firstWALID uint64
lock sync.Mutex // Protects tailOffset.
tailOffset uint64
f *os.File
mapped []byte
// TODO: Append lock?
}
func (seg *segment) getTailOffset() uint64 {
seg.lock.Lock()
defer seg.lock.Unlock()
// TODO
return seg.tailOffset
}
func (seg *segment) setTailOffset(offset uint64) {
seg.lock.Lock()
// TODO
seg.tailOffset = offset
seg.lock.Unlock()
}
// Given a directory and file size in bytes, create a new segment. The actual
@ -165,11 +138,13 @@ func openSegment(dir string) (*segment, error) {
// Set insertion point.
seg.nextWALID = seg.firstWALID
seg.insertAt = 1
if tail := seg.Tail(); tail != nil {
seg.nextWALID = tail.WALID + 1
seg.insertAt = int(tail.offset + fixedDataSize + uint64(len(tail.CmdData)))
}
seg.insertAt = 1
if walID, tail := seg.Tail(); tail != nil {
seg.nextWALID = walID + 1
seg.insertAt = int(tail.offset + offsetData + uint64(len(tail.CmdData)))
}
return seg, nil
}
@ -204,40 +179,36 @@ func (seg *segment) Close() error {
return nil
}
func (seg *segment) Head() *walEntry {
func (seg *segment) Head() (WALEntry, bool) {
if seg.getTailOffset() == 0 {
return nil
return WALEntry{}, false
}
return seg.loadEntry(1)
return seg.loadEntry(1), true
}
func (seg *segment) Tail() *walEntry {
func (seg *segment) Tail() (walID uint64, data []byte) {
offset := seg.getTailOffset()
if offset == 0 {
return nil
return 0, nil
}
return seg.loadEntry(offset)
}
func (seg *segment) loadEntry(offset uint64) *walEntry {
entry := &walEntry{
offset: offset,
prevOffset: *(*uint64)(unsafe.Pointer(&seg.mapped[offset])),
WALID: *(*uint64)(unsafe.Pointer(&seg.mapped[offset+8])),
TS: *(*uint64)(unsafe.Pointer(&seg.mapped[offset+16])),
CmdID: *(*uint32)(unsafe.Pointer(&seg.mapped[offset+24])),
cmdDataSize: *(*uint32)(unsafe.Pointer(&seg.mapped[offset+28])),
}
func (seg *segment) loadEntry(offset uint64) WALEntry {
return loadWALEntry(seg.mapped[offset:])
dataStart := offset + fixedDataSize
dataEnd := dataStart + uint64(entry.cmdDataSize)
entry.CmdData = seg.mapped[dataStart:dataEnd]
return entry
}
walID = *(*uint64)(unsafe.Pointer(&seg.mapped[offset+offsetWALID]))
size := *(*uint32)(unsafe.Pointer(&seg.mapped[offset+offsetDataLen]))
func (seg *segment) Next(entry *walEntry) *walEntry {
offset := entry.offset + fixedDataSize + uint64(entry.cmdDataSize)
dataStart := offset + offsetData
dataEnd := dataStart + uint64(size)
data = seg.mapped[dataStart:dataEnd]
return
}
func (seg *segment) Next(data []byte) (walID uint64, data []byte) {
offset := *(*uint64)(unsafe.Add(unsafe.Pointer(&data[0]), len(data)))
if offset > seg.getTailOffset() {
return nil
}
@ -245,19 +216,20 @@ func (seg *segment) Next(entry *walEntry) *walEntry {
return seg.loadEntry(offset)
}
func (seg *segment) Prev(entry *walEntry) *walEntry {
func (seg *segment) Prev(entry *walEntry) (walID uint64, data []byte) {
prevOffset := *(*uint64)(unsafe.Add(unsafe.Pointer(&data[0]), -offsetData))
if entry.prevOffset == 0 {
return nil
}
return seg.loadEntry(entry.prevOffset)
}
func (seg *segment) Append(walID uint64, cmdID uint32, data []byte) error {
func (seg *segment) Append(walID uint64, data []byte) error {
if walID != seg.nextWALID {
return ErrInvalidWALID
}
needed := fixedDataSize + len(data)
needed := offsetData + len(data)
if len(seg.mapped)-seg.insertAt < needed {
return ErrSegmentFull
}
@ -265,15 +237,15 @@ func (seg *segment) Append(walID uint64, cmdID uint32, data []byte) error {
*(*uint64)(unsafe.Pointer(&seg.mapped[seg.insertAt])) = seg.getTailOffset()
seg.setTailOffset(uint64(seg.insertAt))
*(*uint64)(unsafe.Pointer(&seg.mapped[seg.insertAt+8])) = walID
*(*uint64)(unsafe.Pointer(&seg.mapped[seg.insertAt+16])) = uint64(time.Now().Unix())
*(*uint32)(unsafe.Pointer(&seg.mapped[seg.insertAt+24])) = cmdID
*(*uint32)(unsafe.Pointer(&seg.mapped[seg.insertAt+28])) = uint32(len(data))
*(*uint64)(unsafe.Pointer(&seg.mapped[seg.insertAt+offsetWALID])) = walID
*(*uint64)(unsafe.Pointer(&seg.mapped[seg.insertAt+offsetTS])) = uint64(time.Now().Unix())
*(*uint32)(unsafe.Pointer(&seg.mapped[seg.insertAt+offsetDataLen])) = uint32(len(data))
seg.insertAt += fixedDataSize
seg.insertAt += offsetData
seg.insertAt += copy(seg.mapped[seg.insertAt:], data)
seg.nextWALID++
return nil
}
*/

+ 4
- 10
fsstorage/segment_test.go View File

@ -1,11 +1,6 @@
package fsstorage
import (
"math/rand"
"reflect"
"testing"
)
/*
func TestSegmentInitSizing(t *testing.T) {
type TestCase struct {
inB uint64
@ -62,10 +57,9 @@ func TestSegmentAppend(t *testing.T) {
}
walID := uint64(0)
cmdID := uint32(12345)
data := []byte{9, 8, 7, 6, 5, 4, 3, 2, 1, 0}
if err := seg.Append(walID, cmdID, data); err != nil {
if err := seg.Append(walID, data); err != nil {
t.Fatal(err)
}
@ -76,9 +70,9 @@ func TestSegmentAppend(t *testing.T) {
}
walID = uint64(1)
cmdID = uint32(5432)
data = []byte{0, 1, 2, 3}
if err := seg.Append(walID, cmdID, data); err != nil {
if err := seg.Append(walID, data); err != nil {
t.Fatal(err)
}
}
*/

+ 0
- 1
interfaces.go View File

@ -4,7 +4,6 @@ import "time"
type WALEntry struct {
WALID uint64
CmdID uint32 // Application specific.
Data []byte
}


Loading…
Cancel
Save