From 5a9e355df838648800201a2a0a63e3bc2df94102 Mon Sep 17 00:00:00 2001 From: jdl Date: Tue, 26 Jul 2022 14:02:53 +0200 Subject: [PATCH] wip --- go.mod | 9 ++++++ go.sum | 6 ++++ keyedmutex/mutex.go | 67 ++++++++++++++++++++++++++++++++++++++ testconn/net.go | 79 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 161 insertions(+) create mode 100644 go.mod create mode 100644 go.sum create mode 100644 keyedmutex/mutex.go create mode 100644 testconn/net.go diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..aec7824 --- /dev/null +++ b/go.mod @@ -0,0 +1,9 @@ +module git.crumpington.com/private/mdb + +go 1.18 + +require ( + github.com/google/btree v1.1.2 + github.com/mattn/go-sqlite3 v1.14.14 + golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e0874d9 --- /dev/null +++ b/go.sum @@ -0,0 +1,6 @@ +github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU= +github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= +github.com/mattn/go-sqlite3 v1.14.14 h1:qZgc/Rwetq+MtyE18WhzjokPD93dNqLGNT3QJuLvBGw= +github.com/mattn/go-sqlite3 v1.14.14/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/keyedmutex/mutex.go b/keyedmutex/mutex.go new file mode 100644 index 0000000..699e4b1 --- /dev/null +++ b/keyedmutex/mutex.go @@ -0,0 +1,67 @@ +package keyedmutex + +import ( + "container/list" + "sync" +) + +type KeyedMutex[K comparable] struct { + mu *sync.Mutex + waitList map[K]*list.List +} + +func New[K comparable]() KeyedMutex[K] { + return KeyedMutex[K]{ + mu: new(sync.Mutex), + waitList: map[K]*list.List{}, + } +} + +func (m KeyedMutex[K]) Lock(key K) { + if ch := m.lock(key); ch != nil { + <-ch + } +} + +func (m KeyedMutex[K]) lock(key K) chan struct{} { + m.mu.Lock() + defer m.mu.Unlock() + + if waitList, ok := m.waitList[key]; ok { + ch := make(chan struct{}) + waitList.PushBack(ch) + return ch + } + + m.waitList[key] = list.New() + return nil +} + +func (m KeyedMutex[K]) TryLock(key K) bool { + m.mu.Lock() + defer m.mu.Unlock() + + if _, ok := m.waitList[key]; ok { + return false + } + + m.waitList[key] = list.New() + return true +} + +func (m KeyedMutex[K]) Unlock(key K) { + m.mu.Lock() + defer m.mu.Unlock() + + waitList, ok := m.waitList[key] + if !ok { + panic("unlock of unlocked mutex") + } + + if waitList.Len() == 0 { + delete(m.waitList, key) + } else { + ch := waitList.Remove(waitList.Front()).(chan struct{}) + ch <- struct{}{} + } +} diff --git a/testconn/net.go b/testconn/net.go new file mode 100644 index 0000000..b2ab532 --- /dev/null +++ b/testconn/net.go @@ -0,0 +1,79 @@ +package testconn + +import ( + "net" + "sync" + "time" +) + +type Network struct { + lock sync.Mutex + // Current connections. + cConn net.Conn + sConn net.Conn + + acceptQ chan net.Conn +} + +func NewNetwork() *Network { + return &Network{ + acceptQ: make(chan net.Conn, 1), + } +} + +func (n *Network) Dial() net.Conn { + cc, sc := net.Pipe() + func() { + n.lock.Lock() + defer n.lock.Unlock() + if n.cConn != nil { + n.cConn.Close() + n.cConn = nil + } + select { + case n.acceptQ <- sc: + n.cConn = cc + default: + cc = nil + } + }() + return cc +} + +func (n *Network) Accept() net.Conn { + var sc net.Conn + select { + case sc = <-n.acceptQ: + case <-time.After(time.Second): + return nil + } + + func() { + n.lock.Lock() + defer n.lock.Unlock() + if n.sConn != nil { + n.sConn.Close() + n.sConn = nil + } + n.sConn = sc + }() + return sc +} + +func (n *Network) CloseClient() { + n.lock.Lock() + defer n.lock.Unlock() + if n.cConn != nil { + n.cConn.Close() + n.cConn = nil + } +} + +func (n *Network) CloseServer() { + n.lock.Lock() + defer n.lock.Unlock() + if n.sConn != nil { + n.sConn.Close() + n.sConn = nil + } +}