package wal import "sync" type segmentState struct { Closed bool Archived bool FirstSeqNum int64 LastSeqNum int64 } func newSegmentState(closed bool, header segmentHeader) segmentState { return segmentState{ Closed: closed, Archived: header.ArchivedAt != 0, FirstSeqNum: header.FirstSeqNum, LastSeqNum: header.LastSeqNum, } } type notifyMux struct { lock sync.Mutex nextID int64 recvrs map[int64]chan segmentState } type stateRecvr struct { // Each recvr will always get the most recent sequence number on C. // When the segment is closed, a -1 is sent. C chan segmentState Close func() } func newNotifyMux() *notifyMux { return ¬ifyMux{ recvrs: map[int64]chan segmentState{}, } } func (m *notifyMux) NewRecvr(header segmentHeader) stateRecvr { state := newSegmentState(false, header) m.lock.Lock() defer m.lock.Unlock() m.nextID++ recvrID := m.nextID recvr := stateRecvr{ C: make(chan segmentState, 1), Close: func() { m.lock.Lock() defer m.lock.Unlock() delete(m.recvrs, recvrID) }, } recvr.C <- state m.recvrs[recvrID] = recvr.C return recvr } func (m *notifyMux) Notify(closed bool, header segmentHeader) { state := newSegmentState(closed, header) m.lock.Lock() defer m.lock.Unlock() for _, c := range m.recvrs { select { case c <- state: case <-c: c <- state } } }