Browse Source

WIP

master
jdl 2 years ago
parent
commit
e48ad57c4f
13 changed files with 790 additions and 2 deletions
  1. +1
    -0
      .gitignore
  2. +20
    -2
      README.md
  3. +9
    -0
      fsstorage/error.go
  4. +9
    -0
      fsstorage/fsstorage.go
  5. +123
    -0
      fsstorage/object.go
  6. +145
    -0
      fsstorage/object_test.go
  7. +266
    -0
      fsstorage/segment.go
  8. +78
    -0
      fsstorage/segment_test.go
  9. +26
    -0
      fsstorage/util_test.go
  10. +7
    -0
      go.mod
  11. +6
    -0
      go.sum
  12. +99
    -0
      interfaces.go
  13. +1
    -0
      manager.go

+ 1
- 0
.gitignore View File

@ -15,3 +15,4 @@
# Dependency directories (remove the comment below to include it)
# vendor/
test-files/

+ 20
- 2
README.md View File

@ -1,3 +1,21 @@
# replication
# `replication`
Streaming replication framework
Streaming replication framework for golang.
## Replication
Each node has one role: `leader` or `follower`.
A follower exposes methods:
* GetMaxAppliedLogID
* Reset
* ApplyState - not synced
* ApplyLog - not synced
* ApplyMod - synchronous replication
If a node has a follower, it runs in one state:
* `SyncingState`
* `SyncingLog`
* `SyncingSync`

+ 9
- 0
fsstorage/error.go View File

@ -0,0 +1,9 @@
package fsstorage
import "errors"
var (
ErrInvalidData = errors.New("Invalid data")
ErrSegmentFull = errors.New("Segment is full")
ErrInvalidWALID = errors.New("Invalid WAL ID")
)

+ 9
- 0
fsstorage/fsstorage.go View File

@ -0,0 +1,9 @@
package fsstorage
type FSStorage struct {
addr stringObject
role uint64Object
leaderID uint64Object
followerAddr stringObject
maxAppliedWAL uint64Object
}

+ 123
- 0
fsstorage/object.go View File

@ -0,0 +1,123 @@
package fsstorage
import (
"encoding/binary"
"io/ioutil"
"sync"
"git.crumpington.com/public/toolbox/fsutil"
)
// An object is a file containing data that is read and written atomically.
type object struct {
path string
tmpPath string
data []byte
}
func newObject(path string) object {
return object{
path: path,
tmpPath: path + ".tmp",
}
}
func (o *object) Set(data []byte) error {
o.data = data
return fsutil.WriteFileAtomic(o.data, o.tmpPath, o.path)
}
func (o *object) Load() error {
data, err := ioutil.ReadFile(o.path)
if err != nil {
return err
}
o.data = data
return nil
}
// ----------------------------------------------------------------------------
type stringObject struct {
lock *sync.Mutex
object
value string
}
func newStringObject(path string) stringObject {
return stringObject{
lock: &sync.Mutex{},
object: newObject(path),
}
}
func (o *stringObject) Set(value string) error {
o.lock.Lock()
defer o.lock.Unlock()
o.value = value
return o.object.Set([]byte(value))
}
func (o stringObject) Get() string {
o.lock.Lock()
defer o.lock.Unlock()
return o.value
}
func (o *stringObject) Load() error {
o.lock.Lock()
defer o.lock.Unlock()
if err := o.object.Load(); err != nil {
return err
}
o.value = string(o.data)
return nil
}
// ----------------------------------------------------------------------------
type uint64Object struct {
lock *sync.Mutex
object
value uint64
}
func newUint64Object(path string) uint64Object {
return uint64Object{
lock: &sync.Mutex{},
object: newObject(path),
}
}
func (o *uint64Object) Set(value uint64) error {
o.lock.Lock()
defer o.lock.Unlock()
o.value = value
b := make([]byte, 8)
binary.LittleEndian.PutUint64(b, value)
return o.object.Set(b)
}
func (o uint64Object) Get() uint64 {
o.lock.Lock()
defer o.lock.Unlock()
return o.value
}
func (o *uint64Object) Load() error {
o.lock.Lock()
defer o.lock.Unlock()
if err := o.object.Load(); err != nil {
return err
}
if len(o.data) != 8 {
return ErrInvalidData
}
o.value = binary.LittleEndian.Uint64(o.data)
return nil
}

