Browse Source

WIP

master
jdl 1 year ago
parent
commit
421bd41215
18 changed files with 132 additions and 107 deletions
  1. +8
    -0
      fsstorage/cursor.go
  2. +8
    -0
      fsstorage/emptycursor.go
  3. +0
    -2
      fsstorage/error.go
  4. +13
    -13
      fsstorage/record.go
  5. +4
    -4
      fsstorage/record_test.go
  6. +4
    -4
      fsstorage/seggenerator.go
  7. +1
    -1
      fsstorage/seggenerator_test.go
  8. +6
    -6
      fsstorage/segment.go
  9. +2
    -6
      fsstorage/segment_test.go
  10. +4
    -4
      fsstorage/segmentcursor.go
  11. +9
    -10
      fsstorage/segmenttx.go
  12. +16
    -12
      fsstorage/util_test.go
  13. +14
    -23
      fsstorage/wal.go
  14. +5
    -6
      fsstorage/wal_test.go
  15. +14
    -14
      fsstorage/walcursor.go
  16. +1
    -1
      fsstorage/walutil.go
  17. +9
    -1
      go.mod
  18. +14
    -0
      go.sum

+ 8
- 0
fsstorage/cursor.go View File

@ -0,0 +1,8 @@
package fsstorage
type Cursor interface {
Find(walID int64) *Record
Head() *Record
Next(rec *Record) *Record
Tail() *Record
}

+ 8
- 0
fsstorage/emptycursor.go View File

@ -0,0 +1,8 @@
package fsstorage
type emptyCursor struct{}
func (c emptyCursor) Find(walID int64) *Record { return nil }
func (c emptyCursor) Head() *Record { return nil }
func (c emptyCursor) Tail() *Record { return nil }
func (c emptyCursor) Next(*Record) *Record { return nil }

+ 0
- 2
fsstorage/error.go View File

@ -6,6 +6,4 @@ var (
ErrInvalidData = errors.New("invalid data")
ErrInsufficientSpace = errors.New("insufficient space")
ErrDataTooLarge = errors.New("data too large")
ErrInvalidWALID = errors.New("invalid WAL ID")
ErrNoData = errors.New("no data")
)

fsstorage/walrecord.go → fsstorage/record.go View File


fsstorage/walrecord_test.go → fsstorage/record_test.go View File


+ 4
- 4
fsstorage/seggenerator.go View File

