Compare commits

...

2 Commits

Author SHA1 Message Date
jdl
125e090cb7 ?? 2024-10-25 14:56:59 +02:00
jdl
0518c5dcca Fix memory leak in secondary, cleanup code. 2024-01-06 20:50:20 +01:00
14 changed files with 383 additions and 8 deletions

1
go.mod
View File

@ -4,6 +4,7 @@ go 1.22
require ( require (
github.com/google/btree v1.1.2 github.com/google/btree v1.1.2
go.uber.org/goleak v1.3.0
golang.org/x/net v0.15.0 golang.org/x/net v0.15.0
golang.org/x/sys v0.12.0 golang.org/x/sys v0.12.0
) )

10
go.sum
View File

@ -1,6 +1,16 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU= github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU=
github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8= golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8=
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -12,7 +12,6 @@ const (
pathStreamWAL = "stream-wal" pathStreamWAL = "stream-wal"
) )
// TODO: Remove this!
func (rep *Replicator) Handle(w http.ResponseWriter, r *http.Request) { func (rep *Replicator) Handle(w http.ResponseWriter, r *http.Request) {
// We'll handle two types of requests: HTTP GET requests for JSON, or // We'll handle two types of requests: HTTP GET requests for JSON, or
// streaming requets for state or wall. // streaming requets for state or wall.

11
lib/rep/main_test.go Normal file
View File

@ -0,0 +1,11 @@
package rep
import (
"testing"
"go.uber.org/goleak"
)
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}

View File

@ -36,8 +36,8 @@ type App struct {
// SendState: The primary may need to send storage state to a secondary node. // SendState: The primary may need to send storage state to a secondary node.
SendState func(conn net.Conn) error SendState func(conn net.Conn) error
// (1) RecvState: Secondary nodes may need to load state from the primary if the // (1) RecvState: Secondary nodes may need to load state from the primary if
// WAL is too far behind. // the WAL is too far behind.
RecvState func(conn net.Conn) error RecvState func(conn net.Conn) error
// (2) InitStorage: Prepare application storage for possible calls to // (2) InitStorage: Prepare application storage for possible calls to

View File

@ -56,6 +56,7 @@ func (h TestAppHarness) Run(t *testing.T) {
WALSegMaxAgeSec: 1, WALSegMaxAgeSec: 1,
WALSegGCAgeSec: 1, WALSegGCAgeSec: 1,
}) })
defer app2.Close()
val.MethodByName(method.Name).Call([]reflect.Value{ val.MethodByName(method.Name).Call([]reflect.Value{
reflect.ValueOf(t), reflect.ValueOf(t),

11
lib/wal/main_test.go Normal file
View File

@ -0,0 +1,11 @@
package wal
import (
"testing"
"go.uber.org/goleak"
)
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}

View File

@ -1,7 +1,6 @@
package mdb package mdb
import ( import (
"bytes"
"encoding/json" "encoding/json"
"errors" "errors"
"hash/crc64" "hash/crc64"
@ -25,8 +24,6 @@ type Collection[T any] struct {
uniqueIndices []*Index[T] uniqueIndices []*Index[T]
ByID *Index[T] ByID *Index[T]
buf *bytes.Buffer
} }
type CollectionConfig[T any] struct { type CollectionConfig[T any] struct {
@ -67,7 +64,6 @@ func NewCollection[T any](db *Database, name string, conf *CollectionConfig[T])
validate: conf.Validate, validate: conf.Validate,
indices: []*Index[T]{}, indices: []*Index[T]{},
uniqueIndices: []*Index[T]{}, uniqueIndices: []*Index[T]{},
buf: &bytes.Buffer{},
} }
db.addCollection(c.collectionID, c, &collectionState[T]{ db.addCollection(c.collectionID, c, &collectionState[T]{

67
mdb/db-primary.go Normal file
View File

@ -0,0 +1,67 @@
package mdb
/*
func (db *Database) openPrimary() (err error) {
wal, err := cwal.Open(db.walRootDir, cwal.Config{
SegMinCount: db.conf.WALSegMinCount,
SegMaxAgeSec: db.conf.WALSegMaxAgeSec,
})
pFile, err := pfile.Open(db.pageFilePath,
pFile, err := openPageFileAndReplayWAL(db.rootDir)
if err != nil {
return err
}
defer pFile.Close()
pfHeader, err := pFile.ReadHeader()
if err != nil {
return err
}
tx := db.Snapshot()
tx.seqNum = pfHeader.SeqNum
tx.updatedAt = pfHeader.UpdatedAt
pIndex, err := pagefile.NewIndex(pFile)
if err != nil {
return err
}
err = pFile.IterateAllocated(pIndex, func(cID, iID uint64, data []byte) error {
return db.loadItem(tx, cID, iID, data)
})
if err != nil {
return err
}
w, err := cwal.OpenWriter(db.walRootDir, &cwal.WriterConfig{
SegMinCount: db.conf.WALSegMinCount,
SegMaxAgeSec: db.conf.WALSegMaxAgeSec,
})
if err != nil {
return err
}
db.done.Add(1)
go txAggregator{
Stop: db.stop,
Done: db.done,
ModChan: db.modChan,
W: w,
Index: pIndex,
Snapshot: db.snapshot,
}.Run()
db.done.Add(1)
go (&fileWriter{
Stop: db.stop,
Done: db.done,
PageFilePath: db.pageFilePath,
WALRootDir: db.walRootDir,
}).Run()
return nil
}
*/

View File

@ -99,6 +99,7 @@ func (db *Database) repApply(rec wal.Record) (err error) {
} }
tx.seqNum = rec.SeqNum tx.seqNum = rec.SeqNum
tx.timestampMS = rec.TimestampMS tx.timestampMS = rec.TimestampMS
tx.setReadOnly()
db.snapshot.Store(tx) db.snapshot.Store(tx)
return nil return nil
} }

129
mdb/db-secondary.go Normal file
View File

@ -0,0 +1,129 @@
package mdb
/*
func (db *Database) openSecondary() (err error) {
if db.shouldLoadFromPrimary() {
if err := db.loadFromPrimary(); err != nil {
return err
}
}
log.Printf("Opening page-file...")
pFile, err := openPageFileAndReplayWAL(db.rootDir)
if err != nil {
return err
}
defer pFile.Close()
pfHeader, err := pFile.ReadHeader()
if err != nil {
return err
}
log.Printf("Building page-file index...")
pIndex, err := pagefile.NewIndex(pFile)
if err != nil {
return err
}
tx := db.Snapshot()
tx.seqNum = pfHeader.SeqNum
tx.updatedAt = pfHeader.UpdatedAt
log.Printf("Loading data into memory...")
err = pFile.IterateAllocated(pIndex, func(cID, iID uint64, data []byte) error {
return db.loadItem(tx, cID, iID, data)
})
if err != nil {
return err
}
log.Printf("Creating writer...")
w, err := cswal.OpenWriter(db.walRootDir, &cswal.WriterConfig{
SegMinCount: db.conf.WALSegMinCount,
SegMaxAgeSec: db.conf.WALSegMaxAgeSec,
})
if err != nil {
return err
}
db.done.Add(1)
go (&walFollower{
Stop: db.stop,
Done: db.done,
W: w,
Client: NewClient(db.conf.PrimaryURL, db.conf.ReplicationPSK, db.conf.NetTimeout),
}).Run()
db.done.Add(1)
go (&follower{
Stop: db.stop,
Done: db.done,
WALRootDir: db.walRootDir,
SeqNum: pfHeader.SeqNum,
ApplyChanges: db.applyChanges,
}).Run()
db.done.Add(1)
go (&fileWriter{
Stop: db.stop,
Done: db.done,
PageFilePath: db.pageFilePath,
WALRootDir: db.walRootDir,
}).Run()
return nil
}
func (db *Database) shouldLoadFromPrimary() bool {
if _, err := os.Stat(db.walRootDir); os.IsNotExist(err) {
log.Printf("WAL doesn't exist.")
return true
}
if _, err := os.Stat(db.pageFilePath); os.IsNotExist(err) {
log.Printf("Page-file doesn't exist.")
return true
}
return false
}
func (db *Database) loadFromPrimary() error {
client := NewClient(db.conf.PrimaryURL, db.conf.ReplicationPSK, db.conf.NetTimeout)
defer client.Disconnect()
log.Printf("Loading data from primary...")
if err := os.RemoveAll(db.pageFilePath); err != nil {
log.Printf("Failed to remove page-file: %s", err)
return errs.IO.WithErr(err) // Caller can retry.
}
if err := os.RemoveAll(db.walRootDir); err != nil {
log.Printf("Failed to remove WAL: %s", err)
return errs.IO.WithErr(err) // Caller can retry.
}
err := client.DownloadPageFile(db.pageFilePath+".tmp", db.pageFilePath)
if err != nil {
log.Printf("Failed to get page-file from primary: %s", err)
return err // Caller can retry.
}
pfHeader, err := pagefile.ReadHeader(db.pageFilePath)
if err != nil {
log.Printf("Failed to read page-file sequence number: %s", err)
return err // Caller can retry.
}
if err = cswal.CreateEx(db.walRootDir, pfHeader.SeqNum+1); err != nil {
log.Printf("Failed to initialize WAL: %s", err)
return err // Caller can retry.
}
return nil
}
*/

138
mdb/db-testlist_test.go Normal file
View File

@ -0,0 +1,138 @@
package mdb
import (
"fmt"
"reflect"
"testing"
)
func TestDBList(t *testing.T) {
db := NewTestDBPrimary(t, t.TempDir())
var (
user1 = User{
ID: NewID(),
Name: "User1",
Email: "user1@gmail.com",
}
user2 = User{
ID: NewID(),
Name: "User2",
Email: "user2@gmail.com",
}
user3 = User{
ID: NewID(),
Name: "User3",
Email: "user3@gmail.com",
}
user1Data = make([]UserDataItem, 10)
user2Data = make([]UserDataItem, 4)
user3Data = make([]UserDataItem, 8)
)
err := db.Update(func(tx *Snapshot) error {
if err := db.Users.Insert(tx, &user1); err != nil {
return err
}
if err := db.Users.Insert(tx, &user2); err != nil {
return err
}
for i := range user1Data {
user1Data[i] = UserDataItem{
ID: NewID(),
UserID: user1.ID,
Name: fmt.Sprintf("Name1: %d", i),
Data: fmt.Sprintf("Data: %d", i),
}
if err := db.UserData.Insert(tx, &user1Data[i]); err != nil {
return err
}
}
for i := range user2Data {
user2Data[i] = UserDataItem{
ID: NewID(),
UserID: user2.ID,
Name: fmt.Sprintf("Name2: %d", i),
Data: fmt.Sprintf("Data: %d", i),
}
if err := db.UserData.Insert(tx, &user2Data[i]); err != nil {
return err
}
}
for i := range user3Data {
user3Data[i] = UserDataItem{
ID: NewID(),
UserID: user3.ID,
Name: fmt.Sprintf("Name3: %d", i),
Data: fmt.Sprintf("Data: %d", i),
}
if err := db.UserData.Insert(tx, &user3Data[i]); err != nil {
return err
}
}
return nil
})
if err != nil {
t.Fatal(err)
}
type TestCase struct {
Name string
Args ListArgs[UserDataItem]
Expected []UserDataItem
}
cases := []TestCase{
{
Name: "User1 all",
Args: ListArgs[UserDataItem]{
After: &UserDataItem{
UserID: user1.ID,
},
While: func(item *UserDataItem) bool {
return item.UserID == user1.ID
},
},
Expected: user1Data,
}, {
Name: "User1 limited",
Args: ListArgs[UserDataItem]{
After: &UserDataItem{
UserID: user1.ID,
},
While: func(item *UserDataItem) bool {
return item.UserID == user1.ID
},
Limit: 4,
},
Expected: user1Data[:4],
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
tx := db.Snapshot()
l := db.UserData.ByName.List(tx, tc.Args, nil)
if len(l) != len(tc.Expected) {
t.Fatal(tc.Name, l)
}
for i := range l {
if !reflect.DeepEqual(*l[i], tc.Expected[i]) {
t.Fatal(tc.Name, l)
}
}
})
}
}

View File

@ -72,7 +72,7 @@ func testRunner_testCase(t *testing.T, testCase DBTestCase) {
} }
// TODO: Why is this necessary? // TODO: Why is this necessary?
time.Sleep(time.Second) //time.Sleep(time.Second)
finalStep := testCase.Steps[len(testCase.Steps)-1] finalStep := testCase.Steps[len(testCase.Steps)-1]
secondarySnapshot := db2.Snapshot() secondarySnapshot := db2.Snapshot()

11
mdb/main_test.go Normal file
View File

@ -0,0 +1,11 @@
package mdb
import (
"testing"
"go.uber.org/goleak"
)
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}