80 lines
1.3 KiB
Go
80 lines
1.3 KiB
Go
|
package wal
|
||
|
|
||
|
import "sync"
|
||
|
|
||
|
type segmentState struct {
|
||
|
Closed bool
|
||
|
Archived bool
|
||
|
FirstSeqNum int64
|
||
|
LastSeqNum int64
|
||
|
}
|
||
|
|
||
|
func newSegmentState(closed bool, header segmentHeader) segmentState {
|
||
|
return segmentState{
|
||
|
Closed: closed,
|
||
|
Archived: header.ArchivedAt != 0,
|
||
|
FirstSeqNum: header.FirstSeqNum,
|
||
|
LastSeqNum: header.LastSeqNum,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
type notifyMux struct {
|
||
|
lock sync.Mutex
|
||
|
nextID int64
|
||
|
recvrs map[int64]chan segmentState
|
||
|
}
|
||
|
|
||
|
type stateRecvr struct {
|
||
|
// Each recvr will always get the most recent sequence number on C.
|
||
|
// When the segment is closed, a -1 is sent.
|
||
|
C chan segmentState
|
||
|
Close func()
|
||
|
}
|
||
|
|
||
|
func newNotifyMux() *notifyMux {
|
||
|
return ¬ifyMux{
|
||
|
recvrs: map[int64]chan segmentState{},
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (m *notifyMux) NewRecvr(header segmentHeader) stateRecvr {
|
||
|
state := newSegmentState(false, header)
|
||
|
|
||
|
m.lock.Lock()
|
||
|
defer m.lock.Unlock()
|
||
|
|
||
|
m.nextID++
|
||
|
|
||
|
recvrID := m.nextID
|
||
|
|
||
|
recvr := stateRecvr{
|
||
|
C: make(chan segmentState, 1),
|
||
|
Close: func() {
|
||
|
m.lock.Lock()
|
||
|
defer m.lock.Unlock()
|
||
|
delete(m.recvrs, recvrID)
|
||
|
},
|
||
|
}
|
||
|
|
||
|
recvr.C <- state
|
||
|
m.recvrs[recvrID] = recvr.C
|
||
|
|
||
|
return recvr
|
||
|
}
|
||
|
|
||
|
func (m *notifyMux) Notify(closed bool, header segmentHeader) {
|
||
|
|
||
|
state := newSegmentState(closed, header)
|
||
|
|
||
|
m.lock.Lock()
|
||
|
defer m.lock.Unlock()
|
||
|
|
||
|
for _, c := range m.recvrs {
|
||
|
select {
|
||
|
case c <- state:
|
||
|
case <-c:
|
||
|
c <- state
|
||
|
}
|
||
|
}
|
||
|
}
|