From 36cec05daa2eb08f8d071fcde86e5ef4979640ef Mon Sep 17 00:00:00 2001
From: Andrei Aaron <aaaron@luxoft.com>
Date: Fri, 3 Jan 2025 10:05:07 +0000
Subject: [PATCH] feat(redis): redis implementation for MetaDB

Signed-off-by: Andrei Aaron <aaaron@luxoft.com>
---
 pkg/api/controller_test.go       |   91 ++
 pkg/meta/meta_test.go            |   80 +-
 pkg/meta/redisdb/buckets.go      |   12 +
 pkg/meta/redisdb/redis.go        | 2151 +++++++++++++++++++++++++++---
 pkg/meta/redisdb/redis_test.go   |  242 ++++
 pkg/meta/version/patches.go      |    5 +
 pkg/meta/version/version_test.go |  173 +++
 7 files changed, 2598 insertions(+), 156 deletions(-)
 create mode 100644 pkg/meta/redisdb/buckets.go
 create mode 100644 pkg/meta/redisdb/redis_test.go

diff --git a/pkg/api/controller_test.go b/pkg/api/controller_test.go
index cf8636f9b..6e86efc47 100644
--- a/pkg/api/controller_test.go
+++ b/pkg/api/controller_test.go
@@ -26,6 +26,7 @@ import (
 	"testing"
 	"time"
 
+	"github.com/alicebob/miniredis/v2"
 	"github.com/google/go-github/v62/github"
 	"github.com/gorilla/mux"
 	"github.com/gorilla/securecookie"
@@ -154,6 +155,44 @@ func TestCreateCacheDatabaseDriver(t *testing.T) {
 		So(err, ShouldBeNil)
 		So(driver, ShouldBeNil)
 	})
