Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V1 #8

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
33 changes: 19 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ cache:
ttl: 24h
amnesia: 0
compression: true

my-user-cache:
soft-ttl: 2h
layers:
Expand All @@ -75,25 +76,28 @@ cache:

`soft-ttl` is an instance-wide TTL which when expired will **NOT** remove the data from the instance, but warns that the data is old

Each cache layer can be of types `redis`, `gaurdian`, `memory` or `tiny`. `redis` is used for a single node Redis server, `gaurdian` is used for a master-slave Redis cluster configuration, `memory` uses the BigCache library to provide an efficient and fast in-memory cache, `tiny` uses the native sync.map data structure to store smaller cache values in memory (used for low-write caches).
Each cache layer can be of the following types:
`redis` is used for a single node Redis server.
`gaurdian` [Depricated] is used for a master-slave Redis cluster configuration but it's being depricated in favor of `rediscluster`.
`rediscluster` is an all-encompassing configuration for both client side sharding as well as cluster Redis (or both at the same time).
`memory` uses the BigCache library to provide an efficient and fast in-memory cache.
`tiny` uses the native sync.map data structure to store smaller cache values in memory (used for low-write caches).
Note: all of the cache types are sync-safe, meaning they can be safely used from simultaneously running goroutines.

#### Common layer configs:

`amnesia` is a stochastic fall-through mechanism which allows for a higher layer to be updated from a lower layer by the way of an artificial cache-miss,
a 0 amnesia means that the layers will never miss a data that they actually have, a 10 amnesia means when a key is present in the cache, 90% of the time it is returned but 10% of the time it is ignored and is treated as a cache-miss. a 100 amnesia effectively turns the layer off. (Default: 0)

`compression` is whther the data is compressed before being put into the cache memory. Currently only Zlib compression is supported. (Default: false)

`ttl` is the hard Time To Live for the data in this particular layer, after which the data is expired and is expected to be removed.
an amnesia value of 0 means that the layers will never miss a data that they actually have, an amnesia value of 10 means when a key is present in the cache, 90% of the time it is returned but 10% of the time it is ignored and is treated as a cache-miss. a 100% amnesia effectively turns the layer off. (Default: 0)
`compression` dictates whther the data is compressed before being put into the cache memory. Currently only Zlib compression is supported. (Default: false)
`ttl` is the hard Time-To-Live for the data in this particular layer, after which the data is expired and is expected to be removed.

#### Type-spesific layer configs:

`db` [`redis` - `gaurdian`] is the Redis DB number to be used. (Default:0)
`idle-timeout` [`redis` - `gaurdian`] is the timeout for idle connections to the Redis Server (see Redis documentation) (Default:0 - no timeout)
`address` [`redis` - `gaurdian`] is the Redis Server's Address (the master's address in case of a cluster)
`slaves` [`gaurdian`] is a **list** of Redis servers addresses pertaining to the slave nodes.
`max-memory` [`memory`] is the maximum amount of system memory which can be used by this particular layer.
`db` [`redis` - `gaurdian`] is the Redis DB number to be used. (Default:0)
`idle-timeout` [`redis` - `gaurdian`] is the timeout for idle connections to the Redis Server (see Redis documentation) (Default:0 - no timeout)
`address` [`redis` - `gaurdian` - `rediscluster`] is the Redis Server's Address (the master's address in case of a cluster)
`slaves` [`gaurdian` - `rediscluster`] is a **list** of Redis servers addresses pertaining to the slave nodes.
`max-memory` [`memory`] is the maximum amount of system memory which can be used by this particular layer.


