Compare commits

...

8 Commits

Author SHA1 Message Date
jdl 7e7af68494 Some test code 2021-04-07 08:46:39 +02:00
J. David Lee d53a869235 kvmemcache: tests complete 2021-04-06 17:51:16 +02:00
jdl a6290db03b WIP: kvmemcache 2021-04-06 08:58:17 +02:00
jdl bd37278092 WIP 2021-04-02 20:57:08 +02:00
jdl 5cab19280e Added kv in-memory cache. 2021-04-02 17:14:32 +02:00
jdl 90cde9b4fd Cleanup 2021-04-02 08:44:05 +02:00
jdl d80dad4a40 Added rate limiter. 2021-04-02 08:42:22 +02:00
jdl a6f623ca6d Added keyed mutex. 2021-04-01 22:08:32 +02:00
11 changed files with 836 additions and 1 deletions

5
go.mod
View File

@ -2,4 +2,7 @@ module git.crumpington.com/public/toolbox
go 1.16
require golang.org/x/term v0.0.0-20210317153231-de623e64d2a6
require (
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68
golang.org/x/term v0.0.0-20210317153231-de623e64d2a6
)

67
keyedmutex/keyedmutex.go Normal file
View File

@ -0,0 +1,67 @@
package keyedmutex
import (
"container/list"
"sync"
)
type KeyedMutex struct {
mu *sync.Mutex
waitList map[string]*list.List
}
func New() KeyedMutex {
return KeyedMutex{
mu: new(sync.Mutex),
waitList: map[string]*list.List{},
}
}
func (m KeyedMutex) Lock(key string) {
if ch := m.lock(key); ch != nil {
<-ch
}
}
func (m KeyedMutex) lock(key string) chan struct{} {
m.mu.Lock()
defer m.mu.Unlock()
if waitList, ok := m.waitList[key]; ok {
ch := make(chan struct{})
waitList.PushBack(ch)
return ch
}
m.waitList[key] = list.New()
return nil
}
func (m KeyedMutex) TryLock(key string) bool {
m.mu.Lock()
defer m.mu.Unlock()
if _, ok := m.waitList[key]; ok {
return false
}
m.waitList[key] = list.New()
return true
}
func (m KeyedMutex) Unlock(key string) {
m.mu.Lock()
defer m.mu.Unlock()
waitList, ok := m.waitList[key]
if !ok {
panic("unlock of unlocked mutex")
}
if waitList.Len() == 0 {
delete(m.waitList, key)
} else {
ch := waitList.Remove(waitList.Front()).(chan struct{})
ch <- struct{}{}
}
}

View File

@ -0,0 +1,122 @@
package keyedmutex
import (
"sync"
"testing"
"time"
)
func TestKeyedMutex(t *testing.T) {
checkState := func(t *testing.T, m KeyedMutex, keys ...string) {
if len(m.waitList) != len(keys) {
t.Fatal(m.waitList, keys)
}
for _, key := range keys {
if _, ok := m.waitList[key]; !ok {
t.Fatal(key)
}
}
}
m := New()
checkState(t, m)
m.Lock("a")
checkState(t, m, "a")
m.Lock("b")
checkState(t, m, "a", "b")
m.Lock("c")
checkState(t, m, "a", "b", "c")
if m.TryLock("a") {
t.Fatal("a")
}
if m.TryLock("b") {
t.Fatal("b")
}
if m.TryLock("c") {
t.Fatal("c")
}
if !m.TryLock("d") {
t.Fatal("d")
}
checkState(t, m, "a", "b", "c", "d")
if !m.TryLock("e") {
t.Fatal("e")
}
checkState(t, m, "a", "b", "c", "d", "e")
m.Unlock("c")
checkState(t, m, "a", "b", "d", "e")
m.Unlock("a")
checkState(t, m, "b", "d", "e")
m.Unlock("e")
checkState(t, m, "b", "d")
wg := sync.WaitGroup{}
for i := 0; i < 8; i++ {
wg.Add(1)
go func() {
defer wg.Done()
m.Lock("b")
m.Unlock("b")
}()
}
time.Sleep(100 * time.Millisecond)
m.Unlock("b")
wg.Wait()
checkState(t, m, "d")
m.Unlock("d")
checkState(t, m)
}
func TestKeyedMutex_unlockUnlocked(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Fatal(r)
}
}()
m := New()
m.Unlock("aldkfj")
}
func BenchmarkUncontendedMutex(b *testing.B) {
m := New()
key := "xyz"
for i := 0; i < b.N; i++ {
m.Lock(key)
m.Unlock(key)
}
}
func BenchmarkContendedMutex(b *testing.B) {
m := New()
key := "xyz"
m.Lock(key)
wg := sync.WaitGroup{}
for i := 0; i < b.N; i++ {
wg.Add(1)
go func() {
defer wg.Done()
m.Lock(key)
m.Unlock(key)
}()
}
time.Sleep(time.Second)
b.ResetTimer()
m.Unlock(key)
wg.Wait()
}

