93 lines
1.2 KiB
Go
93 lines
1.2 KiB
Go
package mdb
|
|
|
|
/*
|
|
type txAggregator struct {
|
|
Stop chan struct{}
|
|
Done *sync.WaitGroup
|
|
ModChan chan txMod
|
|
W *cswal.Writer
|
|
Index *pagefile.Index
|
|
Snapshot *atomic.Pointer[Snapshot]
|
|
}
|
|
|
|
func (p txAggregator) Run() {
|
|
defer p.Done.Done()
|
|
defer p.W.Close()
|
|
|
|
var (
|
|
tx *Snapshot
|
|
mod txMod
|
|
rec cswal.Record
|
|
err error
|
|
toNotify = make([]chan error, 0, 1024)
|
|
)
|
|
|
|
READ_FIRST:
|
|
|
|
toNotify = toNotify[:0]
|
|
|
|
select {
|
|
case mod = <-p.ModChan:
|
|
goto BEGIN
|
|
case <-p.Stop:
|
|
goto END
|
|
}
|
|
|
|
BEGIN:
|
|
|
|
tx = p.Snapshot.Load().begin()
|
|
goto APPLY_MOD
|
|
|
|
CLONE:
|
|
|
|
tx = tx.clone()
|
|
goto APPLY_MOD
|
|
|
|
APPLY_MOD:
|
|
|
|
if err = mod.Update(tx); err != nil {
|
|
mod.Resp <- err
|
|
goto ROLLBACK
|
|
}
|
|
|
|
toNotify = append(toNotify, mod.Resp)
|
|
goto NEXT
|
|
|
|
ROLLBACK:
|
|
|
|
if len(toNotify) == 0 {
|
|
goto READ_FIRST
|
|
}
|
|
|
|
tx = tx.rollback()
|
|
goto NEXT
|
|
|
|
NEXT:
|
|
|
|
select {
|
|
case mod = <-p.ModChan:
|
|
goto CLONE
|
|
default:
|
|
goto WRITE
|
|
}
|
|
|
|
WRITE:
|
|
|
|
rec, err = writeChangesToWAL(tx.changes, p.Index, p.W)
|
|
if err == nil {
|
|
tx.seqNum = rec.SeqNum
|
|
tx.updatedAt = rec.CreatedAt
|
|
tx.setReadOnly()
|
|
p.Snapshot.Store(tx)
|
|
}
|
|
|
|
for i := range toNotify {
|
|
toNotify[i] <- err
|
|
}
|
|
|
|
goto READ_FIRST
|
|
|
|
END:
|
|
}
|
|
*/
|