Cleanup and testing.
parent
f1d34b0370
commit
6e8a6e491c
|
@ -1,6 +1,8 @@
|
|||
package wal
|
||||
|
||||
const sqlSchema = `
|
||||
BEGIN IMMEDIATE;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS wal(
|
||||
SeqNum INTEGER NOT NULL PRIMARY KEY,
|
||||
CreatedAt INTEGER NOT NULL,
|
||||
|
@ -11,6 +13,8 @@ CREATE TABLE IF NOT EXISTS wal(
|
|||
) WITHOUT ROWID;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS wal_created_at_index ON wal(CreatedAt);
|
||||
|
||||
COMMIT;
|
||||
`
|
||||
|
||||
const sqlWALMaxSeqNum = `
|
||||
|
|
|
@ -94,8 +94,9 @@ func TestShipping(t *testing.T) {
|
|||
defer f.Close()
|
||||
|
||||
for f.MaxSeqNum() < uint64(N) {
|
||||
conn := nw.Accept()
|
||||
sender.SendWAL(conn)
|
||||
if conn := nw.Accept(); conn != nil {
|
||||
sender.SendWAL(conn)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -107,8 +108,9 @@ func TestShipping(t *testing.T) {
|
|||
w := NewWriterSecondary(fWALPath)
|
||||
|
||||
for f.MaxSeqNum() < uint64(N) {
|
||||
conn := nw.Dial()
|
||||
w.RecvWAL(conn)
|
||||
if conn := nw.Dial(); conn != nil {
|
||||
w.RecvWAL(conn)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -146,7 +148,7 @@ func TestShipping(t *testing.T) {
|
|||
}
|
||||
|
||||
// Delete everything.
|
||||
w.DeleteBefore(time.Now().Unix() + 2)
|
||||
w.DeleteBefore(-1)
|
||||
|
||||
// Run a sender in the background.
|
||||
go func() {
|
||||
|
@ -164,7 +166,7 @@ func TestShipping(t *testing.T) {
|
|||
w.RecvWAL(conn)
|
||||
}()
|
||||
|
||||
time.Sleep(1)
|
||||
time.Sleep(time.Second)
|
||||
|
||||
f := NewFollower(fWALPath)
|
||||
defer f.Close()
|
||||
|
|
|
@ -204,7 +204,7 @@ func (w *Writer) RecvWAL(conn net.Conn) {
|
|||
}
|
||||
}
|
||||
|
||||
func (w *Writer) DeleteBefore(ts int64) {
|
||||
_, err := w.db.Exec(sqlWALDeleteQuery, ts)
|
||||
func (w *Writer) DeleteBefore(seconds int64) {
|
||||
_, err := w.db.Exec(sqlWALDeleteQuery, time.Now().Unix()-seconds)
|
||||
must(err)
|
||||
}
|
||||
|
|
Reference in New Issue