View File

@ -0,0 +1,76 @@
package main
import (
"fmt"
"math/rand"
"sync"
"time"
"git.crumpington.com/public/toolbox/kvmemcache"
)
const (
mean = 1000000
stdDev = 100
computeTime = 2 * time.Millisecond
cacheSize = 800
cacheTTL = time.Second
numThreads = 8
numCalls = 1024 * 1024
)
func randKey() string {
fSample := rand.NormFloat64()*stdDev + mean
sample := int64(fSample)
return fmt.Sprintf("%d", sample)
}
func run(c *kvmemcache.Cache, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < numCalls; i++ {
key := randKey()
_, _ = c.Get(key)
}
}
func main() {
c := kvmemcache.New(kvmemcache.Config{
MaxSize: cacheSize,
TTL: cacheTTL,
Src: func(key string) (interface{}, error) {
time.Sleep(computeTime)
return key, nil
},
})
wg := &sync.WaitGroup{}
wg.Add(numThreads)
t0 := time.Now()
for i := 0; i < numThreads; i++ {
go run(c, wg)
}
wg.Wait()
dt := time.Since(t0)
dtSec := float64(dt) / float64(time.Second)
stats := c.Stats()
fmt.Printf(`
Time taken: %12.3f sec
Hits: %12d
Misses: %12d
Calls/sec: %12.3f
Speedup: %12.3f
`,
dtSec,
stats.Hits,
stats.Misses,
float64(stats.Hits+stats.Misses)/dtSec,
float64(numThreads*numCalls*computeTime)/float64(dt),
)
}

View File

@ -0,0 +1,45 @@
package main
import (
"log"
"sync"
"time"
"git.crumpington.com/public/toolbox/kvmemcache"
)
func main() {
N := 1024 * 2048
trigger := make(chan bool)
c := kvmemcache.New(kvmemcache.Config{
MaxSize: 4,
Src: func(key string) (interface{}, error) {
<-trigger
return key, nil
},
})
readyGroup := sync.WaitGroup{}
doneGroup := sync.WaitGroup{}
readyGroup.Add(N)
doneGroup.Add(N)
for i := 0; i < N; i++ {
go func() {
readyGroup.Done()
c.Get("a")
doneGroup.Done()
}()
}
readyGroup.Wait()
t0 := time.Now()
trigger <- true
doneGroup.Wait()
dt := time.Since(t0)
log.Printf("%v", dt)
}

68
kvmemcache/cache.go Normal file
View File

@ -0,0 +1,68 @@
package kvmemcache
import (
"container/list"
"sync"
"time"
"git.crumpington.com/public/toolbox/keyedmutex"
)
type Cache struct {
updateLock keyedmutex.KeyedMutex
src func(string) (interface{}, error)
ttl time.Duration
maxSize int
// Lock protects cache, ll, and stats.
lock sync.Mutex
cache map[string]*list.Element
ll *list.List
stats Stats
}
type lruItem struct {
key string
createdAt time.Time
value interface{}
err error
}
type Config struct {
MaxSize int
TTL time.Duration // Zero to ignore.
Src func(string) (interface{}, error)
}
func New(conf Config) *Cache {
return &Cache{
updateLock: keyedmutex.New(),
src: conf.Src,
ttl: conf.TTL,
maxSize: conf.MaxSize,
lock: sync.Mutex{},
cache: make(map[string]*list.Element, conf.MaxSize+1),
ll: list.New(),
}
}
func (c *Cache) Get(key string) (interface{}, error) {
ok, val, err := c.get(key)
if ok {
return val, err
}
return c.load(key)
}
func (c *Cache) Evict(key string) {
c.lock.Lock()
defer c.lock.Unlock()
c.evict(key)
}
func (c *Cache) Stats() Stats {
c.lock.Lock()
defer c.lock.Unlock()
return c.stats
}

207
kvmemcache/cache_test.go Normal file
View File

