package mdb import ( "bytes" "git.crumpington.com/public/jldb/mdb/change" ) type txMod struct { Update func(tx *Snapshot) error Resp chan error } func (db *Database) runTXAggreagtor() { defer db.done.Done() var ( tx *Snapshot mod txMod seqNum int64 timestampMS int64 err error buf = &bytes.Buffer{} toNotify = make([]chan error, 0, db.conf.MaxConcurrentUpdates) ) READ_FIRST: toNotify = toNotify[:0] select { case mod = <-db.modChan: goto BEGIN case <-db.stop: goto END } BEGIN: tx = db.snapshot.Load().begin() goto APPLY_MOD CLONE: tx = tx.clone() goto APPLY_MOD APPLY_MOD: if err = mod.Update(tx); err != nil { mod.Resp <- err goto ROLLBACK } toNotify = append(toNotify, mod.Resp) goto NEXT ROLLBACK: if len(toNotify) == 0 { goto READ_FIRST } tx = tx.rollback() goto NEXT NEXT: select { case mod = <-db.modChan: goto CLONE default: goto WRITE } WRITE: db.idx.StageChanges(tx.changes) buf.Reset() if err = change.Write(tx.changes, buf); err != nil { db.idx.UnstageChanges(tx.changes) } if err == nil { seqNum, timestampMS, err = db.rep.Append(int64(buf.Len()), buf) } if err != nil { db.idx.UnstageChanges(tx.changes) } else { db.idx.ApplyChanges(tx.changes) tx.seqNum = seqNum tx.timestampMS = timestampMS tx.setReadOnly() db.snapshot.Store(tx) } for i := range toNotify { toNotify[i] <- err } goto READ_FIRST END: }