??
This commit is contained in:
parent
0518c5dcca
commit
125e090cb7
11
lib/wal/main_test.go
Normal file
11
lib/wal/main_test.go
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
package wal
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"go.uber.org/goleak"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMain(m *testing.M) {
|
||||||
|
goleak.VerifyTestMain(m)
|
||||||
|
}
|
67
mdb/db-primary.go
Normal file
67
mdb/db-primary.go
Normal file
@ -0,0 +1,67 @@
|
|||||||
|
package mdb
|
||||||
|
|
||||||
|
/*
|
||||||
|
func (db *Database) openPrimary() (err error) {
|
||||||
|
wal, err := cwal.Open(db.walRootDir, cwal.Config{
|
||||||
|
SegMinCount: db.conf.WALSegMinCount,
|
||||||
|
SegMaxAgeSec: db.conf.WALSegMaxAgeSec,
|
||||||
|
})
|
||||||
|
|
||||||
|
pFile, err := pfile.Open(db.pageFilePath,
|
||||||
|
|
||||||
|
pFile, err := openPageFileAndReplayWAL(db.rootDir)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer pFile.Close()
|
||||||
|
|
||||||
|
pfHeader, err := pFile.ReadHeader()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
tx := db.Snapshot()
|
||||||
|
tx.seqNum = pfHeader.SeqNum
|
||||||
|
tx.updatedAt = pfHeader.UpdatedAt
|
||||||
|
|
||||||
|
pIndex, err := pagefile.NewIndex(pFile)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = pFile.IterateAllocated(pIndex, func(cID, iID uint64, data []byte) error {
|
||||||
|
return db.loadItem(tx, cID, iID, data)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
w, err := cwal.OpenWriter(db.walRootDir, &cwal.WriterConfig{
|
||||||
|
SegMinCount: db.conf.WALSegMinCount,
|
||||||
|
SegMaxAgeSec: db.conf.WALSegMaxAgeSec,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
db.done.Add(1)
|
||||||
|
go txAggregator{
|
||||||
|
Stop: db.stop,
|
||||||
|
Done: db.done,
|
||||||
|
ModChan: db.modChan,
|
||||||
|
W: w,
|
||||||
|
Index: pIndex,
|
||||||
|
Snapshot: db.snapshot,
|
||||||
|
}.Run()
|
||||||
|
|
||||||
|
db.done.Add(1)
|
||||||
|
go (&fileWriter{
|
||||||
|
Stop: db.stop,
|
||||||
|
Done: db.done,
|
||||||
|
PageFilePath: db.pageFilePath,
|
||||||
|
WALRootDir: db.walRootDir,
|
||||||
|
}).Run()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
*/
|
129
mdb/db-secondary.go
Normal file
129
mdb/db-secondary.go
Normal file
@ -0,0 +1,129 @@
|
|||||||
|
package mdb
|
||||||
|
|
||||||
|
/*
|
||||||
|
func (db *Database) openSecondary() (err error) {
|
||||||
|
if db.shouldLoadFromPrimary() {
|
||||||
|
if err := db.loadFromPrimary(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Opening page-file...")
|
||||||
|
|
||||||
|
pFile, err := openPageFileAndReplayWAL(db.rootDir)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer pFile.Close()
|
||||||
|
|
||||||
|
pfHeader, err := pFile.ReadHeader()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Building page-file index...")
|
||||||
|
|
||||||
|
pIndex, err := pagefile.NewIndex(pFile)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
tx := db.Snapshot()
|
||||||
|
tx.seqNum = pfHeader.SeqNum
|
||||||
|
tx.updatedAt = pfHeader.UpdatedAt
|
||||||
|
|
||||||
|
log.Printf("Loading data into memory...")
|
||||||
|
|
||||||
|
err = pFile.IterateAllocated(pIndex, func(cID, iID uint64, data []byte) error {
|
||||||
|
return db.loadItem(tx, cID, iID, data)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Creating writer...")
|
||||||
|
|
||||||
|
w, err := cswal.OpenWriter(db.walRootDir, &cswal.WriterConfig{
|
||||||
|
SegMinCount: db.conf.WALSegMinCount,
|
||||||
|
SegMaxAgeSec: db.conf.WALSegMaxAgeSec,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
db.done.Add(1)
|
||||||
|
go (&walFollower{
|
||||||
|
Stop: db.stop,
|
||||||
|
Done: db.done,
|
||||||
|
W: w,
|
||||||
|
Client: NewClient(db.conf.PrimaryURL, db.conf.ReplicationPSK, db.conf.NetTimeout),
|
||||||
|
}).Run()
|
||||||
|
|
||||||
|
db.done.Add(1)
|
||||||
|
go (&follower{
|
||||||
|
Stop: db.stop,
|
||||||
|
Done: db.done,
|
||||||
|
WALRootDir: db.walRootDir,
|
||||||
|
SeqNum: pfHeader.SeqNum,
|
||||||
|
ApplyChanges: db.applyChanges,
|
||||||
|
}).Run()
|
||||||
|
|
||||||
|
db.done.Add(1)
|
||||||
|
go (&fileWriter{
|
||||||
|
Stop: db.stop,
|
||||||
|
Done: db.done,
|
||||||
|
PageFilePath: db.pageFilePath,
|
||||||
|
WALRootDir: db.walRootDir,
|
||||||
|
}).Run()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *Database) shouldLoadFromPrimary() bool {
|
||||||
|
if _, err := os.Stat(db.walRootDir); os.IsNotExist(err) {
|
||||||
|
log.Printf("WAL doesn't exist.")
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if _, err := os.Stat(db.pageFilePath); os.IsNotExist(err) {
|
||||||
|
log.Printf("Page-file doesn't exist.")
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *Database) loadFromPrimary() error {
|
||||||
|
client := NewClient(db.conf.PrimaryURL, db.conf.ReplicationPSK, db.conf.NetTimeout)
|
||||||
|
defer client.Disconnect()
|
||||||
|
|
||||||
|
log.Printf("Loading data from primary...")
|
||||||
|
|
||||||
|
if err := os.RemoveAll(db.pageFilePath); err != nil {
|
||||||
|
log.Printf("Failed to remove page-file: %s", err)
|
||||||
|
return errs.IO.WithErr(err) // Caller can retry.
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := os.RemoveAll(db.walRootDir); err != nil {
|
||||||
|
log.Printf("Failed to remove WAL: %s", err)
|
||||||
|
return errs.IO.WithErr(err) // Caller can retry.
|
||||||
|
}
|
||||||
|
|
||||||
|
err := client.DownloadPageFile(db.pageFilePath+".tmp", db.pageFilePath)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to get page-file from primary: %s", err)
|
||||||
|
return err // Caller can retry.
|
||||||
|
}
|
||||||
|
|
||||||
|
pfHeader, err := pagefile.ReadHeader(db.pageFilePath)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to read page-file sequence number: %s", err)
|
||||||
|
return err // Caller can retry.
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = cswal.CreateEx(db.walRootDir, pfHeader.SeqNum+1); err != nil {
|
||||||
|
log.Printf("Failed to initialize WAL: %s", err)
|
||||||
|
return err // Caller can retry.
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
*/
|
138
mdb/db-testlist_test.go
Normal file
138
mdb/db-testlist_test.go
Normal file
@ -0,0 +1,138 @@
|
|||||||
|
package mdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestDBList(t *testing.T) {
|
||||||
|
db := NewTestDBPrimary(t, t.TempDir())
|
||||||
|
|
||||||
|
var (
|
||||||
|
user1 = User{
|
||||||
|
ID: NewID(),
|
||||||
|
Name: "User1",
|
||||||
|
Email: "user1@gmail.com",
|
||||||
|
}
|
||||||
|
|
||||||
|
user2 = User{
|
||||||
|
ID: NewID(),
|
||||||
|
Name: "User2",
|
||||||
|
Email: "user2@gmail.com",
|
||||||
|
}
|
||||||
|
|
||||||
|
user3 = User{
|
||||||
|
ID: NewID(),
|
||||||
|
Name: "User3",
|
||||||
|
Email: "user3@gmail.com",
|
||||||
|
}
|
||||||
|
user1Data = make([]UserDataItem, 10)
|
||||||
|
user2Data = make([]UserDataItem, 4)
|
||||||
|
user3Data = make([]UserDataItem, 8)
|
||||||
|
)
|
||||||
|
|
||||||
|
err := db.Update(func(tx *Snapshot) error {
|
||||||
|
if err := db.Users.Insert(tx, &user1); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := db.Users.Insert(tx, &user2); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range user1Data {
|
||||||
|
user1Data[i] = UserDataItem{
|
||||||
|
ID: NewID(),
|
||||||
|
UserID: user1.ID,
|
||||||
|
Name: fmt.Sprintf("Name1: %d", i),
|
||||||
|
Data: fmt.Sprintf("Data: %d", i),
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := db.UserData.Insert(tx, &user1Data[i]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range user2Data {
|
||||||
|
user2Data[i] = UserDataItem{
|
||||||
|
ID: NewID(),
|
||||||
|
UserID: user2.ID,
|
||||||
|
Name: fmt.Sprintf("Name2: %d", i),
|
||||||
|
Data: fmt.Sprintf("Data: %d", i),
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := db.UserData.Insert(tx, &user2Data[i]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range user3Data {
|
||||||
|
user3Data[i] = UserDataItem{
|
||||||
|
ID: NewID(),
|
||||||
|
UserID: user3.ID,
|
||||||
|
Name: fmt.Sprintf("Name3: %d", i),
|
||||||
|
Data: fmt.Sprintf("Data: %d", i),
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := db.UserData.Insert(tx, &user3Data[i]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
type TestCase struct {
|
||||||
|
Name string
|
||||||
|
Args ListArgs[UserDataItem]
|
||||||
|
Expected []UserDataItem
|
||||||
|
}
|
||||||
|
|
||||||
|
cases := []TestCase{
|
||||||
|
{
|
||||||
|
Name: "User1 all",
|
||||||
|
Args: ListArgs[UserDataItem]{
|
||||||
|
After: &UserDataItem{
|
||||||
|
UserID: user1.ID,
|
||||||
|
},
|
||||||
|
While: func(item *UserDataItem) bool {
|
||||||
|
return item.UserID == user1.ID
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Expected: user1Data,
|
||||||
|
}, {
|
||||||
|
Name: "User1 limited",
|
||||||
|
Args: ListArgs[UserDataItem]{
|
||||||
|
After: &UserDataItem{
|
||||||
|
UserID: user1.ID,
|
||||||
|
},
|
||||||
|
While: func(item *UserDataItem) bool {
|
||||||
|
return item.UserID == user1.ID
|
||||||
|
},
|
||||||
|
Limit: 4,
|
||||||
|
},
|
||||||
|
Expected: user1Data[:4],
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range cases {
|
||||||
|
t.Run(tc.Name, func(t *testing.T) {
|
||||||
|
tx := db.Snapshot()
|
||||||
|
l := db.UserData.ByName.List(tx, tc.Args, nil)
|
||||||
|
if len(l) != len(tc.Expected) {
|
||||||
|
t.Fatal(tc.Name, l)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range l {
|
||||||
|
if !reflect.DeepEqual(*l[i], tc.Expected[i]) {
|
||||||
|
t.Fatal(tc.Name, l)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
11
mdb/main_test.go
Normal file
11
mdb/main_test.go
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
package mdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"go.uber.org/goleak"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMain(m *testing.M) {
|
||||||
|
goleak.VerifyTestMain(m)
|
||||||
|
}
|
92
mdb/txaggregator.go
Normal file
92
mdb/txaggregator.go
Normal file
@ -0,0 +1,92 @@
|
|||||||
|
package mdb
|
||||||
|
|
||||||
|
/*
|
||||||
|
type txAggregator struct {
|
||||||
|
Stop chan struct{}
|
||||||
|
Done *sync.WaitGroup
|
||||||
|
ModChan chan txMod
|
||||||
|
W *cswal.Writer
|
||||||
|
Index *pagefile.Index
|
||||||
|
Snapshot *atomic.Pointer[Snapshot]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p txAggregator) Run() {
|
||||||
|
defer p.Done.Done()
|
||||||
|
defer p.W.Close()
|
||||||
|
|
||||||
|
var (
|
||||||
|
tx *Snapshot
|
||||||
|
mod txMod
|
||||||
|
rec cswal.Record
|
||||||
|
err error
|
||||||
|
toNotify = make([]chan error, 0, 1024)
|
||||||
|
)
|
||||||
|
|
||||||
|
READ_FIRST:
|
||||||
|
|
||||||
|
toNotify = toNotify[:0]
|
||||||
|
|
||||||
|
select {
|
||||||
|
case mod = <-p.ModChan:
|
||||||
|
goto BEGIN
|
||||||
|
case <-p.Stop:
|
||||||
|
goto END
|
||||||
|
}
|
||||||
|
|
||||||
|
BEGIN:
|
||||||
|
|
||||||
|
tx = p.Snapshot.Load().begin()
|
||||||
|
goto APPLY_MOD
|
||||||
|
|
||||||
|
CLONE:
|
||||||
|
|
||||||
|
tx = tx.clone()
|
||||||
|
goto APPLY_MOD
|
||||||
|
|
||||||
|
APPLY_MOD:
|
||||||
|
|
||||||
|
if err = mod.Update(tx); err != nil {
|
||||||
|
mod.Resp <- err
|
||||||
|
goto ROLLBACK
|
||||||
|
}
|
||||||
|
|
||||||
|
toNotify = append(toNotify, mod.Resp)
|
||||||
|
goto NEXT
|
||||||
|
|
||||||
|
ROLLBACK:
|
||||||
|
|
||||||
|
if len(toNotify) == 0 {
|
||||||
|
goto READ_FIRST
|
||||||
|
}
|
||||||
|
|
||||||
|
tx = tx.rollback()
|
||||||
|
goto NEXT
|
||||||
|
|
||||||
|
NEXT:
|
||||||
|
|
||||||
|
select {
|
||||||
|
case mod = <-p.ModChan:
|
||||||
|
goto CLONE
|
||||||
|
default:
|
||||||
|
goto WRITE
|
||||||
|
}
|
||||||
|
|
||||||
|
WRITE:
|
||||||
|
|
||||||
|
rec, err = writeChangesToWAL(tx.changes, p.Index, p.W)
|
||||||
|
if err == nil {
|
||||||
|
tx.seqNum = rec.SeqNum
|
||||||
|
tx.updatedAt = rec.CreatedAt
|
||||||
|
tx.setReadOnly()
|
||||||
|
p.Snapshot.Store(tx)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range toNotify {
|
||||||
|
toNotify[i] <- err
|
||||||
|
}
|
||||||
|
|
||||||
|
goto READ_FIRST
|
||||||
|
|
||||||
|
END:
|
||||||
|
}
|
||||||
|
*/
|
Loading…
Reference in New Issue
Block a user