@ -0,0 +1,207 @@
package kvmemcache
import (
"errors"
"fmt"
"sync"
"testing"
"time"
)
type State struct {
Keys []string
Stats Stats
}
func (c *Cache) assert(state State) error {
c.lock.Lock()
defer c.lock.Unlock()
if len(c.cache) != len(state.Keys) {
return fmt.Errorf(
"Expected %d keys but found %d.",
len(state.Keys),
len(c.cache))
}
for _, k := range state.Keys {
if _, ok := c.cache[k]; !ok {
return fmt.Errorf(
"Expected key %s not found.",
k)
}
}
if c.stats.Hits != state.Stats.Hits {
return fmt.Errorf(
"Expected %d hits, but found %d.",
state.Stats.Hits,
c.stats.Hits)
}
if c.stats.Misses != state.Stats.Misses {
return fmt.Errorf(
"Expected %d misses, but found %d.",
state.Stats.Misses,
c.stats.Misses)
}
return nil
}
var ErrTest = errors.New("Hello")
func TestCache_basic(t *testing.T) {
c := New(Config{
MaxSize: 4,
TTL: 50 * time.Millisecond,
Src: func(key string) (interface{}, error) {
if key == "err" {
return nil, ErrTest
}
return key, nil
},
})
type testCase struct {
name string
sleep time.Duration
key string
evict bool
state State
}
cases := []testCase{
{
name: "get a",
key: "a",
state: State{
Keys: []string{"a"},
Stats: Stats{Hits: 0, Misses: 1},
},
}, {
name: "get a again",
key: "a",
state: State{
Keys: []string{"a"},
Stats: Stats{Hits: 1, Misses: 1},
},
}, {
name: "sleep, then get a again",
sleep: 55 * time.Millisecond,
key: "a",
state: State{
Keys: []string{"a"},
Stats: Stats{Hits: 1, Misses: 2},
},
}, {
name: "get b",
key: "b",
state: State{
Keys: []string{"a", "b"},
Stats: Stats{Hits: 1, Misses: 3},
},
}, {
name: "get c",
key: "c",
state: State{
Keys: []string{"a", "b", "c"},
Stats: Stats{Hits: 1, Misses: 4},
},
}, {
name: "get d",
key: "d",
state: State{
Keys: []string{"a", "b", "c", "d"},
Stats: Stats{Hits: 1, Misses: 5},
},
}, {
name: "get e",
key: "e",
state: State{
Keys: []string{"b", "c", "d", "e"},
Stats: Stats{Hits: 1, Misses: 6},
},
}, {
name: "get c again",
key: "c",
state: State{
Keys: []string{"b", "c", "d", "e"},
Stats: Stats{Hits: 2, Misses: 6},
},
}, {
name: "get err",
key: "err",
state: State{
Keys: []string{"c", "d", "e", "err"},
Stats: Stats{Hits: 2, Misses: 7},
},
}, {
name: "get err again",
key: "err",
state: State{
Keys: []string{"c", "d", "e", "err"},
Stats: Stats{Hits: 3, Misses: 7},
},
}, {
name: "evict c",
key: "c",
evict: true,
state: State{
Keys: []string{"d", "e", "err"},
Stats: Stats{Hits: 3, Misses: 7},
},
},
}
for _, tc := range cases {
time.Sleep(tc.sleep)
if !tc.evict {
val, err := c.Get(tc.key)
if tc.key == "err" && err != ErrTest {
t.Fatal(tc.name, val)
}
if tc.key != "err" && val.(string) != tc.key {
t.Fatal(tc.name, tc.key, val)
}
} else {
c.Evict(tc.key)
}
if err := c.assert(tc.state); err != nil {
t.Fatal(err)
}
}
}
func TestCache_thunderingHerd(t *testing.T) {
c := New(Config{
MaxSize: 4,
Src: func(key string) (interface{}, error) {
time.Sleep(time.Second)
return key, nil
},
})
wg := sync.WaitGroup{}
for i := 0; i < 1024; i++ {
wg.Add(1)
go func() {
defer wg.Done()
val, err := c.Get("a")
if err != nil {
panic(err)
}
if val != "a" {
panic(err)
}
}()
}
wg.Wait()
stats := c.Stats()
if stats.Hits != 1023 || stats.Misses != 1 {
t.Fatal(stats)
}
}

67
kvmemcache/internal.go Normal file
View File

@ -0,0 +1,67 @@
package kvmemcache
import (
"time"
)
func (c *Cache) put(key string, value interface{}, err error) {
c.lock.Lock()
defer c.lock.Unlock()
c.stats.Misses++
c.cache[key] = c.ll.PushFront(lruItem{
key: key,
createdAt: time.Now(),
value: value,
err: err,
})
if c.maxSize != 0 && len(c.cache) > c.maxSize {
li := c.ll.Back()
c.ll.Remove(li)
delete(c.cache, li.Value.(lruItem).key)
}
}
func (c *Cache) evict(key string) {
elem := c.cache[key]
if elem != nil {
delete(c.cache, key)
c.ll.Remove(elem)
}
}
func (c *Cache) get(key string) (ok bool, val interface{}, err error) {
c.lock.Lock()
defer c.lock.Unlock()
li := c.cache[key]
if li == nil {
return false, nil, nil
}
item := li.Value.(lruItem)
// Maybe evict.
if c.ttl != 0 && time.Since(item.createdAt) > c.ttl {
c.evict(key)
return false, nil, nil
}
c.stats.Hits++
return true, item.value, item.err
}
func (c *Cache) load(key string) (interface{}, error) {
c.updateLock.Lock(key)
defer c.updateLock.Unlock(key)
// Check again in case we lost the update race.
ok, val, err := c.get(key)
if ok {
return val, err
}
// Won the update race.
val, err = c.src(key)
c.put(key, val, err)
return val, err
}

