Compare commits
	
		
			12 Commits
		
	
	
		
			429f410681
			...
			v0.7.1
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | 875957f662 | ||
|  | b251368b09 | ||
|  | c2a1a7f247 | ||
|  | 9785637b3b | ||
|  | 728b34b684 | ||
|  | 8be663a0a0 | ||
|  | 6da018353a | ||
|  | 5528c264d3 | ||
|  | 9078335d70 | ||
|  | 3c9e5505ab | ||
| ba1990e379 | |||
| 526196ef9d | 
| @@ -4,6 +4,7 @@ Replicated in-memory database and file store. | |||||||
|  |  | ||||||
| ## TODO | ## TODO | ||||||
|  |  | ||||||
|  | * [ ] mdb: Tests for using `nil` snapshots ? | ||||||
| * [ ] mdb: tests for sanitize and validate functions | * [ ] mdb: tests for sanitize and validate functions | ||||||
| * [ ] Test: lib/wal iterator w/ corrupt file (random corruptions) | * [ ] Test: lib/wal iterator w/ corrupt file (random corruptions) | ||||||
| * [ ] Test: lib/wal io.go | * [ ] Test: lib/wal io.go | ||||||
|   | |||||||
| @@ -12,6 +12,7 @@ const ( | |||||||
| 	pathStreamWAL = "stream-wal" | 	pathStreamWAL = "stream-wal" | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | // TODO: Remove this! | ||||||
| func (rep *Replicator) Handle(w http.ResponseWriter, r *http.Request) { | func (rep *Replicator) Handle(w http.ResponseWriter, r *http.Request) { | ||||||
| 	// We'll handle two types of requests: HTTP GET requests for JSON, or | 	// We'll handle two types of requests: HTTP GET requests for JSON, or | ||||||
| 	// streaming requets for state or wall. | 	// streaming requets for state or wall. | ||||||
|   | |||||||
| @@ -15,7 +15,7 @@ func (rep *Replicator) runWALGC() { | |||||||
| 		select { | 		select { | ||||||
| 		case <-ticker.C: | 		case <-ticker.C: | ||||||
| 			state := rep.getState() | 			state := rep.getState() | ||||||
| 			before := time.Now().Unix() - rep.conf.WALSegMaxAgeSec | 			before := time.Now().Unix() - rep.conf.WALSegGCAgeSec | ||||||
| 			if err := rep.wal.DeleteBefore(before, state.SeqNum); err != nil { | 			if err := rep.wal.DeleteBefore(before, state.SeqNum); err != nil { | ||||||
| 				log.Printf("[WAL-GC] failed to delete wal segments: %v", err) | 				log.Printf("[WAL-GC] failed to delete wal segments: %v", err) | ||||||
| 			} | 			} | ||||||
|   | |||||||
| @@ -2,6 +2,7 @@ package rep | |||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	"io" | 	"io" | ||||||
|  | 	"log" | ||||||
| 	"net" | 	"net" | ||||||
| 	"os" | 	"os" | ||||||
| 	"sync" | 	"sync" | ||||||
| @@ -94,41 +95,49 @@ func Open(app App, conf Config) (*Replicator, error) { | |||||||
| 	rep.client = newClient(rep.conf.PrimaryEndpoint, rep.conf.ReplicationPSK, rep.conf.NetTimeout) | 	rep.client = newClient(rep.conf.PrimaryEndpoint, rep.conf.ReplicationPSK, rep.conf.NetTimeout) | ||||||
|  |  | ||||||
| 	if err := rep.initDirectories(); err != nil { | 	if err := rep.initDirectories(); err != nil { | ||||||
|  | 		log.Printf("Failed to init directories: %v", err) | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if err := rep.acquireLock(); err != nil { | 	if err := rep.acquireLock(); err != nil { | ||||||
| 		rep.Close() | 		rep.Close() | ||||||
|  | 		log.Printf("Failed to acquire lock: %v", err) | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if err := rep.loadLocalState(); err != nil { | 	if err := rep.loadLocalState(); err != nil { | ||||||
| 		rep.Close() | 		rep.Close() | ||||||
|  | 		log.Printf("Failed to load local state: %v", err) | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if err := rep.openWAL(); err != nil { | 	if err := rep.openWAL(); err != nil { | ||||||
| 		rep.Close() | 		rep.Close() | ||||||
|  | 		log.Printf("Failed to open WAL: %v", err) | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if err := rep.recvStateIfNecessary(); err != nil { | 	if err := rep.recvStateIfNecessary(); err != nil { | ||||||
| 		rep.Close() | 		rep.Close() | ||||||
|  | 		log.Printf("Failed to recv state: %v", err) | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if err := rep.app.InitStorage(); err != nil { | 	if err := rep.app.InitStorage(); err != nil { | ||||||
| 		rep.Close() | 		rep.Close() | ||||||
|  | 		log.Printf("Failed to init storage: %v", err) | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if err := rep.replay(); err != nil { | 	if err := rep.replay(); err != nil { | ||||||
| 		rep.Close() | 		rep.Close() | ||||||
|  | 		log.Printf("Failed to replay: %v", err) | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if err := rep.app.LoadFromStorage(); err != nil { | 	if err := rep.app.LoadFromStorage(); err != nil { | ||||||
| 		rep.Close() | 		rep.Close() | ||||||
|  | 		log.Printf("Failed to load from storage: %v", err) | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|   | |||||||
| @@ -21,10 +21,10 @@ type Collection[T any] struct { | |||||||
| 	sanitize func(*T) | 	sanitize func(*T) | ||||||
| 	validate func(*T) error | 	validate func(*T) error | ||||||
|  |  | ||||||
| 	indices       []Index[T] | 	indices       []*Index[T] | ||||||
| 	uniqueIndices []Index[T] | 	uniqueIndices []*Index[T] | ||||||
|  |  | ||||||
| 	ByID Index[T] | 	ByID *Index[T] | ||||||
|  |  | ||||||
| 	buf *bytes.Buffer | 	buf *bytes.Buffer | ||||||
| } | } | ||||||
| @@ -65,8 +65,8 @@ func NewCollection[T any](db *Database, name string, conf *CollectionConfig[T]) | |||||||
| 		copy:          conf.Copy, | 		copy:          conf.Copy, | ||||||
| 		sanitize:      conf.Sanitize, | 		sanitize:      conf.Sanitize, | ||||||
| 		validate:      conf.Validate, | 		validate:      conf.Validate, | ||||||
| 		indices:       []Index[T]{}, | 		indices:       []*Index[T]{}, | ||||||
| 		uniqueIndices: []Index[T]{}, | 		uniqueIndices: []*Index[T]{}, | ||||||
| 		buf:           &bytes.Buffer{}, | 		buf:           &bytes.Buffer{}, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -92,7 +92,7 @@ func NewCollection[T any](db *Database, name string, conf *CollectionConfig[T]) | |||||||
| 	return c | 	return c | ||||||
| } | } | ||||||
|  |  | ||||||
| func (c Collection[T]) Name() string { | func (c *Collection[T]) Name() string { | ||||||
| 	return c.name | 	return c.name | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -108,35 +108,8 @@ type indexConfig[T any] struct { | |||||||
| 	Include func(item *T) bool | 	Include func(item *T) bool | ||||||
| } | } | ||||||
|  |  | ||||||
| func (c Collection[T]) Get(tx *Snapshot, id uint64) (*T, bool) { |  | ||||||
| 	x := new(T) |  | ||||||
| 	c.setID(x, id) |  | ||||||
| 	return c.ByID.Get(tx, x) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (c Collection[T]) List(tx *Snapshot, ids []uint64, out []*T) []*T { |  | ||||||
| 	if len(ids) == 0 { |  | ||||||
| 		return out[:0] |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	if cap(out) < len(ids) { |  | ||||||
| 		out = make([]*T, len(ids)) |  | ||||||
| 	} |  | ||||||
| 	out = out[:0] |  | ||||||
|  |  | ||||||
| 	for _, id := range ids { |  | ||||||
| 		item, ok := c.Get(tx, id) |  | ||||||
| 		if ok { |  | ||||||
| 			out = append(out, item) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	return out |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // AddIndex: Add an index to the collection. | // AddIndex: Add an index to the collection. | ||||||
| func (c *Collection[T]) addIndex(conf indexConfig[T]) Index[T] { | func (c *Collection[T]) addIndex(conf indexConfig[T]) *Index[T] { | ||||||
|  |  | ||||||
| 	var less func(*T, *T) bool | 	var less func(*T, *T) bool | ||||||
|  |  | ||||||
| 	if conf.Unique { | 	if conf.Unique { | ||||||
| @@ -160,7 +133,8 @@ func (c *Collection[T]) addIndex(conf indexConfig[T]) Index[T] { | |||||||
| 		BTree: btree.NewG(256, less), | 		BTree: btree.NewG(256, less), | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	index := Index[T]{ | 	index := &Index[T]{ | ||||||
|  | 		db:           c.db, | ||||||
| 		collectionID: c.collectionID, | 		collectionID: c.collectionID, | ||||||
| 		name:         conf.Name, | 		name:         conf.Name, | ||||||
| 		indexID:      c.getState(c.db.Snapshot()).addIndex(indexState), | 		indexID:      c.getState(c.db.Snapshot()).addIndex(indexState), | ||||||
| @@ -176,7 +150,26 @@ func (c *Collection[T]) addIndex(conf indexConfig[T]) Index[T] { | |||||||
| 	return index | 	return index | ||||||
| } | } | ||||||
|  |  | ||||||
| func (c Collection[T]) Insert(tx *Snapshot, userItem *T) error { | func (c *Collection[T]) Get(tx *Snapshot, id uint64) *T { | ||||||
|  | 	if tx == nil { | ||||||
|  | 		tx = c.db.Snapshot() | ||||||
|  | 	} | ||||||
|  | 	item := new(T) | ||||||
|  | 	c.setID(item, id) | ||||||
|  | 	return c.ByID.Get(tx, item) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *Collection[T]) Insert(tx *Snapshot, userItem *T) error { | ||||||
|  | 	if tx == nil { | ||||||
|  | 		return c.db.Update(func(tx *Snapshot) error { | ||||||
|  | 			return c.insert(tx, userItem) | ||||||
|  | 		}) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return c.insert(tx, userItem) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *Collection[T]) insert(tx *Snapshot, userItem *T) error { | ||||||
| 	if err := c.ensureMutable(tx); err != nil { | 	if err := c.ensureMutable(tx); err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| @@ -203,7 +196,16 @@ func (c Collection[T]) Insert(tx *Snapshot, userItem *T) error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (c Collection[T]) Update(tx *Snapshot, userItem *T) error { | func (c *Collection[T]) Update(tx *Snapshot, userItem *T) error { | ||||||
|  | 	if tx == nil { | ||||||
|  | 		return c.db.Update(func(tx *Snapshot) error { | ||||||
|  | 			return c.update(tx, userItem) | ||||||
|  | 		}) | ||||||
|  | 	} | ||||||
|  | 	return c.update(tx, userItem) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *Collection[T]) update(tx *Snapshot, userItem *T) error { | ||||||
| 	if err := c.ensureMutable(tx); err != nil { | 	if err := c.ensureMutable(tx); err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| @@ -235,7 +237,16 @@ func (c Collection[T]) Update(tx *Snapshot, userItem *T) error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (c Collection[T]) Upsert(tx *Snapshot, item *T) error { | func (c *Collection[T]) Upsert(tx *Snapshot, item *T) error { | ||||||
|  | 	if tx == nil { | ||||||
|  | 		return c.db.Update(func(tx *Snapshot) error { | ||||||
|  | 			return c.upsert(tx, item) | ||||||
|  | 		}) | ||||||
|  | 	} | ||||||
|  | 	return c.upsert(tx, item) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *Collection[T]) upsert(tx *Snapshot, item *T) error { | ||||||
| 	err := c.Insert(tx, item) | 	err := c.Insert(tx, item) | ||||||
| 	if err == nil { | 	if err == nil { | ||||||
| 		return nil | 		return nil | ||||||
| @@ -246,7 +257,16 @@ func (c Collection[T]) Upsert(tx *Snapshot, item *T) error { | |||||||
| 	return err | 	return err | ||||||
| } | } | ||||||
|  |  | ||||||
| func (c Collection[T]) Delete(tx *Snapshot, itemID uint64) error { | func (c *Collection[T]) Delete(tx *Snapshot, itemID uint64) error { | ||||||
|  | 	if tx == nil { | ||||||
|  | 		return c.db.Update(func(tx *Snapshot) error { | ||||||
|  | 			return c.delete(tx, itemID) | ||||||
|  | 		}) | ||||||
|  | 	} | ||||||
|  | 	return c.delete(tx, itemID) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *Collection[T]) delete(tx *Snapshot, itemID uint64) error { | ||||||
| 	if err := c.ensureMutable(tx); err != nil { | 	if err := c.ensureMutable(tx); err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| @@ -254,13 +274,20 @@ func (c Collection[T]) Delete(tx *Snapshot, itemID uint64) error { | |||||||
| 	return c.deleteItem(tx, itemID) | 	return c.deleteItem(tx, itemID) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (c Collection[T]) getByID(tx *Snapshot, itemID uint64) (*T, bool) { | func (c *Collection[T]) Count(tx *Snapshot) int { | ||||||
|  | 	if tx == nil { | ||||||
|  | 		tx = c.db.Snapshot() | ||||||
|  | 	} | ||||||
|  | 	return c.ByID.Count(tx) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *Collection[T]) getByID(tx *Snapshot, itemID uint64) (*T, bool) { | ||||||
| 	x := new(T) | 	x := new(T) | ||||||
| 	c.setID(x, itemID) | 	c.setID(x, itemID) | ||||||
| 	return c.ByID.get(tx, x) | 	return c.ByID.get(tx, x) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (c Collection[T]) ensureMutable(tx *Snapshot) error { | func (c *Collection[T]) ensureMutable(tx *Snapshot) error { | ||||||
| 	if !tx.writable() { | 	if !tx.writable() { | ||||||
| 		return errs.ReadOnly | 		return errs.ReadOnly | ||||||
| 	} | 	} | ||||||
| @@ -274,7 +301,7 @@ func (c Collection[T]) ensureMutable(tx *Snapshot) error { | |||||||
| } | } | ||||||
|  |  | ||||||
| // For initial data loading. | // For initial data loading. | ||||||
| func (c Collection[T]) insertItem(tx *Snapshot, itemID uint64, data []byte) error { | func (c *Collection[T]) insertItem(tx *Snapshot, itemID uint64, data []byte) error { | ||||||
| 	item := new(T) | 	item := new(T) | ||||||
| 	if err := json.Unmarshal(data, item); err != nil { | 	if err := json.Unmarshal(data, item); err != nil { | ||||||
| 		return errs.Encoding.WithErr(err).WithCollection(c.name) | 		return errs.Encoding.WithErr(err).WithCollection(c.name) | ||||||
| @@ -295,7 +322,7 @@ func (c Collection[T]) insertItem(tx *Snapshot, itemID uint64, data []byte) erro | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (c Collection[T]) deleteItem(tx *Snapshot, itemID uint64) error { | func (c *Collection[T]) deleteItem(tx *Snapshot, itemID uint64) error { | ||||||
| 	item, ok := c.getByID(tx, itemID) | 	item, ok := c.getByID(tx, itemID) | ||||||
| 	if !ok { | 	if !ok { | ||||||
| 		return errs.NotFound | 		return errs.NotFound | ||||||
| @@ -312,7 +339,7 @@ func (c Collection[T]) deleteItem(tx *Snapshot, itemID uint64) error { | |||||||
|  |  | ||||||
| // upsertItem inserts or updates the item with itemID and the given serialized | // upsertItem inserts or updates the item with itemID and the given serialized | ||||||
| // form. It's called by | // form. It's called by | ||||||
| func (c Collection[T]) upsertItem(tx *Snapshot, itemID uint64, data []byte) error { | func (c *Collection[T]) upsertItem(tx *Snapshot, itemID uint64, data []byte) error { | ||||||
| 	item, ok := c.getByID(tx, itemID) | 	item, ok := c.getByID(tx, itemID) | ||||||
| 	if ok { | 	if ok { | ||||||
| 		tx.delete(c.collectionID, itemID) | 		tx.delete(c.collectionID, itemID) | ||||||
| @@ -335,14 +362,14 @@ func (c Collection[T]) upsertItem(tx *Snapshot, itemID uint64, data []byte) erro | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (c Collection[T]) getID(t *T) uint64 { | func (c *Collection[T]) getID(t *T) uint64 { | ||||||
| 	return *((*uint64)(unsafe.Pointer(t))) | 	return *((*uint64)(unsafe.Pointer(t))) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (c Collection[T]) setID(t *T, id uint64) { | func (c *Collection[T]) setID(t *T, id uint64) { | ||||||
| 	*((*uint64)(unsafe.Pointer(t))) = id | 	*((*uint64)(unsafe.Pointer(t))) = id | ||||||
| } | } | ||||||
|  |  | ||||||
| func (c Collection[T]) getState(tx *Snapshot) *collectionState[T] { | func (c *Collection[T]) getState(tx *Snapshot) *collectionState[T] { | ||||||
| 	return tx.collections[c.collectionID].(*collectionState[T]) | 	return tx.collections[c.collectionID].(*collectionState[T]) | ||||||
| } | } | ||||||
|   | |||||||
| @@ -54,8 +54,8 @@ var testDBTestCases = []DBTestCase{{ | |||||||
| 		Name: "Update", | 		Name: "Update", | ||||||
|  |  | ||||||
| 		Update: func(t *testing.T, db TestDB, tx *Snapshot) error { | 		Update: func(t *testing.T, db TestDB, tx *Snapshot) error { | ||||||
| 			user, ok := db.Users.ByID.Get(tx, &User{ID: 1}) | 			user := db.Users.ByID.Get(tx, &User{ID: 1}) | ||||||
| 			if !ok { | 			if user == nil { | ||||||
| 				return errs.NotFound | 				return errs.NotFound | ||||||
| 			} | 			} | ||||||
| 			user.Name = "Bob" | 			user.Name = "Bob" | ||||||
| @@ -323,8 +323,8 @@ var testDBTestCases = []DBTestCase{{ | |||||||
| 		Name: "Update", | 		Name: "Update", | ||||||
|  |  | ||||||
| 		Update: func(t *testing.T, db TestDB, tx *Snapshot) error { | 		Update: func(t *testing.T, db TestDB, tx *Snapshot) error { | ||||||
| 			user, ok := db.Users.ByID.Get(tx, &User{ID: 1}) | 			user := db.Users.ByID.Get(tx, &User{ID: 1}) | ||||||
| 			if !ok { | 			if user == nil { | ||||||
| 				return errs.NotFound | 				return errs.NotFound | ||||||
| 			} | 			} | ||||||
| 			user.Name = "Bob" | 			user.Name = "Bob" | ||||||
| @@ -493,8 +493,8 @@ var testDBTestCases = []DBTestCase{{ | |||||||
| 		Name: "Update", | 		Name: "Update", | ||||||
|  |  | ||||||
| 		Update: func(t *testing.T, db TestDB, tx *Snapshot) error { | 		Update: func(t *testing.T, db TestDB, tx *Snapshot) error { | ||||||
| 			u, ok := db.Users.ByID.Get(tx, &User{ID: 2}) | 			u := db.Users.ByID.Get(tx, &User{ID: 2}) | ||||||
| 			if !ok { | 			if u == nil { | ||||||
| 				return errs.NotFound | 				return errs.NotFound | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| @@ -609,16 +609,16 @@ var testDBTestCases = []DBTestCase{{ | |||||||
| 		Update: func(t *testing.T, db TestDB, tx *Snapshot) error { | 		Update: func(t *testing.T, db TestDB, tx *Snapshot) error { | ||||||
| 			expected := &User{ID: 1, Name: "Alice", Email: "a@b.com"} | 			expected := &User{ID: 1, Name: "Alice", Email: "a@b.com"} | ||||||
|  |  | ||||||
| 			u, ok := db.Users.ByID.Get(tx, &User{ID: 1}) | 			u := db.Users.ByID.Get(tx, &User{ID: 1}) | ||||||
| 			if !ok { | 			if u == nil { | ||||||
| 				return errs.NotFound | 				return errs.NotFound | ||||||
| 			} | 			} | ||||||
| 			if !reflect.DeepEqual(u, expected) { | 			if !reflect.DeepEqual(u, expected) { | ||||||
| 				return errors.New("Not equal (id)") | 				return errors.New("Not equal (id)") | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			u, ok = db.Users.ByEmail.Get(tx, &User{Email: "a@b.com"}) | 			u = db.Users.ByEmail.Get(tx, &User{Email: "a@b.com"}) | ||||||
| 			if !ok { | 			if u == nil { | ||||||
| 				return errs.NotFound | 				return errs.NotFound | ||||||
| 			} | 			} | ||||||
| 			if !reflect.DeepEqual(u, expected) { | 			if !reflect.DeepEqual(u, expected) { | ||||||
| @@ -637,11 +637,11 @@ var testDBTestCases = []DBTestCase{{ | |||||||
| 		Name: "Get not found", | 		Name: "Get not found", | ||||||
|  |  | ||||||
| 		Update: func(t *testing.T, db TestDB, tx *Snapshot) error { | 		Update: func(t *testing.T, db TestDB, tx *Snapshot) error { | ||||||
| 			if _, ok := db.Users.ByID.Get(tx, &User{ID: 2}); ok { | 			if u := db.Users.ByID.Get(tx, &User{ID: 2}); u != nil { | ||||||
| 				return errors.New("Found (id)") | 				return errors.New("Found (id)") | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			if _, ok := db.Users.ByEmail.Get(tx, &User{Email: "x@b.com"}); ok { | 			if u := db.Users.ByEmail.Get(tx, &User{Email: "x@b.com"}); u != nil { | ||||||
| 				return errors.New("Found (email)") | 				return errors.New("Found (email)") | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| @@ -753,8 +753,8 @@ var testDBTestCases = []DBTestCase{{ | |||||||
| 					return true | 					return true | ||||||
| 				} | 				} | ||||||
|  |  | ||||||
| 				prev, ok := db.Users.ByID.Get(tx, &User{ID: u.ID - 1}) | 				prev := db.Users.ByID.Get(tx, &User{ID: u.ID - 1}) | ||||||
| 				if !ok { | 				if prev == nil { | ||||||
| 					err = errors.New("Previous user not found") | 					err = errors.New("Previous user not found") | ||||||
| 					return false | 					return false | ||||||
| 				} | 				} | ||||||
| @@ -811,8 +811,8 @@ var testDBTestCases = []DBTestCase{{ | |||||||
| 					return true | 					return true | ||||||
| 				} | 				} | ||||||
|  |  | ||||||
| 				prev, ok := db.Users.ByID.Get(tx, &User{ID: u.ID + 1}) | 				prev := db.Users.ByID.Get(tx, &User{ID: u.ID + 1}) | ||||||
| 				if !ok { | 				if prev == nil { | ||||||
| 					err = errors.New("Previous user not found") | 					err = errors.New("Previous user not found") | ||||||
| 					return false | 					return false | ||||||
| 				} | 				} | ||||||
|   | |||||||
| @@ -1,138 +0,0 @@ | |||||||
| package mdb |  | ||||||
|  |  | ||||||
| import ( |  | ||||||
| 	"fmt" |  | ||||||
| 	"reflect" |  | ||||||
| 	"testing" |  | ||||||
| ) |  | ||||||
|  |  | ||||||
| func TestDBList(t *testing.T) { |  | ||||||
| 	db := NewTestDBPrimary(t, t.TempDir()) |  | ||||||
|  |  | ||||||
| 	var ( |  | ||||||
| 		user1 = User{ |  | ||||||
| 			ID:    NewID(), |  | ||||||
| 			Name:  "User1", |  | ||||||
| 			Email: "user1@gmail.com", |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		user2 = User{ |  | ||||||
| 			ID:    NewID(), |  | ||||||
| 			Name:  "User2", |  | ||||||
| 			Email: "user2@gmail.com", |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		user3 = User{ |  | ||||||
| 			ID:    NewID(), |  | ||||||
| 			Name:  "User3", |  | ||||||
| 			Email: "user3@gmail.com", |  | ||||||
| 		} |  | ||||||
| 		user1Data = make([]UserDataItem, 10) |  | ||||||
| 		user2Data = make([]UserDataItem, 4) |  | ||||||
| 		user3Data = make([]UserDataItem, 8) |  | ||||||
| 	) |  | ||||||
|  |  | ||||||
| 	err := db.Update(func(tx *Snapshot) error { |  | ||||||
| 		if err := db.Users.Insert(tx, &user1); err != nil { |  | ||||||
| 			return err |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		if err := db.Users.Insert(tx, &user2); err != nil { |  | ||||||
| 			return err |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		for i := range user1Data { |  | ||||||
| 			user1Data[i] = UserDataItem{ |  | ||||||
| 				ID:     NewID(), |  | ||||||
| 				UserID: user1.ID, |  | ||||||
| 				Name:   fmt.Sprintf("Name1: %d", i), |  | ||||||
| 				Data:   fmt.Sprintf("Data: %d", i), |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			if err := db.UserData.Insert(tx, &user1Data[i]); err != nil { |  | ||||||
| 				return err |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		for i := range user2Data { |  | ||||||
| 			user2Data[i] = UserDataItem{ |  | ||||||
| 				ID:     NewID(), |  | ||||||
| 				UserID: user2.ID, |  | ||||||
| 				Name:   fmt.Sprintf("Name2: %d", i), |  | ||||||
| 				Data:   fmt.Sprintf("Data: %d", i), |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			if err := db.UserData.Insert(tx, &user2Data[i]); err != nil { |  | ||||||
| 				return err |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		for i := range user3Data { |  | ||||||
| 			user3Data[i] = UserDataItem{ |  | ||||||
| 				ID:     NewID(), |  | ||||||
| 				UserID: user3.ID, |  | ||||||
| 				Name:   fmt.Sprintf("Name3: %d", i), |  | ||||||
| 				Data:   fmt.Sprintf("Data: %d", i), |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			if err := db.UserData.Insert(tx, &user3Data[i]); err != nil { |  | ||||||
| 				return err |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		return nil |  | ||||||
| 	}) |  | ||||||
|  |  | ||||||
| 	if err != nil { |  | ||||||
| 		t.Fatal(err) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	type TestCase struct { |  | ||||||
| 		Name     string |  | ||||||
| 		Args     ListArgs[UserDataItem] |  | ||||||
| 		Expected []UserDataItem |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	cases := []TestCase{ |  | ||||||
| 		{ |  | ||||||
| 			Name: "User1 all", |  | ||||||
| 			Args: ListArgs[UserDataItem]{ |  | ||||||
| 				After: &UserDataItem{ |  | ||||||
| 					UserID: user1.ID, |  | ||||||
| 				}, |  | ||||||
| 				While: func(item *UserDataItem) bool { |  | ||||||
| 					return item.UserID == user1.ID |  | ||||||
| 				}, |  | ||||||
| 			}, |  | ||||||
| 			Expected: user1Data, |  | ||||||
| 		}, { |  | ||||||
| 			Name: "User1 limited", |  | ||||||
| 			Args: ListArgs[UserDataItem]{ |  | ||||||
| 				After: &UserDataItem{ |  | ||||||
| 					UserID: user1.ID, |  | ||||||
| 				}, |  | ||||||
| 				While: func(item *UserDataItem) bool { |  | ||||||
| 					return item.UserID == user1.ID |  | ||||||
| 				}, |  | ||||||
| 				Limit: 4, |  | ||||||
| 			}, |  | ||||||
| 			Expected: user1Data[:4], |  | ||||||
| 		}, |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	for _, tc := range cases { |  | ||||||
| 		t.Run(tc.Name, func(t *testing.T) { |  | ||||||
| 			tx := db.Snapshot() |  | ||||||
| 			l := db.UserData.ByName.List(tx, tc.Args, nil) |  | ||||||
| 			if len(l) != len(tc.Expected) { |  | ||||||
| 				t.Fatal(tc.Name, l) |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			for i := range l { |  | ||||||
| 				if !reflect.DeepEqual(*l[i], tc.Expected[i]) { |  | ||||||
| 					t.Fatal(tc.Name, l) |  | ||||||
| 				} |  | ||||||
| 			} |  | ||||||
| 		}) |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| @@ -134,23 +134,23 @@ func checkSlicesEqual[T any](t *testing.T, name string, actual, expected []T) { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func checkMinMaxEqual[T any](t *testing.T, name string, tx *Snapshot, index Index[T], expected []T) { | func checkMinMaxEqual[T any](t *testing.T, name string, tx *Snapshot, index *Index[T], expected []T) { | ||||||
| 	if len(expected) == 0 { | 	if len(expected) == 0 { | ||||||
| 		if min, ok := index.Min(tx); ok { | 		if min := index.Min(tx); min != nil { | ||||||
| 			t.Fatal(min) | 			t.Fatal(min) | ||||||
| 		} | 		} | ||||||
| 		if max, ok := index.Max(tx); ok { | 		if max := index.Max(tx); max != nil { | ||||||
| 			t.Fatal(max) | 			t.Fatal(max) | ||||||
| 		} | 		} | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	min, ok := index.Min(tx) | 	min := index.Min(tx) | ||||||
| 	if !ok { | 	if min == nil { | ||||||
| 		t.Fatal("No min") | 		t.Fatal("No min") | ||||||
| 	} | 	} | ||||||
| 	max, ok := index.Max(tx) | 	max := index.Max(tx) | ||||||
| 	if !ok { | 	if max == nil { | ||||||
| 		t.Fatal("No max") | 		t.Fatal("No max") | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|   | |||||||
| @@ -14,7 +14,7 @@ type UserDataItem struct { | |||||||
|  |  | ||||||
| type UserData struct { | type UserData struct { | ||||||
| 	*Collection[UserDataItem] | 	*Collection[UserDataItem] | ||||||
| 	ByName Index[UserDataItem] // Unique index on (Token). | 	ByName *Index[UserDataItem] // Unique index on (Token). | ||||||
| } | } | ||||||
|  |  | ||||||
| func NewUserDataCollection(db *Database) UserData { | func NewUserDataCollection(db *Database) UserData { | ||||||
|   | |||||||
| @@ -12,9 +12,9 @@ type User struct { | |||||||
|  |  | ||||||
| type Users struct { | type Users struct { | ||||||
| 	*Collection[User] | 	*Collection[User] | ||||||
| 	ByEmail   Index[User] // Unique index on (Email). | 	ByEmail   *Index[User] // Unique index on (Email). | ||||||
| 	ByName    Index[User] // Index on (Name). | 	ByName    *Index[User] // Index on (Name). | ||||||
| 	ByBlocked Index[User] // Partial index on (Blocked,Email). | 	ByBlocked *Index[User] // Partial index on (Blocked,Email). | ||||||
| } | } | ||||||
|  |  | ||||||
| func NewUserCollection(db *Database) Users { | func NewUserCollection(db *Database) Users { | ||||||
|   | |||||||
| @@ -73,6 +73,10 @@ type Database struct { | |||||||
| } | } | ||||||
|  |  | ||||||
| func New(conf Config) *Database { | func New(conf Config) *Database { | ||||||
|  | 	if conf.NetTimeout <= 0 { | ||||||
|  | 		conf.NetTimeout = time.Minute | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	if conf.MaxConcurrentUpdates <= 0 { | 	if conf.MaxConcurrentUpdates <= 0 { | ||||||
| 		conf.MaxConcurrentUpdates = 32 | 		conf.MaxConcurrentUpdates = 32 | ||||||
| 	} | 	} | ||||||
|   | |||||||
| @@ -21,8 +21,8 @@ func (i Index[T]) AssertEqual(t *testing.T, tx1, tx2 *Snapshot) { | |||||||
|  |  | ||||||
| 	errStr := "" | 	errStr := "" | ||||||
| 	i.Ascend(tx1, func(item1 *T) bool { | 	i.Ascend(tx1, func(item1 *T) bool { | ||||||
| 		item2, ok := i.Get(tx2, item1) | 		item2 := i.Get(tx2, item1) | ||||||
| 		if !ok { | 		if item2 == nil { | ||||||
| 			errStr = fmt.Sprintf("Indices don't match. %v not found.", item1) | 			errStr = fmt.Sprintf("Indices don't match. %v not found.", item1) | ||||||
| 			return false | 			return false | ||||||
| 		} | 		} | ||||||
|   | |||||||
							
								
								
									
										130
									
								
								mdb/index.go
									
									
									
									
									
								
							
							
						
						
									
										130
									
								
								mdb/index.go
									
									
									
									
									
								
							| @@ -10,7 +10,7 @@ func NewIndex[T any]( | |||||||
| 	c *Collection[T], | 	c *Collection[T], | ||||||
| 	name string, | 	name string, | ||||||
| 	compare func(lhs, rhs *T) int, | 	compare func(lhs, rhs *T) int, | ||||||
| ) Index[T] { | ) *Index[T] { | ||||||
| 	return c.addIndex(indexConfig[T]{ | 	return c.addIndex(indexConfig[T]{ | ||||||
| 		Name:    name, | 		Name:    name, | ||||||
| 		Unique:  false, | 		Unique:  false, | ||||||
| @@ -24,7 +24,7 @@ func NewPartialIndex[T any]( | |||||||
| 	name string, | 	name string, | ||||||
| 	compare func(lhs, rhs *T) int, | 	compare func(lhs, rhs *T) int, | ||||||
| 	include func(*T) bool, | 	include func(*T) bool, | ||||||
| ) Index[T] { | ) *Index[T] { | ||||||
| 	return c.addIndex(indexConfig[T]{ | 	return c.addIndex(indexConfig[T]{ | ||||||
| 		Name:    name, | 		Name:    name, | ||||||
| 		Unique:  false, | 		Unique:  false, | ||||||
| @@ -37,7 +37,7 @@ func NewUniqueIndex[T any]( | |||||||
| 	c *Collection[T], | 	c *Collection[T], | ||||||
| 	name string, | 	name string, | ||||||
| 	compare func(lhs, rhs *T) int, | 	compare func(lhs, rhs *T) int, | ||||||
| ) Index[T] { | ) *Index[T] { | ||||||
| 	return c.addIndex(indexConfig[T]{ | 	return c.addIndex(indexConfig[T]{ | ||||||
| 		Name:    name, | 		Name:    name, | ||||||
| 		Unique:  true, | 		Unique:  true, | ||||||
| @@ -51,7 +51,7 @@ func NewUniquePartialIndex[T any]( | |||||||
| 	name string, | 	name string, | ||||||
| 	compare func(lhs, rhs *T) int, | 	compare func(lhs, rhs *T) int, | ||||||
| 	include func(*T) bool, | 	include func(*T) bool, | ||||||
| ) Index[T] { | ) *Index[T] { | ||||||
| 	return c.addIndex(indexConfig[T]{ | 	return c.addIndex(indexConfig[T]{ | ||||||
| 		Name:    name, | 		Name:    name, | ||||||
| 		Unique:  true, | 		Unique:  true, | ||||||
| @@ -63,6 +63,7 @@ func NewUniquePartialIndex[T any]( | |||||||
| // ---------------------------------------------------------------------------- | // ---------------------------------------------------------------------------- | ||||||
|  |  | ||||||
| type Index[T any] struct { | type Index[T any] struct { | ||||||
|  | 	db           *Database | ||||||
| 	name         string | 	name         string | ||||||
| 	collectionID uint64 | 	collectionID uint64 | ||||||
| 	indexID      uint64 | 	indexID      uint64 | ||||||
| @@ -70,117 +71,86 @@ type Index[T any] struct { | |||||||
| 	copy         func(*T) *T | 	copy         func(*T) *T | ||||||
| } | } | ||||||
|  |  | ||||||
| func (i Index[T]) Get(tx *Snapshot, in *T) (item *T, ok bool) { | func (i *Index[T]) ensureSnapshot(tx *Snapshot) *Snapshot { | ||||||
| 	tPtr, ok := i.get(tx, in) | 	if tx == nil { | ||||||
| 	if !ok { | 		tx = i.db.Snapshot() | ||||||
| 		return item, false |  | ||||||
| 	} | 	} | ||||||
| 	return i.copy(tPtr), true | 	return tx | ||||||
| } | } | ||||||
|  |  | ||||||
| func (i Index[T]) get(tx *Snapshot, in *T) (*T, bool) { | func (i *Index[T]) Get(tx *Snapshot, in *T) *T { | ||||||
|  | 	tx = i.ensureSnapshot(tx) | ||||||
|  | 	if tPtr, ok := i.get(tx, in); ok { | ||||||
|  | 		return i.copy(tPtr) | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (i *Index[T]) get(tx *Snapshot, in *T) (*T, bool) { | ||||||
| 	return i.btree(tx).Get(in) | 	return i.btree(tx).Get(in) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (i Index[T]) Has(tx *Snapshot, in *T) bool { | func (i *Index[T]) Has(tx *Snapshot, in *T) bool { | ||||||
|  | 	tx = i.ensureSnapshot(tx) | ||||||
| 	return i.btree(tx).Has(in) | 	return i.btree(tx).Has(in) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (i Index[T]) Min(tx *Snapshot) (item *T, ok bool) { | func (i *Index[T]) Min(tx *Snapshot) *T { | ||||||
| 	tPtr, ok := i.btree(tx).Min() | 	tx = i.ensureSnapshot(tx) | ||||||
| 	if !ok { | 	if tPtr, ok := i.btree(tx).Min(); ok { | ||||||
| 		return item, false | 		return i.copy(tPtr) | ||||||
| 	} | 	} | ||||||
| 	return i.copy(tPtr), true | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (i Index[T]) Max(tx *Snapshot) (item *T, ok bool) { | func (i *Index[T]) Max(tx *Snapshot) *T { | ||||||
| 	tPtr, ok := i.btree(tx).Max() | 	tx = i.ensureSnapshot(tx) | ||||||
| 	if !ok { | 	if tPtr, ok := i.btree(tx).Max(); ok { | ||||||
| 		return item, false | 		return i.copy(tPtr) | ||||||
| 	} | 	} | ||||||
| 	return i.copy(tPtr), true | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (i Index[T]) Ascend(tx *Snapshot, each func(*T) bool) { | func (i *Index[T]) Ascend(tx *Snapshot, each func(*T) bool) { | ||||||
|  | 	tx = i.ensureSnapshot(tx) | ||||||
| 	i.btreeForIter(tx).Ascend(func(t *T) bool { | 	i.btreeForIter(tx).Ascend(func(t *T) bool { | ||||||
| 		return each(i.copy(t)) | 		return each(i.copy(t)) | ||||||
| 	}) | 	}) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (i Index[T]) AscendAfter(tx *Snapshot, after *T, each func(*T) bool) { | func (i *Index[T]) AscendAfter(tx *Snapshot, after *T, each func(*T) bool) { | ||||||
|  | 	tx = i.ensureSnapshot(tx) | ||||||
| 	i.btreeForIter(tx).AscendGreaterOrEqual(after, func(t *T) bool { | 	i.btreeForIter(tx).AscendGreaterOrEqual(after, func(t *T) bool { | ||||||
| 		return each(i.copy(t)) | 		return each(i.copy(t)) | ||||||
| 	}) | 	}) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (i Index[T]) Descend(tx *Snapshot, each func(*T) bool) { | func (i *Index[T]) Descend(tx *Snapshot, each func(*T) bool) { | ||||||
|  | 	tx = i.ensureSnapshot(tx) | ||||||
| 	i.btreeForIter(tx).Descend(func(t *T) bool { | 	i.btreeForIter(tx).Descend(func(t *T) bool { | ||||||
| 		return each(i.copy(t)) | 		return each(i.copy(t)) | ||||||
| 	}) | 	}) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (i Index[T]) DescendAfter(tx *Snapshot, after *T, each func(*T) bool) { | func (i *Index[T]) DescendAfter(tx *Snapshot, after *T, each func(*T) bool) { | ||||||
|  | 	tx = i.ensureSnapshot(tx) | ||||||
| 	i.btreeForIter(tx).DescendLessOrEqual(after, func(t *T) bool { | 	i.btreeForIter(tx).DescendLessOrEqual(after, func(t *T) bool { | ||||||
| 		return each(i.copy(t)) | 		return each(i.copy(t)) | ||||||
| 	}) | 	}) | ||||||
| } | } | ||||||
|  |  | ||||||
| type ListArgs[T any] struct { | func (i *Index[T]) Count(tx *Snapshot) int { | ||||||
| 	Desc  bool          // True for descending order, otherwise ascending. | 	tx = i.ensureSnapshot(tx) | ||||||
| 	After *T            // If after is given, iterate after (and including) the value. | 	return i.btree(tx).Len() | ||||||
| 	While func(*T) bool // Continue iterating until While is false. |  | ||||||
| 	Limit int           // Maximum number of items to return. 0 => All. |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (i Index[T]) List(tx *Snapshot, args ListArgs[T], out []*T) []*T { |  | ||||||
| 	if args.Limit < 0 { |  | ||||||
| 		return nil |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	if args.While == nil { |  | ||||||
| 		args.While = func(*T) bool { return true } |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	size := args.Limit |  | ||||||
| 	if size == 0 { |  | ||||||
| 		size = 32 // Why not? |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	items := out[:0] |  | ||||||
|  |  | ||||||
| 	each := func(item *T) bool { |  | ||||||
| 		if !args.While(item) { |  | ||||||
| 			return false |  | ||||||
| 		} |  | ||||||
| 		items = append(items, item) |  | ||||||
| 		return args.Limit == 0 || len(items) < args.Limit |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	if args.Desc { |  | ||||||
| 		if args.After != nil { |  | ||||||
| 			i.DescendAfter(tx, args.After, each) |  | ||||||
| 		} else { |  | ||||||
| 			i.Descend(tx, each) |  | ||||||
| 		} |  | ||||||
| 	} else { |  | ||||||
| 		if args.After != nil { |  | ||||||
| 			i.AscendAfter(tx, args.After, each) |  | ||||||
| 		} else { |  | ||||||
| 			i.Ascend(tx, each) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	return items |  | ||||||
| } | } | ||||||
|  |  | ||||||
| // ---------------------------------------------------------------------------- | // ---------------------------------------------------------------------------- | ||||||
|  |  | ||||||
| func (i Index[T]) insertConflict(tx *Snapshot, item *T) bool { | func (i *Index[T]) insertConflict(tx *Snapshot, item *T) bool { | ||||||
| 	return i.btree(tx).Has(item) | 	return i.btree(tx).Has(item) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (i Index[T]) updateConflict(tx *Snapshot, item *T) bool { | func (i *Index[T]) updateConflict(tx *Snapshot, item *T) bool { | ||||||
| 	current, ok := i.btree(tx).Get(item) | 	current, ok := i.btree(tx).Get(item) | ||||||
| 	return ok && i.getID(current) != i.getID(item) | 	return ok && i.getID(current) != i.getID(item) | ||||||
| } | } | ||||||
| @@ -188,7 +158,7 @@ func (i Index[T]) updateConflict(tx *Snapshot, item *T) bool { | |||||||
| // This should only be called after insertConflict. Additionally, the caller | // This should only be called after insertConflict. Additionally, the caller | ||||||
| // should ensure that the index has been properly cloned for write before | // should ensure that the index has been properly cloned for write before | ||||||
| // writing. | // writing. | ||||||
| func (i Index[T]) insert(tx *Snapshot, item *T) { | func (i *Index[T]) insert(tx *Snapshot, item *T) { | ||||||
| 	if i.include != nil && !i.include(item) { | 	if i.include != nil && !i.include(item) { | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| @@ -196,7 +166,7 @@ func (i Index[T]) insert(tx *Snapshot, item *T) { | |||||||
| 	i.btree(tx).ReplaceOrInsert(item) | 	i.btree(tx).ReplaceOrInsert(item) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (i Index[T]) update(tx *Snapshot, old, new *T) { | func (i *Index[T]) update(tx *Snapshot, old, new *T) { | ||||||
| 	bt := i.btree(tx) | 	bt := i.btree(tx) | ||||||
| 	bt.Delete(old) | 	bt.Delete(old) | ||||||
|  |  | ||||||
| @@ -204,22 +174,22 @@ func (i Index[T]) update(tx *Snapshot, old, new *T) { | |||||||
| 	i.insert(tx, new) | 	i.insert(tx, new) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (i Index[T]) delete(tx *Snapshot, item *T) { | func (i *Index[T]) delete(tx *Snapshot, item *T) { | ||||||
| 	i.btree(tx).Delete(item) | 	i.btree(tx).Delete(item) | ||||||
| } | } | ||||||
|  |  | ||||||
| // ---------------------------------------------------------------------------- | // ---------------------------------------------------------------------------- | ||||||
|  |  | ||||||
| func (i Index[T]) getState(tx *Snapshot) indexState[T] { | func (i *Index[T]) getState(tx *Snapshot) indexState[T] { | ||||||
| 	return tx.collections[i.collectionID].(*collectionState[T]).Indices[i.indexID] | 	return tx.collections[i.collectionID].(*collectionState[T]).Indices[i.indexID] | ||||||
| } | } | ||||||
|  |  | ||||||
| // Get the current btree for get/has/update/delete, etc. | // Get the current btree for get/has/update/delete, etc. | ||||||
| func (i Index[T]) btree(tx *Snapshot) *btree.BTreeG[*T] { | func (i *Index[T]) btree(tx *Snapshot) *btree.BTreeG[*T] { | ||||||
| 	return i.getState(tx).BTree | 	return i.getState(tx).BTree | ||||||
| } | } | ||||||
|  |  | ||||||
| func (i Index[T]) btreeForIter(tx *Snapshot) *btree.BTreeG[*T] { | func (i *Index[T]) btreeForIter(tx *Snapshot) *btree.BTreeG[*T] { | ||||||
| 	cState := tx.collections[i.collectionID].(*collectionState[T]) | 	cState := tx.collections[i.collectionID].(*collectionState[T]) | ||||||
| 	bt := cState.Indices[i.indexID].BTree | 	bt := cState.Indices[i.indexID].BTree | ||||||
|  |  | ||||||
| @@ -231,6 +201,6 @@ func (i Index[T]) btreeForIter(tx *Snapshot) *btree.BTreeG[*T] { | |||||||
| 	return bt | 	return bt | ||||||
| } | } | ||||||
|  |  | ||||||
| func (i Index[T]) getID(t *T) uint64 { | func (i *Index[T]) getID(t *T) uint64 { | ||||||
| 	return *((*uint64)(unsafe.Pointer(t))) | 	return *((*uint64)(unsafe.Pointer(t))) | ||||||
| } | } | ||||||
|   | |||||||
| @@ -51,6 +51,10 @@ func (f *freeList) Push(pages ...uint64) { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func (f *freeList) SetNextPage(nextPage uint64) { | ||||||
|  | 	f.nextPage = nextPage | ||||||
|  | } | ||||||
|  |  | ||||||
| func (f *freeList) Pop(count int, out []uint64) []uint64 { | func (f *freeList) Pop(count int, out []uint64) []uint64 { | ||||||
| 	out = out[:0] | 	out = out[:0] | ||||||
|  |  | ||||||
|   | |||||||
| @@ -13,14 +13,19 @@ type Index struct { | |||||||
| } | } | ||||||
|  |  | ||||||
| func NewIndex(f *File) (*Index, error) { | func NewIndex(f *File) (*Index, error) { | ||||||
|  | 	firstPage, err := f.pageCount() | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	idx := &Index{ | 	idx := &Index{ | ||||||
| 		fList: newFreeList(0), | 		fList: newFreeList(firstPage), | ||||||
| 		aList: *newAllocList(), | 		aList: *newAllocList(), | ||||||
| 		seen:  map[[2]uint64]struct{}{}, | 		seen:  map[[2]uint64]struct{}{}, | ||||||
| 		mask:  []bool{}, | 		mask:  []bool{}, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	err := f.iterate(func(pageID uint64, page dataPage) error { | 	err = f.iterate(func(pageID uint64, page dataPage) error { | ||||||
| 		header := page.Header() | 		header := page.Header() | ||||||
| 		switch header.PageType { | 		switch header.PageType { | ||||||
| 		case pageTypeHead: | 		case pageTypeHead: | ||||||
|   | |||||||
| @@ -134,6 +134,21 @@ func (pf *File) writePage(page dataPage, id uint64) error { | |||||||
| // Reading | // Reading | ||||||
| // ---------------------------------------------------------------------------- | // ---------------------------------------------------------------------------- | ||||||
|  |  | ||||||
|  | func (pf *File) pageCount() (uint64, error) { | ||||||
|  | 	fi, err := pf.f.Stat() | ||||||
|  | 	if err != nil { | ||||||
|  | 		return 0, errs.IO.WithErr(err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	fileSize := fi.Size() | ||||||
|  | 	if fileSize%pageSize != 0 { | ||||||
|  | 		return 0, errs.Corrupt.WithMsg("File size isn't a multiple of page size.") | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	maxPage := uint64(fileSize / pageSize) | ||||||
|  | 	return maxPage, nil | ||||||
|  | } | ||||||
|  |  | ||||||
| func (pf *File) iterate(each func(pageID uint64, page dataPage) error) error { | func (pf *File) iterate(each func(pageID uint64, page dataPage) error) error { | ||||||
| 	pf.lock.RLock() | 	pf.lock.RLock() | ||||||
| 	defer pf.lock.RUnlock() | 	defer pf.lock.RUnlock() | ||||||
|   | |||||||
| @@ -133,8 +133,8 @@ func (db DataDB) modifyOnce() { | |||||||
| func (db DataDB) ComputeCRC(tx *Snapshot) uint32 { | func (db DataDB) ComputeCRC(tx *Snapshot) uint32 { | ||||||
| 	h := crc32.NewIEEE() | 	h := crc32.NewIEEE() | ||||||
| 	for dataID := uint64(1); dataID < 10; dataID++ { | 	for dataID := uint64(1); dataID < 10; dataID++ { | ||||||
| 		d, ok := db.Datas.ByID.Get(tx, &DataItem{ID: dataID}) | 		d := db.Datas.ByID.Get(tx, &DataItem{ID: dataID}) | ||||||
| 		if !ok { | 		if d == nil { | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		h.Write(d.Data) | 		h.Write(d.Data) | ||||||
| @@ -143,8 +143,8 @@ func (db DataDB) ComputeCRC(tx *Snapshot) uint32 { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (db DataDB) ReadCRC(tx *Snapshot) uint32 { | func (db DataDB) ReadCRC(tx *Snapshot) uint32 { | ||||||
| 	r, ok := db.CRCs.ByID.Get(tx, &CRCItem{ID: 1}) | 	r := db.CRCs.ByID.Get(tx, &CRCItem{ID: 1}) | ||||||
| 	if !ok { | 	if r == nil { | ||||||
| 		return 0 | 		return 0 | ||||||
| 	} | 	} | ||||||
| 	return r.CRC32 | 	return r.CRC32 | ||||||
|   | |||||||
| @@ -136,8 +136,8 @@ func (db DataDB) modifyOnce() { | |||||||
| func (db DataDB) ComputeCRC(tx *mdb.Snapshot) uint32 { | func (db DataDB) ComputeCRC(tx *mdb.Snapshot) uint32 { | ||||||
| 	h := crc32.NewIEEE() | 	h := crc32.NewIEEE() | ||||||
| 	for dataID := uint64(1); dataID < 10; dataID++ { | 	for dataID := uint64(1); dataID < 10; dataID++ { | ||||||
| 		d, ok := db.Datas.ByID.Get(tx, &DataItem{ID: dataID}) | 		d := db.Datas.ByID.Get(tx, &DataItem{ID: dataID}) | ||||||
| 		if !ok { | 		if d == nil { | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		h.Write(d.Data) | 		h.Write(d.Data) | ||||||
| @@ -146,8 +146,8 @@ func (db DataDB) ComputeCRC(tx *mdb.Snapshot) uint32 { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (db DataDB) ReadCRC(tx *mdb.Snapshot) uint32 { | func (db DataDB) ReadCRC(tx *mdb.Snapshot) uint32 { | ||||||
| 	r, ok := db.CRCs.ByID.Get(tx, &CRCItem{ID: 1}) | 	r := db.CRCs.ByID.Get(tx, &CRCItem{ID: 1}) | ||||||
| 	if !ok { | 	if r == nil { | ||||||
| 		return 0 | 		return 0 | ||||||
| 	} | 	} | ||||||
| 	return r.CRC32 | 	return r.CRC32 | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user