+ 145
- 0
fsstorage/object_test.go View File

@ -0,0 +1,145 @@
package fsstorage
import (
"bytes"
"encoding/hex"
"fmt"
"math/rand"
"os"
"path/filepath"
"testing"
)
func TestObject(t *testing.T) {
for i := 0; i < 100; i++ {
// Create random data.
buf := make([]byte, 1+rand.Int63n(31))
if _, err := rand.Read(buf); err != nil {
t.Fatal(err)
}
path := newTestPath()
obj := newObject(path)
if err := obj.Set(buf); err != nil {
t.Fatal(err)
}
if !bytes.Equal(obj.data, buf) {
t.Fatal(obj.data, buf)
}
obj2 := newObject(path)
if err := obj2.Load(); err != nil {
t.Fatal(err)
}
if !bytes.Equal(obj.data, obj2.data) {
t.Fatal(obj, obj2)
}
}
}
func TestObjectLoadNotFound(t *testing.T) {
obj := newObject("./doesnt-exist.data")
if err := obj.Load(); !os.IsNotExist(err) {
t.Fatal(err)
}
}
func TestStringObject(t *testing.T) {
if err := os.MkdirAll("test-files", 0700); err != nil {
t.Fatal(err)
}
for i := 0; i < 100; i++ {
// Create random data.
buf := make([]byte, 1+rand.Int63n(31))
if _, err := rand.Read(buf); err != nil {
t.Fatal(err)
}
value := hex.EncodeToString(buf)
path := filepath.Join("test-files", fmt.Sprintf("file.%d", i))
obj := newStringObject(path)
if err := obj.Set(value); err != nil {
t.Fatal(err)
}
if obj.Get() != value {
t.Fatal(obj.value, value)
}
obj2 := newStringObject(path)
if err := obj2.Load(); err != nil {
t.Fatal(err)
}
if obj.Get() != obj2.Get() {
t.Fatal(obj, obj2)
}
}
}
func TestStringObjectLoadNotFound(t *testing.T) {
obj := newStringObject("./doesnt-exist.data")
if err := obj.Load(); !os.IsNotExist(err) {
t.Fatal(err)
}
}
func TestUint64Object(t *testing.T) {
if err := os.MkdirAll("test-files", 0700); err != nil {
t.Fatal(err)
}
for i := 0; i < 100; i++ {
// Create random data.
value := rand.Uint64()
path := filepath.Join("test-files", fmt.Sprintf("file.%d", i))
obj := newUint64Object(path)
if err := obj.Set(value); err != nil {
t.Fatal(err)
}
if obj.Get() != value {
t.Fatal(obj.value, value)
}
obj2 := newUint64Object(path)
if err := obj2.Load(); err != nil {
t.Fatal(err)
}
if obj.Get() != obj2.Get() {
t.Fatal(obj, obj2)
}
}
}
func TestUint64ObjectLoadNotFound(t *testing.T) {
obj := newUint64Object("./doesnt-exist.data")
if err := obj.Load(); !os.IsNotExist(err) {
t.Fatal(err)
}
}
func TestUint64ObjectInvalidData(t *testing.T) {
path := newTestPath()
f, err := os.Create(path)
if err != nil {
t.Fatal(err)
}
_, err = f.Write([]byte{0, 1, 2, 3, 4, 5, 6})
if err != nil {
t.Fatal(err)
}
if err := f.Close(); err != nil {
t.Fatal(err)
}
obj := newUint64Object(path)
if err := obj.Load(); err != ErrInvalidData {
t.Fatal(err)
}
}

+ 266
- 0
fsstorage/segment.go View File