6
kvmemcache/stats.go Normal file
View File

@ -0,0 +1,6 @@
package kvmemcache
type Stats struct {
Hits uint64
Misses uint64
}

View File

@ -0,0 +1,73 @@
package ratelimiter
import (
"errors"
"sync"
"time"
)
var ErrBackoff = errors.New("Backoff")
type Config struct {
BurstLimit int64 // Number of requests to allow to burst.
FillPeriod time.Duration // Add one call per period.
MaxWaitCount int64 // Max number of waiting requests. 0 disables.
}
type Limiter struct {
lock sync.Mutex
fillPeriod time.Duration
minWaitTime time.Duration
maxWaitTime time.Duration
waitTime time.Duration
lastRequest time.Time
}
func New(conf Config) *Limiter {
if conf.BurstLimit < 0 {
panic(conf.BurstLimit)
}
if conf.FillPeriod <= 0 {
panic(conf.FillPeriod)
}
if conf.MaxWaitCount < 0 {
panic(conf.MaxWaitCount)
}
return &Limiter{
fillPeriod: conf.FillPeriod,
waitTime: -conf.FillPeriod * time.Duration(conf.BurstLimit),
minWaitTime: -conf.FillPeriod * time.Duration(conf.BurstLimit),
maxWaitTime: conf.FillPeriod * time.Duration(conf.MaxWaitCount-1),
lastRequest: time.Now(),
}
}
func (lim *Limiter) limit() (time.Duration, error) {
lim.lock.Lock()
defer lim.lock.Unlock()
dt := time.Since(lim.lastRequest)
waitTime := lim.waitTime - dt
if waitTime < lim.minWaitTime {
waitTime = lim.minWaitTime
} else if waitTime >= lim.maxWaitTime {
return 0, ErrBackoff
}
lim.waitTime = waitTime + lim.fillPeriod
lim.lastRequest = lim.lastRequest.Add(dt)
return lim.waitTime, nil
}
// Apply the limiter to the calling thread. The function may sleep for up to
// maxWaitTime before returning. If the timeout would need to be more than
// maxWaitTime to enforce the rate limit, ErrBackoff is returned.
func (lim *Limiter) Limit() error {
dt, err := lim.limit()
time.Sleep(dt) // Will return immediately for dt <= 0.
return err
}

View File

@ -0,0 +1,101 @@
package ratelimiter
import (
"sync"
"testing"
"time"
)
func TestRateLimiter_Limit_Errors(t *testing.T) {
type TestCase struct {
Name string
Conf Config
N int
ErrCount int
DT time.Duration
}
cases := []TestCase{
{
Name: "no burst, no wait",
Conf: Config{
BurstLimit: 0,
FillPeriod: 100 * time.Millisecond,
MaxWaitCount: 0,
},
N: 32,
ErrCount: 31,
DT: 100 * time.Millisecond,
}, {
Name: "no wait",
Conf: Config{
BurstLimit: 10,
FillPeriod: 100 * time.Millisecond,
MaxWaitCount: 0,
},
N: 32,
ErrCount: 22,
DT: 0,
}, {
Name: "no burst",
Conf: Config{
BurstLimit: 0,
FillPeriod: 10 * time.Millisecond,
MaxWaitCount: 10,
},
N: 32,
ErrCount: 22,
DT: 100 * time.Millisecond,
}, {
Name: "burst and wait",
Conf: Config{
BurstLimit: 10,
FillPeriod: 10 * time.Millisecond,
MaxWaitCount: 10,
},
N: 32,
ErrCount: 12,
DT: 100 * time.Millisecond,
},
}
for _, tc := range cases {
wg := sync.WaitGroup{}
l := New(tc.Conf)
errs := make([]error, tc.N)
t0 := time.Now()
for i := 0; i < tc.N; i++ {
wg.Add(1)
go func(i int) {
errs[i] = l.Limit()
wg.Done()
}(i)
}
wg.Wait()
dt := time.Since(t0)
errCount := 0
for _, err := range errs {
if err != nil {
errCount++
}
}
if errCount != tc.ErrCount {
t.Fatalf("%s: Expected %d errors but got %d.",
tc.Name, tc.ErrCount, errCount)
}
if dt < tc.DT {
t.Fatal(tc.Name, dt, tc.DT)
}
if dt > tc.DT+10*time.Millisecond {
t.Fatal(tc.Name, dt, tc.DT)
}
}
}