Skip to content

Commit

Permalink
Merge pull request #1450 from DiceDB/arpit-readiness-cli-chat
Browse files Browse the repository at this point in the history
Handshake and cleaner WatchManager implementation
  • Loading branch information
arpitbbhayani authored Feb 5, 2025
2 parents a2e53d9 + 3c0ceed commit 9de4871
Show file tree
Hide file tree
Showing 13 changed files with 151 additions and 74 deletions.
2 changes: 0 additions & 2 deletions examples/chatroom-go/Makefile

This file was deleted.

7 changes: 5 additions & 2 deletions examples/chatroom-go/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## Potential Risks
Chatroom
===

1. may have update when multiple updates happen at same time (granualrity)?
```sh
$ go run main.go <username>
```
2 changes: 1 addition & 1 deletion examples/chatroom-go/db/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ var (

func init() {
var err error
Client, err = dicedb.NewClient("localhost", 7379, dicedb.WithWatch())
Client, err = dicedb.NewClient("localhost", 7379)
if err != nil {
panic(err)
}
Expand Down
3 changes: 2 additions & 1 deletion examples/chatroom-go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ require (
github.com/charmbracelet/bubbles v0.20.0
github.com/charmbracelet/bubbletea v1.2.4
github.com/charmbracelet/lipgloss v1.0.0
github.com/dicedb/dicedb-go v1.0.1
)

require (
github.com/atotto/clipboard v0.1.4 // indirect
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
github.com/charmbracelet/x/ansi v0.4.5 // indirect
github.com/charmbracelet/x/term v0.2.1 // indirect
github.com/dicedb/dicedb-go v1.0.1 // indirect
github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-localereader v0.0.1 // indirect
Expand Down
6 changes: 6 additions & 0 deletions examples/chatroom-go/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ github.com/charmbracelet/x/term v0.2.1 h1:AQeHeLZ1OqSXhrAWpYUtZyX1T3zVxfpZuEQMIQ
github.com/charmbracelet/x/term v0.2.1/go.mod h1:oQ4enTYFV7QN4m0i9mzHrViD7TQKvNEEkHUMCmsxdUg=
github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f h1:Y/CXytFA4m6baUTXGLOoWe4PQhGxaX0KpnayAqC48p4=
github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f/go.mod h1:vw97MGsxSvLiUE2X8qFplwetxpGLQrlU1Q9AUEIzCaM=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY=
github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
Expand All @@ -41,5 +45,7 @@ golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.8 h1:nAL+RVCQ9uMn3vJZbV+MRnydTJFPf8qqY42YiA6MrqY=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.36.4 h1:6A3ZDJHn/eNqc1i+IdefRzy/9PokBTPvcqMySR7NNIM=
google.golang.org/protobuf v1.36.4/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
9 changes: 5 additions & 4 deletions examples/chatroom-go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@

package main

// A simple program demonstrating the text area component from the Bubbles
// component library.

import (
"chatroom-go/db"
"fmt"
Expand Down Expand Up @@ -33,7 +30,11 @@ func init() {
}

func loop() {
for resp := range db.Client.WatchCh() {
ch, err := db.Client.WatchCh()
if err != nil {
panic(err)
}
for resp := range ch {
fmt.Println(resp)
}
}
Expand Down
29 changes: 0 additions & 29 deletions internal/cmd/cmd_client_id.go

This file was deleted.

27 changes: 27 additions & 0 deletions internal/cmd/cmd_handshake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) 2022-present, DiceDB contributors
// All rights reserved. Licensed under the BSD 3-Clause License. See LICENSE file in the project root for full license information.

package cmd

import (
dstore "github.com/dicedb/dice/internal/store"
)

var cHANDSHAKE = &DiceDBCommand{
Name: "HANDSHAKE",
HelpShort: "HANDSHAKE is used to handshake with the database; sends client_id and execution mode",
Eval: evalHANDSHAKE,
}

func init() {
commandRegistry.AddCommand(cHANDSHAKE)
}

func evalHANDSHAKE(c *Cmd, s *dstore.Store) (*CmdRes, error) {
if len(c.C.Args) != 2 {
return cmdResNil, errWrongArgumentCount("HANDSHAKE")
}
c.ClientID = c.C.Args[0]
c.Mode = c.C.Args[1]
return cmdResOK, nil
}
22 changes: 22 additions & 0 deletions internal/cmd/cmd_mode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) 2022-present, DiceDB contributors
// All rights reserved. Licensed under the BSD 3-Clause License. See LICENSE file in the project root for full license information.

package cmd

import (
dstore "github.com/dicedb/dice/internal/store"
)

var cMODE = &DiceDBCommand{
Name: "MODE",
HelpShort: "MODE sets the mode of the client",
Eval: evalMODE,
}

func init() {
commandRegistry.AddCommand(cMODE)
}

func evalMODE(c *Cmd, s *dstore.Store) (*CmdRes, error) {
return cmdResOK, nil
}
2 changes: 2 additions & 0 deletions internal/cmd/cmds.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Cmd struct {
C *wire.Command
IsReplay bool
ClientID string
Mode string
}

func (c *Cmd) String() string {
Expand Down Expand Up @@ -83,6 +84,7 @@ func Execute(c *Cmd, s *store.Store) (*CmdRes, error) {
slog.Debug("command executed",
slog.Any("cmd", c.String()),
slog.String("client_id", c.ClientID),
slog.String("mode", c.Mode),
slog.Int("shard_id", s.ShardID),
slog.Any("took_ns", time.Since(start).Nanoseconds()))
return resp, err
Expand Down
7 changes: 7 additions & 0 deletions internal/server/ironhawk/iothread.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

type IOThread struct {
ClientID string
Mode string
IoHandler *IOHandler
Session *auth.Session
}
Expand All @@ -39,6 +40,7 @@ func (t *IOThread) StartSync(ctx context.Context, shardManager *ShardManager, wa

_c := &cmd.Cmd{C: c}
_c.ClientID = t.ClientID
_c.Mode = t.Mode

res, err := shardManager.Execute(_c)
if err != nil {
Expand All @@ -49,6 +51,11 @@ func (t *IOThread) StartSync(ctx context.Context, shardManager *ShardManager, wa
// Also, CLientID is duplicated in command and io-thread.
t.ClientID = _c.ClientID

if c.Cmd == "HANDSHAKE" {
t.ClientID = _c.C.Args[0]
t.Mode = _c.C.Args[1]
}

if strings.HasSuffix(c.Cmd, ".WATCH") {
watchManager.HandleWatch(_c, t)
}
Expand Down
10 changes: 8 additions & 2 deletions internal/server/ironhawk/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,15 @@ func (s *Server) startIOThread(ctx context.Context, wg *sync.WaitGroup, thread *
if err != nil {
if err == io.EOF {
s.watchManager.CleanupThreadWatchSubscriptions(thread)
slog.Debug("client disconnected. io-thread stopped", slog.String("client_id", thread.ClientID))
slog.Debug("client disconnected. io-thread stopped",
slog.String("client_id", thread.ClientID),
slog.String("mode", thread.Mode),
)
} else {
slog.Debug("io-thread errored out", slog.String("client_id", thread.ClientID), slog.Any("error", err))
slog.Debug("io-thread errored out",
slog.String("client_id", thread.ClientID),
slog.String("mode", thread.Mode),
slog.Any("error", err))
}
}
}
Expand Down
99 changes: 66 additions & 33 deletions internal/server/ironhawk/watch_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,88 +12,111 @@ import (
)

type WatchManager struct {
clientWatchThreadMap map[string]*IOThread

keyFPMap map[string]map[uint32]bool
fpThreadMap map[uint32]map[*IOThread]bool
fpClientMap map[uint32]map[string]bool
fpCmdMap map[uint32]*cmd.Cmd
}

func NewWatchManager() *WatchManager {
return &WatchManager{
clientWatchThreadMap: map[string]*IOThread{},

keyFPMap: map[string]map[uint32]bool{},
fpThreadMap: map[uint32]map[*IOThread]bool{},
fpClientMap: map[uint32]map[string]bool{},
fpCmdMap: map[uint32]*cmd.Cmd{},
}
}

func (w *WatchManager) HandleWatch(c *cmd.Cmd, t *IOThread) {
fp := c.GetFingerprint()
key := c.Key()
fp, key := c.GetFingerprint(), c.Key()
slog.Debug("creating a new subscription",
slog.String("key", key),
slog.String("cmd", c.String()),
slog.Any("fingerprint", fp))

// For the key that will be watched through any .WATCH command
// Create an entry in the map that holds, key <--> [command fingerprint] as map
if _, ok := w.keyFPMap[key]; !ok {
w.keyFPMap[key] = make(map[uint32]bool)
}
w.keyFPMap[key][fp] = true

if _, ok := w.fpThreadMap[fp]; !ok {
w.fpThreadMap[fp] = make(map[*IOThread]bool)
// For the fingerprint
// Create an entry in the map that holds, fingerprint <--> [client id] as map
// This tells us which clients are subscribed to a particular fingerprint
if _, ok := w.fpClientMap[fp]; !ok {
w.fpClientMap[fp] = make(map[string]bool)
}
w.fpThreadMap[fp][t] = true
w.fpClientMap[fp][t.ClientID] = true

// Store the fingerprint <--> command mapping
// so that we understand what should we execute when the data changes
w.fpCmdMap[fp] = c

// If the thread is a WATCH thread, store it in the map
// so that we can notify the clients when the data changes
if t.Mode == "WATCH" {
w.clientWatchThreadMap[t.ClientID] = t
}
}

func (w *WatchManager) HandleUnwatch(c *cmd.Cmd, t *IOThread) {
if len(c.C.Args) != 1 {
return
}

// Parse the fingerprint from the command
_fp, err := strconv.ParseUint(c.C.Args[0], 10, 32)
if err != nil {
return
}
fp := uint32(_fp)

delete(w.fpThreadMap[fp], t)
if len(w.fpThreadMap[fp]) == 0 {
delete(w.fpThreadMap, fp)
}
// Multiple clients can unsubscribe from the same fingerprint
// So, we need to delete the one that is unsubscribing
delete(w.fpClientMap[fp], t.ClientID)

for key, fpMap := range w.keyFPMap {
if _, ok := fpMap[fp]; ok {
delete(w.keyFPMap[key], fp)
}
if len(w.keyFPMap[key]) == 0 {
delete(w.keyFPMap, key)
}
// If a fingerprint has no clients subscribed to it, delete the fingerprint from the map.
if len(w.fpClientMap[fp]) == 0 {
delete(w.fpClientMap, fp)

// If we have deleted the fingerprint, delete the command from the map
delete(w.fpCmdMap, fp)
}

// TODO: Maintain ref count for gp -> cmd mapping
// delete it from delete(fpCmdMap, fp) only when ref count is 0
// check if any easier way to do this
// Delete the mapping where we have the key <--> [command fingerprint]
// This seems to be an O(n) operation.
// Downside of keeping this entry laying around is that
// If key k changes, then we may be iterating through the fingerprint that does not have any active watcher
// Hence we should do a lazy deletion.

// TODO: If the key gets deleted from the database
// delete the subscriptions against that key from all the places.
}

func (w *WatchManager) CleanupThreadWatchSubscriptions(t *IOThread) {
for fp, threadMap := range w.fpThreadMap {
if _, ok := threadMap[t]; ok {
delete(w.fpThreadMap[fp], t)
}
if len(w.fpThreadMap[fp]) == 0 {
delete(w.fpThreadMap, fp)
// Delete the mapping of Watch thread to client id
delete(w.clientWatchThreadMap, t.ClientID)

// Delete all the subscriptions of the client from the fingerprint maps
// Note: this is an O(n) operation and hence if there are large number of clients, this might be expensive.
// We can do a lazy deletion of the fingerprint map if this becomes a problem.
for fp := range w.fpClientMap {
delete(w.fpClientMap[fp], t.ClientID)
if len(w.fpClientMap[fp]) == 0 {
delete(w.fpClientMap, fp)
}
}
}

func (w *WatchManager) NotifyWatchers(c *cmd.Cmd, shardManager *ShardManager, t *IOThread) {
// TODO: During first WATCH call, we are getting the response multiple times on the Client
// Check if this is happening because of the way we are notifying the watchers
key := c.Key()
for fp := range w.keyFPMap[key] {
_c := w.fpCmdMap[fp]
if _c == nil {
// TODO: We might want to remove the key from keyFPMap if we don't have a command for it.
// TODO: Not having a command for a fingerprint is a bug.
continue
}

Expand All @@ -105,13 +128,23 @@ func (w *WatchManager) NotifyWatchers(c *cmd.Cmd, shardManager *ShardManager, t
continue
}

for thread := range w.fpThreadMap[fp] {
for clientID := range w.fpClientMap[fp] {
thread := w.clientWatchThreadMap[clientID]
if thread == nil {
// if there is no thread against the client, delete the client from the map
delete(w.clientWatchThreadMap, clientID)
continue
}

err := thread.IoHandler.WriteSync(context.Background(), r.R)
if err != nil {
slog.Error("failed to write response to thread", slog.Any("client_id", thread.ClientID), slog.Any("error", err))
slog.Error("failed to write response to thread",
slog.Any("client_id", thread.ClientID),
slog.String("mode", thread.Mode),
slog.Any("error", err))
}
}

slog.Debug("notifying watchers for key", slog.String("key", key), slog.Int("watchers", len(w.fpThreadMap[fp])))
slog.Debug("notifying watchers for key", slog.String("key", key), slog.Int("watchers", len(w.fpClientMap[fp])))
}
}

0 comments on commit 9de4871

Please sign in to comment.