package rep import ( "bytes" "encoding/binary" "encoding/json" "io" "math/rand" "net" "sync" "testing" "time" "git.crumpington.com/public/jldb/lib/errs" "git.crumpington.com/public/jldb/lib/wal" ) // ---------------------------------------------------------------------------- type TestCmd struct { Set int64 // 1 for set, 0 for delete Key int64 Val int64 } func (c TestCmd) marshal() []byte { b := make([]byte, 24) binary.LittleEndian.PutUint64(b, uint64(c.Set)) binary.LittleEndian.PutUint64(b[8:], uint64(c.Key)) binary.LittleEndian.PutUint64(b[16:], uint64(c.Val)) return b } func (c *TestCmd) unmarshal(b []byte) { c.Set = int64(binary.LittleEndian.Uint64(b)) c.Key = int64(binary.LittleEndian.Uint64(b[8:])) c.Val = int64(binary.LittleEndian.Uint64(b[16:])) } func CmdFromRec(rec wal.Record) TestCmd { cmd := TestCmd{} buf, err := io.ReadAll(rec.Reader) if err != nil { panic(err) } if len(buf) != 24 { panic(len(buf)) } cmd.unmarshal(buf) return cmd } // ---------------------------------------------------------------------------- var storage = map[int64]map[int64]int64{} // ---------------------------------------------------------------------------- type TestApp struct { ID int64 storage map[int64]int64 rep *Replicator lock sync.Mutex m map[int64]int64 } func newApp(t *testing.T, id int64, conf Config) *TestApp { t.Helper() a := &TestApp{ ID: id, m: map[int64]int64{}, } var err error a.rep, err = Open(App{ SendState: a.sendState, RecvState: a.recvState, InitStorage: a.initStorage, Replay: a.replay, LoadFromStorage: a.loadFromStorage, Apply: a.apply, }, conf) if err != nil { t.Fatal(errs.FmtDetails(err)) } return a } func (a *TestApp) _set(k, v int64) { a.lock.Lock() defer a.lock.Unlock() a.m[k] = v } func (a *TestApp) _del(k int64) { a.lock.Lock() defer a.lock.Unlock() delete(a.m, k) } func (a *TestApp) Get(k int64) int64 { a.lock.Lock() defer a.lock.Unlock() return a.m[k] } func (app *TestApp) Close() { app.rep.Close() } func (app *TestApp) Set(k, v int64) error { cmd := TestCmd{Set: 1, Key: k, Val: v} if _, _, err := app.rep.Append(24, bytes.NewBuffer(cmd.marshal())); err != nil { return err } app._set(k, v) return nil } func (app *TestApp) Del(k int64) error { cmd := TestCmd{Set: 0, Key: k, Val: 0} if _, _, err := app.rep.Append(24, bytes.NewBuffer(cmd.marshal())); err != nil { return err } app._del(k) return nil } func (app *TestApp) UpdateRandomFor(dt time.Duration) { tStart := time.Now() for time.Since(tStart) < dt { if rand.Float32() < 0.5 { if err := app.Set(1+rand.Int63n(10), 1+rand.Int63n(10)); err != nil { panic(err) } } else { if err := app.Del(1 + rand.Int63n(10)); err != nil { panic(err) } } time.Sleep(time.Millisecond) } app.Set(999, 999) } func (app *TestApp) WaitForEOF() { for app.Get(999) != 999 { time.Sleep(time.Millisecond) } } func (app *TestApp) AssertEqual(t *testing.T, rhs *TestApp) { app.lock.Lock() defer app.lock.Unlock() rhs.lock.Lock() defer rhs.lock.Unlock() if len(app.m) != len(rhs.m) { t.Fatal(len(app.m), len(rhs.m)) } for k := range app.m { if app.m[k] != rhs.m[k] { t.Fatal(k, app.m[k], rhs.m[k]) } } } // ---------------------------------------------------------------------------- func (app *TestApp) sendState(conn net.Conn) error { app.lock.Lock() b, _ := json.Marshal(app.m) app.lock.Unlock() _, err := conn.Write(b) return err } func (app *TestApp) recvState(conn net.Conn) error { m := map[int64]int64{} if err := json.NewDecoder(conn).Decode(&m); err != nil { return err } storage[app.ID] = m return nil } func (app *TestApp) initStorage() error { if _, ok := storage[app.ID]; !ok { storage[app.ID] = map[int64]int64{} } app.storage = storage[app.ID] return nil } func (app *TestApp) replay(rec wal.Record) error { cmd := CmdFromRec(rec) if cmd.Set != 0 { app.storage[cmd.Key] = cmd.Val } else { delete(app.storage, cmd.Key) } return nil } func (app *TestApp) loadFromStorage() error { app.m = map[int64]int64{} for k, v := range app.storage { app.m[k] = v } return nil } func (app *TestApp) apply(rec wal.Record) error { cmd := CmdFromRec(rec) if cmd.Set != 0 { app.storage[cmd.Key] = cmd.Val } else { delete(app.storage, cmd.Key) } // For primary, only update storage. if app.rep.Primary() { return nil } // For secondary, update the map. if cmd.Set != 0 { app._set(cmd.Key, cmd.Val) } else { app._del(cmd.Key) } return nil }