From 125e090cb771f27253e350abe17aa677a2ef03d4 Mon Sep 17 00:00:00 2001 From: jdl Date: Fri, 25 Oct 2024 14:56:59 +0200 Subject: [PATCH] ?? --- lib/wal/main_test.go | 11 ++++ mdb/db-primary.go | 67 +++++++++++++++++++ mdb/db-secondary.go | 129 +++++++++++++++++++++++++++++++++++++ mdb/db-testlist_test.go | 138 ++++++++++++++++++++++++++++++++++++++++ mdb/main_test.go | 11 ++++ mdb/txaggregator.go | 92 +++++++++++++++++++++++++++ 6 files changed, 448 insertions(+) create mode 100644 lib/wal/main_test.go create mode 100644 mdb/db-primary.go create mode 100644 mdb/db-secondary.go create mode 100644 mdb/db-testlist_test.go create mode 100644 mdb/main_test.go create mode 100644 mdb/txaggregator.go diff --git a/lib/wal/main_test.go b/lib/wal/main_test.go new file mode 100644 index 0000000..2e11436 --- /dev/null +++ b/lib/wal/main_test.go @@ -0,0 +1,11 @@ +package wal + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/mdb/db-primary.go b/mdb/db-primary.go new file mode 100644 index 0000000..df6ba5b --- /dev/null +++ b/mdb/db-primary.go @@ -0,0 +1,67 @@ +package mdb + +/* +func (db *Database) openPrimary() (err error) { + wal, err := cwal.Open(db.walRootDir, cwal.Config{ + SegMinCount: db.conf.WALSegMinCount, + SegMaxAgeSec: db.conf.WALSegMaxAgeSec, + }) + + pFile, err := pfile.Open(db.pageFilePath, + + pFile, err := openPageFileAndReplayWAL(db.rootDir) + if err != nil { + return err + } + defer pFile.Close() + + pfHeader, err := pFile.ReadHeader() + if err != nil { + return err + } + + tx := db.Snapshot() + tx.seqNum = pfHeader.SeqNum + tx.updatedAt = pfHeader.UpdatedAt + + pIndex, err := pagefile.NewIndex(pFile) + if err != nil { + return err + } + + err = pFile.IterateAllocated(pIndex, func(cID, iID uint64, data []byte) error { + return db.loadItem(tx, cID, iID, data) + }) + if err != nil { + return err + } + + w, err := cwal.OpenWriter(db.walRootDir, &cwal.WriterConfig{ + SegMinCount: db.conf.WALSegMinCount, + SegMaxAgeSec: db.conf.WALSegMaxAgeSec, + }) + if err != nil { + return err + } + + db.done.Add(1) + go txAggregator{ + Stop: db.stop, + Done: db.done, + ModChan: db.modChan, + W: w, + Index: pIndex, + Snapshot: db.snapshot, + }.Run() + + db.done.Add(1) + go (&fileWriter{ + Stop: db.stop, + Done: db.done, + PageFilePath: db.pageFilePath, + WALRootDir: db.walRootDir, + }).Run() + + return nil +} +*/ diff --git a/mdb/db-secondary.go b/mdb/db-secondary.go new file mode 100644 index 0000000..f03b01f --- /dev/null +++ b/mdb/db-secondary.go @@ -0,0 +1,129 @@ +package mdb + +/* +func (db *Database) openSecondary() (err error) { + if db.shouldLoadFromPrimary() { + if err := db.loadFromPrimary(); err != nil { + return err + } + } + + log.Printf("Opening page-file...") + + pFile, err := openPageFileAndReplayWAL(db.rootDir) + if err != nil { + return err + } + defer pFile.Close() + + pfHeader, err := pFile.ReadHeader() + if err != nil { + return err + } + + log.Printf("Building page-file index...") + + pIndex, err := pagefile.NewIndex(pFile) + if err != nil { + return err + } + + tx := db.Snapshot() + tx.seqNum = pfHeader.SeqNum + tx.updatedAt = pfHeader.UpdatedAt + + log.Printf("Loading data into memory...") + + err = pFile.IterateAllocated(pIndex, func(cID, iID uint64, data []byte) error { + return db.loadItem(tx, cID, iID, data) + }) + if err != nil { + return err + } + + log.Printf("Creating writer...") + + w, err := cswal.OpenWriter(db.walRootDir, &cswal.WriterConfig{ + SegMinCount: db.conf.WALSegMinCount, + SegMaxAgeSec: db.conf.WALSegMaxAgeSec, + }) + if err != nil { + return err + } + + db.done.Add(1) + go (&walFollower{ + Stop: db.stop, + Done: db.done, + W: w, + Client: NewClient(db.conf.PrimaryURL, db.conf.ReplicationPSK, db.conf.NetTimeout), + }).Run() + + db.done.Add(1) + go (&follower{ + Stop: db.stop, + Done: db.done, + WALRootDir: db.walRootDir, + SeqNum: pfHeader.SeqNum, + ApplyChanges: db.applyChanges, + }).Run() + + db.done.Add(1) + go (&fileWriter{ + Stop: db.stop, + Done: db.done, + PageFilePath: db.pageFilePath, + WALRootDir: db.walRootDir, + }).Run() + + return nil +} + +func (db *Database) shouldLoadFromPrimary() bool { + if _, err := os.Stat(db.walRootDir); os.IsNotExist(err) { + log.Printf("WAL doesn't exist.") + return true + } + if _, err := os.Stat(db.pageFilePath); os.IsNotExist(err) { + log.Printf("Page-file doesn't exist.") + return true + } + return false +} + +func (db *Database) loadFromPrimary() error { + client := NewClient(db.conf.PrimaryURL, db.conf.ReplicationPSK, db.conf.NetTimeout) + defer client.Disconnect() + + log.Printf("Loading data from primary...") + + if err := os.RemoveAll(db.pageFilePath); err != nil { + log.Printf("Failed to remove page-file: %s", err) + return errs.IO.WithErr(err) // Caller can retry. + } + + if err := os.RemoveAll(db.walRootDir); err != nil { + log.Printf("Failed to remove WAL: %s", err) + return errs.IO.WithErr(err) // Caller can retry. + } + + err := client.DownloadPageFile(db.pageFilePath+".tmp", db.pageFilePath) + if err != nil { + log.Printf("Failed to get page-file from primary: %s", err) + return err // Caller can retry. + } + + pfHeader, err := pagefile.ReadHeader(db.pageFilePath) + if err != nil { + log.Printf("Failed to read page-file sequence number: %s", err) + return err // Caller can retry. + } + + if err = cswal.CreateEx(db.walRootDir, pfHeader.SeqNum+1); err != nil { + log.Printf("Failed to initialize WAL: %s", err) + return err // Caller can retry. + } + + return nil +} +*/ diff --git a/mdb/db-testlist_test.go b/mdb/db-testlist_test.go new file mode 100644 index 0000000..008093a --- /dev/null +++ b/mdb/db-testlist_test.go @@ -0,0 +1,138 @@ +package mdb + +import ( + "fmt" + "reflect" + "testing" +) + +func TestDBList(t *testing.T) { + db := NewTestDBPrimary(t, t.TempDir()) + + var ( + user1 = User{ + ID: NewID(), + Name: "User1", + Email: "user1@gmail.com", + } + + user2 = User{ + ID: NewID(), + Name: "User2", + Email: "user2@gmail.com", + } + + user3 = User{ + ID: NewID(), + Name: "User3", + Email: "user3@gmail.com", + } + user1Data = make([]UserDataItem, 10) + user2Data = make([]UserDataItem, 4) + user3Data = make([]UserDataItem, 8) + ) + + err := db.Update(func(tx *Snapshot) error { + if err := db.Users.Insert(tx, &user1); err != nil { + return err + } + + if err := db.Users.Insert(tx, &user2); err != nil { + return err + } + + for i := range user1Data { + user1Data[i] = UserDataItem{ + ID: NewID(), + UserID: user1.ID, + Name: fmt.Sprintf("Name1: %d", i), + Data: fmt.Sprintf("Data: %d", i), + } + + if err := db.UserData.Insert(tx, &user1Data[i]); err != nil { + return err + } + } + + for i := range user2Data { + user2Data[i] = UserDataItem{ + ID: NewID(), + UserID: user2.ID, + Name: fmt.Sprintf("Name2: %d", i), + Data: fmt.Sprintf("Data: %d", i), + } + + if err := db.UserData.Insert(tx, &user2Data[i]); err != nil { + return err + } + } + + for i := range user3Data { + user3Data[i] = UserDataItem{ + ID: NewID(), + UserID: user3.ID, + Name: fmt.Sprintf("Name3: %d", i), + Data: fmt.Sprintf("Data: %d", i), + } + + if err := db.UserData.Insert(tx, &user3Data[i]); err != nil { + return err + } + } + + return nil + }) + + if err != nil { + t.Fatal(err) + } + + type TestCase struct { + Name string + Args ListArgs[UserDataItem] + Expected []UserDataItem + } + + cases := []TestCase{ + { + Name: "User1 all", + Args: ListArgs[UserDataItem]{ + After: &UserDataItem{ + UserID: user1.ID, + }, + While: func(item *UserDataItem) bool { + return item.UserID == user1.ID + }, + }, + Expected: user1Data, + }, { + Name: "User1 limited", + Args: ListArgs[UserDataItem]{ + After: &UserDataItem{ + UserID: user1.ID, + }, + While: func(item *UserDataItem) bool { + return item.UserID == user1.ID + }, + Limit: 4, + }, + Expected: user1Data[:4], + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + tx := db.Snapshot() + l := db.UserData.ByName.List(tx, tc.Args, nil) + if len(l) != len(tc.Expected) { + t.Fatal(tc.Name, l) + } + + for i := range l { + if !reflect.DeepEqual(*l[i], tc.Expected[i]) { + t.Fatal(tc.Name, l) + } + } + }) + } +} diff --git a/mdb/main_test.go b/mdb/main_test.go new file mode 100644 index 0000000..d9e09aa --- /dev/null +++ b/mdb/main_test.go @@ -0,0 +1,11 @@ +package mdb + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/mdb/txaggregator.go b/mdb/txaggregator.go new file mode 100644 index 0000000..e073e1f --- /dev/null +++ b/mdb/txaggregator.go @@ -0,0 +1,92 @@ +package mdb + +/* +type txAggregator struct { + Stop chan struct{} + Done *sync.WaitGroup + ModChan chan txMod + W *cswal.Writer + Index *pagefile.Index + Snapshot *atomic.Pointer[Snapshot] +} + +func (p txAggregator) Run() { + defer p.Done.Done() + defer p.W.Close() + + var ( + tx *Snapshot + mod txMod + rec cswal.Record + err error + toNotify = make([]chan error, 0, 1024) + ) + +READ_FIRST: + + toNotify = toNotify[:0] + + select { + case mod = <-p.ModChan: + goto BEGIN + case <-p.Stop: + goto END + } + +BEGIN: + + tx = p.Snapshot.Load().begin() + goto APPLY_MOD + +CLONE: + + tx = tx.clone() + goto APPLY_MOD + +APPLY_MOD: + + if err = mod.Update(tx); err != nil { + mod.Resp <- err + goto ROLLBACK + } + + toNotify = append(toNotify, mod.Resp) + goto NEXT + +ROLLBACK: + + if len(toNotify) == 0 { + goto READ_FIRST + } + + tx = tx.rollback() + goto NEXT + +NEXT: + + select { + case mod = <-p.ModChan: + goto CLONE + default: + goto WRITE + } + +WRITE: + + rec, err = writeChangesToWAL(tx.changes, p.Index, p.W) + if err == nil { + tx.seqNum = rec.SeqNum + tx.updatedAt = rec.CreatedAt + tx.setReadOnly() + p.Snapshot.Store(tx) + } + + for i := range toNotify { + toNotify[i] <- err + } + + goto READ_FIRST + +END: +} +*/