From ba1990e379930a7f8c9589bd5eb916e70a581489 Mon Sep 17 00:00:00 2001 From: "J. David Lee" Date: Fri, 17 Nov 2023 10:06:56 +0000 Subject: [PATCH] v2 (#2) Co-authored-by: jdl Reviewed-on: https://git.crumpington.com/public/jldb/pulls/2 --- mdb/collection.go | 45 +++++++------ mdb/db-primary.go | 67 -------------------- mdb/db-secondary.go | 129 -------------------------------------- mdb/db-testrunner_test.go | 2 +- mdb/db-userdata_test.go | 2 +- mdb/db-users_test.go | 6 +- mdb/index.go | 46 +++++++------- 7 files changed, 50 insertions(+), 247 deletions(-) delete mode 100644 mdb/db-primary.go delete mode 100644 mdb/db-secondary.go diff --git a/mdb/collection.go b/mdb/collection.go index 7e20015..73b035d 100644 --- a/mdb/collection.go +++ b/mdb/collection.go @@ -21,10 +21,10 @@ type Collection[T any] struct { sanitize func(*T) validate func(*T) error - indices []Index[T] - uniqueIndices []Index[T] + indices []*Index[T] + uniqueIndices []*Index[T] - ByID Index[T] + ByID *Index[T] buf *bytes.Buffer } @@ -65,8 +65,8 @@ func NewCollection[T any](db *Database, name string, conf *CollectionConfig[T]) copy: conf.Copy, sanitize: conf.Sanitize, validate: conf.Validate, - indices: []Index[T]{}, - uniqueIndices: []Index[T]{}, + indices: []*Index[T]{}, + uniqueIndices: []*Index[T]{}, buf: &bytes.Buffer{}, } @@ -92,7 +92,7 @@ func NewCollection[T any](db *Database, name string, conf *CollectionConfig[T]) return c } -func (c Collection[T]) Name() string { +func (c *Collection[T]) Name() string { return c.name } @@ -108,13 +108,13 @@ type indexConfig[T any] struct { Include func(item *T) bool } -func (c Collection[T]) Get(tx *Snapshot, id uint64) (*T, bool) { +func (c *Collection[T]) Get(tx *Snapshot, id uint64) (*T, bool) { x := new(T) c.setID(x, id) return c.ByID.Get(tx, x) } -func (c Collection[T]) List(tx *Snapshot, ids []uint64, out []*T) []*T { +func (c *Collection[T]) List(tx *Snapshot, ids []uint64, out []*T) []*T { if len(ids) == 0 { return out[:0] } @@ -135,8 +135,7 @@ func (c Collection[T]) List(tx *Snapshot, ids []uint64, out []*T) []*T { } // AddIndex: Add an index to the collection. -func (c *Collection[T]) addIndex(conf indexConfig[T]) Index[T] { - +func (c *Collection[T]) addIndex(conf indexConfig[T]) *Index[T] { var less func(*T, *T) bool if conf.Unique { @@ -160,7 +159,7 @@ func (c *Collection[T]) addIndex(conf indexConfig[T]) Index[T] { BTree: btree.NewG(256, less), } - index := Index[T]{ + index := &Index[T]{ collectionID: c.collectionID, name: conf.Name, indexID: c.getState(c.db.Snapshot()).addIndex(indexState), @@ -176,7 +175,7 @@ func (c *Collection[T]) addIndex(conf indexConfig[T]) Index[T] { return index } -func (c Collection[T]) Insert(tx *Snapshot, userItem *T) error { +func (c *Collection[T]) Insert(tx *Snapshot, userItem *T) error { if err := c.ensureMutable(tx); err != nil { return err } @@ -203,7 +202,7 @@ func (c Collection[T]) Insert(tx *Snapshot, userItem *T) error { return nil } -func (c Collection[T]) Update(tx *Snapshot, userItem *T) error { +func (c *Collection[T]) Update(tx *Snapshot, userItem *T) error { if err := c.ensureMutable(tx); err != nil { return err } @@ -235,7 +234,7 @@ func (c Collection[T]) Update(tx *Snapshot, userItem *T) error { return nil } -func (c Collection[T]) Upsert(tx *Snapshot, item *T) error { +func (c *Collection[T]) Upsert(tx *Snapshot, item *T) error { err := c.Insert(tx, item) if err == nil { return nil @@ -246,7 +245,7 @@ func (c Collection[T]) Upsert(tx *Snapshot, item *T) error { return err } -func (c Collection[T]) Delete(tx *Snapshot, itemID uint64) error { +func (c *Collection[T]) Delete(tx *Snapshot, itemID uint64) error { if err := c.ensureMutable(tx); err != nil { return err } @@ -254,13 +253,13 @@ func (c Collection[T]) Delete(tx *Snapshot, itemID uint64) error { return c.deleteItem(tx, itemID) } -func (c Collection[T]) getByID(tx *Snapshot, itemID uint64) (*T, bool) { +func (c *Collection[T]) getByID(tx *Snapshot, itemID uint64) (*T, bool) { x := new(T) c.setID(x, itemID) return c.ByID.get(tx, x) } -func (c Collection[T]) ensureMutable(tx *Snapshot) error { +func (c *Collection[T]) ensureMutable(tx *Snapshot) error { if !tx.writable() { return errs.ReadOnly } @@ -274,7 +273,7 @@ func (c Collection[T]) ensureMutable(tx *Snapshot) error { } // For initial data loading. -func (c Collection[T]) insertItem(tx *Snapshot, itemID uint64, data []byte) error { +func (c *Collection[T]) insertItem(tx *Snapshot, itemID uint64, data []byte) error { item := new(T) if err := json.Unmarshal(data, item); err != nil { return errs.Encoding.WithErr(err).WithCollection(c.name) @@ -295,7 +294,7 @@ func (c Collection[T]) insertItem(tx *Snapshot, itemID uint64, data []byte) erro return nil } -func (c Collection[T]) deleteItem(tx *Snapshot, itemID uint64) error { +func (c *Collection[T]) deleteItem(tx *Snapshot, itemID uint64) error { item, ok := c.getByID(tx, itemID) if !ok { return errs.NotFound @@ -312,7 +311,7 @@ func (c Collection[T]) deleteItem(tx *Snapshot, itemID uint64) error { // upsertItem inserts or updates the item with itemID and the given serialized // form. It's called by -func (c Collection[T]) upsertItem(tx *Snapshot, itemID uint64, data []byte) error { +func (c *Collection[T]) upsertItem(tx *Snapshot, itemID uint64, data []byte) error { item, ok := c.getByID(tx, itemID) if ok { tx.delete(c.collectionID, itemID) @@ -335,14 +334,14 @@ func (c Collection[T]) upsertItem(tx *Snapshot, itemID uint64, data []byte) erro return nil } -func (c Collection[T]) getID(t *T) uint64 { +func (c *Collection[T]) getID(t *T) uint64 { return *((*uint64)(unsafe.Pointer(t))) } -func (c Collection[T]) setID(t *T, id uint64) { +func (c *Collection[T]) setID(t *T, id uint64) { *((*uint64)(unsafe.Pointer(t))) = id } -func (c Collection[T]) getState(tx *Snapshot) *collectionState[T] { +func (c *Collection[T]) getState(tx *Snapshot) *collectionState[T] { return tx.collections[c.collectionID].(*collectionState[T]) } diff --git a/mdb/db-primary.go b/mdb/db-primary.go deleted file mode 100644 index df6ba5b..0000000 --- a/mdb/db-primary.go +++ /dev/null @@ -1,67 +0,0 @@ -package mdb - -/* -func (db *Database) openPrimary() (err error) { - wal, err := cwal.Open(db.walRootDir, cwal.Config{ - SegMinCount: db.conf.WALSegMinCount, - SegMaxAgeSec: db.conf.WALSegMaxAgeSec, - }) - - pFile, err := pfile.Open(db.pageFilePath, - - pFile, err := openPageFileAndReplayWAL(db.rootDir) - if err != nil { - return err - } - defer pFile.Close() - - pfHeader, err := pFile.ReadHeader() - if err != nil { - return err - } - - tx := db.Snapshot() - tx.seqNum = pfHeader.SeqNum - tx.updatedAt = pfHeader.UpdatedAt - - pIndex, err := pagefile.NewIndex(pFile) - if err != nil { - return err - } - - err = pFile.IterateAllocated(pIndex, func(cID, iID uint64, data []byte) error { - return db.loadItem(tx, cID, iID, data) - }) - if err != nil { - return err - } - - w, err := cwal.OpenWriter(db.walRootDir, &cwal.WriterConfig{ - SegMinCount: db.conf.WALSegMinCount, - SegMaxAgeSec: db.conf.WALSegMaxAgeSec, - }) - if err != nil { - return err - } - - db.done.Add(1) - go txAggregator{ - Stop: db.stop, - Done: db.done, - ModChan: db.modChan, - W: w, - Index: pIndex, - Snapshot: db.snapshot, - }.Run() - - db.done.Add(1) - go (&fileWriter{ - Stop: db.stop, - Done: db.done, - PageFilePath: db.pageFilePath, - WALRootDir: db.walRootDir, - }).Run() - - return nil -} -*/ diff --git a/mdb/db-secondary.go b/mdb/db-secondary.go deleted file mode 100644 index f03b01f..0000000 --- a/mdb/db-secondary.go +++ /dev/null @@ -1,129 +0,0 @@ -package mdb - -/* -func (db *Database) openSecondary() (err error) { - if db.shouldLoadFromPrimary() { - if err := db.loadFromPrimary(); err != nil { - return err - } - } - - log.Printf("Opening page-file...") - - pFile, err := openPageFileAndReplayWAL(db.rootDir) - if err != nil { - return err - } - defer pFile.Close() - - pfHeader, err := pFile.ReadHeader() - if err != nil { - return err - } - - log.Printf("Building page-file index...") - - pIndex, err := pagefile.NewIndex(pFile) - if err != nil { - return err - } - - tx := db.Snapshot() - tx.seqNum = pfHeader.SeqNum - tx.updatedAt = pfHeader.UpdatedAt - - log.Printf("Loading data into memory...") - - err = pFile.IterateAllocated(pIndex, func(cID, iID uint64, data []byte) error { - return db.loadItem(tx, cID, iID, data) - }) - if err != nil { - return err - } - - log.Printf("Creating writer...") - - w, err := cswal.OpenWriter(db.walRootDir, &cswal.WriterConfig{ - SegMinCount: db.conf.WALSegMinCount, - SegMaxAgeSec: db.conf.WALSegMaxAgeSec, - }) - if err != nil { - return err - } - - db.done.Add(1) - go (&walFollower{ - Stop: db.stop, - Done: db.done, - W: w, - Client: NewClient(db.conf.PrimaryURL, db.conf.ReplicationPSK, db.conf.NetTimeout), - }).Run() - - db.done.Add(1) - go (&follower{ - Stop: db.stop, - Done: db.done, - WALRootDir: db.walRootDir, - SeqNum: pfHeader.SeqNum, - ApplyChanges: db.applyChanges, - }).Run() - - db.done.Add(1) - go (&fileWriter{ - Stop: db.stop, - Done: db.done, - PageFilePath: db.pageFilePath, - WALRootDir: db.walRootDir, - }).Run() - - return nil -} - -func (db *Database) shouldLoadFromPrimary() bool { - if _, err := os.Stat(db.walRootDir); os.IsNotExist(err) { - log.Printf("WAL doesn't exist.") - return true - } - if _, err := os.Stat(db.pageFilePath); os.IsNotExist(err) { - log.Printf("Page-file doesn't exist.") - return true - } - return false -} - -func (db *Database) loadFromPrimary() error { - client := NewClient(db.conf.PrimaryURL, db.conf.ReplicationPSK, db.conf.NetTimeout) - defer client.Disconnect() - - log.Printf("Loading data from primary...") - - if err := os.RemoveAll(db.pageFilePath); err != nil { - log.Printf("Failed to remove page-file: %s", err) - return errs.IO.WithErr(err) // Caller can retry. - } - - if err := os.RemoveAll(db.walRootDir); err != nil { - log.Printf("Failed to remove WAL: %s", err) - return errs.IO.WithErr(err) // Caller can retry. - } - - err := client.DownloadPageFile(db.pageFilePath+".tmp", db.pageFilePath) - if err != nil { - log.Printf("Failed to get page-file from primary: %s", err) - return err // Caller can retry. - } - - pfHeader, err := pagefile.ReadHeader(db.pageFilePath) - if err != nil { - log.Printf("Failed to read page-file sequence number: %s", err) - return err // Caller can retry. - } - - if err = cswal.CreateEx(db.walRootDir, pfHeader.SeqNum+1); err != nil { - log.Printf("Failed to initialize WAL: %s", err) - return err // Caller can retry. - } - - return nil -} -*/ diff --git a/mdb/db-testrunner_test.go b/mdb/db-testrunner_test.go index a812c8e..a011ffa 100644 --- a/mdb/db-testrunner_test.go +++ b/mdb/db-testrunner_test.go @@ -134,7 +134,7 @@ func checkSlicesEqual[T any](t *testing.T, name string, actual, expected []T) { } } -func checkMinMaxEqual[T any](t *testing.T, name string, tx *Snapshot, index Index[T], expected []T) { +func checkMinMaxEqual[T any](t *testing.T, name string, tx *Snapshot, index *Index[T], expected []T) { if len(expected) == 0 { if min, ok := index.Min(tx); ok { t.Fatal(min) diff --git a/mdb/db-userdata_test.go b/mdb/db-userdata_test.go index ce946fb..b6a786e 100644 --- a/mdb/db-userdata_test.go +++ b/mdb/db-userdata_test.go @@ -14,7 +14,7 @@ type UserDataItem struct { type UserData struct { *Collection[UserDataItem] - ByName Index[UserDataItem] // Unique index on (Token). + ByName *Index[UserDataItem] // Unique index on (Token). } func NewUserDataCollection(db *Database) UserData { diff --git a/mdb/db-users_test.go b/mdb/db-users_test.go index 63dbcc3..4109f2f 100644 --- a/mdb/db-users_test.go +++ b/mdb/db-users_test.go @@ -12,9 +12,9 @@ type User struct { type Users struct { *Collection[User] - ByEmail Index[User] // Unique index on (Email). - ByName Index[User] // Index on (Name). - ByBlocked Index[User] // Partial index on (Blocked,Email). + ByEmail *Index[User] // Unique index on (Email). + ByName *Index[User] // Index on (Name). + ByBlocked *Index[User] // Partial index on (Blocked,Email). } func NewUserCollection(db *Database) Users { diff --git a/mdb/index.go b/mdb/index.go index ed769ee..9335e64 100644 --- a/mdb/index.go +++ b/mdb/index.go @@ -10,7 +10,7 @@ func NewIndex[T any]( c *Collection[T], name string, compare func(lhs, rhs *T) int, -) Index[T] { +) *Index[T] { return c.addIndex(indexConfig[T]{ Name: name, Unique: false, @@ -24,7 +24,7 @@ func NewPartialIndex[T any]( name string, compare func(lhs, rhs *T) int, include func(*T) bool, -) Index[T] { +) *Index[T] { return c.addIndex(indexConfig[T]{ Name: name, Unique: false, @@ -37,7 +37,7 @@ func NewUniqueIndex[T any]( c *Collection[T], name string, compare func(lhs, rhs *T) int, -) Index[T] { +) *Index[T] { return c.addIndex(indexConfig[T]{ Name: name, Unique: true, @@ -51,7 +51,7 @@ func NewUniquePartialIndex[T any]( name string, compare func(lhs, rhs *T) int, include func(*T) bool, -) Index[T] { +) *Index[T] { return c.addIndex(indexConfig[T]{ Name: name, Unique: true, @@ -70,7 +70,7 @@ type Index[T any] struct { copy func(*T) *T } -func (i Index[T]) Get(tx *Snapshot, in *T) (item *T, ok bool) { +func (i *Index[T]) Get(tx *Snapshot, in *T) (item *T, ok bool) { tPtr, ok := i.get(tx, in) if !ok { return item, false @@ -78,15 +78,15 @@ func (i Index[T]) Get(tx *Snapshot, in *T) (item *T, ok bool) { return i.copy(tPtr), true } -func (i Index[T]) get(tx *Snapshot, in *T) (*T, bool) { +func (i *Index[T]) get(tx *Snapshot, in *T) (*T, bool) { return i.btree(tx).Get(in) } -func (i Index[T]) Has(tx *Snapshot, in *T) bool { +func (i *Index[T]) Has(tx *Snapshot, in *T) bool { return i.btree(tx).Has(in) } -func (i Index[T]) Min(tx *Snapshot) (item *T, ok bool) { +func (i *Index[T]) Min(tx *Snapshot) (item *T, ok bool) { tPtr, ok := i.btree(tx).Min() if !ok { return item, false @@ -94,7 +94,7 @@ func (i Index[T]) Min(tx *Snapshot) (item *T, ok bool) { return i.copy(tPtr), true } -func (i Index[T]) Max(tx *Snapshot) (item *T, ok bool) { +func (i *Index[T]) Max(tx *Snapshot) (item *T, ok bool) { tPtr, ok := i.btree(tx).Max() if !ok { return item, false @@ -102,25 +102,25 @@ func (i Index[T]) Max(tx *Snapshot) (item *T, ok bool) { return i.copy(tPtr), true } -func (i Index[T]) Ascend(tx *Snapshot, each func(*T) bool) { +func (i *Index[T]) Ascend(tx *Snapshot, each func(*T) bool) { i.btreeForIter(tx).Ascend(func(t *T) bool { return each(i.copy(t)) }) } -func (i Index[T]) AscendAfter(tx *Snapshot, after *T, each func(*T) bool) { +func (i *Index[T]) AscendAfter(tx *Snapshot, after *T, each func(*T) bool) { i.btreeForIter(tx).AscendGreaterOrEqual(after, func(t *T) bool { return each(i.copy(t)) }) } -func (i Index[T]) Descend(tx *Snapshot, each func(*T) bool) { +func (i *Index[T]) Descend(tx *Snapshot, each func(*T) bool) { i.btreeForIter(tx).Descend(func(t *T) bool { return each(i.copy(t)) }) } -func (i Index[T]) DescendAfter(tx *Snapshot, after *T, each func(*T) bool) { +func (i *Index[T]) DescendAfter(tx *Snapshot, after *T, each func(*T) bool) { i.btreeForIter(tx).DescendLessOrEqual(after, func(t *T) bool { return each(i.copy(t)) }) @@ -133,7 +133,7 @@ type ListArgs[T any] struct { Limit int // Maximum number of items to return. 0 => All. } -func (i Index[T]) List(tx *Snapshot, args ListArgs[T], out []*T) []*T { +func (i *Index[T]) List(tx *Snapshot, args ListArgs[T], out []*T) []*T { if args.Limit < 0 { return nil } @@ -176,11 +176,11 @@ func (i Index[T]) List(tx *Snapshot, args ListArgs[T], out []*T) []*T { // ---------------------------------------------------------------------------- -func (i Index[T]) insertConflict(tx *Snapshot, item *T) bool { +func (i *Index[T]) insertConflict(tx *Snapshot, item *T) bool { return i.btree(tx).Has(item) } -func (i Index[T]) updateConflict(tx *Snapshot, item *T) bool { +func (i *Index[T]) updateConflict(tx *Snapshot, item *T) bool { current, ok := i.btree(tx).Get(item) return ok && i.getID(current) != i.getID(item) } @@ -188,7 +188,7 @@ func (i Index[T]) updateConflict(tx *Snapshot, item *T) bool { // This should only be called after insertConflict. Additionally, the caller // should ensure that the index has been properly cloned for write before // writing. -func (i Index[T]) insert(tx *Snapshot, item *T) { +func (i *Index[T]) insert(tx *Snapshot, item *T) { if i.include != nil && !i.include(item) { return } @@ -196,7 +196,7 @@ func (i Index[T]) insert(tx *Snapshot, item *T) { i.btree(tx).ReplaceOrInsert(item) } -func (i Index[T]) update(tx *Snapshot, old, new *T) { +func (i *Index[T]) update(tx *Snapshot, old, new *T) { bt := i.btree(tx) bt.Delete(old) @@ -204,22 +204,22 @@ func (i Index[T]) update(tx *Snapshot, old, new *T) { i.insert(tx, new) } -func (i Index[T]) delete(tx *Snapshot, item *T) { +func (i *Index[T]) delete(tx *Snapshot, item *T) { i.btree(tx).Delete(item) } // ---------------------------------------------------------------------------- -func (i Index[T]) getState(tx *Snapshot) indexState[T] { +func (i *Index[T]) getState(tx *Snapshot) indexState[T] { return tx.collections[i.collectionID].(*collectionState[T]).Indices[i.indexID] } // Get the current btree for get/has/update/delete, etc. -func (i Index[T]) btree(tx *Snapshot) *btree.BTreeG[*T] { +func (i *Index[T]) btree(tx *Snapshot) *btree.BTreeG[*T] { return i.getState(tx).BTree } -func (i Index[T]) btreeForIter(tx *Snapshot) *btree.BTreeG[*T] { +func (i *Index[T]) btreeForIter(tx *Snapshot) *btree.BTreeG[*T] { cState := tx.collections[i.collectionID].(*collectionState[T]) bt := cState.Indices[i.indexID].BTree @@ -231,6 +231,6 @@ func (i Index[T]) btreeForIter(tx *Snapshot) *btree.BTreeG[*T] { return bt } -func (i Index[T]) getID(t *T) uint64 { +func (i *Index[T]) getID(t *T) uint64 { return *((*uint64)(unsafe.Pointer(t))) }