@ -10,13 +10,13 @@ import (
// The segGenerator generates empty segments in the background.
type segGenerator struct {
conf WALConfig
conf Config
nextSegID int64
nextSeg chan *segment
stop chan bool
}
func newSegGenerator(conf WALConfig, nextSegID int64) *segGenerator {
func newSegGenerator(conf Config, nextSegID int64) *segGenerator {
return &segGenerator{
conf: conf,
nextSegID: nextSegID,
@ -66,9 +66,9 @@ FOR:
select {
case <-time.After(8 * time.Second):
continue
continue FOR
case <-f.stop:
break
break FOR
}
}


+ 1
- 1
fsstorage/seggenerator_test.go View File

@ -3,7 +3,7 @@ package fsstorage
import "testing"
func TestSegGenerator(t *testing.T) {
conf := WALConfig{Dir: newTestPath(), SegmentSize: 1024}
conf := Config{Dir: newTestPath(), SegmentSize: 1024}
sg := newSegGenerator(conf, 32)
go sg.Run()


+ 6
- 6
fsstorage/segment.go View File

@ -20,7 +20,7 @@ type segment struct {
lock sync.Mutex
insertAt int
nextWALID int64
nextID int64
tailOffset *atomicInt64
archivedAt *atomicInt64
@ -89,7 +89,7 @@ func openSegment(dir string) (*segment, error) {
dataPath: filepath.Join(dir, "data"),
tailOffsetPath: filepath.Join(dir, "tail-offset"),
archivedAtPath: filepath.Join(dir, "archivedAt"),
nextWALID: -1,
nextID: -1,
tailOffset: newAtomicInt64(),
archivedAt: newAtomicInt64(),
}
@ -136,7 +136,7 @@ func openSegment(dir string) (*segment, error) {
// Update insertion point if we have a tail pointer .
seg.insertAt = 0
if tail := seg.Cursor().Tail(); tail != nil {
seg.nextWALID = tail.WALID() + 1
seg.nextID = tail.ID() + 1
seg.insertAt = int(tail.nextOffset())
}
@ -182,7 +182,7 @@ func (seg *segment) Close() error {
}
seg.insertAt = 0
seg.nextWALID = -1
seg.nextID = -1
seg.tailOffset.Set(0)
seg.archivedAt.Set(0)
@ -219,7 +219,7 @@ func (seg *segment) Update(fn func(tx *segmentTX) error) error {
tx := &segmentTX{
seg: seg,
nextWALID: seg.nextWALID,
nextID: seg.nextID,
tailOffset: seg.tailOffset.Get(),
insertAt: seg.insertAt,
}
@ -231,7 +231,7 @@ func (seg *segment) Update(fn func(tx *segmentTX) error) error {
}
seg.tailOffset.Set(tx.tailOffset)
seg.nextWALID = tx.nextWALID
seg.nextID = tx.nextID
seg.insertAt = tx.insertAt
return nil
}


+ 2
- 6
fsstorage/segment_test.go View File

@ -38,9 +38,7 @@ func TestSegment_Append_multiple(t *testing.T) {
_ = seg.Update(func(tx *segmentTX) error {
for _, tr := range testRecs {
if _, err := tx.Append(tr.WALID, tr.Data); err != nil {
t.Fatal(err)
}
tx.Append(tr.ID, tr.Data)
}
return nil
})
@ -62,9 +60,7 @@ func TestSegment_Append_Close_Open(t *testing.T) {
err = seg.Update(func(tx *segmentTX) error {
for _, tr := range testRecs {
if _, err := tx.Append(tr.WALID, tr.Data); err != nil {
t.Fatal(err)
}
tx.Append(tr.ID, tr.Data)
}
return nil
})


+ 4
- 4
fsstorage/segmentcursor.go View File

@ -10,27 +10,27 @@ func (cur segmentCursor) Loaded() bool {
return cur.mapped != nil
}
func (cur segmentCursor) loadRecord(offset int64, rec *WALRecord) *WALRecord {
func (cur segmentCursor) loadRecord(offset int64, rec *Record) *Record {
rec = readWALRecord(cur.mapped, offset, rec)
rec.segID = cur.segID
return rec
}
func (cur segmentCursor) Head() *WALRecord {
func (cur segmentCursor) Head() *Record {
if cur.tailOffset == -1 {
return nil
}
return cur.loadRecord(0, nil)
}
func (cur segmentCursor) Tail() *WALRecord {
func (cur segmentCursor) Tail() *Record {
if cur.tailOffset == -1 {
return nil
}
return cur.loadRecord(cur.tailOffset, nil)
}
func (cur segmentCursor) Next(rec *WALRecord) *WALRecord {
func (cur segmentCursor) Next(rec *Record) *Record {
offset := rec.nextOffset()
if offset > cur.tailOffset {
return nil


+ 9
- 10
fsstorage/segmenttx.go View File

@ -1,26 +1,25 @@
package fsstorage
import "fmt"
type segmentTX struct {
seg *segment
nextWALID int64
nextID int64
tailOffset int64
insertAt int
}
func (tx *segmentTX) Append(walID int64, data []byte) (*WALRecord, error) {
if walID != tx.nextWALID && tx.nextWALID != -1 {
return nil, ErrInvalidWALID
func (tx *segmentTX) Append(walID int64, data []byte) *Record {
if walID != tx.nextID && tx.nextID != -1 {
panic(fmt.Sprintf("Invalid WAL ID. Expected %d but got %d",
tx.nextID, walID))
}
tx.nextWALID = walID + 1
tx.nextID = walID + 1
rec := writeWALRecord(tx.seg.mapped, int64(tx.insertAt), walID, data)
tx.tailOffset = rec.offset
tx.insertAt = int(rec.nextOffset())
rec.segID = tx.seg.ID()
return rec, nil
}
func (tx *segmentTX) ReminaingSpace() int {
return len(tx.seg.mapped) - tx.insertAt
return rec
}

+ 16
- 12
fsstorage/util_test.go View File

@ -29,7 +29,7 @@ func newTestPath() string {
type TestRec struct {
Offset int64
NextOffset int64
WALID int64
ID int64
Data []byte
}
@ -46,7 +46,7 @@ func generateTestRecords(count int) (recs []TestRec, totalSize int64) {
recs[i] = TestRec{
Offset: offset,
NextOffset: offset + int64(recOffsetData+len(data)),
WALID: walID,
ID: walID,
Data: data,
}
offset += recOffsetData + int64(len(data))
@ -56,7 +56,7 @@ func generateTestRecords(count int) (recs []TestRec, totalSize int64) {
return
}
func (tr TestRec) Equals(rhs *WALRecord) error {
func (tr TestRec) Equals(rhs *Record) error {
if rhs == nil {
return fmt.Errorf("nil record")
}
@ -69,8 +69,8 @@ func (tr TestRec) Equals(rhs *WALRecord) error {
return fmt.Errorf("NextOffset %d != %d", rhs.nextOffset(), tr.NextOffset)
}
if rhs.WALID() != tr.WALID {
return fmt.Errorf("WALID %d != %d", rhs.WALID(), tr.WALID)
if rhs.ID() != tr.ID {
return fmt.Errorf("WALID %d != %d", rhs.ID(), tr.ID)
}
if rhs.dataLen() != uint32(len(tr.Data)) {
@ -106,6 +106,7 @@ func segmentContains(seg *segment, testRecs []TestRec) error {
}
func walDataForID(wal *WAL, id int64) []byte {
segSize := wal.conf.SegmentSize
switch id % 100 {
@ -125,8 +126,11 @@ func walDataForID(wal *WAL, id int64) []byte {
func insertWALData(wal *WAL, count int) error {
id := int64(1)
wal.Read(func(cur WALCursor) {
id = cur.Tail().WALID() + 1
wal.Read(func(cur Cursor) {
tail := cur.Tail()
if tail != nil {
id = tail.ID() + 1
}
})
remaining := count
@ -154,16 +158,16 @@ func insertWALData(wal *WAL, count int) error {
func verifyWALData(wal *WAL, count int) (err error) {
wal.Read(func(cur WALCursor) {
wal.Read(func(cur Cursor) {
// Check each entry and count.
n := 0
rec := cur.Head()
for rec != nil {
n++
expected := walDataForID(wal, rec.WALID())
expected := walDataForID(wal, rec.ID())
if !bytes.Equal(rec.Data(), expected) {
err = fmt.Errorf("%d: %s != %s", rec.WALID(), rec.Data(), expected)
err = fmt.Errorf("%d: %s != %s", rec.ID(), rec.Data(), expected)
return
}
rec = cur.Next(rec)
@ -175,8 +179,8 @@ func verifyWALData(wal *WAL, count int) (err error) {
}
// Try some random IDs.
minID := cur.Head().WALID()
maxID := cur.Tail().WALID()
minID := cur.Head().ID()
maxID := cur.Tail().ID()
for i := 0; i < 1024; i++ {
randID := minID + rand.Int63n(maxID-minID+1)


+ 14
- 23
fsstorage/wal.go View File

@ -11,15 +11,15 @@ import (
type WAL struct {
// TODO: Add lock file in WAL dir.
conf WALConfig
conf Config
lock sync.Mutex
numReaders int64
segs []*segment // Ordered list of segments.
current *segment
nextWALID int64
current *segment
nextID int64
maxAppliedIDPath string
maxAppliedID int64
@ -29,18 +29,18 @@ type WAL struct {
segGenerator *segGenerator
}
type WALConfig struct {
type Config struct {
Dir string
SegmentSize int64
}
func OpenWAL(conf WALConfig) (*WAL, error) {
func OpenWAL(conf Config) (*WAL, error) {
var err error
wal := &WAL{
conf: conf,
numReaders: 0,
nextWALID: 1,
nextID: 1,
maxAppliedIDPath: filepath.Join(conf.Dir, "max-applied-id"),
maxAppliedID: 0,
}
@ -53,7 +53,7 @@ func OpenWAL(conf WALConfig) (*WAL, error) {
// Set current, nextWALID.
if len(wal.segs) != 0 {
wal.current = wal.segs[len(wal.segs)-1]
wal.nextWALID = wal.current.Cursor().Tail().WALID() + 1
wal.nextID = wal.current.Cursor().Tail().ID() + 1
}
// Set max data size.
@ -93,30 +93,24 @@ func OpenWAL(conf WALConfig) (*WAL, error) {
// ----------------------------------------------------------------------------
// May return ErrNoData if the WAL is empty.
func (wal *WAL) Read(fn func(cur WALCursor)) error {
func (wal *WAL) Read(fn func(cur Cursor)) {
wal.lock.Lock()
wal.numReaders++
segs := wal.segs
wal.lock.Unlock()
cur, err := newWALCursor(segs)
if err == nil {
fn(cur)
}
fn(newWALCursor(segs))
wal.lock.Lock()
wal.numReaders--
wal.lock.Unlock()
return err
}
// ----------------------------------------------------------------------------
func (wal *WAL) Append(l [][]byte) (recs []*WALRecord, err error) {
func (wal *WAL) Append(l [][]byte) (recs []*Record, err error) {
var batch [][]byte
recs = make([]*WALRecord, 0, len(batch))
recs = make([]*Record, 0, len(batch))
for len(l) > 0 {
// Get the next batch to append.
@ -131,15 +125,12 @@ func (wal *WAL) Append(l [][]byte) (recs []*WALRecord, err error) {
continue
}
nextWALID := wal.nextWALID
nextWALID := wal.nextID
updatedRecs := recs
err = wal.current.Update(func(tx *segmentTX) error {
for _, data := range batch {
rec, err := tx.Append(nextWALID, data)
if err != nil {
return err
}
rec := tx.Append(nextWALID, data)
updatedRecs = append(updatedRecs, rec)
nextWALID++
}
@ -150,7 +141,7 @@ func (wal *WAL) Append(l [][]byte) (recs []*WALRecord, err error) {
return recs, err
}
wal.nextWALID = nextWALID
wal.nextID = nextWALID
recs = updatedRecs
}


+ 5
- 6
fsstorage/wal_test.go View File

@ -12,7 +12,7 @@ func TestWAL_oneRecord(t *testing.T) {
t.Fatal(err)
}
wal, err := OpenWAL(WALConfig{Dir: dir, SegmentSize: 256})
wal, err := OpenWAL(Config{Dir: dir, SegmentSize: 256})
if err != nil {
t.Fatal(err)
}
@ -26,15 +26,15 @@ func TestWAL_oneRecord(t *testing.T) {
if len(recs) != 1 {
t.Fatal(recs)
}
if recs[0].WALID() != 1 {
t.Fatal(recs[0].WALID())
if recs[0].ID() != 1 {
t.Fatal(recs[0].ID())
}
if !bytes.Equal(recs[0].Data(), l[0]) {
t.Fatal(recs[0].Data())
}
wal.Read(func(cur WALCursor) {
wal.Read(func(cur Cursor) {
rec := cur.Head()
if rec == nil {
t.Fatal(rec)
@ -53,7 +53,7 @@ func TestWAL_basic(t *testing.T) {
t.Fatal(err)
}
conf := WALConfig{
conf := Config{
Dir: dir,
SegmentSize: 1024,
}
@ -89,5 +89,4 @@ func TestWAL_basic(t *testing.T) {
if err := wal.Close(); err != nil {
t.Fatal(err)
}
}

+ 14
- 14
fsstorage/walcursor.go View File

@ -1,12 +1,12 @@
package fsstorage
type WALCursor struct {
type walCursor struct {
firstSegID int64
segs []*segment
cursors []segmentCursor
}
func newWALCursor(segs []*segment) (WALCursor, error) {
func newWALCursor(segs []*segment) Cursor {
iOut := 0
for _, seg := range segs {
if seg.Cursor().Head() != nil {
@ -17,21 +17,21 @@ func newWALCursor(segs []*segment) (WALCursor, error) {
segs = segs[:iOut]
if len(segs) == 0 {
return WALCursor{}, ErrNoData
return emptyCursor{}
}
return WALCursor{
return walCursor{
firstSegID: segs[0].ID(),
segs: segs,
cursors: make([]segmentCursor, len(segs)),
}, nil
}
}
func (cur WALCursor) getSegIndex(id int64) int {
func (cur walCursor) getSegIndex(id int64) int {
return int(id - cur.firstSegID)
}
func (c WALCursor) getCursor(segID int64) (segmentCursor, bool) {
func (c walCursor) getCursor(segID int64) (segmentCursor, bool) {
idx := c.getSegIndex(segID)
if idx < 0 || idx >= len(c.segs) {
return segmentCursor{}, false
@ -47,17 +47,17 @@ func (c WALCursor) getCursor(segID int64) (segmentCursor, bool) {
return cur, true
}
func (c WALCursor) Head() *WALRecord {
func (c walCursor) Head() *Record {
cur, _ := c.getCursor(c.segs[0].ID())
return cur.Head()
}
func (c WALCursor) Tail() *WALRecord {
func (c walCursor) Tail() *Record {
cur, _ := c.getCursor(c.segs[len(c.segs)-1].ID())
return cur.Tail()
}
func (c WALCursor) Next(rec *WALRecord) *WALRecord {
func (c walCursor) Next(rec *Record) *Record {
cur, _ := c.getCursor(rec.segID)
if next := cur.Next(rec); next != nil {
return next
@ -68,19 +68,19 @@ func (c WALCursor) Next(rec *WALRecord) *WALRecord {
return nil
}
func (c WALCursor) Find(walID int64) *WALRecord {
func (c walCursor) Find(walID int64) *Record {
for _, seg := range c.segs {
cur := seg.Cursor()
if cur.Head().WALID() > walID {
if cur.Head().ID() > walID {
break
}
if cur.Tail().WALID() < walID {
if cur.Tail().ID() < walID {
continue
}
rec := cur.Head()
for rec != nil && rec.WALID() != walID {
for rec != nil && rec.ID() != walID {
rec = cur.Next(rec)
}
return rec


+ 1
- 1
fsstorage/walutil.go View File

@ -47,7 +47,7 @@ func loadSegs(dir string) ([]*segment, error) {
log.Printf("Segment ID: %d - %d != 1", right.ID(), left.ID())
return nil, ErrInvalidData
}
if right.Cursor().Head().WALID()-left.Cursor().Tail().WALID() != 1 {
if right.Cursor().Head().ID()-left.Cursor().Tail().ID() != 1 {
return nil, ErrInvalidData
}


+ 9
- 1
go.mod View File

@ -4,4 +4,12 @@ go 1.17
require git.crumpington.com/public/toolbox v1.3.1
require golang.org/x/sys v0.0.0-20210910150752-751e447fb3d0 // indirect
require (
github.com/yuin/goldmark v1.4.1 // indirect
golang.org/x/mod v0.5.0 // indirect
golang.org/x/net v0.0.0-20210916014120-12bc252f5db8 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20210915083310-ed5796bab164 // indirect
golang.org/x/tools v0.1.5 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
)

+ 14
- 0
go.sum View File

@ -1,6 +1,20 @@
git.crumpington.com/public/toolbox v1.3.1 h1:UQEtMD5o1Lcbh3YGvYCyrGDD9ZvIAt2bF22t4A+eCFI=
git.crumpington.com/public/toolbox v1.3.1/go.mod h1:3cNzhpDPL6y9CYm8gJ2Rfvyd4lvjNzSVfS4/Yp7nGA8=
github.com/yuin/goldmark v1.4.1 h1:/vn0k+RBvwlxEmP5E7SZMqNxPhfMVFEJiykr15/0XKM=
github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
golang.org/x/mod v0.5.0 h1:UG21uOlmZabA4fW5i7ZX6bjw1xELEGg/ZLgZq9auk/Q=
golang.org/x/mod v0.5.0/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro=
golang.org/x/net v0.0.0-20210916014120-12bc252f5db8 h1:/6y1LfuqNuQdHAm0jjtPtgRcxIxjVZgm5OTu8/QhZvk=
golang.org/x/net v0.0.0-20210916014120-12bc252f5db8/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210910150752-751e447fb3d0 h1:xrCZDmdtoloIiooiA9q0OQb9r8HejIHYoHGhGCe1pGg=
golang.org/x/sys v0.0.0-20210910150752-751e447fb3d0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210915083310-ed5796bab164 h1:7ZDGnxgHAMw7thfC5bEos0RDAccZKxioiWBhfIe+tvw=
golang.org/x/sys v0.0.0-20210915083310-ed5796bab164/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

Loading…
Cancel
Save