Skip to content

Commit

Permalink
wait until nats v2.11
Browse files Browse the repository at this point in the history
  • Loading branch information
jibon57 committed Sep 1, 2024
1 parent 8dab328 commit 7f3269e
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 81 deletions.
4 changes: 2 additions & 2 deletions pkg/models/analytics_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func (m *AnalyticsModel) PrepareToExportAnalytics(roomId, sid, meta string) {

// lock to prevent this room re-creation until process finish
// otherwise will give an unexpected result
_ = m.natsService.LockRoomCreation(roomId, time.Second*5)
defer m.natsService.UnlockRoomCreation(roomId)
_ = m.rs.LockRoomCreation(roomId, time.Second*5)
defer m.rs.UnlockRoomCreation(roomId)

if _, err := os.Stat(*m.app.AnalyticsSettings.FilesStorePath); os.IsNotExist(err) {
err = os.MkdirAll(*m.app.AnalyticsSettings.FilesStorePath, os.ModePerm)
Expand Down
2 changes: 1 addition & 1 deletion pkg/models/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func NewRoomModel(app *config.AppConfig, ds *dbservice.DatabaseService, rs *redi
// CheckAndWaitUntilRoomCreationInProgress will check the process & wait if needed
func (m *RoomModel) CheckAndWaitUntilRoomCreationInProgress(roomId string) {
for {
locked := m.natsService.IsRoomCreationLock(roomId)
locked := m.rs.IsRoomCreationLock(roomId)
if locked {
log.Println(roomId, "room creation locked, waiting for:", config.WaitDurationIfRoomCreationLocked)
time.Sleep(config.WaitDurationIfRoomCreationLocked)
Expand Down
4 changes: 2 additions & 2 deletions pkg/models/room_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (m *RoomModel) CreateRoom(r *plugnmeet.CreateRoomReq) (*plugnmeet.ActiveRoo
m.preRoomCreationTasks(r)
// in preRoomCreationTasks we've added this room in progress list
// so, we'll just use deferring to clean this room at the end of this function
defer m.natsService.UnlockRoomCreation(r.RoomId)
defer m.rs.UnlockRoomCreation(r.RoomId)

// check if room already exists in db or not
roomDbInfo, err := m.ds.GetRoomInfoByRoomId(r.RoomId, 1)
Expand Down Expand Up @@ -188,7 +188,7 @@ func (m *RoomModel) preRoomCreationTasks(r *plugnmeet.CreateRoomReq) {
// set maximum 1 minute as TTL
// this way we can ensure that there will not be any deadlock
// otherwise in various reason key may stay in kv & create deadlock
err := m.natsService.LockRoomCreation(r.GetRoomId(), time.Minute*1)
err := m.rs.LockRoomCreation(r.GetRoomId(), time.Minute*1)
if err != nil {
log.Errorln(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/models/room_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (m *RoomModel) EndRoom(r *plugnmeet.RoomEndReq) (bool, string) {
func (m *RoomModel) OnAfterRoomEnded(roomId, roomSid, metadata string) {
// lock room creation otherwise may have an unexpected result
// if recreated before clean up completed
err := m.natsService.LockRoomCreation(roomId, config.WaitBeforeTriggerOnAfterRoomEnded+(time.Second*5))
err := m.rs.LockRoomCreation(roomId, config.WaitBeforeTriggerOnAfterRoomEnded+(time.Second*5))
if err != nil {
log.Errorln(err)
}
Expand Down Expand Up @@ -141,7 +141,7 @@ func (m *RoomModel) OnAfterRoomEnded(roomId, roomSid, metadata string) {

log.Infoln(fmt.Sprintf("roomId: %s has been cleaned properly", roomId))
// release the room
m.natsService.UnlockRoomCreation(roomId)
m.rs.UnlockRoomCreation(roomId)

// finally, create the analytics file
analyticsModel := NewAnalyticsModel(m.app, m.ds, m.rs)
Expand Down
6 changes: 3 additions & 3 deletions pkg/models/scheduler_duration.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ import (
)

func (m *SchedulerModel) checkRoomWithDuration() {
locked := m.natsService.IsSchedulerTaskLock("checkRoomWithDuration")
locked := m.rs.IsSchedulerTaskLock("checkRoomWithDuration")
if locked {
// if lock then we will not perform here
return
}

// now set lock
_ = m.natsService.LockSchedulerTask("checkRoomWithDuration", time.Minute*1)
_ = m.rs.LockSchedulerTask("checkRoomWithDuration", time.Minute*1)
// clean at the end
defer m.natsService.UnlockSchedulerTask("checkRoomWithDuration")
defer m.rs.UnlockSchedulerTask("checkRoomWithDuration")

rooms := m.rmDuration.GetRoomsWithDurationMap()
for i, r := range rooms {
Expand Down
6 changes: 3 additions & 3 deletions pkg/models/scheduler_room.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ import (

// activeRoomChecker will check & do reconciliation between DB & livekit
func (m *SchedulerModel) activeRoomChecker() {
locked := m.natsService.IsSchedulerTaskLock("activeRoomChecker")
locked := m.rs.IsSchedulerTaskLock("activeRoomChecker")
if locked {
// if lock then we will not perform here
return
}

// now set lock
_ = m.natsService.LockSchedulerTask("activeRoomChecker", time.Minute*10)
_ = m.rs.LockSchedulerTask("activeRoomChecker", time.Minute*10)
// clean at the end
defer m.natsService.UnlockSchedulerTask("activeRoomChecker")
defer m.rs.UnlockSchedulerTask("activeRoomChecker")

activeRooms, err := m.ds.GetActiveRoomsInfo()
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/models/scheduler_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ import (
// checkOnlineUsersStatus will compare last ping result
// and take the decision to update user's status
func (m *SchedulerModel) checkOnlineUsersStatus() {
locked := m.natsService.IsSchedulerTaskLock("checkOnlineUsersStatus")
locked := m.rs.IsSchedulerTaskLock("checkOnlineUsersStatus")
if locked {
// if lock then we will not perform here
return
}
// now set lock
_ = m.natsService.LockSchedulerTask("checkOnlineUsersStatus", time.Minute*1)
_ = m.rs.LockSchedulerTask("checkOnlineUsersStatus", time.Minute*1)
// clean at the end
defer m.natsService.UnlockSchedulerTask("checkOnlineUsersStatus")
defer m.rs.UnlockSchedulerTask("checkOnlineUsersStatus")

kl := m.app.JetStream.KeyValueStoreNames(context.Background())
for s := range kl.Name() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/models/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (m *UserModel) CommonValidation(c *fiber.Ctx) error {
// CheckAndWaitUntilRoomCreationInProgress will check the process & wait if needed
func (m *UserModel) CheckAndWaitUntilRoomCreationInProgress(roomId string) {
for {
locked := m.natsService.IsRoomCreationLock(roomId)
locked := m.rs.IsRoomCreationLock(roomId)
if locked {
log.Println(roomId, "joining not possible because of room locked, waiting for:", config.WaitDurationIfRoomCreationLocked)
time.Sleep(config.WaitDurationIfRoomCreationLocked)
Expand Down
64 changes: 0 additions & 64 deletions pkg/services/nats/lock.go

This file was deleted.

67 changes: 67 additions & 0 deletions pkg/services/redis/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package redisservice

import (
"fmt"
"time"
)

const (
RoomCreationLockKey = Prefix + "roomCreationLock-%s"
SchedulerLockKey = Prefix + "schedulerLock-%s"
)

func (s *RedisService) LockRoomCreation(roomId string, ttl time.Duration) error {
key := fmt.Sprintf(RoomCreationLockKey, roomId)
_, err := s.rc.Set(s.ctx, key, fmt.Sprintf("%d", time.Now().Unix()), ttl).Result()
if err != nil {
return err
}

return nil
}

func (s *RedisService) IsRoomCreationLock(roomId string) bool {
key := fmt.Sprintf(RoomCreationLockKey, roomId)
result, err := s.rc.Get(s.ctx, key).Result()
if err != nil {
return false
}

if result != "" {
return true
}

return false
}

func (s *RedisService) UnlockRoomCreation(roomId string) {
_, _ = s.rc.Del(s.ctx, fmt.Sprintf(RoomCreationLockKey, roomId)).Result()
}

func (s *RedisService) LockSchedulerTask(task string, ttl time.Duration) error {
key := fmt.Sprintf(SchedulerLockKey, task)
_, err := s.rc.Set(s.ctx, key, fmt.Sprintf("%d", time.Now().Unix()), ttl).Result()
if err != nil {
return err
}

return nil
}

func (s *RedisService) IsSchedulerTaskLock(task string) bool {
key := fmt.Sprintf(SchedulerLockKey, task)
result, err := s.rc.Get(s.ctx, key).Result()
if err != nil {
return false
}

if result != "" {
return true
}

return false
}

func (s *RedisService) UnlockSchedulerTask(task string) {
_, _ = s.rc.Del(s.ctx, fmt.Sprintf(SchedulerLockKey, task)).Result()
}

0 comments on commit 7f3269e

Please sign in to comment.