+	Convey("Test CreateCacheDatabaseDriver redisdb", t, func() {
+		miniRedis := miniredis.RunT(t)
+
+		log := log.NewLogger("debug", "")
+
+		// fail create db, no perm
+		dir := t.TempDir()
+		conf := config.New()
+		conf.Storage.RootDirectory = dir
+		conf.Storage.Dedupe = true
+		conf.Storage.RemoteCache = true
+		conf.Storage.CacheDriver = map[string]interface{}{
+			"name": "redis",
+			"url":  "redis://" + miniRedis.Addr(),
+		}
+
+		// test initialization for S3 storage
+		conf.Storage.StorageDriver = map[string]interface{}{
+			"name":          "s3",
+			"rootdirectory": "/zot",
+			"url":           "us-east-2",
+		}
+
+		driver, err := storage.CreateCacheDatabaseDriver(conf.Storage.StorageConfig, log)
+		So(err, ShouldBeNil)
+		So(driver, ShouldNotBeNil)
+		So(driver.Name(), ShouldEqual, "redis")
+		So(driver.UsesRelativePaths(), ShouldEqual, false)
+
+		// test initialization for local storage
+		conf.Storage.StorageDriver = nil
+
+		driver, err = storage.CreateCacheDatabaseDriver(conf.Storage.StorageConfig, log)
+		So(err, ShouldBeNil)
+		So(driver, ShouldNotBeNil)
+		So(driver.Name(), ShouldEqual, "redis")
+		So(driver.UsesRelativePaths(), ShouldEqual, true)
+	})
 	tskip.SkipDynamo(t)
 	tskip.SkipS3(t)
 	Convey("Test CreateCacheDatabaseDriver dynamodb", t, func() {
@@ -303,6 +342,58 @@ func TestCreateMetaDBDriver(t *testing.T) {
 		So(testFunc, ShouldNotPanic)
 	})
 
+	Convey("Test create MetaDB redis", t, func() {
+		miniRedis := miniredis.RunT(t)
+
+		log := log.NewLogger("debug", "")
+		dir := t.TempDir()
+		conf := config.New()
+		conf.Storage.RootDirectory = dir
+		conf.Storage.Dedupe = true
+		conf.Storage.RemoteCache = true
+		conf.Storage.StorageDriver = map[string]interface{}{
+			"name":          "s3",
+			"rootdirectory": "/zot",
+			"region":        "us-east-2",
+			"bucket":        "zot-storage",
+			"secure":        true,
+			"skipverify":    false,
+		}
+
+		conf.Storage.CacheDriver = map[string]interface{}{
+			"name": "dummy",
+		}
+
+		metaDB, err := meta.New(conf.Storage.StorageConfig, log)
+		So(err, ShouldNotBeNil)
+		So(metaDB, ShouldBeNil)
+
+		conf.Storage.CacheDriver = map[string]interface{}{
+			"name": "redis",
+		}
+
+		testFunc := func() { _, _ = meta.New(conf.Storage.StorageConfig, log) }
+		So(testFunc, ShouldPanic)
+
+		conf.Storage.CacheDriver = map[string]interface{}{
+			"name": "redis",
+			"url":  "url",
+		}
+
+		metaDB, err = meta.New(conf.Storage.StorageConfig, log)
+		So(err, ShouldNotBeNil)
+		So(metaDB, ShouldBeNil)
+
+		conf.Storage.CacheDriver = map[string]interface{}{
+			"name": "redis",
+			"url":  "redis://" + miniRedis.Addr(),
+		}
+
+		metaDB, err = meta.New(conf.Storage.StorageConfig, log)
+		So(err, ShouldBeNil)
+		So(metaDB, ShouldNotBeNil)
+	})
+
 	Convey("Test create MetaDB bolt", t, func() {
 		log := log.NewLogger("debug", "")
 		dir := t.TempDir()
diff --git a/pkg/meta/meta_test.go b/pkg/meta/meta_test.go
index c918bf4be..5dd926a75 100644
--- a/pkg/meta/meta_test.go
+++ b/pkg/meta/meta_test.go
@@ -11,6 +11,7 @@ import (
 	"testing"
 	"time"
 
+	"github.com/alicebob/miniredis/v2"
 	"github.com/aws/aws-sdk-go-v2/service/dynamodb"
 	guuid "github.com/gofrs/uuid"
 	"github.com/notaryproject/notation-core-go/signature/jws"
@@ -18,6 +19,7 @@ import (
 	"github.com/notaryproject/notation-go/signer"
 	godigest "github.com/opencontainers/go-digest"
 	ispec "github.com/opencontainers/image-spec/specs-go/v1"
+	"github.com/redis/go-redis/v9"
 	. "github.com/smartystreets/goconvey/convey"
 
 	zcommon "zotregistry.dev/zot/pkg/common"
@@ -28,6 +30,7 @@ import (
 	"zotregistry.dev/zot/pkg/meta/boltdb"
 	"zotregistry.dev/zot/pkg/meta/common"
 	mdynamodb "zotregistry.dev/zot/pkg/meta/dynamodb"
+	"zotregistry.dev/zot/pkg/meta/redisdb"
 	mTypes "zotregistry.dev/zot/pkg/meta/types"
 	reqCtx "zotregistry.dev/zot/pkg/requestcontext"
 	tCommon "zotregistry.dev/zot/pkg/test/common"
@@ -164,6 +167,35 @@ func TestDynamoDBWrapper(t *testing.T) {
 	})
 }
 
+func TestRedisDB(t *testing.T) {
+	miniRedis := miniredis.RunT(t)
+
+	Convey("RedisDB Wrapper", t, func() {
+		rootDir := t.TempDir()
+		log := log.NewLogger("debug", "")
+
+		redisDriver, err := redisdb.GetRedisClient("redis://" + miniRedis.Addr())
+		So(err, ShouldBeNil)
+
+		metaDB, err := redisdb.New(redisDriver, log)
+		So(metaDB, ShouldNotBeNil)
+		So(err, ShouldBeNil)
+
+		imgTrustStore, err := imagetrust.NewLocalImageTrustStore(rootDir)
+		So(err, ShouldBeNil)
+
+		metaDB.SetImageTrustStore(imgTrustStore)
+
+		defer func() {
+			metaDB.ResetDB() //nolint: errcheck
+			os.RemoveAll(path.Join(rootDir, "_cosign"))
+			os.RemoveAll(path.Join(rootDir, "_notation"))
+		}()
+
+		RunMetaDBTests(t, metaDB)
+	})
+}
+
 func RunMetaDBTests(t *testing.T, metaDB mTypes.MetaDB, preparationFuncs ...func() error) { //nolint: thelper
 	ctx := context.Background()
 
@@ -1616,9 +1648,20 @@ func RunMetaDBTests(t *testing.T, metaDB mTypes.MetaDB, preparationFuncs ...func
 				So(err, ShouldBeNil)
 				So(len(repoMetaList), ShouldEqual, 2)
 
-				So(repoMetaList[0].Tags[tag1].Digest, ShouldResemble, image1.DigestStr())
-				So(repoMetaList[0].Tags[tag2].Digest, ShouldResemble, image2.DigestStr())
-				So(repoMetaList[1].Tags[tag3].Digest, ShouldResemble, image3.DigestStr())
+				repos := map[string]map[string]string{}
+				for _, repoMeta := range repoMetaList {
+					if _, exists := repos[repoMeta.Name]; !exists {
+						repos[repoMeta.Name] = map[string]string{}
+					}
+
+					for tag, descriptor := range repoMeta.Tags {
+						repos[repoMeta.Name][tag] = descriptor.Digest
+					}
+				}
+
+				So(repos[repo1][tag1], ShouldEqual, image1.DigestStr())
+				So(repos[repo1][tag2], ShouldEqual, image2.DigestStr())
+				So(repos[repo2][tag3], ShouldEqual, image3.DigestStr())
 			})
 
 			Convey("Search a repo by name", func() {
@@ -2577,3 +2620,34 @@ func TestCreateBoltDB(t *testing.T) {
 		So(err, ShouldNotBeNil)
 	})
 }
+
+func TestCreateRedisDB(t *testing.T) {
+	Convey("Create", t, func() {
+		miniRedis := miniredis.RunT(t)
+
+		log := log.NewLogger("debug", "")
+		So(log, ShouldNotBeNil)
+
+		redisDriver, err := redisdb.GetRedisClient("redis://" + miniRedis.Addr())
+		So(err, ShouldBeNil)
+
+		metaDB, err := meta.Create("redis", redisDriver, nil, log)
+		So(metaDB, ShouldNotBeNil)
+		So(err, ShouldBeNil)
+	})
+
+	Convey("fails", t, func() {
+		log := log.NewLogger("debug", "")
+
+		_, err := meta.Create("redis", nil, mdynamodb.DBDriverParameters{}, log)
+		So(err, ShouldNotBeNil)
+
+		// Redis client will not be responding
+		redisURL := "redis://127.0.0.1:" + tCommon.GetFreePort() // must not match miniRedis.Addr()
+		connOpts, _ := redis.ParseURL(redisURL)
+		cacheDB := redis.NewClient(connOpts)
+
+		_, err = meta.Create("redis", cacheDB, nil, log)
+		So(err, ShouldNotBeNil)
+	})
+}
diff --git a/pkg/meta/redisdb/buckets.go b/pkg/meta/redisdb/buckets.go
new file mode 100644
index 000000000..08fb2a998
--- /dev/null
+++ b/pkg/meta/redisdb/buckets.go
@@ -0,0 +1,12 @@
+package redisdb
+
+// MetadataDB.
+const (
+	ImageMetaBuck       = "zot:ImageMeta"
+	RepoMetaBuck        = "zot:RepoMeta"
+	RepoBlobsBuck       = "zot:RepoBlobsMeta"
+	RepoLastUpdatedBuck = "zot:RepoLastUpdated"
+	UserDataBucket      = "zot:UserData"
+	VersionBucket       = "zot:Version"
+	UserAPIKeysBucket   = "zot:UserAPIKeys" //nolint: gosec // these are not hardcoded credentials
+)
diff --git a/pkg/meta/redisdb/redis.go b/pkg/meta/redisdb/redis.go
index 76f3d6778..54dccd25a 100644
--- a/pkg/meta/redisdb/redis.go
+++ b/pkg/meta/redisdb/redis.go
@@ -2,29 +2,53 @@ package redisdb
 
 import (
 	"context"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"strings"
 	"time"
 
 	godigest "github.com/opencontainers/go-digest"
+	ispec "github.com/opencontainers/image-spec/specs-go/v1"
 	"github.com/redis/go-redis/v9"
+	"google.golang.org/protobuf/proto"
+	"google.golang.org/protobuf/types/known/timestamppb"
 
+	zerr "zotregistry.dev/zot/errors"
+	"zotregistry.dev/zot/pkg/api/constants"
+	zcommon "zotregistry.dev/zot/pkg/common"
 	"zotregistry.dev/zot/pkg/log"
+	"zotregistry.dev/zot/pkg/meta/common"
+	mConvert "zotregistry.dev/zot/pkg/meta/convert"
+	proto_go "zotregistry.dev/zot/pkg/meta/proto/gen"
 	mTypes "zotregistry.dev/zot/pkg/meta/types"
+	"zotregistry.dev/zot/pkg/meta/version"
+	reqCtx "zotregistry.dev/zot/pkg/requestcontext"
 )
 
 type RedisDB struct {
-	Client        *redis.Client
+	Client        redis.UniversalClient
 	imgTrustStore mTypes.ImageTrustStore
+	Patches       []func(client redis.UniversalClient) error
+	Version       string
 	Log           log.Logger
 }
 
-func New(client *redis.Client, log log.Logger) (*RedisDB, error) {
+func New(client redis.UniversalClient, log log.Logger) (*RedisDB, error) {
 	redisWrapper := RedisDB{
 		Client:        client,
-		imgTrustStore: nil,
 		Log:           log,
+		Patches:       version.GetRedisDBPatches(),
+		Version:       version.CurrentVersion,
+		imgTrustStore: nil,
+	}
+
+	if err := client.Ping(context.Background()).Err(); err != nil {
+		log.Error().Err(err).Msg("failed to ping redis DB")
+
+		return nil, err
 	}
 
-	// Using the Config value, create the DynamoDB client
 	return &redisWrapper, nil
 }
 
@@ -39,219 +63,1906 @@ func GetRedisClient(url string) (*redis.Client, error) {
 
 // GetStarredRepos returns starred repos and takes current user in consideration.
 func (rc *RedisDB) GetStarredRepos(ctx context.Context) ([]string, error) {
-	return []string{}, nil
+	userData, err := rc.GetUserData(ctx)
+	if errors.Is(err, zerr.ErrUserDataNotFound) || errors.Is(err, zerr.ErrUserDataNotAllowed) {
+		return []string{}, nil
+	}
+
+	return userData.StarredRepos, err
 }
 
 // GetBookmarkedRepos returns bookmarked repos and takes current user in consideration.
 func (rc *RedisDB) GetBookmarkedRepos(ctx context.Context) ([]string, error) {
-	return []string{}, nil
+	userData, err := rc.GetUserData(ctx)
+	if errors.Is(err, zerr.ErrUserDataNotFound) || errors.Is(err, zerr.ErrUserDataNotAllowed) {
+		return []string{}, nil
+	}
+
+	return userData.BookmarkedRepos, err
 }
 
 // ToggleStarRepo adds/removes stars on repos.
-func (rc *RedisDB) ToggleStarRepo(ctx context.Context, reponame string) (mTypes.ToggleState, error) {
-	return 0, nil
+func (rc *RedisDB) ToggleStarRepo(ctx context.Context, repo string) (mTypes.ToggleState, error) {
+	userAc, err := reqCtx.UserAcFromContext(ctx)
+	if err != nil {
+		return mTypes.NotChanged, err
+	}
+
+	if userAc.IsAnonymous() || !userAc.Can(constants.ReadPermission, repo) {
+		return mTypes.NotChanged, zerr.ErrUserDataNotAllowed
+	}
+
+	userid := userAc.GetUsername()
+
+	var res mTypes.ToggleState
+
+	userData, err := rc.GetUserData(ctx)
+	if err != nil && !errors.Is(err, zerr.ErrUserDataNotFound) {
+		return mTypes.NotChanged, err
+	}
+
+	isRepoStarred := zcommon.Contains(userData.StarredRepos, repo)
+
+	if isRepoStarred {
+		res = mTypes.Removed
+
+		userData.StarredRepos = zcommon.RemoveFrom(userData.StarredRepos, repo)
+	} else {
+		res = mTypes.Added
+
+		userData.StarredRepos = append(userData.StarredRepos, repo)
+	}
+
+	userDataBlob, err := json.Marshal(userData)
+	if err != nil {
+		return mTypes.NotChanged, err
+	}
+
+	protoRepoMeta, err := rc.getProtoRepoMeta(ctx, repo)
+	if err != nil {
+		return mTypes.NotChanged, err
+	}
+
+	switch res {
+	case mTypes.Added:
+		protoRepoMeta.Stars++
+	case mTypes.Removed:
+		protoRepoMeta.Stars--
+	}
+
+	repoMetaBlob, err := proto.Marshal(protoRepoMeta)
+	if err != nil {
+		return mTypes.NotChanged, err
+	}
+
+	_, err = rc.Client.TxPipelined(ctx, func(txrp redis.Pipeliner) error {
+		if err = txrp.HSet(ctx, UserDataBucket, userid, userDataBlob).Err(); err != nil {
+			rc.Log.Error().Err(err).Str("hset", UserDataBucket).Str("userid", userid).
+				Msg("failed to set user data record")
+
+			return fmt.Errorf("failed to set user data for identity %s: %w", userid, err)
+		}
+
+		if err := txrp.HSet(ctx, RepoMetaBuck, repo, repoMetaBlob).Err(); err != nil {
+			rc.Log.Error().Err(err).Str("hset", RepoMetaBuck).Str("repo", repo).
+				Msg("failed to put repo meta record")
+
+			return fmt.Errorf("failed to set repometa for repo %s: %w", repo, err)
+		}
+
+		return nil
+	})
+
+	return res, err
 }
 
 // ToggleBookmarkRepo adds/removes bookmarks on repos.
-func (rc *RedisDB) ToggleBookmarkRepo(ctx context.Context, reponame string) (mTypes.ToggleState, error) {
-	return 0, nil
+func (rc *RedisDB) ToggleBookmarkRepo(ctx context.Context, repo string) (mTypes.ToggleState, error) {
+	userAc, err := reqCtx.UserAcFromContext(ctx)
+	if err != nil {
+		return mTypes.NotChanged, err
+	}
+
+	if userAc.IsAnonymous() || !userAc.Can(constants.ReadPermission, repo) {
+		return mTypes.NotChanged, zerr.ErrUserDataNotAllowed
+	}
+
+	userid := userAc.GetUsername()
+
+	var res mTypes.ToggleState
+
+	userData, err := rc.GetUserData(ctx)
+	if err != nil && !errors.Is(err, zerr.ErrUserDataNotFound) {
+		return mTypes.NotChanged, err
+	}
+
+	isRepoBookmarked := zcommon.Contains(userData.BookmarkedRepos, repo)
+
+	if isRepoBookmarked {
+		res = mTypes.Removed
+
+		userData.BookmarkedRepos = zcommon.RemoveFrom(userData.BookmarkedRepos, repo)
+	} else {
+		res = mTypes.Added
+
+		userData.BookmarkedRepos = append(userData.BookmarkedRepos, repo)
+	}
+
+	userDataBlob, err := json.Marshal(userData)
+	if err != nil {
+		return mTypes.NotChanged, err
+	}
+
+	err = rc.Client.HSet(ctx, UserDataBucket, userid, userDataBlob).Err()
+	if err != nil {
+		rc.Log.Error().Err(err).Str("hset", UserDataBucket).Str("userid", userid).
+			Msg("failed to set user data record")
+
+		return mTypes.NotChanged, fmt.Errorf("failed to set user data for identity %s: %w", userid, err)
+	}
+
+	return res, nil
 }
 
 // UserDB profile/api key CRUD.
 func (rc *RedisDB) GetUserData(ctx context.Context) (mTypes.UserData, error) {
-	return mTypes.UserData{}, nil
+	var userData mTypes.UserData
+
+	userAc, err := reqCtx.UserAcFromContext(ctx)
+	if err != nil {
+		return userData, err
+	}
+
+	if userAc.IsAnonymous() {
+		return userData, zerr.ErrUserDataNotAllowed
+	}
+
+	userid := userAc.GetUsername()
+
+	userDataBlob, err := rc.Client.HGet(ctx, UserDataBucket, userid).Bytes()
+	if err != nil && !errors.Is(err, redis.Nil) {
+		rc.Log.Error().Err(err).Str("hget", UserDataBucket).Str("userid", userid).
+			Msg("failed to get user data record")
+
+		return mTypes.UserData{}, fmt.Errorf("failed to get user data record for identity %s: %w", userid, err)
+	}
+
+	if errors.Is(err, redis.Nil) {
+		return userData, zerr.ErrUserDataNotFound
+	}
+
+	err = json.Unmarshal(userDataBlob, &userData)
+
+	return userData, err
 }
 
 func (rc *RedisDB) SetUserData(ctx context.Context, userData mTypes.UserData) error {
+	userAc, err := reqCtx.UserAcFromContext(ctx)
+	if err != nil {
+		return err
+	}
+
+	if userAc.IsAnonymous() {
+		return zerr.ErrUserDataNotAllowed
+	}
+
+	userid := userAc.GetUsername()
+
+	userDataBlob, err := json.Marshal(userData)
+	if err != nil {
+		return err
+	}
+
+	err = rc.Client.HSet(ctx, UserDataBucket, userid, userDataBlob).Err()
+	if err != nil {
+		rc.Log.Error().Err(err).Str("hset", UserDataBucket).Str("userid", userid).
+			Msg("failed to set user data record")
+
+		return fmt.Errorf("failed to set user data for identity %s: %w", userid, err)
+	}
+
 	return nil
 }
 
 func (rc *RedisDB) SetUserGroups(ctx context.Context, groups []string) error {
+	userAc, err := reqCtx.UserAcFromContext(ctx)
+	if err != nil {
+		return err
+	}
+
+	if userAc.IsAnonymous() {
+		return zerr.ErrUserDataNotAllowed
+	}
+
+	userid := userAc.GetUsername()
+
+	userData, err := rc.GetUserData(ctx)
+	if err != nil && !errors.Is(err, zerr.ErrUserDataNotFound) {
+		return err
+	}
+
+	userData.Groups = groups
+
+	userDataBlob, err := json.Marshal(userData)
+	if err != nil {
+		return err
+	}
+
+	err = rc.Client.HSet(ctx, UserDataBucket, userid, userDataBlob).Err()
+	if err != nil {
+		rc.Log.Error().Err(err).Str("hset", UserDataBucket).Str("userid", userid).
+			Msg("failed to set user data record")
+
+		return fmt.Errorf("failed to set user data for identity %s: %w", userid, err)
+	}
+
 	return nil
 }
 
 func (rc *RedisDB) GetUserGroups(ctx context.Context) ([]string, error) {
-	return []string{}, nil
+	userData, err := rc.GetUserData(ctx)
+
+	return userData.Groups, err
 }
 
 func (rc *RedisDB) DeleteUserData(ctx context.Context) error {
-	return nil
-}
+	userAc, err := reqCtx.UserAcFromContext(ctx)
+	if err != nil {
+		return err
+	}
 
-func (rc *RedisDB) GetUserAPIKeyInfo(hashedKey string) (string, error) {
-	return "", nil
-}
+	if userAc.IsAnonymous() {
+		return zerr.ErrUserDataNotAllowed
+	}
 
-func (rc *RedisDB) GetUserAPIKeys(ctx context.Context) ([]mTypes.APIKeyDetails, error) {
-	return []mTypes.APIKeyDetails{}, nil
-}
+	userid := userAc.GetUsername()
 
-func (rc *RedisDB) AddUserAPIKey(ctx context.Context, hashedKey string, apiKeyDetails *mTypes.APIKeyDetails) error {
-	return nil
-}
+	_, err = rc.GetUserData(ctx)
+	if err != nil && errors.Is(err, zerr.ErrUserDataNotFound) {
+		return zerr.ErrBucketDoesNotExist
+	}
 
-func (rc *RedisDB) IsAPIKeyExpired(ctx context.Context, hashedKey string) (bool, error) {
-	return false, nil
-}
+	err = rc.Client.HDel(ctx, UserDataBucket, userid).Err()
+	if err != nil {
+		rc.Log.Error().Err(err).Str("hdel", UserDataBucket).Str("userid", userid).
+			Msg("failed to delete user data record")
 
-func (rc *RedisDB) UpdateUserAPIKeyLastUsed(ctx context.Context, hashedKey string) error {
-	return nil
-}
+		return fmt.Errorf("failed to delete user data for identity %s: %w", userid, err)
+	}
 
-func (rc *RedisDB) DeleteUserAPIKey(ctx context.Context, id string) error {
-	return nil
+	return err
 }
 
-func (rc *RedisDB) SetImageMeta(digest godigest.Digest, imageMeta mTypes.ImageMeta) error {
-	return nil
-}
+func (rc *RedisDB) GetUserAPIKeyInfo(hashedKey string) (string, error) {
+	ctx := context.Background()
 
-// SetRepoReference sets the given image data to the repo metadata.
-func (rc *RedisDB) SetRepoReference(ctx context.Context, repo string,
-	reference string, imageMeta mTypes.ImageMeta,
-) error {
-	return nil
-}
+	userid, err := rc.Client.HGet(ctx, UserAPIKeysBucket, hashedKey).Result()
+	if err != nil && !errors.Is(err, redis.Nil) {
+		rc.Log.Error().Err(err).Str("hget", UserAPIKeysBucket).Str("userid", userid).
+			Msg("failed to get api key record")
 
-// SearchRepos searches for repos given a search string.
-func (rc *RedisDB) SearchRepos(ctx context.Context, searchText string) ([]mTypes.RepoMeta, error) {
-	return []mTypes.RepoMeta{}, nil
-}
+		return userid, fmt.Errorf("failed to get api key record for identity %s: %w", userid, err)
+	}
 
-// SearchTags searches for images(repo:tag) given a search string.
-func (rc *RedisDB) SearchTags(ctx context.Context, searchText string) ([]mTypes.FullImageMeta, error) {
-	return []mTypes.FullImageMeta{}, nil
-}
+	if len(userid) == 0 || errors.Is(err, redis.Nil) {
+		return userid, zerr.ErrUserAPIKeyNotFound
+	}
 
-// FilterTags filters for images given a filter function.
-func (rc *RedisDB) FilterTags(ctx context.Context, filterRepoTag mTypes.FilterRepoTagFunc,
-	filterFunc mTypes.FilterFunc,
-) ([]mTypes.FullImageMeta, error) {
-	return []mTypes.FullImageMeta{}, nil
+	return userid, err
 }
 
-// FilterRepos filters for repos given a filter function.
-func (rc *RedisDB) FilterRepos(ctx context.Context, rankName mTypes.FilterRepoNameFunc,
-	filterFunc mTypes.FilterFullRepoFunc,
-) ([]mTypes.RepoMeta, error) {
-	return []mTypes.RepoMeta{}, nil
-}
+func (rc *RedisDB) GetUserAPIKeys(ctx context.Context) ([]mTypes.APIKeyDetails, error) {
+	apiKeys := make([]mTypes.APIKeyDetails, 0)
 
-// GetRepoMeta returns the full information about a repo.
-func (rc *RedisDB) GetRepoMeta(ctx context.Context, repo string) (mTypes.RepoMeta, error) {
-	return mTypes.RepoMeta{}, nil
-}
+	userAc, err := reqCtx.UserAcFromContext(ctx)
+	if err != nil {
+		return nil, err
+	}
 
-// GetFullImageMeta returns the full information about an image.
-func (rc *RedisDB) GetFullImageMeta(ctx context.Context, repo string, tag string) (mTypes.FullImageMeta, error) {
-	return mTypes.FullImageMeta{}, nil
-}
+	if userAc.IsAnonymous() {
+		return nil, zerr.ErrUserDataNotAllowed
+	}
 
-// GetImageMeta returns the raw information about an image.
-func (rc *RedisDB) GetImageMeta(digest godigest.Digest) (mTypes.ImageMeta, error) {
-	return mTypes.ImageMeta{}, nil
-}
+	userid := userAc.GetUsername()
 
-// GetMultipleRepoMeta returns a list of all repos that match the given filter function.
-func (rc *RedisDB) GetMultipleRepoMeta(ctx context.Context, filter func(repoMeta mTypes.RepoMeta) bool) (
-	[]mTypes.RepoMeta, error,
-) {
-	return []mTypes.RepoMeta{}, nil
-}
+	userData, err := rc.GetUserData(ctx)
+	if err != nil && !errors.Is(err, zerr.ErrUserDataNotFound) {
+		return apiKeys, err
+	}
 
-// AddManifestSignature adds signature metadata to a given manifest in the database.
-func (rc *RedisDB) AddManifestSignature(repo string, signedManifestDigest godigest.Digest,
-	sm mTypes.SignatureMetadata,
-) error {
-	return nil
-}
+	changed := false
 
-// DeleteSignature deletes signature metadata to a given manifest from the database.
-func (rc *RedisDB) DeleteSignature(repo string, signedManifestDigest godigest.Digest,
-	sigMeta mTypes.SignatureMetadata,
-) error {
-	return nil
-}
+	for hashedKey, apiKeyDetails := range userData.APIKeys {
+		// if expiresAt is not nil value
+		if !apiKeyDetails.ExpirationDate.Equal(time.Time{}) && time.Now().After(apiKeyDetails.ExpirationDate) {
+			apiKeyDetails.IsExpired = true
 
-// UpdateSignaturesValidity checks and updates signatures validity of a given manifest.
-func (rc *RedisDB) UpdateSignaturesValidity(ctx context.Context, repo string, manifestDigest godigest.Digest) error {
-	return nil
-}
+			changed = true
+		}
 
-// IncrementRepoStars adds 1 to the star count of an image.
-func (rc *RedisDB) IncrementRepoStars(repo string) error {
-	return nil
-}
+		userData.APIKeys[hashedKey] = apiKeyDetails
 
-// DecrementRepoStars subtracts 1 from the star count of an image.
-func (rc *RedisDB) DecrementRepoStars(repo string) error {
-	return nil
-}
+		apiKeys = append(apiKeys, apiKeyDetails)
+	}
 
-// SetRepoMeta returns RepoMetadata of a repo from the database.
-func (rc *RedisDB) SetRepoMeta(repo string, repoMeta mTypes.RepoMeta) error {
-	return nil
-}
+	if !changed {
+		// return early, no need to make a call to update key expiry in the DB
+		return apiKeys, nil
+	}
 
-// DeleteRepoMeta.
-func (rc *RedisDB) DeleteRepoMeta(repo string) error {
-	return nil
-}
+	userDataBlob, err := json.Marshal(userData)
+	if err != nil {
+		return apiKeys, err
+	}
 
-// GetReferrersInfo returns a list of  for all referrers of the given digest that match one of the
-// artifact types.
-func (rc *RedisDB) GetReferrersInfo(repo string, referredDigest godigest.Digest,
-	artifactTypes []string,
-) ([]mTypes.ReferrerInfo, error) {
-	return []mTypes.ReferrerInfo{}, nil
-}
+	err = rc.Client.HSet(ctx, UserDataBucket, userid, userDataBlob).Err()
+	if err != nil {
+		rc.Log.Error().Err(err).Str("hset", UserDataBucket).Str("userid", userid).
+			Msg("failed to set user data record")
 
-// UpdateStatsOnDownload adds 1 to the download count of an image and sets the timestamp of download.
-func (rc *RedisDB) UpdateStatsOnDownload(repo string, reference string) error {
-	return nil
-}
+		return apiKeys, fmt.Errorf("failed to set user data for identity %s: %w", userid, err)
+	}
 
-// FilterImageMeta returns the image data for the given digests.
-func (rc *RedisDB) FilterImageMeta(ctx context.Context,
-	digests []string,
-) (map[mTypes.ImageDigest]mTypes.ImageMeta, error) {
-	return map[mTypes.ImageDigest]mTypes.ImageMeta{}, nil
+	return apiKeys, nil
 }
 
-/*
-	RemoveRepoReference removes the tag from RepoMetadata if the reference is a tag,
+func (rc *RedisDB) AddUserAPIKey(ctx context.Context, hashedKey string, apiKeyDetails *mTypes.APIKeyDetails) error {
+	userAc, err := reqCtx.UserAcFromContext(ctx)
+	if err != nil {
+		return err
+	}
 
-it also removes its corresponding digest from Statistics, Signatures and Referrers if there are no tags
-pointing to it.
-If the reference is a digest then it will remove the digest from Statistics, Signatures and Referrers only
-if there are no tags pointing to the digest, otherwise it's noop.
-*/
-func (rc *RedisDB) RemoveRepoReference(repo, reference string, manifestDigest godigest.Digest) error {
-	return nil
-}
+	if userAc.IsAnonymous() {
+		return zerr.ErrUserDataNotAllowed
+	}
 
-// ResetRepoReferences resets all layout specific data (tags, signatures, referrers, etc.) but keep user and image
-// specific metadata such as star count, downloads other statistics.
-func (rc *RedisDB) ResetRepoReferences(repo string) error {
-	return nil
-}
+	userid := userAc.GetUsername()
 
-func (rc *RedisDB) GetRepoLastUpdated(repo string) time.Time {
-	return time.Now()
-}
+	userData, err := rc.GetUserData(ctx)
+	if err != nil && !errors.Is(err, zerr.ErrUserDataNotFound) {
+		return err
+	}
 
-func (rc *RedisDB) GetAllRepoNames() ([]string, error) {
-	return []string{}, nil
-}
+	if userData.APIKeys == nil {
+		userData.APIKeys = make(map[string]mTypes.APIKeyDetails)
+	}
 
-// ResetDB will delete all data in the DB.
-func (rc *RedisDB) ResetDB() error {
-	return nil
-}
+	userData.APIKeys[hashedKey] = *apiKeyDetails
 
-func (rc *RedisDB) PatchDB() error {
-	return nil
-}
+	userDataBlob, err := json.Marshal(userData)
+	if err != nil {
+		return err
+	}
+
+	_, err = rc.Client.TxPipelined(ctx, func(txrp redis.Pipeliner) error {
+		if err := txrp.HSet(ctx, UserDataBucket, userid, userDataBlob).Err(); err != nil {
+			rc.Log.Error().Err(err).Str("hset", UserDataBucket).Str("userid", userid).
+				Msg("failed to set user data record")
+
+			return fmt.Errorf("failed to set user data for identity %s: %w", userid, err)
+		}
+
+		if err := txrp.HSet(ctx, UserAPIKeysBucket, hashedKey, userid).Err(); err != nil {
+			rc.Log.Error().Err(err).Str("hset", UserAPIKeysBucket).Str("userid", userid).
+				Msg("failed to set api key record")
+
+			return fmt.Errorf("failed to set api key for identity %s: %w", userid, err)
+		}
+
+		return nil
+	})
+
+	return err
+}
+
+func (rc *RedisDB) IsAPIKeyExpired(ctx context.Context, hashedKey string) (bool, error) {
+	userAc, err := reqCtx.UserAcFromContext(ctx)
+	if err != nil {
+		return false, err
+	}
+
+	if userAc.IsAnonymous() {
+		return false, zerr.ErrUserDataNotAllowed
+	}
+
+	userid := userAc.GetUsername()
+
+	var isExpired bool
+
+	userData, err := rc.GetUserData(ctx)
+	if err != nil && !errors.Is(err, zerr.ErrUserDataNotFound) {
+		return isExpired, err
+	}
+
+	apiKeyDetails := userData.APIKeys[hashedKey]
+	if apiKeyDetails.IsExpired {
+		isExpired = true
+
+		return isExpired, nil
+	}
+
+	// if expiresAt is not nil value
+	if !apiKeyDetails.ExpirationDate.Equal(time.Time{}) && time.Now().After(apiKeyDetails.ExpirationDate) {
+		isExpired = true
+		apiKeyDetails.IsExpired = true
+	}
+
+	userData.APIKeys[hashedKey] = apiKeyDetails
+
+	userDataBlob, err := json.Marshal(userData)
+	if err != nil {
+		return isExpired, err
+	}
+
+	err = rc.Client.HSet(ctx, UserDataBucket, userid, userDataBlob).Err()
+	if err != nil {
+		rc.Log.Error().Err(err).Str("hset", UserDataBucket).Str("userid", userid).
+			Msg("failed to set user data record")
+
+		return isExpired, fmt.Errorf("failed to set user data for identity %s: %w", userid, err)
+	}
+
+	return isExpired, nil
+}
+
+func (rc *RedisDB) UpdateUserAPIKeyLastUsed(ctx context.Context, hashedKey string) error {
+	userAc, err := reqCtx.UserAcFromContext(ctx)
+	if err != nil {
+		return err
+	}
+
+	if userAc.IsAnonymous() {
+		return zerr.ErrUserDataNotAllowed
+	}
+
+	userid := userAc.GetUsername()
+
+	userData, err := rc.GetUserData(ctx)
+	if err != nil && !errors.Is(err, zerr.ErrUserDataNotFound) {
+		return err
+	}
+
+	apiKeyDetails := userData.APIKeys[hashedKey]
+	apiKeyDetails.LastUsed = time.Now()
+
+	userData.APIKeys[hashedKey] = apiKeyDetails
+
+	userDataBlob, err := json.Marshal(userData)
+	if err != nil {
+		return err
+	}
+
+	err = rc.Client.HSet(ctx, UserDataBucket, userid, userDataBlob).Err()
+	if err != nil {
+		rc.Log.Error().Err(err).Str("hset", UserDataBucket).Str("userid", userid).
+			Msg("failed to set user data record")
+
+		return fmt.Errorf("failed to set user data for identity %s: %w", userid, err)
+	}
+
+	return err
+}
+
+func (rc *RedisDB) DeleteUserAPIKey(ctx context.Context, keyID string) error {
+	userAc, err := reqCtx.UserAcFromContext(ctx)
+	if err != nil {
+		return err
+	}
+
+	if userAc.IsAnonymous() {
+		return zerr.ErrUserDataNotAllowed
+	}
+
+	userid := userAc.GetUsername()
+
+	userData, err := rc.GetUserData(ctx)
+	if err != nil {
+		return err
+	}
+
+	for hash, apiKeyDetails := range userData.APIKeys {
+		if apiKeyDetails.UUID != keyID {
+			continue
+		}
+
+		delete(userData.APIKeys, hash)
+
+		userDataBlob, err := json.Marshal(userData)
+		if err != nil {
+			return err
+		}
+
+		_, err = rc.Client.TxPipelined(ctx, func(txrp redis.Pipeliner) error {
+			if err = txrp.HSet(ctx, UserDataBucket, userid, userDataBlob).Err(); err != nil {
+				rc.Log.Error().Err(err).Str("hset", UserDataBucket).Str("userid", userid).
+					Msg("failed to set user data record")
+
+				return fmt.Errorf("failed to set user data for identity %s: %w", userid, err)
+			}
+
+			if err = txrp.HDel(ctx, UserAPIKeysBucket, hash).Err(); err != nil {
+				rc.Log.Error().Err(err).Str("hdel", UserAPIKeysBucket).Str("userid", userid).
+					Msg("failed to delete api key record")
+
+				return fmt.Errorf("failed to delete api key record for identity %s: %w", userid, err)
+			}
+
+			return nil
+		})
+	}
+
+	return nil
+}
+
+func (rc *RedisDB) SetImageMeta(digest godigest.Digest, imageMeta mTypes.ImageMeta) error {
+	protoImageMeta := &proto_go.ImageMeta{}
+	ctx := context.Background()
+
+	switch imageMeta.MediaType {
+	case ispec.MediaTypeImageManifest:
+		manifest := imageMeta.Manifests[0]
+
+		protoImageMeta = mConvert.GetProtoImageManifestData(manifest.Manifest, manifest.Config,
+			manifest.Size, manifest.Digest.String())
+	case ispec.MediaTypeImageIndex:
+		protoImageMeta = mConvert.GetProtoImageIndexMeta(*imageMeta.Index, imageMeta.Size, imageMeta.Digest.String())
+	}
+
+	pImageMetaBlob, err := proto.Marshal(protoImageMeta)
+	if err != nil {
+		return fmt.Errorf("failed to calculate blob for manifest with digest %s %w", digest, err)
+	}
+
+	err = rc.Client.HSet(ctx, ImageMetaBuck, digest.String(), pImageMetaBlob).Err()
+	if err != nil {
+		rc.Log.Error().Err(err).Str("hset", ImageMetaBuck).Str("digest", digest.String()).
+			Msg("failed to set image meta record")
+
+		return fmt.Errorf("failed to set image meta record for digest %s: %w", digest.String(), err)
+	}
+
+	return nil
+}
+
+// SetRepoReference sets the given image data to the repo metadata.
+func (rc *RedisDB) SetRepoReference(ctx context.Context, repo string,
+	reference string, imageMeta mTypes.ImageMeta,
+) error {
+	if err := common.ValidateRepoReferenceInput(repo, reference, imageMeta.Digest); err != nil {
+		return err
+	}
+
+	var userid string
+
+	userAc, err := reqCtx.UserAcFromContext(ctx)
+	if err == nil {
+		userid = userAc.GetUsername()
+	}
+
+	// 1. Add image data to db if needed
+	protoImageMeta := mConvert.GetProtoImageMeta(imageMeta)
+
+	imageMetaBlob, err := proto.Marshal(protoImageMeta)
+	if err != nil {
+		return err
+	}
+
+	err = rc.Client.HSet(ctx, ImageMetaBuck, imageMeta.Digest.String(), imageMetaBlob).Err()
+	if err != nil {
+		rc.Log.Error().Err(err).Str("hset", ImageMetaBuck).Str("digest", imageMeta.Digest.String()).
+			Msg("failed to set image meta record")
+
+		return fmt.Errorf("failed to set image meta record for digest %s: %w", imageMeta.Digest.String(), err)
+	}
+
+	protoRepoMeta, err := rc.getProtoRepoMeta(ctx, repo)
+	if err != nil && !errors.Is(err, zerr.ErrRepoMetaNotFound) {
+		return err
+	}
+
+	// 2. Referrers
+	if subject := mConvert.GetImageSubject(protoImageMeta); subject != nil {
+		refInfo := &proto_go.ReferrersInfo{}
+		if protoRepoMeta.Referrers[subject.Digest.String()] != nil {
+			refInfo = protoRepoMeta.Referrers[subject.Digest.String()]
+		}
+
+		foundReferrer := false
+
+		for i := range refInfo.List {
+			if refInfo.List[i].Digest == mConvert.GetImageDigestStr(protoImageMeta) {
+				foundReferrer = true
+				refInfo.List[i].Count += 1
+
+				break
+			}
+		}
+
+		if !foundReferrer {
+			refInfo.List = append(refInfo.List, &proto_go.ReferrerInfo{
+				Count:        1,
+				MediaType:    protoImageMeta.MediaType,
+				Digest:       mConvert.GetImageDigestStr(protoImageMeta),
+				ArtifactType: mConvert.GetImageArtifactType(protoImageMeta),
+				Size:         mConvert.GetImageManifestSize(protoImageMeta),
+				Annotations:  mConvert.GetImageAnnotations(protoImageMeta),
+			})
+		}
+
+		protoRepoMeta.Referrers[subject.Digest.String()] = refInfo
+	}
+
+	// 3. Update tag
+	if !common.ReferenceIsDigest(reference) {
+		protoRepoMeta.Tags[reference] = &proto_go.TagDescriptor{
+			Digest:    imageMeta.Digest.String(),
+			MediaType: imageMeta.MediaType,
+		}
+	}
+
+	if _, ok := protoRepoMeta.Statistics[imageMeta.Digest.String()]; !ok {
+		protoRepoMeta.Statistics[imageMeta.Digest.String()] = &proto_go.DescriptorStatistics{
+			DownloadCount:     0,
+			LastPullTimestamp: &timestamppb.Timestamp{},
+			PushTimestamp:     timestamppb.Now(),
+			PushedBy:          userid,
+		}
+	} else if protoRepoMeta.Statistics[imageMeta.Digest.String()].PushTimestamp.AsTime().IsZero() {
+		protoRepoMeta.Statistics[imageMeta.Digest.String()].PushTimestamp = timestamppb.Now()
+	}
+
+	if _, ok := protoRepoMeta.Signatures[imageMeta.Digest.String()]; !ok {
+		protoRepoMeta.Signatures[imageMeta.Digest.String()] = &proto_go.ManifestSignatures{
+			Map: map[string]*proto_go.SignaturesInfo{"": {}},
+		}
+	}
+
+	if _, ok := protoRepoMeta.Referrers[imageMeta.Digest.String()]; !ok {
+		protoRepoMeta.Referrers[imageMeta.Digest.String()] = &proto_go.ReferrersInfo{
+			List: []*proto_go.ReferrerInfo{},
+		}
+	}
+
+	// 4. Blobs
+	repoBlobsBytes, err := rc.Client.HGet(ctx, RepoBlobsBuck, repo).Bytes()
+	if err != nil && !errors.Is(err, redis.Nil) {
+		rc.Log.Error().Err(err).Str("hget", RepoBlobsBuck).Str("repo", repo).
+			Msg("failed to get repo blobs record")
+
+		return fmt.Errorf("failed to get repo blobs record for repo %s: %w", repo, err)
+	}
+
+	repoBlobs, err := unmarshalProtoRepoBlobs(repo, repoBlobsBytes)
+	if err != nil {
+		return err
+	}
+
+	protoRepoMeta, repoBlobs = common.AddImageMetaToRepoMeta(protoRepoMeta, repoBlobs, reference, imageMeta)
+	protoTime := timestamppb.New(time.Now())
+
+	protoTimeBlob, err := proto.Marshal(protoTime)
+	if err != nil {
+		return err
+	}
+
+	repoBlobsBytes, err = proto.Marshal(repoBlobs)
+	if err != nil {
+		return err
+	}
+
+	repoMetaBlob, err := proto.Marshal(protoRepoMeta)
+	if err != nil {
+		return err
+	}
+
+	_, err = rc.Client.TxPipelined(ctx, func(txrp redis.Pipeliner) error {
+		if err := txrp.HSet(ctx, RepoLastUpdatedBuck, repo, protoTimeBlob).Err(); err != nil {
+			rc.Log.Error().Err(err).Str("hset", RepoLastUpdatedBuck).Str("repo", repo).
+				Msg("failed to put repo last updated timestamp")
+
+			return fmt.Errorf("failed to put repo last updated record for repo %s: %w", repo, err)
+		}
+
+		if err := txrp.HSet(ctx, RepoBlobsBuck, repo, repoBlobsBytes).Err(); err != nil {
+			rc.Log.Error().Err(err).Str("hset", RepoBlobsBuck).Str("repo", repo).
+				Msg("failed to put repo blobs record")
+
+			return fmt.Errorf("failed to set repo blobs record for repo %s: %w", repo, err)
+		}
+
+		if err := txrp.HSet(ctx, RepoMetaBuck, repo, repoMetaBlob).Err(); err != nil {
+			rc.Log.Error().Err(err).Str("hset", RepoMetaBuck).Str("repo", repo).
+				Msg("failed to put repo meta record")
+
+			return fmt.Errorf("failed to put repometa record for repo %s: %w", repo, err)
+		}
+
+		return nil
+	})
+
+	return err
+}
+
+// SearchRepos searches for repos given a search string.
+func (rc *RedisDB) SearchRepos(ctx context.Context, searchText string) ([]mTypes.RepoMeta, error) {
+	foundRepos := []mTypes.RepoMeta{}
+
+	repoMetaEntries, err := rc.Client.HGetAll(ctx, RepoMetaBuck).Result()
+	if err != nil {
+		rc.Log.Error().Err(err).Str("hgetall", RepoMetaBuck).Msg("failed to get all repo meta records")
+
+		return foundRepos, fmt.Errorf("failed to get all repo meta records: %w", err)
+	}
+
+	userBookmarks, userStars := rc.getUserBookmarksAndStarsNoError(ctx)
+
+	for repo, repoMetaBlob := range repoMetaEntries {
+		if ok, err := reqCtx.RepoIsUserAvailable(ctx, repo); !ok || err != nil {
+			continue
+		}
+
+		rank := common.RankRepoName(searchText, repo)
+		if rank == -1 {
+			continue
+		}
+
+		protoRepoMeta, err := unmarshalProtoRepoMeta(repo, []byte(repoMetaBlob))
+		if err != nil {
+			// similarly with other metadb implementations, do not return a partial result on error
+			return []mTypes.RepoMeta{}, err
+		}
+
+		delete(protoRepoMeta.Tags, "")
+
+		if len(protoRepoMeta.Tags) == 0 {
+			continue
+		}
+
+		protoRepoMeta.Rank = int32(rank) //nolint:gosec // ignore overflow
+		protoRepoMeta.IsBookmarked = zcommon.Contains(userBookmarks, protoRepoMeta.Name)
+		protoRepoMeta.IsStarred = zcommon.Contains(userStars, protoRepoMeta.Name)
+
+		repoMeta := mConvert.GetRepoMeta(protoRepoMeta)
+		foundRepos = append(foundRepos, repoMeta)
+	}
+
+	return foundRepos, err
+}
+
+// SearchTags searches for images(repo:tag) given a search string.
+func (rc *RedisDB) SearchTags(ctx context.Context, searchText string) ([]mTypes.FullImageMeta, error) {
+	images := []mTypes.FullImageMeta{}
+
+	searchedRepo, searchedTag, err := common.GetRepoTag(searchText)
+	if err != nil {
+		return images, fmt.Errorf("failed to parse search text, invalid format %w", err)
+	}
+
+	repoMetaEntries, err := rc.Client.HGetAll(ctx, RepoMetaBuck).Result()
+	if err != nil {
+		rc.Log.Error().Err(err).Str("hgetall", RepoMetaBuck).Msg("failed to get all repo meta records")
+
+		return images, fmt.Errorf("failed to get all repo meta records: %w", err)
+	}
+
+	userBookmarks, userStars := rc.getUserBookmarksAndStarsNoError(ctx)
+
+	for repo, repoMetaBlob := range repoMetaEntries {
+		if repo != searchedRepo {
+			continue
+		}
+
+		if ok, err := reqCtx.RepoIsUserAvailable(ctx, repo); !ok || err != nil {
+			return images, err
+		}
+
+		protoRepoMeta, err := unmarshalProtoRepoMeta(repo, []byte(repoMetaBlob))
+		if err != nil {
+			return images, err
+		}
+
+		delete(protoRepoMeta.Tags, "")
+
+		protoRepoMeta.IsBookmarked = zcommon.Contains(userBookmarks, protoRepoMeta.Name)
+		protoRepoMeta.IsStarred = zcommon.Contains(userStars, protoRepoMeta.Name)
+
+		for tag, descriptor := range protoRepoMeta.Tags {
+			if !strings.HasPrefix(tag, searchedTag) || tag == "" {
+				continue
+			}
+
+			var protoImageMeta *proto_go.ImageMeta
+
+			switch descriptor.MediaType {
+			case ispec.MediaTypeImageManifest:
+				manifestDigest := descriptor.Digest
+
+				imageManifestData, err := rc.getProtoImageMeta(ctx, manifestDigest)
+				if err != nil {
+					return images, fmt.Errorf("failed to fetch manifest meta for manifest with digest %s %w",
+						manifestDigest, err)
+				}
+
+				protoImageMeta = imageManifestData
+			case ispec.MediaTypeImageIndex:
+				indexDigest := descriptor.Digest
+
+				imageIndexData, err := rc.getProtoImageMeta(ctx, indexDigest)
+				if err != nil {
+					return images, fmt.Errorf("failed to fetch manifest meta for manifest with digest %s %w",
+						indexDigest, err)
+				}
+
+				_, manifestDataList, err := rc.getAllContainedMeta(ctx, imageIndexData)
+				if err != nil {
+					return images, err
+				}
+
+				imageIndexData.Manifests = manifestDataList
+
+				protoImageMeta = imageIndexData
+			default:
+				rc.Log.Error().Str("mediaType", descriptor.MediaType).Msg("unsupported media type")
+
+				continue
+			}
+
+			images = append(images, mConvert.GetFullImageMetaFromProto(tag, protoRepoMeta, protoImageMeta))
+		}
+	}
+
+	return images, err
+}
+
+// FilterTags filters for images given a filter function.
+func (rc *RedisDB) FilterTags(ctx context.Context, filterRepoTag mTypes.FilterRepoTagFunc,
+	filterFunc mTypes.FilterFunc,
+) ([]mTypes.FullImageMeta, error) {
+	images := []mTypes.FullImageMeta{}
+
+	repoMetaEntries, err := rc.Client.HGetAll(ctx, RepoMetaBuck).Result()
+	if err != nil {
+		rc.Log.Error().Err(err).Str("hgetall", RepoMetaBuck).Msg("failed to get all repo meta records")
+
+		return images, fmt.Errorf("failed to get all repo meta records: %w", err)
+	}
+
+	userBookmarks, userStars := rc.getUserBookmarksAndStarsNoError(ctx)
+
+	var unifiedErr error
+
+	for repo, repoMetaBlob := range repoMetaEntries {
+		if ok, err := reqCtx.RepoIsUserAvailable(ctx, repo); !ok || err != nil {
+			continue
+		}
+
+		protoRepoMeta, err := unmarshalProtoRepoMeta(repo, []byte(repoMetaBlob))
+		if err != nil {
+			unifiedErr = errors.Join(unifiedErr, err)
+
+			continue
+		}
+
+		delete(protoRepoMeta.Tags, "")
+		protoRepoMeta.IsBookmarked = zcommon.Contains(userBookmarks, protoRepoMeta.Name)
+		protoRepoMeta.IsStarred = zcommon.Contains(userStars, protoRepoMeta.Name)
+		repoMeta := mConvert.GetRepoMeta(protoRepoMeta)
+
+		for tag, descriptor := range protoRepoMeta.Tags {
+			if !filterRepoTag(repo, tag) {
+				continue
+			}
+
+			switch descriptor.MediaType {
+			case ispec.MediaTypeImageManifest:
+				manifestDigest := descriptor.Digest
+
+				imageManifestData, err := rc.getProtoImageMeta(ctx, manifestDigest)
+				if err != nil {
+					unifiedErr = errors.Join(unifiedErr, err)
+
+					continue
+				}
+
+				imageMeta := mConvert.GetImageMeta(imageManifestData)
+
+				if filterFunc(repoMeta, imageMeta) {
+					images = append(images, mConvert.GetFullImageMetaFromProto(tag, protoRepoMeta, imageManifestData))
+				}
+			case ispec.MediaTypeImageIndex:
+				indexDigest := descriptor.Digest
+
+				protoImageIndexMeta, err := rc.getProtoImageMeta(ctx, indexDigest)
+				if err != nil {
+					unifiedErr = errors.Join(unifiedErr, err)
+
+					continue
+				}
+
+				imageIndexMeta := mConvert.GetImageMeta(protoImageIndexMeta)
+				matchedManifests := []*proto_go.ManifestMeta{}
+
+				imageManifestDataList, _, err := rc.getAllContainedMeta(ctx, protoImageIndexMeta)
+				if err != nil {
+					unifiedErr = errors.Join(unifiedErr, err)
+
+					continue
+				}
+
+				for _, imageManifestData := range imageManifestDataList {
+					imageMeta := mConvert.GetImageMeta(imageManifestData)
+					partialImageMeta := common.GetPartialImageMeta(imageIndexMeta, imageMeta)
+
+					if filterFunc(repoMeta, partialImageMeta) {
+						matchedManifests = append(matchedManifests, imageManifestData.Manifests[0])
+					}
+				}
+
+				if len(matchedManifests) > 0 {
+					protoImageIndexMeta.Manifests = matchedManifests
+
+					images = append(images, mConvert.GetFullImageMetaFromProto(tag, protoRepoMeta, protoImageIndexMeta))
+				}
+			default:
+				rc.Log.Error().Str("mediaType", descriptor.MediaType).Msg("unsupported media type")
+
+				continue
+			}
+		}
+	}
+
+	return images, unifiedErr
+}
+
+// FilterRepos filters for repos given a filter function.
+func (rc *RedisDB) FilterRepos(ctx context.Context, acceptName mTypes.FilterRepoNameFunc,
+	filterFunc mTypes.FilterFullRepoFunc,
+) ([]mTypes.RepoMeta, error) {
+	foundRepos := []mTypes.RepoMeta{}
+
+	repoMetaEntries, err := rc.Client.HGetAll(ctx, RepoMetaBuck).Result()
+	if err != nil {
+		rc.Log.Error().Err(err).Str("hgetall", RepoMetaBuck).Msg("failed to get all repo meta records")
+
+		return foundRepos, fmt.Errorf("failed to get all repo meta records: %w", err)
+	}
+
+	userBookmarks, userStars := rc.getUserBookmarksAndStarsNoError(ctx)
+
+	for repo, repoMetaBlob := range repoMetaEntries {
+		if ok, err := reqCtx.RepoIsUserAvailable(ctx, repo); !ok || err != nil {
+			continue
+		}
+
+		if !acceptName(repo) {
+			continue
+		}
+
+		protoRepoMeta, err := unmarshalProtoRepoMeta(repo, []byte(repoMetaBlob))
+		if err != nil {
+			// similarly with other metadb implementations, do not return a partial result on error
+			return []mTypes.RepoMeta{}, err
+		}
+
+		protoRepoMeta.IsBookmarked = zcommon.Contains(userBookmarks, protoRepoMeta.Name)
+		protoRepoMeta.IsStarred = zcommon.Contains(userStars, protoRepoMeta.Name)
+
+		repoMeta := mConvert.GetRepoMeta(protoRepoMeta)
+
+		if filterFunc(repoMeta) {
+			foundRepos = append(foundRepos, repoMeta)
+		}
+	}
+
+	return foundRepos, nil
+}
+
+// GetRepoMeta returns the full information about a repo.
+func (rc *RedisDB) GetRepoMeta(ctx context.Context, repo string) (mTypes.RepoMeta, error) {
+	protoRepoMeta, err := rc.getProtoRepoMeta(ctx, repo)
+	if err != nil {
+		return mTypes.RepoMeta{}, err
+	}
+
+	userBookmarks, userStars := rc.getUserBookmarksAndStarsNoError(ctx)
+
+	delete(protoRepoMeta.Tags, "")
+	protoRepoMeta.IsBookmarked = zcommon.Contains(userBookmarks, repo)
+	protoRepoMeta.IsStarred = zcommon.Contains(userStars, repo)
+
+	return mConvert.GetRepoMeta(protoRepoMeta), err
+}
+
+// GetFullImageMeta returns the full information about an image.
+func (rc *RedisDB) GetFullImageMeta(ctx context.Context, repo string, tag string) (mTypes.FullImageMeta, error) {
+	protoImageMeta := &proto_go.ImageMeta{}
+
+	protoRepoMeta, err := rc.getProtoRepoMeta(ctx, repo)
+	if err != nil {
+		return mConvert.GetFullImageMetaFromProto(tag, protoRepoMeta, protoImageMeta), err
+	}
+
+	userBookmarks, userStars := rc.getUserBookmarksAndStarsNoError(ctx)
+
+	delete(protoRepoMeta.Tags, "")
+	protoRepoMeta.IsBookmarked = zcommon.Contains(userBookmarks, repo)
+	protoRepoMeta.IsStarred = zcommon.Contains(userStars, repo)
+
+	descriptor, ok := protoRepoMeta.Tags[tag]
+	if !ok {
+		return mConvert.GetFullImageMetaFromProto(tag, protoRepoMeta, protoImageMeta),
+			fmt.Errorf("%w for tag %s in repo %s", zerr.ErrImageMetaNotFound, tag, repo)
+	}
+
+	protoImageMeta, err = rc.getProtoImageMeta(ctx, descriptor.Digest)
+	if err != nil {
+		return mConvert.GetFullImageMetaFromProto(tag, protoRepoMeta, protoImageMeta), err
+	}
+
+	if protoImageMeta.MediaType == ispec.MediaTypeImageIndex {
+		_, manifestDataList, err := rc.getAllContainedMeta(ctx, protoImageMeta)
+		if err != nil {
+			return mConvert.GetFullImageMetaFromProto(tag, protoRepoMeta, protoImageMeta), err
+		}
+
+		protoImageMeta.Manifests = manifestDataList
+	}
+
+	return mConvert.GetFullImageMetaFromProto(tag, protoRepoMeta, protoImageMeta), nil
+}
+
+// GetImageMeta returns the raw information about an image.
+func (rc *RedisDB) GetImageMeta(digest godigest.Digest) (mTypes.ImageMeta, error) {
+	imageMeta := mTypes.ImageMeta{}
+	ctx := context.Background()
+
+	protoImageMeta, err := rc.getProtoImageMeta(ctx, digest.String())
+	if err != nil {
+		return imageMeta, err
+	}
+
+	if protoImageMeta.MediaType == ispec.MediaTypeImageIndex {
+		_, manifestDataList, err := rc.getAllContainedMeta(ctx, protoImageMeta)
+		if err != nil {
+			return imageMeta, err
+		}
+
+		protoImageMeta.Manifests = manifestDataList
+	}
+
+	imageMeta = mConvert.GetImageMeta(protoImageMeta)
+
+	return imageMeta, err
+}
+
+// GetMultipleRepoMeta returns a list of all repos that match the given filter function.
+func (rc *RedisDB) GetMultipleRepoMeta(ctx context.Context, filter func(repoMeta mTypes.RepoMeta) bool) (
+	[]mTypes.RepoMeta, error,
+) {
+	foundRepos := []mTypes.RepoMeta{}
+
+	repoMetaEntries, err := rc.Client.HGetAll(ctx, RepoMetaBuck).Result()
+	if err != nil {
+		rc.Log.Error().Err(err).Str("hgetall", RepoMetaBuck).Msg("failed to get all repo meta records")
+
+		return foundRepos, fmt.Errorf("failed to get all repometa records: %w", err)
+	}
+
+	for repo, repoMetaBlob := range repoMetaEntries {
+		if ok, err := reqCtx.RepoIsUserAvailable(ctx, repo); !ok || err != nil {
+			continue
+		}
+
+		protoRepoMeta, err := unmarshalProtoRepoMeta(repo, []byte(repoMetaBlob))
+		if err != nil {
+			// similarly with other metadb implementations, return a partial result on error
+			return foundRepos, err
+		}
+
+		delete(protoRepoMeta.Tags, "")
+
+		repoMeta := mConvert.GetRepoMeta(protoRepoMeta)
+
+		if filter(repoMeta) {
+			foundRepos = append(foundRepos, repoMeta)
+		}
+	}
+
+	return foundRepos, err
+}
+
+// AddManifestSignature adds signature metadata to a given manifest in the database.
+func (rc *RedisDB) AddManifestSignature(repo string, signedManifestDigest godigest.Digest,
+	sigMeta mTypes.SignatureMetadata,
+) error {
+	ctx := context.Background()
+
+	protoRepoMeta, err := rc.getProtoRepoMeta(ctx, repo)
+	if err != nil && !errors.Is(err, zerr.ErrRepoMetaNotFound) {
+		return err
+	}
+
+	if errors.Is(err, zerr.ErrRepoMetaNotFound) {
+		var err error
+		// create a new object
+		repoMeta := proto_go.RepoMeta{
+			Name: repo,
+			Tags: map[string]*proto_go.TagDescriptor{"": {}},
+			Signatures: map[string]*proto_go.ManifestSignatures{
+				signedManifestDigest.String(): {
+					Map: map[string]*proto_go.SignaturesInfo{
+						sigMeta.SignatureType: {
+							List: []*proto_go.SignatureInfo{
+								{
+									SignatureManifestDigest: sigMeta.SignatureDigest,
+									LayersInfo:              mConvert.GetProtoLayersInfo(sigMeta.LayersInfo),
+								},
+							},
+						},
+					},
+				},
+			},
+			Referrers:  map[string]*proto_go.ReferrersInfo{"": {}},
+			Statistics: map[string]*proto_go.DescriptorStatistics{"": {}},
+		}
+
+		repoMetaBlob, err := proto.Marshal(&repoMeta)
+		if err != nil {
+			return err
+		}
+
+		if err := rc.Client.HSet(ctx, RepoMetaBuck, repo, repoMetaBlob).Err(); err != nil {
+			rc.Log.Error().Err(err).Str("hset", RepoMetaBuck).Str("repo", repo).
+				Msg("failed to put repo meta record")
+
+			return fmt.Errorf("failed to put repometa record for repo %s: %w", repo, err)
+		}
+
+		return nil
+	}
+
+	var (
+		manifestSignatures *proto_go.ManifestSignatures
+		found              bool
+	)
+
+	if manifestSignatures, found = protoRepoMeta.Signatures[signedManifestDigest.String()]; !found {
+		manifestSignatures = &proto_go.ManifestSignatures{Map: map[string]*proto_go.SignaturesInfo{"": {}}}
+	}
+
+	signatureSlice := &proto_go.SignaturesInfo{List: []*proto_go.SignatureInfo{}}
+	if sigSlice, found := manifestSignatures.Map[sigMeta.SignatureType]; found {
+		signatureSlice = sigSlice
+	}
+
+	if !common.ProtoSignatureAlreadyExists(signatureSlice.List, sigMeta) {
+		switch sigMeta.SignatureType {
+		case zcommon.NotationSignature:
+			signatureSlice.List = append(signatureSlice.List, &proto_go.SignatureInfo{
+				SignatureManifestDigest: sigMeta.SignatureDigest,
+				LayersInfo:              mConvert.GetProtoLayersInfo(sigMeta.LayersInfo),
+			})
+		case zcommon.CosignSignature:
+			newCosignSig := &proto_go.SignatureInfo{
+				SignatureManifestDigest: sigMeta.SignatureDigest,
+				LayersInfo:              mConvert.GetProtoLayersInfo(sigMeta.LayersInfo),
+			}
+
+			if zcommon.IsCosignTag(sigMeta.SignatureTag) {
+				// the entry for "sha256-{digest}.sig" signatures should be overwritten if
+				// it exists or added on the first position if it doesn't exist
+				if len(signatureSlice.GetList()) == 0 {
+					signatureSlice.List = []*proto_go.SignatureInfo{newCosignSig}
+				} else {
+					signatureSlice.List[0] = newCosignSig
+				}
+			} else {
+				// the first position should be reserved for "sha256-{digest}.sig" signatures
+				if len(signatureSlice.GetList()) == 0 {
+					signatureSlice.List = []*proto_go.SignatureInfo{{
+						SignatureManifestDigest: "",
+						LayersInfo:              []*proto_go.LayersInfo{},
+					}}
+				}
+
+				signatureSlice.List = append(signatureSlice.List, newCosignSig)
+			}
+		}
+	}
+
+	manifestSignatures.Map[sigMeta.SignatureType] = signatureSlice
+	protoRepoMeta.Signatures[signedManifestDigest.String()] = manifestSignatures
+
+	repoMetaBlob, err := proto.Marshal(protoRepoMeta)
+	if err != nil {
+		return err
+	}
+
+	err = rc.Client.HSet(ctx, RepoMetaBuck, repo, repoMetaBlob).Err()
+	if err != nil {
+		rc.Log.Error().Err(err).Str("hset", RepoMetaBuck).Str("repo", repo).
+			Msg("failed to put repo meta record")
+
+		return fmt.Errorf("failed to put repometa record for repo %s: %w", repo, err)
+	}
+
+	return nil
+}
+
+// DeleteSignature deletes signature metadata to a given manifest from the database.
+func (rc *RedisDB) DeleteSignature(repo string, signedManifestDigest godigest.Digest,
+	sigMeta mTypes.SignatureMetadata,
+) error {
+	ctx := context.Background()
+
+	protoRepoMeta, err := rc.getProtoRepoMeta(ctx, repo)
+	if err != nil {
+		return err
+	}
+
+	manifestSignatures, found := protoRepoMeta.Signatures[signedManifestDigest.String()]
+	if !found {
+		return zerr.ErrImageMetaNotFound
+	}
+
+	signatureSlice := manifestSignatures.Map[sigMeta.SignatureType]
+
+	newSignatureSlice := make([]*proto_go.SignatureInfo, 0, len(signatureSlice.List))
+
+	for _, sigInfo := range signatureSlice.List {
+		if sigInfo.SignatureManifestDigest != sigMeta.SignatureDigest {
+			newSignatureSlice = append(newSignatureSlice, sigInfo)
+		}
+	}
+
+	manifestSignatures.Map[sigMeta.SignatureType] = &proto_go.SignaturesInfo{List: newSignatureSlice}
+	protoRepoMeta.Signatures[signedManifestDigest.String()] = manifestSignatures
+
+	repoMetaBlob, err := proto.Marshal(protoRepoMeta)
+	if err != nil {
+		return err
+	}
+
+	err = rc.Client.HSet(ctx, RepoMetaBuck, repo, repoMetaBlob).Err()
+	if err != nil {
+		rc.Log.Error().Err(err).Str("hset", RepoMetaBuck).Str("repo", repo).
+			Msg("failed to put repo meta record")
+
+		return fmt.Errorf("failed to put repometa record for repo %s: %w", repo, err)
+	}
+
+	return err
+}
+
+// UpdateSignaturesValidity checks and updates signatures validity of a given manifest.
+func (rc *RedisDB) UpdateSignaturesValidity(ctx context.Context, repo string, manifestDigest godigest.Digest) error {
+	imgTrustStore := rc.ImageTrustStore()
+
+	if imgTrustStore == nil {
+		return nil
+	}
+
+	// get ManifestData of signed manifest
+	protoImageMeta, err := rc.getProtoImageMeta(ctx, manifestDigest.String())
+	if err != nil {
+		if errors.Is(err, zerr.ErrImageMetaNotFound) {
+			// manifest meta not found, updating signatures with details about validity and author will not be performed
+			return nil
+		}
+
+		return err
+	}
+
+	// update signatures with details about validity and author
+	protoRepoMeta, err := rc.getProtoRepoMeta(ctx, repo)
+	if err != nil {
+		return err
+	}
+
+	manifestSignatures := proto_go.ManifestSignatures{Map: map[string]*proto_go.SignaturesInfo{"": {}}}
+
+	for sigType, sigs := range protoRepoMeta.Signatures[manifestDigest.String()].Map {
+		if zcommon.IsContextDone(ctx) {
+			return ctx.Err()
+		}
+
+		signaturesInfo := []*proto_go.SignatureInfo{}
+
+		for _, sigInfo := range sigs.List {
+			layersInfo := []*proto_go.LayersInfo{}
+
+			for _, layerInfo := range sigInfo.LayersInfo {
+				author, date, isTrusted, _ := imgTrustStore.VerifySignature(sigType, layerInfo.LayerContent,
+					layerInfo.SignatureKey, manifestDigest, mConvert.GetImageMeta(protoImageMeta), repo)
+
+				if isTrusted {
+					layerInfo.Signer = author
+				}
+
+				if !date.IsZero() {
+					layerInfo.Signer = author
+					layerInfo.Date = timestamppb.New(date)
+				}
+
+				layersInfo = append(layersInfo, layerInfo)
+			}
+
+			signaturesInfo = append(signaturesInfo, &proto_go.SignatureInfo{
+				SignatureManifestDigest: sigInfo.SignatureManifestDigest,
+				LayersInfo:              layersInfo,
+			})
+		}
+
+		manifestSignatures.Map[sigType] = &proto_go.SignaturesInfo{List: signaturesInfo}
+	}
+
+	protoRepoMeta.Signatures[manifestDigest.String()] = &manifestSignatures
+
+	repoMetaBlob, err := proto.Marshal(protoRepoMeta)
+	if err != nil {
+		return err
+	}
+
+	err = rc.Client.HSet(ctx, RepoMetaBuck, repo, repoMetaBlob).Err()
+	if err != nil {
+		rc.Log.Error().Err(err).Str("hset", RepoMetaBuck).Str("repo", repo).
+			Msg("failed to put repo meta record")
+
+		return fmt.Errorf("failed to put repometa record for repo %s: %w", repo, err)
+	}
+
+	return nil
+}
+
+// IncrementRepoStars adds 1 to the star count of an image.
+func (rc *RedisDB) IncrementRepoStars(repo string) error {
+	ctx := context.Background()
+
+	protoRepoMeta, err := rc.getProtoRepoMeta(ctx, repo)
+	if err != nil {
+		return err
+	}
+
+	protoRepoMeta.Stars++
+
+	repoMetaBlob, err := proto.Marshal(protoRepoMeta)
+	if err != nil {
+		return err
+	}
+
+	err = rc.Client.HSet(ctx, RepoMetaBuck, repo, repoMetaBlob).Err()
+	if err != nil {
+		rc.Log.Error().Err(err).Str("hset", RepoMetaBuck).Str("repo", repo).
+			Msg("failed to put repo meta record")
+
+		return fmt.Errorf("failed to put repometa record for repo %s: %w", repo, err)
+	}
+
+	return nil
+}
+
+// DecrementRepoStars subtracts 1 from the star count of an image.
+func (rc *RedisDB) DecrementRepoStars(repo string) error {
+	ctx := context.Background()
+
+	protoRepoMeta, err := rc.getProtoRepoMeta(ctx, repo)
+	if err != nil {
+		return err
+	}
+
+	if protoRepoMeta.Stars == 0 {
+		return nil
+	}
+
+	protoRepoMeta.Stars--
+
+	repoMetaBlob, err := proto.Marshal(protoRepoMeta)
+	if err != nil {
+		return err
+	}
+
+	err = rc.Client.HSet(ctx, RepoMetaBuck, repo, repoMetaBlob).Err()
+	if err != nil {
+		rc.Log.Error().Err(err).Str("hset", RepoMetaBuck).Str("repo", repo).
+			Msg("failed to put repo meta record")
+
+		return fmt.Errorf("failed to put repometa record for repo %s: %w", repo, err)
+	}
+
+	return nil
+}
+
+// SetRepoMeta returns RepoMetadata of a repo from the database.
+func (rc *RedisDB) SetRepoMeta(repo string, repoMeta mTypes.RepoMeta) error {
+	repoMeta.Name = repo
+
+	repoMetaBlob, err := proto.Marshal(mConvert.GetProtoRepoMeta(repoMeta))
+	if err != nil {
+		return err
+	}
+
+	// The last update time is set to 0 in order to force an update in case of a next storage parsing
+	protoTime := timestamppb.New(time.Time{})
+
+	protoTimeBlob, err := proto.Marshal(protoTime)
+	if err != nil {
+		return err
+	}
+
+	ctx := context.Background()
+
+	_, err = rc.Client.TxPipelined(ctx, func(txrp redis.Pipeliner) error {
+		if err := txrp.HSet(ctx, RepoMetaBuck, repo, repoMetaBlob).Err(); err != nil {
+			rc.Log.Error().Err(err).Str("hset", RepoMetaBuck).Str("repo", repo).
+				Msg("failed to put repo meta record")
+
+			return fmt.Errorf("failed to put repometa record for repo %s: %w", repo, err)
+		}
+
+		if err := txrp.HSet(ctx, RepoLastUpdatedBuck, repo, protoTimeBlob).Err(); err != nil {
+			rc.Log.Error().Err(err).Str("hset", RepoLastUpdatedBuck).Str("repo", repo).
+				Msg("failed to put repo last updated timestamp")
+
+			return fmt.Errorf("failed to put repo last updated record for repo %s: %w", repo, err)
+		}
+
+		return nil
+	})
+
+	return err
+}
+
+// DeleteRepoMeta.
+func (rc *RedisDB) DeleteRepoMeta(repo string) error {
+	ctx := context.Background()
+
+	_, err := rc.Client.TxPipelined(ctx, func(txrp redis.Pipeliner) error {
+		if err := txrp.HDel(ctx, RepoMetaBuck, repo).Err(); err != nil {
+			rc.Log.Error().Err(err).Str("hdel", RepoMetaBuck).Str("repo", repo).
+				Msg("failed to delete repo meta record")
+
+			return fmt.Errorf("failed to delete repometa record for repo %s: %w", repo, err)
+		}
+
+		if err := txrp.HDel(ctx, RepoBlobsBuck, repo).Err(); err != nil {
+			rc.Log.Error().Err(err).Str("hdel", RepoBlobsBuck).Str("repo", repo).
+				Msg("failed to put repo blobs record")
+
+			return fmt.Errorf("failed to delete repo blobs record for repo %s: %w", repo, err)
+		}
+
+		if err := txrp.HDel(ctx, RepoLastUpdatedBuck, repo).Err(); err != nil {
+			rc.Log.Error().Err(err).Str("hdel", RepoLastUpdatedBuck).Str("repo", repo).
+				Msg("failed to put repo last updated timestamp")
+
+			return fmt.Errorf("failed to delete repo last updated record for repo %s: %w", repo, err)
+		}
+
+		return nil
+	})
+
+	return err
+}
+
+// GetReferrersInfo returns a list of  for all referrers of the given digest that match one of the
+// artifact types.
+func (rc *RedisDB) GetReferrersInfo(repo string, referredDigest godigest.Digest,
+	artifactTypes []string,
+) ([]mTypes.ReferrerInfo, error) {
+	referrersInfoResult := []mTypes.ReferrerInfo{}
+	ctx := context.Background()
+
+	protoRepoMeta, err := rc.getProtoRepoMeta(ctx, repo)
+	if err != nil {
+		return referrersInfoResult, err
+	}
+
+	referrersInfo := protoRepoMeta.Referrers[referredDigest.String()].List
+
+	for i := range referrersInfo {
+		if !common.MatchesArtifactTypes(referrersInfo[i].ArtifactType, artifactTypes) {
+			continue
+		}
+
+		referrersInfoResult = append(referrersInfoResult, mTypes.ReferrerInfo{
+			Digest:       referrersInfo[i].Digest,
+			MediaType:    referrersInfo[i].MediaType,
+			ArtifactType: referrersInfo[i].ArtifactType,
+			Size:         int(referrersInfo[i].Size),
+			Annotations:  referrersInfo[i].Annotations,
+		})
+	}
+
+	return referrersInfoResult, err
+}
+
+// UpdateStatsOnDownload adds 1 to the download count of an image and sets the timestamp of download.
+func (rc *RedisDB) UpdateStatsOnDownload(repo string, reference string) error {
+	ctx := context.Background()
+
+	protoRepoMeta, err := rc.getProtoRepoMeta(ctx, repo)
+	if err != nil {
+		return err
+	}
+
+	manifestDigest := reference
+
+	if common.ReferenceIsTag(reference) {
+		descriptor, found := protoRepoMeta.Tags[reference]
+
+		if !found {
+			return zerr.ErrImageMetaNotFound
+		}
+
+		manifestDigest = descriptor.Digest
+	}
+
+	manifestStatistics, ok := protoRepoMeta.Statistics[manifestDigest]
+	if !ok {
+		return zerr.ErrImageMetaNotFound
+	}
+
+	manifestStatistics.DownloadCount++
+	manifestStatistics.LastPullTimestamp = timestamppb.Now()
+	protoRepoMeta.Statistics[manifestDigest] = manifestStatistics
+
+	repoMetaBlob, err := proto.Marshal(protoRepoMeta)
+	if err != nil {
+		return err
+	}
+
+	err = rc.Client.HSet(ctx, RepoMetaBuck, repo, repoMetaBlob).Err()
+	if err != nil {
+		rc.Log.Error().Err(err).Str("hset", RepoMetaBuck).Str("repo", repo).
+			Msg("failed to put repo meta record")
+
+		return fmt.Errorf("failed to put repometa record for repo %s: %w", repo, err)
+	}
+
+	return nil
+}
+
+// FilterImageMeta returns the image data for the given digests.
+func (rc *RedisDB) FilterImageMeta(ctx context.Context,
+	digests []string,
+) (map[mTypes.ImageDigest]mTypes.ImageMeta, error) {
+	imageMetaMap := map[string]mTypes.ImageMeta{}
+
+	for _, digest := range digests {
+		protoImageMeta, err := rc.getProtoImageMeta(ctx, digest)
+		if err != nil {
+			return imageMetaMap, err
+		}
+
+		if protoImageMeta.MediaType == ispec.MediaTypeImageIndex {
+			_, manifestDataList, err := rc.getAllContainedMeta(ctx, protoImageMeta)
+			if err != nil {
+				return imageMetaMap, err
+			}
+
+			protoImageMeta.Manifests = manifestDataList
+		}
+
+		imageMetaMap[digest] = mConvert.GetImageMeta(protoImageMeta)
+	}
+
+	return imageMetaMap, nil
+}
+
+/*
+	RemoveRepoReference removes the tag from RepoMetadata if the reference is a tag,
+
+it also removes its corresponding digest from Statistics, Signatures and Referrers if there are no tags
+pointing to it.
+If the reference is a digest then it will remove the digest from Statistics, Signatures and Referrers only
+if there are no tags pointing to the digest, otherwise it's noop.
+*/
+func (rc *RedisDB) RemoveRepoReference(repo, reference string, manifestDigest godigest.Digest) error {
+	ctx := context.Background()
+
+	protoRepoMeta, err := rc.getProtoRepoMeta(ctx, repo)
+	if err != nil {
+		if errors.Is(err, zerr.ErrRepoMetaNotFound) {
+			return nil
+		}
+
+		return err
+	}
+
+	protoImageMeta, err := rc.getProtoImageMeta(ctx, manifestDigest.String())
+	if err != nil {
+		if errors.Is(err, zerr.ErrImageMetaNotFound) {
+			return nil
+		}
+
+		return err
+	}
+
+	// Remove Referrers
+	if subject := mConvert.GetImageSubject(protoImageMeta); subject != nil {
+		referredDigest := subject.Digest.String()
+		refInfo := &proto_go.ReferrersInfo{}
+
+		if protoRepoMeta.Referrers[referredDigest] != nil {
+			refInfo = protoRepoMeta.Referrers[referredDigest]
+		}
+
+		referrers := refInfo.List
+
+		for i := range referrers {
+			if referrers[i].Digest == manifestDigest.String() {
+				referrers[i].Count -= 1
+
+				if referrers[i].Count == 0 || common.ReferenceIsDigest(reference) {
+					referrers = append(referrers[:i], referrers[i+1:]...)
+				}
+
+				break
+			}
+		}
+
+		refInfo.List = referrers
+
+		protoRepoMeta.Referrers[referredDigest] = refInfo
+	}
+
+	if !common.ReferenceIsDigest(reference) {
+		delete(protoRepoMeta.Tags, reference)
+	} else {
+		// remove all tags pointing to this digest
+		for tag, desc := range protoRepoMeta.Tags {
+			if desc.Digest == reference {
+				delete(protoRepoMeta.Tags, tag)
+			}
+		}
+	}
+
+	/* try to find at least one tag pointing to manifestDigest
+	if not found then we can also remove everything related to this digest */
+	var foundTag bool
+
+	for _, desc := range protoRepoMeta.Tags {
+		if desc.Digest == manifestDigest.String() {
+			foundTag = true
+		}
+	}
+
+	if !foundTag {
+		delete(protoRepoMeta.Statistics, manifestDigest.String())
+		delete(protoRepoMeta.Signatures, manifestDigest.String())
+		delete(protoRepoMeta.Referrers, manifestDigest.String())
+	}
+
+	repoBlobsBytes, err := rc.Client.HGet(ctx, RepoBlobsBuck, repo).Bytes()
+	if err != nil && !errors.Is(err, redis.Nil) {
+		rc.Log.Error().Err(err).Str("hget", RepoBlobsBuck).Str("repo", repo).
+			Msg("failed to get repo blobs record")
+
+		return fmt.Errorf("failed to get repo blobs record for repo %s: %w", repo, err)
+	}
+
+	repoBlobs, err := unmarshalProtoRepoBlobs(repo, repoBlobsBytes)
+	if err != nil {
+		return err
+	}
+
+	protoRepoMeta, repoBlobs = common.RemoveImageFromRepoMeta(protoRepoMeta, repoBlobs, reference)
+	protoTime := timestamppb.New(time.Now())
+
+	protoTimeBlob, err := proto.Marshal(protoTime)
+	if err != nil {
+		return err
+	}
+
+	repoBlobsBytes, err = proto.Marshal(repoBlobs)
+	if err != nil {
+		return err
+	}
+
+	repoMetaBlob, err := proto.Marshal(protoRepoMeta)
+	if err != nil {
+		return err
+	}
+
+	_, err = rc.Client.TxPipelined(ctx, func(txrp redis.Pipeliner) error {
+		if err := txrp.HSet(ctx, RepoLastUpdatedBuck, repo, protoTimeBlob).Err(); err != nil {
+			rc.Log.Error().Err(err).Str("hset", RepoLastUpdatedBuck).Str("repo", repo).
+				Msg("failed to put repo last updated timestamp")
+
+			return fmt.Errorf("failed to put repo last updated record for repo %s: %w", repo, err)
+		}
+
+		if err := txrp.HSet(ctx, RepoBlobsBuck, repo, repoBlobsBytes).Err(); err != nil {
+			rc.Log.Error().Err(err).Str("hset", RepoBlobsBuck).Str("repo", repo).
+				Msg("failed to put repo blobs record")
+
+			return fmt.Errorf("failed to set repo blobs record for repo %s: %w", repo, err)
+		}
+
+		if err := txrp.HSet(ctx, RepoMetaBuck, repo, repoMetaBlob).Err(); err != nil {
+			rc.Log.Error().Err(err).Str("hset", RepoMetaBuck).Str("repo", repo).
+				Msg("failed to put repo meta record")
+
+			return fmt.Errorf("failed to put repometa record for repo %s: %w", repo, err)
+		}
+
+		return nil
+	})
+
+	return err
+}
+
+// ResetRepoReferences resets all layout specific data (tags, signatures, referrers, etc.) but keep user and image
+// specific metadata such as star count, downloads other statistics.
+func (rc *RedisDB) ResetRepoReferences(repo string) error {
+	ctx := context.Background()
+
+	protoRepoMeta, err := rc.getProtoRepoMeta(ctx, repo)
+	if err != nil && !errors.Is(err, zerr.ErrRepoMetaNotFound) {
+		return err
+	}
+
+	repoMetaBlob, err := proto.Marshal(&proto_go.RepoMeta{
+		Name:       repo,
+		Statistics: protoRepoMeta.Statistics,
+		Stars:      protoRepoMeta.Stars,
+		Tags:       map[string]*proto_go.TagDescriptor{"": {}},
+		Signatures: map[string]*proto_go.ManifestSignatures{"": {Map: map[string]*proto_go.SignaturesInfo{"": {}}}},
+		Referrers:  map[string]*proto_go.ReferrersInfo{"": {}},
+	})
+	if err != nil {
+		return err
+	}
+
+	if err := rc.Client.HSet(ctx, RepoMetaBuck, repo, repoMetaBlob).Err(); err != nil {
+		rc.Log.Error().Err(err).Str("hset", RepoMetaBuck).Str("repo", repo).
+			Msg("failed to put repo meta record")
+
+		return fmt.Errorf("failed to put repometa record for repo %s: %w", repo, err)
+	}
+
+	return nil
+}
+
+func (rc *RedisDB) GetRepoLastUpdated(repo string) time.Time {
+	ctx := context.Background()
+
+	lastUpdatedBlob, err := rc.Client.HGet(ctx, RepoLastUpdatedBuck, repo).Bytes()
+	if err != nil {
+		rc.Log.Error().Err(err).Str("hget", RepoLastUpdatedBuck).Str("repo", repo).
+			Msg("failed to get repo last updated timestamp")
+
+		return time.Time{}
+	}
+
+	if len(lastUpdatedBlob) == 0 {
+		return time.Time{}
+	}
+
+	protoTime := &timestamppb.Timestamp{}
+
+	err = proto.Unmarshal(lastUpdatedBlob, protoTime)
+	if err != nil {
+		return time.Time{}
+	}
+
+	lastUpdated := *mConvert.GetTime(protoTime)
+
+	return lastUpdated
+}
+
+func (rc *RedisDB) GetAllRepoNames() ([]string, error) {
+	foundRepos := []string{}
+	ctx := context.Background()
+
+	repoMetaEntries, err := rc.Client.HGetAll(ctx, RepoMetaBuck).Result()
+	if err != nil {
+		rc.Log.Error().Err(err).Str("hgetall", RepoMetaBuck).Msg("failed to get all repo meta records")
+
+		return foundRepos, fmt.Errorf("failed to get all repometa records %w", err)
+	}
+
+	for repo := range repoMetaEntries {
+		foundRepos = append(foundRepos, repo)
+	}
+
+	return foundRepos, nil
+}
+
+// ResetDB will delete all data in the DB.
+func (rc *RedisDB) ResetDB() error {
+	ctx := context.Background()
+
+	_, err := rc.Client.TxPipelined(ctx, func(txrp redis.Pipeliner) error {
+		if err := txrp.Del(ctx, RepoMetaBuck).Err(); err != nil {
+			rc.Log.Error().Err(err).Str("del", RepoMetaBuck).Msg("failed to delete repo meta bucket")
+
+			return fmt.Errorf("failed to delete repo meta bucket: %w", err)
+		}
+
+		if err := txrp.Del(ctx, ImageMetaBuck).Err(); err != nil {
+			rc.Log.Error().Err(err).Str("del", ImageMetaBuck).Msg("failed to delete image meta bucket")
+
+			return fmt.Errorf("failed to delete image meta bucket: %w", err)
+		}
+
+		if err := txrp.Del(ctx, RepoBlobsBuck).Err(); err != nil {
+			rc.Log.Error().Err(err).Str("del", RepoBlobsBuck).Msg("failed to delete repo blobs bucket")
+
+			return fmt.Errorf("failed to delete repo blobs bucket: %w", err)
+		}
+
+		if err := txrp.Del(ctx, RepoLastUpdatedBuck).Err(); err != nil {
+			rc.Log.Error().Err(err).Str("del", RepoLastUpdatedBuck).Msg("failed to delete repo last updated bucket")
+
+			return fmt.Errorf("failed to delete repo last updated bucket: %w", err)
+		}
+
+		if err := txrp.Del(ctx, UserDataBucket).Err(); err != nil {
+			rc.Log.Error().Err(err).Str("del", UserDataBucket).Msg("failed to delete user data bucket")
+
+			return fmt.Errorf("failed to delete user data bucket: %w", err)
+		}
+
+		if err := txrp.Del(ctx, UserAPIKeysBucket).Err(); err != nil {
+			rc.Log.Error().Err(err).Str("del", UserAPIKeysBucket).Msg("failed to delete user api key bucket")
+
+			return fmt.Errorf("failed to delete user api key bucket: %w", err)
+		}
+
+		if err := txrp.Del(ctx, VersionBucket).Err(); err != nil {
+			rc.Log.Error().Err(err).Str("del", VersionBucket).Msg("failed to delete version bucket")
+
+			return fmt.Errorf("failed to delete version bucket: %w", err)
+		}
+
+		return nil
+	})
+
+	return err
+}
+
+func (rc *RedisDB) PatchDB() error {
+	var DBVersion string
+	ctx := context.Background()
+
+	DBVersion, err := rc.Client.Get(ctx, VersionBucket).Result()
+	if err != nil {
+		if !errors.Is(err, redis.Nil) {
+			rc.Log.Error().Err(err).Str("get", VersionBucket).Msg("failed to get db version")
+
+			return fmt.Errorf("patching the database failed, can't read db version: %w", err)
+		}
+
+		// this is a new DB, we need to initialize the version
+		if err := rc.Client.Set(ctx, VersionBucket, rc.Version, 0).Err(); err != nil {
+			rc.Log.Error().Err(err).Str("set", VersionBucket).
+				Str("value", version.CurrentVersion).Msg("failed to set db version")
+
+			return fmt.Errorf("patching the database failed, can't set db version: %w", err)
+		}
+
+		// No need to apply patches on a new DB
+		return nil
+	}
+
+	if version.GetVersionIndex(DBVersion) == -1 {
+		return fmt.Errorf("%w: %s could not identify patches", zerr.ErrInvalidMetaDBVersion, DBVersion)
+	}
+
+	for patchIndex, patch := range rc.Patches {
+		if patchIndex < version.GetVersionIndex(DBVersion) {
+			continue
+		}
+
+		err := patch(rc.Client)
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
 
 func (rc *RedisDB) ImageTrustStore() mTypes.ImageTrustStore {
 	return rc.imgTrustStore
@@ -260,3 +1971,137 @@ func (rc *RedisDB) ImageTrustStore() mTypes.ImageTrustStore {
 func (rc *RedisDB) SetImageTrustStore(imgTrustStore mTypes.ImageTrustStore) {
 	rc.imgTrustStore = imgTrustStore
 }
+
+// getUserBookmarksAndStarsNoError is used in several calls where we don't want
+// to fail if the user data is unavailable, such as the case of getting all repos for
+// anonymous users, or using metaDB internaly for CVE scanning repos.
+func (rc *RedisDB) getUserBookmarksAndStarsNoError(ctx context.Context) ([]string, []string) {
+	userData, err := rc.GetUserData(ctx)
+	if err != nil {
+		return []string{}, []string{}
+	}
+
+	return userData.BookmarkedRepos, userData.StarredRepos
+}
+
+func (rc *RedisDB) getProtoImageMeta(ctx context.Context, digest string) (*proto_go.ImageMeta, error) {
+	imageMetaBlob, err := rc.Client.HGet(ctx, ImageMetaBuck, digest).Bytes()
+	if err != nil && !errors.Is(err, redis.Nil) {
+		rc.Log.Error().Err(err).Str("hget", ImageMetaBuck).Str("digest", digest).
+			Msg("failed to get image meta record")
+
+		return nil, fmt.Errorf("failed to get image meta record for digest %s: %w", digest, err)
+	}
+
+	if len(imageMetaBlob) == 0 || errors.Is(err, redis.Nil) {
+		return nil, fmt.Errorf("%w for digest %s", zerr.ErrImageMetaNotFound, digest)
+	}
+
+	imageMeta := proto_go.ImageMeta{}
+
+	err = proto.Unmarshal(imageMetaBlob, &imageMeta)
+	if err != nil {
+		return nil, err
+	}
+
+	return &imageMeta, nil
+}
+
+func (rc *RedisDB) getAllContainedMeta(ctx context.Context, imageIndexData *proto_go.ImageMeta,
+) ([]*proto_go.ImageMeta, []*proto_go.ManifestMeta, error) {
+	manifestDataList := make([]*proto_go.ManifestMeta, 0, len(imageIndexData.Index.Index.Manifests))
+	imageMetaList := make([]*proto_go.ImageMeta, 0, len(imageIndexData.Index.Index.Manifests))
+
+	for _, manifest := range imageIndexData.Index.Index.Manifests {
+		imageManifestData, err := rc.getProtoImageMeta(ctx, manifest.Digest)
+		if err != nil {
+			return imageMetaList, manifestDataList, err
+		}
+
+		switch imageManifestData.MediaType {
+		case ispec.MediaTypeImageManifest:
+			imageMetaList = append(imageMetaList, imageManifestData)
+			manifestDataList = append(manifestDataList, imageManifestData.Manifests[0])
+		case ispec.MediaTypeImageIndex:
+			partialImageDataList, partialManifestDataList, err := rc.getAllContainedMeta(ctx, imageManifestData)
+			if err != nil {
+				return imageMetaList, manifestDataList, err
+			}
+
+			imageMetaList = append(imageMetaList, partialImageDataList...)
+			manifestDataList = append(manifestDataList, partialManifestDataList...)
+		}
+	}
+
+	return imageMetaList, manifestDataList, nil
+}
+
+func (rc *RedisDB) getProtoRepoMeta(ctx context.Context, repo string) (*proto_go.RepoMeta, error) {
+	repoMetaBlob, err := rc.Client.HGet(ctx, RepoMetaBuck, repo).Bytes()
+	if err != nil && !errors.Is(err, redis.Nil) {
+		rc.Log.Error().Err(err).Str("hget", RepoMetaBuck).Str("repo", repo).
+			Msg("failed to get repo meta record")
+
+		return nil, fmt.Errorf("failed to get repo meta record for repo %s: %w", repo, err)
+	}
+
+	return unmarshalProtoRepoMeta(repo, repoMetaBlob)
+}
+
+// unmarshalProtoRepoMeta will unmarshal the repoMeta blob and initialize nil maps. If the blob is empty
+// an empty initialized object is returned.
+func unmarshalProtoRepoMeta(repo string, repoMetaBlob []byte) (*proto_go.RepoMeta, error) {
+	protoRepoMeta := &proto_go.RepoMeta{
+		Name: repo,
+	}
+
+	if len(repoMetaBlob) > 0 {
+		err := proto.Unmarshal(repoMetaBlob, protoRepoMeta)
+		if err != nil {
+			return protoRepoMeta, err
+		}
+	}
+
+	if protoRepoMeta.Tags == nil {
+		protoRepoMeta.Tags = map[string]*proto_go.TagDescriptor{"": {}}
+	}
+
+	if protoRepoMeta.Statistics == nil {
+		protoRepoMeta.Statistics = map[string]*proto_go.DescriptorStatistics{"": {}}
+	}
+
+	if protoRepoMeta.Signatures == nil {
+		protoRepoMeta.Signatures = map[string]*proto_go.ManifestSignatures{"": {}}
+	}
+
+	if protoRepoMeta.Referrers == nil {
+		protoRepoMeta.Referrers = map[string]*proto_go.ReferrersInfo{"": {}}
+	}
+
+	if len(repoMetaBlob) == 0 {
+		return protoRepoMeta, zerr.ErrRepoMetaNotFound
+	}
+
+	return protoRepoMeta, nil
+}
+
+// unmarshalProtoRepoBlobs will unmarshal the repoBlobs blob and initialize nil maps. If the blob is empty
+// an empty initialized object is returned.
+func unmarshalProtoRepoBlobs(repo string, repoBlobsBytes []byte) (*proto_go.RepoBlobs, error) {
+	repoBlobs := &proto_go.RepoBlobs{
+		Name: repo,
+	}
+
+	if len(repoBlobsBytes) > 0 {
+		err := proto.Unmarshal(repoBlobsBytes, repoBlobs)
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	if repoBlobs.Blobs == nil {
+		repoBlobs.Blobs = map[string]*proto_go.BlobInfo{"": {}}
+	}
+
+	return repoBlobs, nil
+}
diff --git a/pkg/meta/redisdb/redis_test.go b/pkg/meta/redisdb/redis_test.go
new file mode 100644
index 000000000..d09d63796
--- /dev/null
+++ b/pkg/meta/redisdb/redis_test.go
@@ -0,0 +1,242 @@
+package redisdb_test
+
+import (
+	"context"
+	"fmt"
+	"strings"
+	"testing"
+
+	"github.com/alicebob/miniredis/v2"
+	"github.com/redis/go-redis/v9"
+	. "github.com/smartystreets/goconvey/convey"
+
+	"zotregistry.dev/zot/pkg/log"
+	"zotregistry.dev/zot/pkg/meta/redisdb"
+	mTypes "zotregistry.dev/zot/pkg/meta/types"
+)
+
+func TestRedis(t *testing.T) {
+	miniRedis := miniredis.RunT(t)
+
+	Convey("Test redis metadb implementation", t, func() {
+		log := log.NewLogger("debug", "")
+		So(log, ShouldNotBeNil)
+
+		opts, err := redis.ParseURL("redis://" + miniRedis.Addr())
+		So(err, ShouldBeNil)
+
+		client := redis.NewClient(opts)
+		defer DumpKeys(t, client) // Troubleshoot test failures
+
+		metaDB, err := redisdb.New(client, log)
+		So(err, ShouldBeNil)
+
+		Convey("Test repoMeta ops", func() {
+			ctx := context.Background()
+
+			// Create/Get repo meta
+			for i := range 5 {
+				repoName := fmt.Sprintf("repo%d", i+1)
+				digest := fmt.Sprintf("dig%d", i+1)
+
+				initialRepoMeta := mTypes.RepoMeta{
+					Name: repoName,
+					Tags: map[mTypes.Tag]mTypes.Descriptor{"tag": {Digest: digest}},
+
+					Statistics: map[mTypes.ImageDigest]mTypes.DescriptorStatistics{},
+					Signatures: map[mTypes.ImageDigest]mTypes.ManifestSignatures{},
+					Referrers:  map[mTypes.ImageDigest][]mTypes.ReferrerInfo{"digest": {{Digest: digest}}},
+				}
+
+				err = metaDB.SetRepoMeta(repoName, initialRepoMeta)
+				So(err, ShouldBeNil)
+
+				expectedRepoMeta, err := metaDB.GetRepoMeta(ctx, repoName)
+				So(err, ShouldBeNil)
+
+				So(expectedRepoMeta.Name, ShouldEqual, initialRepoMeta.Name)
+				So(expectedRepoMeta.Tags["tag"].Digest, ShouldEqual, initialRepoMeta.Tags["tag"].Digest)
+			}
+
+			// Get Multiple, Filter and Delete repo meta
+			repoMetas, err := metaDB.GetMultipleRepoMeta(ctx, func(repoMeta mTypes.RepoMeta) bool {
+				if strings.Contains(repoMeta.Name, "repo1") || strings.Contains(repoMeta.Name, "repo4") {
+					return true
+				}
+
+				return false
+			})
+			So(err, ShouldBeNil)
+			So(len(repoMetas), ShouldEqual, 2)
+			So(repoMetas[0].Name, ShouldNotEqual, repoMetas[1].Name)
+
+			for _, repoMeta := range repoMetas {
+				So(repoMeta.Name, ShouldBeIn, []string{"repo1", "repo4"})
+			}
+
+			repoMetas, err = metaDB.FilterRepos(ctx,
+				func(repo string) bool {
+					return true
+				},
+				func(repoMeta mTypes.RepoMeta) bool {
+					if strings.Contains(repoMeta.Tags["tag"].Digest, "dig2") ||
+						strings.Contains(repoMeta.Tags["tag"].Digest, "dig5") {
+						return true
+					}
+
+					return false
+				},
+			)
+			So(err, ShouldBeNil)
+			So(len(repoMetas), ShouldEqual, 2)
+			So(repoMetas[0].Tags["tag"].Digest, ShouldNotEqual, repoMetas[1].Tags["tag"].Digest)
+
+			for _, repoMeta := range repoMetas {
+				So(repoMeta.Tags["tag"].Digest, ShouldBeIn, []string{"dig2", "dig5"})
+			}
+
+			repoNames, err := metaDB.GetAllRepoNames()
+			So(err, ShouldBeNil)
+			So(len(repoNames), ShouldEqual, 5)
+
+			err = metaDB.DeleteRepoMeta("repo2")
+			So(err, ShouldBeNil)
+
+			repoMeta, err := metaDB.GetRepoMeta(ctx, "repo2")
+			So(err, ShouldNotBeNil)
+			So(repoMeta.Name, ShouldBeEmpty)
+
+			repoNames, err = metaDB.GetAllRepoNames()
+			So(err, ShouldBeNil)
+			So(len(repoNames), ShouldEqual, 4)
+
+			repoMetas, err = metaDB.GetMultipleRepoMeta(ctx, func(repoMeta mTypes.RepoMeta) bool { return true })
+			So(err, ShouldBeNil)
+			So(len(repoMetas), ShouldEqual, 4)
+
+			repoMetas, err = metaDB.GetMultipleRepoMeta(ctx, func(repoMeta mTypes.RepoMeta) bool { return false })
+			So(err, ShouldBeNil)
+			So(len(repoMetas), ShouldEqual, 0)
+
+			repoMetas, err = metaDB.FilterRepos(ctx,
+				func(repo string) bool {
+					result := strings.Contains(repo, "repo5")
+
+					return result
+				},
+				func(repoMeta mTypes.RepoMeta) bool {
+					if strings.Contains(repoMeta.Tags["tag"].Digest, "dig3") ||
+						strings.Contains(repoMeta.Tags["tag"].Digest, "dig5") {
+						return true
+					}
+
+					return false
+				},
+			)
+			So(err, ShouldBeNil)
+			So(len(repoMetas), ShouldEqual, 1)
+			So(repoMetas[0].Tags["tag"].Digest, ShouldEqual, "dig5")
+
+			repoMetas, err = metaDB.SearchRepos(ctx, "repo")
+			So(err, ShouldBeNil)
+			So(len(repoMetas), ShouldEqual, 4)
+
+			repoMetas, err = metaDB.SearchRepos(ctx, "epo3")
+			So(err, ShouldBeNil)
+			So(len(repoMetas), ShouldEqual, 1)
+
+			// Stars
+			repoMeta, err = metaDB.GetRepoMeta(ctx, "repo1")
+			So(err, ShouldBeNil)
+			So(repoMeta, ShouldNotBeNil)
+			So(repoMeta.StarCount, ShouldEqual, 0)
+
+			err = metaDB.IncrementRepoStars("repo1")
+			So(err, ShouldBeNil)
+			err = metaDB.IncrementRepoStars("repo1")
+			So(err, ShouldBeNil)
+
+			repoMeta, err = metaDB.GetRepoMeta(ctx, "repo1")
+			So(err, ShouldBeNil)
+			So(repoMeta, ShouldNotBeNil)
+			So(repoMeta.StarCount, ShouldEqual, 2)
+
+			err = metaDB.DecrementRepoStars("repo1")
+			So(err, ShouldBeNil)
+			err = metaDB.DecrementRepoStars("repo1")
+			So(err, ShouldBeNil)
+
+			repoMeta, err = metaDB.GetRepoMeta(ctx, "repo1")
+			So(err, ShouldBeNil)
+			So(repoMeta, ShouldNotBeNil)
+			So(repoMeta.StarCount, ShouldEqual, 0)
+
+			err = metaDB.DecrementRepoStars("repo1")
+			So(err, ShouldBeNil)
+
+			repoMeta, err = metaDB.GetRepoMeta(ctx, "repo1")
+			So(err, ShouldBeNil)
+			So(repoMeta, ShouldNotBeNil)
+			So(repoMeta.StarCount, ShouldEqual, 0)
+		})
+	})
+}
+
+func DumpKeys(t *testing.T, client *redis.Client) {
+	t.Helper()
+
+	// Retrieve all keys
+	keys, err := client.Keys(context.Background(), "*").Result()
+	if err != nil {
+		t.Log("Error retrieving keys:", err)
+
+		return
+	}
+
+	// Print the keys
+	t.Log("Keys in Redis:")
+
+	for _, key := range keys {
+		keyType, err := client.Type(context.Background(), key).Result()
+		if err != nil {
+			t.Logf("Error retrieving type for key %s: %v\n", key, err)
+
+			continue
+		}
+
+		var value string
+
+		switch keyType {
+		case "string":
+			value, err = client.Get(context.Background(), key).Result()
+		case "list":
+			values, err := client.LRange(context.Background(), key, 0, -1).Result()
+			if err == nil {
+				value = fmt.Sprintf("%v", values)
+			}
+		case "hash":
+			values, err := client.HGetAll(context.Background(), key).Result()
+			if err == nil {
+				value = fmt.Sprintf("%v", values)
+			}
+		case "set":
+			values, err := client.SMembers(context.Background(), key).Result()
+			if err == nil {
+				value = fmt.Sprintf("%v", values)
+			}
+		case "zset":
+			values, err := client.ZRange(context.Background(), key, 0, -1).Result()
+			if err == nil {
+				value = fmt.Sprintf("%v", values)
+			}
+		default:
+			value = "Unsupported type"
+		}
+
+		if err != nil {
+			t.Logf("Error retrieving value for key %s: %v\n", key, err)
+		} else {
+			t.Logf("Key: %s, Type: %s, Value: %s\n", key, keyType, value)
+		}
+	}
+}
diff --git a/pkg/meta/version/patches.go b/pkg/meta/version/patches.go
index bc125a051..70d050616 100644
--- a/pkg/meta/version/patches.go
+++ b/pkg/meta/version/patches.go
@@ -2,6 +2,7 @@ package version
 
 import (
 	"github.com/aws/aws-sdk-go-v2/service/dynamodb"
+	"github.com/redis/go-redis/v9"
 	"go.etcd.io/bbolt"
 )
 
@@ -12,3 +13,7 @@ func GetBoltDBPatches() []func(DB *bbolt.DB) error {
 func GetDynamoDBPatches() []func(client *dynamodb.Client, tableNames map[string]string) error {
 	return []func(client *dynamodb.Client, tableNames map[string]string) error{}
 }
+
+func GetRedisDBPatches() []func(client redis.UniversalClient) error {
+	return []func(client redis.UniversalClient) error{}
+}
diff --git a/pkg/meta/version/version_test.go b/pkg/meta/version/version_test.go
index e3bb26bd6..f22b1fa0a 100644
--- a/pkg/meta/version/version_test.go
+++ b/pkg/meta/version/version_test.go
@@ -3,21 +3,25 @@ package version_test
 import (
 	"context"
 	"errors"
+	"fmt"
 	"os"
 	"path"
 	"testing"
 
+	"github.com/alicebob/miniredis/v2"
 	"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
 	"github.com/aws/aws-sdk-go-v2/service/dynamodb"
 	"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
 	"github.com/aws/aws-sdk-go/aws"
 	guuid "github.com/gofrs/uuid"
+	"github.com/redis/go-redis/v9"
 	. "github.com/smartystreets/goconvey/convey"
 	"go.etcd.io/bbolt"
 
 	"zotregistry.dev/zot/pkg/log"
 	"zotregistry.dev/zot/pkg/meta/boltdb"
 	mdynamodb "zotregistry.dev/zot/pkg/meta/dynamodb"
+	"zotregistry.dev/zot/pkg/meta/redisdb"
 	"zotregistry.dev/zot/pkg/meta/version"
 	tskip "zotregistry.dev/zot/pkg/test/skip"
 )
@@ -211,3 +215,172 @@ func setDynamoDBVersion(client *dynamodb.Client, versTable, vers string) error {
 
 	return err
 }
+
+func TestVersioningRedisDB(t *testing.T) {
+	miniRedis := miniredis.RunT(t)
+
+	Convey("Tests", t, func() {
+		opts, err := redis.ParseURL("redis://" + miniRedis.Addr())
+		So(err, ShouldBeNil)
+
+		client := redis.NewClient(opts)
+		defer dumpRedisKeys(t, client) // Troubleshoot test failures
+
+		log := log.NewLogger("debug", "")
+		metaDB, err := redisdb.New(client, log)
+		So(err, ShouldBeNil)
+
+		So(metaDB.ResetDB(), ShouldBeNil)
+
+		ctx := context.Background()
+
+		Convey("empty initial version triggers setting the default", func() {
+			// Check no value is initially set
+			actualVersion, err := client.Get(ctx, redisdb.VersionBucket).Result()
+			So(err, ShouldEqual, redis.Nil)
+			So(actualVersion, ShouldEqual, "")
+
+			err = metaDB.PatchDB()
+			So(err, ShouldBeNil)
+
+			// Check default version is added in the DB
+			actualVersion, err = client.Get(ctx, redisdb.VersionBucket).Result()
+			So(err, ShouldBeNil)
+			So(actualVersion, ShouldEqual, version.CurrentVersion)
+		})
+
+		Convey("initial version with a bad value raises an error", func() {
+			// Set invalid initial value
+			err = client.Set(ctx, redisdb.VersionBucket, "VInvalid", 0).Err()
+			So(err, ShouldBeNil)
+
+			// Check error when attempting to patch
+			err = metaDB.PatchDB()
+			So(err, ShouldNotBeNil)
+		})
+
+		Convey("skip iterating patches", func() {
+			// Initialize DB version
+			metaDB.Version = version.Version1
+
+			// Patches have errors so we can check bad upgrade logic
+			metaDB.Patches = []func(client redis.UniversalClient) error{
+				func(client redis.UniversalClient) error { // V1 to V2
+					return ErrTestError
+				},
+				func(client redis.UniversalClient) error { // V2 to V3
+					return ErrTestError
+				},
+			}
+
+			// No patch should be applied for V1 so no error is expected
+			err = metaDB.PatchDB()
+			So(err, ShouldBeNil)
+		})
+
+		Convey("iterate over patches without any errors", func() {
+			// Initialize DB version with a lower version
+			metaDB.Version = version.Version1
+
+			err = metaDB.PatchDB()
+			So(err, ShouldBeNil)
+
+			// Now change to a newer DB version and apply patches
+			metaDB.Version = version.Version3
+
+			metaDB.Patches = []func(redis.UniversalClient) error{
+				func(client redis.UniversalClient) error { // V1 to V2
+					return nil
+				},
+				func(client redis.UniversalClient) error { // V2 to V3
+					return nil
+				},
+			}
+
+			err = metaDB.PatchDB()
+			So(err, ShouldBeNil)
+		})
+
+		Convey("iterate over patches with errors", func() {
+			// initialize DB version with a lower version
+			metaDB.Version = version.Version1
+
+			err = metaDB.PatchDB()
+			So(err, ShouldBeNil)
+
+			// now change to a newer DB version and apply patches
+			metaDB.Version = version.Version3
+
+			metaDB.Patches = []func(client redis.UniversalClient) error{
+				func(client redis.UniversalClient) error { // V1 to V2
+					return nil
+				},
+				func(client redis.UniversalClient) error { // V2 to V3
+					return ErrTestError
+				},
+			}
+
+			err = metaDB.PatchDB()
+			So(err, ShouldNotBeNil)
+		})
+	})
+}
+
+func dumpRedisKeys(t *testing.T, client *redis.Client) {
+	t.Helper()
+
+	// Retrieve all keys
+	keys, err := client.Keys(context.Background(), "*").Result()
+	if err != nil {
+		t.Log("Error retrieving keys:", err)
+
+		return
+	}
+
+	// Print the keys
+	t.Log("Keys in Redis:")
+
+	for _, key := range keys {
+		keyType, err := client.Type(context.Background(), key).Result()
+		if err != nil {
+			t.Logf("Error retrieving type for key %s: %v\n", key, err)
+
+			continue
+		}
+
+		var value string
+
+		switch keyType {
+		case "string":
+			value, err = client.Get(context.Background(), key).Result()
+		case "list":
+			values, err := client.LRange(context.Background(), key, 0, -1).Result()
+			if err == nil {
+				value = fmt.Sprintf("%v", values)
+			}
+		case "hash":
+			values, err := client.HGetAll(context.Background(), key).Result()
+			if err == nil {
+				value = fmt.Sprintf("%v", values)
+			}
+		case "set":
+			values, err := client.SMembers(context.Background(), key).Result()
+			if err == nil {
+				value = fmt.Sprintf("%v", values)
+			}
+		case "zset":
+			values, err := client.ZRange(context.Background(), key, 0, -1).Result()
+			if err == nil {
+				value = fmt.Sprintf("%v", values)
+			}
+		default:
+			value = "Unsupported type"
+		}
+
+		if err != nil {
+			t.Logf("Error retrieving value for key %s: %v\n", key, err)
+		} else {
+			t.Logf("Key: %s, Type: %s, Value: %s\n", key, keyType, value)
+		}
+	}
+}