Compare commits
2 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
125e090cb7 | ||
|
0518c5dcca |
1
go.mod
1
go.mod
@ -4,6 +4,7 @@ go 1.22
|
||||
|
||||
require (
|
||||
github.com/google/btree v1.1.2
|
||||
go.uber.org/goleak v1.3.0
|
||||
golang.org/x/net v0.15.0
|
||||
golang.org/x/sys v0.12.0
|
||||
)
|
||||
|
10
go.sum
10
go.sum
@ -1,6 +1,16 @@
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU=
|
||||
github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
|
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8=
|
||||
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
|
||||
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
|
||||
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
@ -12,7 +12,6 @@ const (
|
||||
pathStreamWAL = "stream-wal"
|
||||
)
|
||||
|
||||
// TODO: Remove this!
|
||||
func (rep *Replicator) Handle(w http.ResponseWriter, r *http.Request) {
|
||||
// We'll handle two types of requests: HTTP GET requests for JSON, or
|
||||
// streaming requets for state or wall.
|
||||
|
11
lib/rep/main_test.go
Normal file
11
lib/rep/main_test.go
Normal file
@ -0,0 +1,11 @@
|
||||
package rep
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"go.uber.org/goleak"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
goleak.VerifyTestMain(m)
|
||||
}
|
@ -36,8 +36,8 @@ type App struct {
|
||||
// SendState: The primary may need to send storage state to a secondary node.
|
||||
SendState func(conn net.Conn) error
|
||||
|
||||
// (1) RecvState: Secondary nodes may need to load state from the primary if the
|
||||
// WAL is too far behind.
|
||||
// (1) RecvState: Secondary nodes may need to load state from the primary if
|
||||
// the WAL is too far behind.
|
||||
RecvState func(conn net.Conn) error
|
||||
|
||||
// (2) InitStorage: Prepare application storage for possible calls to
|
||||
|
@ -56,6 +56,7 @@ func (h TestAppHarness) Run(t *testing.T) {
|
||||
WALSegMaxAgeSec: 1,
|
||||
WALSegGCAgeSec: 1,
|
||||
})
|
||||
defer app2.Close()
|
||||
|
||||
val.MethodByName(method.Name).Call([]reflect.Value{
|
||||
reflect.ValueOf(t),
|
||||
|
11
lib/wal/main_test.go
Normal file
11
lib/wal/main_test.go
Normal file
@ -0,0 +1,11 @@
|
||||
package wal
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"go.uber.org/goleak"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
goleak.VerifyTestMain(m)
|
||||
}
|
@ -1,7 +1,6 @@
|
||||
package mdb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"hash/crc64"
|
||||
@ -25,8 +24,6 @@ type Collection[T any] struct {
|
||||
uniqueIndices []*Index[T]
|
||||
|
||||
ByID *Index[T]
|
||||
|
||||
buf *bytes.Buffer
|
||||
}
|
||||
|
||||
type CollectionConfig[T any] struct {
|
||||
@ -67,7 +64,6 @@ func NewCollection[T any](db *Database, name string, conf *CollectionConfig[T])
|
||||
validate: conf.Validate,
|
||||
indices: []*Index[T]{},
|
||||
uniqueIndices: []*Index[T]{},
|
||||
buf: &bytes.Buffer{},
|
||||
}
|
||||
|
||||
db.addCollection(c.collectionID, c, &collectionState[T]{
|
||||
|
67
mdb/db-primary.go
Normal file
67
mdb/db-primary.go
Normal file
@ -0,0 +1,67 @@
|
||||
package mdb
|
||||
|
||||
/*
|
||||
func (db *Database) openPrimary() (err error) {
|
||||
wal, err := cwal.Open(db.walRootDir, cwal.Config{
|
||||
SegMinCount: db.conf.WALSegMinCount,
|
||||
SegMaxAgeSec: db.conf.WALSegMaxAgeSec,
|
||||
})
|
||||
|
||||
pFile, err := pfile.Open(db.pageFilePath,
|
||||
|
||||
pFile, err := openPageFileAndReplayWAL(db.rootDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer pFile.Close()
|
||||
|
||||
pfHeader, err := pFile.ReadHeader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tx := db.Snapshot()
|
||||
tx.seqNum = pfHeader.SeqNum
|
||||
tx.updatedAt = pfHeader.UpdatedAt
|
||||
|
||||
pIndex, err := pagefile.NewIndex(pFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = pFile.IterateAllocated(pIndex, func(cID, iID uint64, data []byte) error {
|
||||
return db.loadItem(tx, cID, iID, data)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w, err := cwal.OpenWriter(db.walRootDir, &cwal.WriterConfig{
|
||||
SegMinCount: db.conf.WALSegMinCount,
|
||||
SegMaxAgeSec: db.conf.WALSegMaxAgeSec,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
db.done.Add(1)
|
||||
go txAggregator{
|
||||
Stop: db.stop,
|
||||
Done: db.done,
|
||||
ModChan: db.modChan,
|
||||
W: w,
|
||||
Index: pIndex,
|
||||
Snapshot: db.snapshot,
|
||||
}.Run()
|
||||
|
||||
db.done.Add(1)
|
||||
go (&fileWriter{
|
||||
Stop: db.stop,
|
||||
Done: db.done,
|
||||
PageFilePath: db.pageFilePath,
|
||||
WALRootDir: db.walRootDir,
|
||||
}).Run()
|
||||
|
||||
return nil
|
||||
}
|
||||
*/
|
@ -99,6 +99,7 @@ func (db *Database) repApply(rec wal.Record) (err error) {
|
||||
}
|
||||
tx.seqNum = rec.SeqNum
|
||||
tx.timestampMS = rec.TimestampMS
|
||||
tx.setReadOnly()
|
||||
db.snapshot.Store(tx)
|
||||
return nil
|
||||
}
|
||||
|
129
mdb/db-secondary.go
Normal file
129
mdb/db-secondary.go
Normal file
@ -0,0 +1,129 @@
|
||||
package mdb
|
||||
|
||||
/*
|
||||
func (db *Database) openSecondary() (err error) {
|
||||
if db.shouldLoadFromPrimary() {
|
||||
if err := db.loadFromPrimary(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("Opening page-file...")
|
||||
|
||||
pFile, err := openPageFileAndReplayWAL(db.rootDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer pFile.Close()
|
||||
|
||||
pfHeader, err := pFile.ReadHeader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Printf("Building page-file index...")
|
||||
|
||||
pIndex, err := pagefile.NewIndex(pFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tx := db.Snapshot()
|
||||
tx.seqNum = pfHeader.SeqNum
|
||||
tx.updatedAt = pfHeader.UpdatedAt
|
||||
|
||||
log.Printf("Loading data into memory...")
|
||||
|
||||
err = pFile.IterateAllocated(pIndex, func(cID, iID uint64, data []byte) error {
|
||||
return db.loadItem(tx, cID, iID, data)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Printf("Creating writer...")
|
||||
|
||||
w, err := cswal.OpenWriter(db.walRootDir, &cswal.WriterConfig{
|
||||
SegMinCount: db.conf.WALSegMinCount,
|
||||
SegMaxAgeSec: db.conf.WALSegMaxAgeSec,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
db.done.Add(1)
|
||||
go (&walFollower{
|
||||
Stop: db.stop,
|
||||
Done: db.done,
|
||||
W: w,
|
||||
Client: NewClient(db.conf.PrimaryURL, db.conf.ReplicationPSK, db.conf.NetTimeout),
|
||||
}).Run()
|
||||
|
||||
db.done.Add(1)
|
||||
go (&follower{
|
||||
Stop: db.stop,
|
||||
Done: db.done,
|
||||
WALRootDir: db.walRootDir,
|
||||
SeqNum: pfHeader.SeqNum,
|
||||
ApplyChanges: db.applyChanges,
|
||||
}).Run()
|
||||
|
||||
db.done.Add(1)
|
||||
go (&fileWriter{
|
||||
Stop: db.stop,
|
||||
Done: db.done,
|
||||
PageFilePath: db.pageFilePath,
|
||||
WALRootDir: db.walRootDir,
|
||||
}).Run()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *Database) shouldLoadFromPrimary() bool {
|
||||
if _, err := os.Stat(db.walRootDir); os.IsNotExist(err) {
|
||||
log.Printf("WAL doesn't exist.")
|
||||
return true
|
||||
}
|
||||
if _, err := os.Stat(db.pageFilePath); os.IsNotExist(err) {
|
||||
log.Printf("Page-file doesn't exist.")
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (db *Database) loadFromPrimary() error {
|
||||
client := NewClient(db.conf.PrimaryURL, db.conf.ReplicationPSK, db.conf.NetTimeout)
|
||||
defer client.Disconnect()
|
||||
|
||||
log.Printf("Loading data from primary...")
|
||||
|
||||
if err := os.RemoveAll(db.pageFilePath); err != nil {
|
||||
log.Printf("Failed to remove page-file: %s", err)
|
||||
return errs.IO.WithErr(err) // Caller can retry.
|
||||
}
|
||||
|
||||
if err := os.RemoveAll(db.walRootDir); err != nil {
|
||||
log.Printf("Failed to remove WAL: %s", err)
|
||||
return errs.IO.WithErr(err) // Caller can retry.
|
||||
}
|
||||
|
||||
err := client.DownloadPageFile(db.pageFilePath+".tmp", db.pageFilePath)
|
||||
if err != nil {
|
||||
log.Printf("Failed to get page-file from primary: %s", err)
|
||||
return err // Caller can retry.
|
||||
}
|
||||
|
||||
pfHeader, err := pagefile.ReadHeader(db.pageFilePath)
|
||||
if err != nil {
|
||||
log.Printf("Failed to read page-file sequence number: %s", err)
|
||||
return err // Caller can retry.
|
||||
}
|
||||
|
||||
if err = cswal.CreateEx(db.walRootDir, pfHeader.SeqNum+1); err != nil {
|
||||
log.Printf("Failed to initialize WAL: %s", err)
|
||||
return err // Caller can retry.
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
*/
|
138
mdb/db-testlist_test.go
Normal file
138
mdb/db-testlist_test.go
Normal file
@ -0,0 +1,138 @@
|
||||
package mdb
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestDBList(t *testing.T) {
|
||||
db := NewTestDBPrimary(t, t.TempDir())
|
||||
|
||||
var (
|
||||
user1 = User{
|
||||
ID: NewID(),
|
||||
Name: "User1",
|
||||
Email: "user1@gmail.com",
|
||||
}
|
||||
|
||||
user2 = User{
|
||||
ID: NewID(),
|
||||
Name: "User2",
|
||||
Email: "user2@gmail.com",
|
||||
}
|
||||
|
||||
user3 = User{
|
||||
ID: NewID(),
|
||||
Name: "User3",
|
||||
Email: "user3@gmail.com",
|
||||
}
|
||||
user1Data = make([]UserDataItem, 10)
|
||||
user2Data = make([]UserDataItem, 4)
|
||||
user3Data = make([]UserDataItem, 8)
|
||||
)
|
||||
|
||||
err := db.Update(func(tx *Snapshot) error {
|
||||
if err := db.Users.Insert(tx, &user1); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := db.Users.Insert(tx, &user2); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := range user1Data {
|
||||
user1Data[i] = UserDataItem{
|
||||
ID: NewID(),
|
||||
UserID: user1.ID,
|
||||
Name: fmt.Sprintf("Name1: %d", i),
|
||||
Data: fmt.Sprintf("Data: %d", i),
|
||||
}
|
||||
|
||||
if err := db.UserData.Insert(tx, &user1Data[i]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for i := range user2Data {
|
||||
user2Data[i] = UserDataItem{
|
||||
ID: NewID(),
|
||||
UserID: user2.ID,
|
||||
Name: fmt.Sprintf("Name2: %d", i),
|
||||
Data: fmt.Sprintf("Data: %d", i),
|
||||
}
|
||||
|
||||
if err := db.UserData.Insert(tx, &user2Data[i]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for i := range user3Data {
|
||||
user3Data[i] = UserDataItem{
|
||||
ID: NewID(),
|
||||
UserID: user3.ID,
|
||||
Name: fmt.Sprintf("Name3: %d", i),
|
||||
Data: fmt.Sprintf("Data: %d", i),
|
||||
}
|
||||
|
||||
if err := db.UserData.Insert(tx, &user3Data[i]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
type TestCase struct {
|
||||
Name string
|
||||
Args ListArgs[UserDataItem]
|
||||
Expected []UserDataItem
|
||||
}
|
||||
|
||||
cases := []TestCase{
|
||||
{
|
||||
Name: "User1 all",
|
||||
Args: ListArgs[UserDataItem]{
|
||||
After: &UserDataItem{
|
||||
UserID: user1.ID,
|
||||
},
|
||||
While: func(item *UserDataItem) bool {
|
||||
return item.UserID == user1.ID
|
||||
},
|
||||
},
|
||||
Expected: user1Data,
|
||||
}, {
|
||||
Name: "User1 limited",
|
||||
Args: ListArgs[UserDataItem]{
|
||||
After: &UserDataItem{
|
||||
UserID: user1.ID,
|
||||
},
|
||||
While: func(item *UserDataItem) bool {
|
||||
return item.UserID == user1.ID
|
||||
},
|
||||
Limit: 4,
|
||||
},
|
||||
Expected: user1Data[:4],
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
tx := db.Snapshot()
|
||||
l := db.UserData.ByName.List(tx, tc.Args, nil)
|
||||
if len(l) != len(tc.Expected) {
|
||||
t.Fatal(tc.Name, l)
|
||||
}
|
||||
|
||||
for i := range l {
|
||||
if !reflect.DeepEqual(*l[i], tc.Expected[i]) {
|
||||
t.Fatal(tc.Name, l)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@ -72,7 +72,7 @@ func testRunner_testCase(t *testing.T, testCase DBTestCase) {
|
||||
}
|
||||
|
||||
// TODO: Why is this necessary?
|
||||
time.Sleep(time.Second)
|
||||
//time.Sleep(time.Second)
|
||||
finalStep := testCase.Steps[len(testCase.Steps)-1]
|
||||
|
||||
secondarySnapshot := db2.Snapshot()
|
||||
|
11
mdb/main_test.go
Normal file
11
mdb/main_test.go
Normal file
@ -0,0 +1,11 @@
|
||||
package mdb
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"go.uber.org/goleak"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
goleak.VerifyTestMain(m)
|
||||
}
|
Loading…
Reference in New Issue
Block a user