@ -0,0 +1,266 @@
package fsstorage
import (
"encoding/binary"
"io/ioutil"
"os"
"path/filepath"
"syscall"
"time"
"unsafe"
"git.crumpington.com/public/toolbox/fsutil"
"git.crumpington.com/public/toolbox/mmap"
)
func init() {
if unsafe.Sizeof(walFixedData{}) != fixedDataSize {
panic(unsafe.Sizeof(walFixedData{}))
}
}
type walFixedData struct {
prevOffset uint64
TS uint64 // Unix timestamp (seconds).
WALID uint64
CmdID uint32
cmdDataSize uint32
}
const fixedDataSize = 32
type walEntry struct {
*walFixedData
offset uint64
CmdData []byte
}
type segment struct {
dataPath string
tailOffsetPath string
tailOffsetStagingPath string
insertAt int
nextWALID uint64
firstWALID uint64
tailOffset uint64
f *os.File
mapped []byte
}
// Given a directory and file size in bytes, create a new segment. The actual
// size will be rounded up to a multiple of 1024*1024.
func initSegment(dir string, firstWALID, size uint64) error {
// Size will be at least 1M, and a multiple of 1M.
M := uint64(1024 * 1024)
if size%M != 0 {
size += M
size -= size % M
}
if size < M {
size = M
}
// Create directory.
if err := os.MkdirAll(dir, 0700); err != nil {
return err
}
// Create the file.
f, err := os.Create(filepath.Join(dir, "data"))
if err != nil {
return err
}
// Write first WAL ID.
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, firstWALID)
if _, err := f.Write(buf); err != nil {
f.Close()
return err
}
// Truncate to size.
if err := f.Truncate(int64(size)); err != nil {
f.Close()
return err
}
// Sync and close file.
if err := f.Sync(); err != nil {
f.Close()
return err
}
if err := f.Close(); err != nil {
return err
}
// Write tail offset.
for i := range buf {
buf[i] = 0
}
return fsutil.WriteFileAtomic(
buf,
filepath.Join(dir, "tail-offset.staging"),
filepath.Join(dir, "tail-offset"))
}
func openSegment(dir string) (*segment, error) {
var err error
seg := &segment{
dataPath: filepath.Join(dir, "data"),
tailOffsetPath: filepath.Join(dir, "tail-offset"),
tailOffsetStagingPath: filepath.Join(dir, "tail-offset.staged"),
}
// Open the file.
seg.f, err = os.OpenFile(seg.dataPath, os.O_RDWR, 0600)
if err != nil {
return nil, err
}
// Get the mapping size.
fi, err := seg.f.Stat()
if err != nil {
seg.Close()
return nil, err
}
// Map the file data.
seg.mapped, err = mmap.Map(
syscall.PROT_READ|syscall.PROT_WRITE,
syscall.MAP_SHARED,
fi.Size(),
seg.f)
if err != nil {
seg.Close()
return nil, err
}
seg.mapped = seg.mapped[:cap(seg.mapped)]
// Read the first WAL ID.
if len(seg.mapped) < 8 {
seg.Close()
return nil, ErrInvalidData
}
seg.firstWALID = binary.LittleEndian.Uint64(seg.mapped[:8])
// Load the tail offset.
buf, err := ioutil.ReadFile(seg.tailOffsetPath)
if err != nil || len(buf) != 8 {
seg.Close()
return nil, err
}
seg.tailOffset = binary.LittleEndian.Uint64(buf)
// Set insertion point.
seg.nextWALID = seg.firstWALID
seg.insertAt = 1
if tail := seg.Tail(); tail != nil {
seg.nextWALID = tail.WALID + 1
seg.insertAt = int(tail.offset + fixedDataSize + uint64(len(tail.CmdData)))
}
return seg, nil
}
func (seg *segment) Sync() error {
if err := mmap.Sync(seg.mapped); err != nil {
return err
}
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, seg.tailOffset)
return fsutil.WriteFileAtomic(buf, seg.tailOffsetStagingPath, seg.tailOffsetPath)
}
func (seg *segment) Close() error {
if seg.mapped != nil {
if err := mmap.Unmap(seg.mapped); err != nil {
return err
}
seg.mapped = nil
}
if seg.f != nil {
if err := seg.f.Close(); err != nil {
return err
}
seg.f = nil
}
seg.firstWALID = 0
seg.tailOffset = 0
return nil
}
func (seg *segment) Head() *walEntry {
if seg.tailOffset == 0 {
return nil
}
return seg.loadEntry(1)
}
func (seg *segment) Tail() *walEntry {
if seg.tailOffset == 0 {
return nil
}
return seg.loadEntry(seg.tailOffset)
}
func (seg *segment) loadEntry(offset uint64) *walEntry {
fixed := (*walFixedData)(unsafe.Pointer(&seg.mapped[offset]))
entry := &walEntry{
walFixedData: fixed,
offset: offset,
}
dataStart := offset + fixedDataSize
dataEnd := dataStart + uint64(entry.cmdDataSize)
entry.CmdData = seg.mapped[dataStart:dataEnd]
return entry
}
func (seg *segment) Next(entry *walEntry) *walEntry {
offset := entry.offset + fixedDataSize + uint64(entry.cmdDataSize)
if offset > seg.tailOffset {
return nil
}
return seg.loadEntry(offset)
}
func (seg *segment) Prev(entry *walEntry) *walEntry {
if entry.prevOffset == 0 {
return nil
}
return seg.loadEntry(entry.prevOffset)
}
func (seg *segment) Append(walID uint64, cmdID uint32, data []byte) error {
if walID != seg.nextWALID {
return ErrInvalidWALID
}
needed := fixedDataSize + len(data)
if len(seg.mapped)-seg.insertAt < needed {
return ErrSegmentFull
}
fixed := (*walFixedData)(unsafe.Pointer(&seg.mapped[seg.insertAt]))
fixed.prevOffset = seg.tailOffset
fixed.TS = uint64(time.Now().Unix())
fixed.WALID = walID
fixed.CmdID = cmdID
fixed.cmdDataSize = uint32(len(data))
seg.tailOffset = uint64(seg.insertAt)
seg.insertAt += fixedDataSize
seg.insertAt += copy(seg.mapped[seg.insertAt:], data)
seg.nextWALID++
return nil
}

