jldb/lib/rep/replicator-walfollower.go

67 lines
1.0 KiB
Go
Raw Permalink Normal View History

2023-10-13 09:43:27 +00:00
package rep
import (
"log"
"time"
)
func (rep *Replicator) runWALFollower() {
defer rep.done.Done()
for {
rep.followOnce()
select {
case <-rep.stop:
return
default:
time.Sleep(time.Second)
}
}
}
func (rep *Replicator) followOnce() {
logf := func(pattern string, args ...any) {
log.Printf("[WAL-FOLLOWER] "+pattern, args...)
}
state := rep.getState()
it, err := rep.wal.Iterator(state.SeqNum + 1)
if err != nil {
logf("Failed to create WAL iterator: %v", err)
return
}
defer it.Close()
for {
select {
case <-rep.stop:
logf("Stopped")
return
default:
}
if it.Next(time.Second) {
rec := it.Record()
if err := rep.app.Apply(rec); err != nil {
logf("App failed to apply change: %v", err)
return
}
if err := rep.ack(rec.SeqNum, rec.TimestampMS); err != nil {
logf("App failed to update local state: %v", err)
return
}
select {
case rep.appendNotify <- struct{}{}:
default:
}
} else if it.Error() != nil {
logf("Iteration error: %v", err)
return
}
}
}