jldb/lib/wal/notify.go
2023-10-13 11:43:27 +02:00

80 lines
1.3 KiB
Go

package wal
import "sync"
type segmentState struct {
Closed bool
Archived bool
FirstSeqNum int64
LastSeqNum int64
}
func newSegmentState(closed bool, header segmentHeader) segmentState {
return segmentState{
Closed: closed,
Archived: header.ArchivedAt != 0,
FirstSeqNum: header.FirstSeqNum,
LastSeqNum: header.LastSeqNum,
}
}
type notifyMux struct {
lock sync.Mutex
nextID int64
recvrs map[int64]chan segmentState
}
type stateRecvr struct {
// Each recvr will always get the most recent sequence number on C.
// When the segment is closed, a -1 is sent.
C chan segmentState
Close func()
}
func newNotifyMux() *notifyMux {
return &notifyMux{
recvrs: map[int64]chan segmentState{},
}
}
func (m *notifyMux) NewRecvr(header segmentHeader) stateRecvr {
state := newSegmentState(false, header)
m.lock.Lock()
defer m.lock.Unlock()
m.nextID++
recvrID := m.nextID
recvr := stateRecvr{
C: make(chan segmentState, 1),
Close: func() {
m.lock.Lock()
defer m.lock.Unlock()
delete(m.recvrs, recvrID)
},
}
recvr.C <- state
m.recvrs[recvrID] = recvr.C
return recvr
}
func (m *notifyMux) Notify(closed bool, header segmentHeader) {
state := newSegmentState(closed, header)
m.lock.Lock()
defer m.lock.Unlock()
for _, c := range m.recvrs {
select {
case c <- state:
case <-c:
c <- state
}
}
}