+ 78
- 0
fsstorage/segment_test.go View File

@ -0,0 +1,78 @@
package fsstorage
import (
"math/rand"
"reflect"
"testing"
)
func TestSegmentInitSizing(t *testing.T) {
type TestCase struct {
inB uint64
outMB int
}
cases := []TestCase{
{inB: 0, outMB: 1},
{inB: 1, outMB: 1},
{inB: 1024*1024 - 1, outMB: 1},
{inB: 1024 * 1024, outMB: 1},
{inB: 1024*1024 + 1, outMB: 2},
{inB: 2*1024*1024 - 1, outMB: 2},
{inB: 2 * 1024 * 1024, outMB: 2},
}
for _, tc := range cases {
dir := newTestPath()
firstWALID := uint64(rand.Int63())
if err := initSegment(dir, firstWALID, tc.inB); err != nil {
t.Fatal(err)
}
seg, err := openSegment(dir)
if err != nil {
panic(err)
t.Fatal(err)
}
if seg.firstWALID != firstWALID {
t.Fatal(seg.firstWALID, firstWALID)
}
if len(seg.mapped) != tc.outMB*1024*1024 {
t.Fatal(len(seg.mapped), tc.outMB)
}
if seg.insertAt != 1 {
t.Fatal(seg.insertAt)
}
}
}
func TestSegmentAppend(t *testing.T) {
dir := newTestPath()
err := initSegment(dir, 0, 0)
if err != nil {
t.Fatal(err)
}
seg, err := openSegment(dir)
if err != nil {
t.Fatal(err)
}
walID := uint64(0)
cmdID := uint32(12345)
data := []byte{9, 8, 7, 6, 5, 4, 3, 2, 1, 0}
if err := seg.Append(walID, cmdID, data); err != nil {
t.Fatal(err)
}
head := seg.Head()
tail := seg.Tail()
if !reflect.DeepEqual(head, tail) {
t.Fatal(*head, *tail)
}
}

+ 26
- 0
fsstorage/util_test.go View File

