package mdb /* type txAggregator struct { Stop chan struct{} Done *sync.WaitGroup ModChan chan txMod W *cswal.Writer Index *pagefile.Index Snapshot *atomic.Pointer[Snapshot] } func (p txAggregator) Run() { defer p.Done.Done() defer p.W.Close() var ( tx *Snapshot mod txMod rec cswal.Record err error toNotify = make([]chan error, 0, 1024) ) READ_FIRST: toNotify = toNotify[:0] select { case mod = <-p.ModChan: goto BEGIN case <-p.Stop: goto END } BEGIN: tx = p.Snapshot.Load().begin() goto APPLY_MOD CLONE: tx = tx.clone() goto APPLY_MOD APPLY_MOD: if err = mod.Update(tx); err != nil { mod.Resp <- err goto ROLLBACK } toNotify = append(toNotify, mod.Resp) goto NEXT ROLLBACK: if len(toNotify) == 0 { goto READ_FIRST } tx = tx.rollback() goto NEXT NEXT: select { case mod = <-p.ModChan: goto CLONE default: goto WRITE } WRITE: rec, err = writeChangesToWAL(tx.changes, p.Index, p.W) if err == nil { tx.seqNum = rec.SeqNum tx.updatedAt = rec.CreatedAt tx.setReadOnly() p.Snapshot.Store(tx) } for i := range toNotify { toNotify[i] <- err } goto READ_FIRST END: } */