## Documentation
Expand All @@ -119,9 +123,10 @@ We use [SemVer](http://semver.org/) for versioning. For the versions available,

## Authors

* **Ramtin Rostami** - *Initial work* - [rrostami](https://github.com/rrostami)
* **Pedram Teymoori** - *Initial work* - [pedramteymoori](https://github.com/pedramteymoori)
* **Parsa abdollahi** - *Initial work* - []()
* **Ramtin Rostami** [rrostami](https://github.com/rrostami) - *Initial work & Maintaining*
* **Pedram Teymoori** [pedramteymoori](https://github.com/pedramteymoori) - *Initial work*
* **Parsa abdollahi** - *Initial work*
* **Ava Abderezaei** [avv-va](https://github.com/avv-va) - *Tests*

See also the list of [contributors](https://github.com/cafebazaar/Mnemosyne/graphs/contributors) who participated in this project.

Expand Down
84 changes: 84 additions & 0 deletions c-memory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package mnemosyne

import (
"context"
"math/rand"
"time"

"github.com/allegro/bigcache"
"github.com/sirupsen/logrus"
)

type inMemoryCache struct {
layerName string
base *bigcache.BigCache
amnesiaChance int
compressionEnabled bool
cacheTTL time.Duration
watcher ITimer
}

func NewInMemoryCache(opts *CacheOpts, watcher ITimer) *inMemoryCache {
internalOpts := bigcache.Config{
Shards: 1024,
LifeWindow: opts.cacheTTL,
MaxEntriesInWindow: 1100 * 10 * 60,
MaxEntrySize: 500,
Verbose: false,
HardMaxCacheSize: opts.memOpts.maxMem,
CleanWindow: 1 * time.Minute,
}
cacheInstance, err := bigcache.NewBigCache(internalOpts)
if err != nil {
logrus.Errorf("InMemCache Error: %v", err)
}
return &inMemoryCache{
layerName: opts.layerName,
base: cacheInstance,
amnesiaChance: opts.amnesiaChance,
compressionEnabled: opts.compressionEnabled,
cacheTTL: opts.cacheTTL,
watcher: watcher,
}
}

func (mc *inMemoryCache) Get(ctx context.Context, key string) (*cachableRet, error) {
if mc.amnesiaChance > rand.Intn(100) {
return nil, NewAmnesiaError(mc.amnesiaChance)
}
rawBytes, err := mc.base.Get(key)
if err != nil {
return nil, err
}
return finalizeCacheResponse(rawBytes, mc.compressionEnabled)
}

func (mc *inMemoryCache) Set(ctx context.Context, key string, value interface{}) error {
if mc.amnesiaChance == 100 {
return NewAmnesiaError(mc.amnesiaChance)
}
finalData, err := prepareCachePayload(value, mc.compressionEnabled)
if err != nil {
return err
}
return mc.base.Set(key, finalData)
}

func (mc *inMemoryCache) Delete(ctx context.Context, key string) error {
if mc.amnesiaChance == 100 {
return NewAmnesiaError(mc.amnesiaChance)
}
return mc.base.Delete(key)
}

func (mc *inMemoryCache) Clear() error {
return mc.base.Reset()
}

func (mc *inMemoryCache) TTL(ctx context.Context, key string) time.Duration {
return time.Second * 0
}

func (mc *inMemoryCache) Name() string {
return mc.layerName
}
164 changes: 164 additions & 0 deletions c-redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package mnemosyne

import (
"context"
"hash/fnv"
"math/rand"
"time"

"github.com/go-redis/redis"
"github.com/sirupsen/logrus"
)

type RedisClusterAddress struct {
MasterAddr string `mapstructure:"address"`
SlaveAddrs []string `mapstructure:"slaves"`
}

type RedisOpts struct {
db int
idleTimeout time.Duration
shards []*RedisClusterAddress
}

type clusterClient struct {
master *redis.Client
slaves []*redis.Client
}

type redisCache struct {
layerName string
baseClients []*clusterClient
amnesiaChance int
compressionEnabled bool
cacheTTL time.Duration
watcher ITimer
}

func makeClient(addr string, db int, idleTimeout time.Duration) *redis.Client {
redisOptions := &redis.Options{
Addr: addr,
DB: db,
}
if idleTimeout >= time.Second {
redisOptions.IdleTimeout = idleTimeout
}
newClient := redis.NewClient(redisOptions)

if err := newClient.Ping().Err(); err != nil {
logrus.WithError(err).WithField("address", addr).Error("error pinging Redis")
}
return newClient
}

func NewShardedClusterRedisCache(opts *CacheOpts, watcher ITimer) *redisCache {
rc := &redisCache{
layerName: opts.layerName,
amnesiaChance: opts.amnesiaChance,
compressionEnabled: opts.compressionEnabled,
cacheTTL: opts.cacheTTL,
watcher: watcher,
}
rc.baseClients = make([]*clusterClient, len(opts.redisOpts.shards))
for i, shard := range opts.redisOpts.shards {
rc.baseClients[i].master = makeClient(shard.MasterAddr,
opts.redisOpts.db,
opts.redisOpts.idleTimeout)

rc.baseClients[i].slaves = make([]*redis.Client, len(shard.SlaveAddrs))
for j, slv := range shard.SlaveAddrs {
rc.baseClients[i].slaves[j] = makeClient(slv,
opts.redisOpts.db,
opts.redisOpts.idleTimeout)
}
}
return rc
}

func (rc *redisCache) Get(ctx context.Context, key string) (*cachableRet, error) {
if rc.amnesiaChance > rand.Intn(100) {
return nil, NewAmnesiaError(rc.amnesiaChance)
}
client := rc.pickClient(key, false).WithContext(ctx)
startMarker := rc.watcher.Start()
strValue, err := client.Get(key).Result()
if err == nil {
rc.watcher.Done(startMarker, rc.layerName, "get", "ok")
} else if err == redis.Nil {
rc.watcher.Done(startMarker, rc.layerName, "get", "miss")
} else {
rc.watcher.Done(startMarker, rc.layerName, "get", "error")
}
rawBytes := []byte(strValue)
return finalizeCacheResponse(rawBytes, rc.compressionEnabled)
}

func (rc *redisCache) Set(ctx context.Context, key string, value interface{}) error {
if rc.amnesiaChance == 100 {
return NewAmnesiaError(rc.amnesiaChance)
}
finalData, err := prepareCachePayload(value, rc.compressionEnabled)
if err != nil {
return err
}
client := rc.pickClient(key, true).WithContext(ctx)
startMarker := rc.watcher.Start()
setError := client.SetNX(key, finalData, rc.cacheTTL).Err()
if setError != nil {
rc.watcher.Done(startMarker, rc.layerName, "set", "error")
} else {
rc.watcher.Done(startMarker, rc.layerName, "set", "ok")
}
return setError
}
func (rc *redisCache) Delete(ctx context.Context, key string) error {
if rc.amnesiaChance == 100 {
return NewAmnesiaError(rc.amnesiaChance)
}
client := rc.pickClient(key, true).WithContext(ctx)
return client.Del(key).Err()
}

func (rc *redisCache) Clear() error {
for _, cl := range rc.baseClients {
client := cl.master
err := client.FlushDB().Err()
if err != nil {
return err
}
}
return nil
}

func (rc *redisCache) TTL(ctx context.Context, key string) time.Duration {
client := rc.pickClient(key, false).WithContext(ctx)
res, err := client.TTL(key).Result()
if err != nil {
return time.Second * 0
}
return res
}

func (rc *redisCache) pickClient(key string, modification bool) *redis.Client {
shard := rc.shardKey(key)
if modification || len(rc.baseClients[shard].slaves) == 0 {
return rc.baseClients[shard].master
}
cl := rand.Intn(len(rc.baseClients[shard].slaves))
return rc.baseClients[shard].slaves[cl]
}

func (rc *redisCache) shardKey(key string) int {
shards := len(rc.baseClients)
if shards == 1 {
return 0
}
hasher := fnv.New32a()
hasher.Write([]byte(key))
keyHash := int(hasher.Sum32())
return keyHash % shards
}

func (rc *redisCache) Name() string {
return rc.layerName
}
80 changes: 80 additions & 0 deletions c-tiny.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package mnemosyne

import (
"context"
"errors"
"math/rand"
"sync"
"time"
)

type tinyCache struct {
layerName string
base *sync.Map
amnesiaChance int
compressionEnabled bool
cacheTTL time.Duration
watcher ITimer
}

func NewTinyCache(opts *CacheOpts, watcher ITimer) *tinyCache {
data := sync.Map{}
return &tinyCache{
layerName: opts.layerName,
base: &data,
amnesiaChance: opts.amnesiaChance,
compressionEnabled: opts.compressionEnabled,
cacheTTL: time.Hour * 9999,
watcher: watcher,
}
}

func (tc *tinyCache) Get(ctx context.Context, key string) (*cachableRet, error) {
if tc.amnesiaChance > rand.Intn(100) {
return nil, NewAmnesiaError(tc.amnesiaChance)
}
var rawBytes []byte
val, ok := tc.base.Load(key)
if !ok {
return nil, errors.New("Failed to load from syncmap")
} else {
rawBytes, ok = val.([]byte)
if !ok {
return nil, errors.New("Failed to load from syncmap")
}
}
return finalizeCacheResponse(rawBytes, tc.compressionEnabled)
}

func (tc *tinyCache) Set(ctx context.Context, key string, value interface{}) error {
if tc.amnesiaChance == 100 {
return NewAmnesiaError(tc.amnesiaChance)
}
finalData, err := prepareCachePayload(value, tc.compressionEnabled)
if err != nil {
return err
}
tc.base.Store(key, finalData)
return nil
}

func (tc *tinyCache) Delete(ctx context.Context, key string) error {
if tc.amnesiaChance == 100 {
return NewAmnesiaError(tc.amnesiaChance)
}
tc.base.Delete(key)
return nil
}

func (tc *tinyCache) Clear() error {
tc.base = &sync.Map{}
return nil
}

func (tc *tinyCache) TTL(ctx context.Context, key string) time.Duration {
return time.Second * 0
}

func (tc *tinyCache) Name() string {
return tc.layerName
}
Loading