diff --git a/go.mod b/go.mod index 3b5ce6d..e261e13 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.22 require ( github.com/google/btree v1.1.2 + go.uber.org/goleak v1.3.0 golang.org/x/net v0.15.0 golang.org/x/sys v0.12.0 ) diff --git a/go.sum b/go.sum index 5921adb..bb51033 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,16 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU= github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8= golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/lib/rep/http-handler.go b/lib/rep/http-handler.go index f1721a9..843cb7a 100644 --- a/lib/rep/http-handler.go +++ b/lib/rep/http-handler.go @@ -12,7 +12,6 @@ const ( pathStreamWAL = "stream-wal" ) -// TODO: Remove this! func (rep *Replicator) Handle(w http.ResponseWriter, r *http.Request) { // We'll handle two types of requests: HTTP GET requests for JSON, or // streaming requets for state or wall. diff --git a/lib/rep/main_test.go b/lib/rep/main_test.go new file mode 100644 index 0000000..d922a30 --- /dev/null +++ b/lib/rep/main_test.go @@ -0,0 +1,11 @@ +package rep + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/lib/rep/replicator.go b/lib/rep/replicator.go index 93c76ca..b3a4583 100644 --- a/lib/rep/replicator.go +++ b/lib/rep/replicator.go @@ -36,8 +36,8 @@ type App struct { // SendState: The primary may need to send storage state to a secondary node. SendState func(conn net.Conn) error - // (1) RecvState: Secondary nodes may need to load state from the primary if the - // WAL is too far behind. + // (1) RecvState: Secondary nodes may need to load state from the primary if + // the WAL is too far behind. RecvState func(conn net.Conn) error // (2) InitStorage: Prepare application storage for possible calls to diff --git a/lib/rep/testapp-harness_test.go b/lib/rep/testapp-harness_test.go index bfcf4e0..dd9803f 100644 --- a/lib/rep/testapp-harness_test.go +++ b/lib/rep/testapp-harness_test.go @@ -56,6 +56,7 @@ func (h TestAppHarness) Run(t *testing.T) { WALSegMaxAgeSec: 1, WALSegGCAgeSec: 1, }) + defer app2.Close() val.MethodByName(method.Name).Call([]reflect.Value{ reflect.ValueOf(t), diff --git a/mdb/collection.go b/mdb/collection.go index 40b603d..f9d6e86 100644 --- a/mdb/collection.go +++ b/mdb/collection.go @@ -1,7 +1,6 @@ package mdb import ( - "bytes" "encoding/json" "errors" "hash/crc64" @@ -25,8 +24,6 @@ type Collection[T any] struct { uniqueIndices []*Index[T] ByID *Index[T] - - buf *bytes.Buffer } type CollectionConfig[T any] struct { @@ -67,7 +64,6 @@ func NewCollection[T any](db *Database, name string, conf *CollectionConfig[T]) validate: conf.Validate, indices: []*Index[T]{}, uniqueIndices: []*Index[T]{}, - buf: &bytes.Buffer{}, } db.addCollection(c.collectionID, c, &collectionState[T]{ diff --git a/mdb/db-rep.go b/mdb/db-rep.go index 70dd3eb..1f230fc 100644 --- a/mdb/db-rep.go +++ b/mdb/db-rep.go @@ -99,6 +99,7 @@ func (db *Database) repApply(rec wal.Record) (err error) { } tx.seqNum = rec.SeqNum tx.timestampMS = rec.TimestampMS + tx.setReadOnly() db.snapshot.Store(tx) return nil } diff --git a/mdb/db-testrunner_test.go b/mdb/db-testrunner_test.go index a303f2a..c157ba5 100644 --- a/mdb/db-testrunner_test.go +++ b/mdb/db-testrunner_test.go @@ -72,7 +72,7 @@ func testRunner_testCase(t *testing.T, testCase DBTestCase) { } // TODO: Why is this necessary? - time.Sleep(time.Second) + //time.Sleep(time.Second) finalStep := testCase.Steps[len(testCase.Steps)-1] secondarySnapshot := db2.Snapshot() diff --git a/mdb/txaggregator.go b/mdb/txaggregator.go deleted file mode 100644 index e073e1f..0000000 --- a/mdb/txaggregator.go +++ /dev/null @@ -1,92 +0,0 @@ -package mdb - -/* -type txAggregator struct { - Stop chan struct{} - Done *sync.WaitGroup - ModChan chan txMod - W *cswal.Writer - Index *pagefile.Index - Snapshot *atomic.Pointer[Snapshot] -} - -func (p txAggregator) Run() { - defer p.Done.Done() - defer p.W.Close() - - var ( - tx *Snapshot - mod txMod - rec cswal.Record - err error - toNotify = make([]chan error, 0, 1024) - ) - -READ_FIRST: - - toNotify = toNotify[:0] - - select { - case mod = <-p.ModChan: - goto BEGIN - case <-p.Stop: - goto END - } - -BEGIN: - - tx = p.Snapshot.Load().begin() - goto APPLY_MOD - -CLONE: - - tx = tx.clone() - goto APPLY_MOD - -APPLY_MOD: - - if err = mod.Update(tx); err != nil { - mod.Resp <- err - goto ROLLBACK - } - - toNotify = append(toNotify, mod.Resp) - goto NEXT - -ROLLBACK: - - if len(toNotify) == 0 { - goto READ_FIRST - } - - tx = tx.rollback() - goto NEXT - -NEXT: - - select { - case mod = <-p.ModChan: - goto CLONE - default: - goto WRITE - } - -WRITE: - - rec, err = writeChangesToWAL(tx.changes, p.Index, p.W) - if err == nil { - tx.seqNum = rec.SeqNum - tx.updatedAt = rec.CreatedAt - tx.setReadOnly() - p.Snapshot.Store(tx) - } - - for i := range toNotify { - toNotify[i] <- err - } - - goto READ_FIRST - -END: -} -*/