@ -0,0 +1,26 @@
package fsstorage
import (
"encoding/hex"
"fmt"
"math/rand"
"os"
"path/filepath"
"time"
)
func newTestPath() string {
dir := "test-files"
if err := os.MkdirAll(dir, 0700); err != nil {
panic(err)
}
buf := make([]byte, 16)
if _, err := rand.Read(buf); err != nil {
panic(err)
}
name := fmt.Sprintf("%d.%x",
time.Now().UnixNano(), hex.EncodeToString(buf))
return filepath.Join(dir, name)
}

+ 7
- 0
go.mod View File

@ -0,0 +1,7 @@
module git.crumpington.com/public/replication
go 1.17
require git.crumpington.com/public/toolbox v1.3.1
require golang.org/x/sys v0.0.0-20210910150752-751e447fb3d0 // indirect

+ 6
- 0
go.sum View File

@ -0,0 +1,6 @@
git.crumpington.com/public/toolbox v1.3.1 h1:UQEtMD5o1Lcbh3YGvYCyrGDD9ZvIAt2bF22t4A+eCFI=
git.crumpington.com/public/toolbox v1.3.1/go.mod h1:3cNzhpDPL6y9CYm8gJ2Rfvyd4lvjNzSVfS4/Yp7nGA8=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210910150752-751e447fb3d0 h1:xrCZDmdtoloIiooiA9q0OQb9r8HejIHYoHGhGCe1pGg=
golang.org/x/sys v0.0.0-20210910150752-751e447fb3d0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=

+ 99
- 0
interfaces.go View File

@ -0,0 +1,99 @@
package replication
import "time"
type WALEntry struct {
WALID uint64
CmdID uint16 // Application specific.
Data []byte
}
type App interface {
EncodeCmd(cmd interface{}, entry WALEntry) WALEntry
DecodeCmd(entry WALEntry) (cmd interface{})
Apply(cmds []interface{}) (responseCodes []uint16)
ClearState() error
IterState() (cmdChan chan interface{})
}
type Manager interface {
Apply(cmd interface{}) (responseCode uint16, err error)
ReportFailed(format string, args ...interface{})
}
type Storage interface {
Address() (string, error)
SetAddress(addr string) error
Role() uint64
SetRole(role uint64) error
LeaderID() uint64
SetLeaderID(id uint64) error
FollowerAddress() (string, error)
SetFollowerAddress(addr string) error
// WriteWAL returns the new max wal ID.
WriteWAL(entries []WALEntry) error
// Update the max applied WAL ID.
SetMaxAppliedWAL(id uint64) error
MaxAppliedWAL() (uint64, error)
// The max stored WAL ID.
MaxWAL() (uint64, error)
// The min stored WAL ID.
MinWAL() (uint64, error)
// Remove WAL entries.
ClearWAL() error
TruncateWALBeforeID(id uint64) error
TruncateWALBeforeTime(time.Time) error
}
type Follower interface {
Connect(leaderID uint64, address string) error
MaxAppliedWAL() (id int64, err error)
ClearState(leaderID uint64) (err error)
ApplyState(leaderID uint64, entries []WALEntry) (err error)
ApplyWAL(leaderID uint64, entries []WALEntry) (respCodes []uint16, err error)
}
const (
NodeRoleUnknown = 0
NodeRoleLeader = 1
NodeRoleFollower = 2
NodeRoleForwarder = 3
)
const (
FollowerStateError = -1
FollowerStateNone = 0 // No follower.
FollowerStateNotConnected = 1
FollowerStateSyncingState = 2
FollowerStateSyncingLog = 3
FollowerStateSynchronous = 4
)
type NodeStatus struct {
Address string
Role int8
LeaderID uint64
MaxWAL uint64
MinWAL uint64
MaxAppliedWAL uint64
Follower string
FolowerState int8
}
// Node as seen by the supervisor.
type Node interface {
Status() (NodeStatus, error)
SetRole(nodeRole int8, leaderID uint64) error
SetLeader(address string) error // Empty address => no leader.
SetFollower(address string) error // Empty address => no follower.
Stop() error
}

+ 1
- 0
manager.go View File

@ -0,0 +1 @@
package replication

Loading…
Cancel
Save