diff --git a/pkg/models/analytics_export.go b/pkg/models/analytics_export.go index 5a27feeb..d93fb165 100644 --- a/pkg/models/analytics_export.go +++ b/pkg/models/analytics_export.go @@ -46,8 +46,8 @@ func (m *AnalyticsModel) PrepareToExportAnalytics(roomId, sid, meta string) { // lock to prevent this room re-creation until process finish // otherwise will give an unexpected result - _ = m.natsService.LockRoomCreation(roomId, time.Second*5) - defer m.natsService.UnlockRoomCreation(roomId) + _ = m.rs.LockRoomCreation(roomId, time.Second*5) + defer m.rs.UnlockRoomCreation(roomId) if _, err := os.Stat(*m.app.AnalyticsSettings.FilesStorePath); os.IsNotExist(err) { err = os.MkdirAll(*m.app.AnalyticsSettings.FilesStorePath, os.ModePerm) diff --git a/pkg/models/room.go b/pkg/models/room.go index 30acedeb..6cf5d022 100644 --- a/pkg/models/room.go +++ b/pkg/models/room.go @@ -46,7 +46,7 @@ func NewRoomModel(app *config.AppConfig, ds *dbservice.DatabaseService, rs *redi // CheckAndWaitUntilRoomCreationInProgress will check the process & wait if needed func (m *RoomModel) CheckAndWaitUntilRoomCreationInProgress(roomId string) { for { - locked := m.natsService.IsRoomCreationLock(roomId) + locked := m.rs.IsRoomCreationLock(roomId) if locked { log.Println(roomId, "room creation locked, waiting for:", config.WaitDurationIfRoomCreationLocked) time.Sleep(config.WaitDurationIfRoomCreationLocked) diff --git a/pkg/models/room_create.go b/pkg/models/room_create.go index e87c9e96..fe9afddc 100644 --- a/pkg/models/room_create.go +++ b/pkg/models/room_create.go @@ -24,7 +24,7 @@ func (m *RoomModel) CreateRoom(r *plugnmeet.CreateRoomReq) (*plugnmeet.ActiveRoo m.preRoomCreationTasks(r) // in preRoomCreationTasks we've added this room in progress list // so, we'll just use deferring to clean this room at the end of this function - defer m.natsService.UnlockRoomCreation(r.RoomId) + defer m.rs.UnlockRoomCreation(r.RoomId) // check if room already exists in db or not roomDbInfo, err := m.ds.GetRoomInfoByRoomId(r.RoomId, 1) @@ -188,7 +188,7 @@ func (m *RoomModel) preRoomCreationTasks(r *plugnmeet.CreateRoomReq) { // set maximum 1 minute as TTL // this way we can ensure that there will not be any deadlock // otherwise in various reason key may stay in kv & create deadlock - err := m.natsService.LockRoomCreation(r.GetRoomId(), time.Minute*1) + err := m.rs.LockRoomCreation(r.GetRoomId(), time.Minute*1) if err != nil { log.Errorln(err) } diff --git a/pkg/models/room_end.go b/pkg/models/room_end.go index 1209b899..fc6466bd 100644 --- a/pkg/models/room_end.go +++ b/pkg/models/room_end.go @@ -58,7 +58,7 @@ func (m *RoomModel) EndRoom(r *plugnmeet.RoomEndReq) (bool, string) { func (m *RoomModel) OnAfterRoomEnded(roomId, roomSid, metadata string) { // lock room creation otherwise may have an unexpected result // if recreated before clean up completed - err := m.natsService.LockRoomCreation(roomId, config.WaitBeforeTriggerOnAfterRoomEnded+(time.Second*5)) + err := m.rs.LockRoomCreation(roomId, config.WaitBeforeTriggerOnAfterRoomEnded+(time.Second*5)) if err != nil { log.Errorln(err) } @@ -141,7 +141,7 @@ func (m *RoomModel) OnAfterRoomEnded(roomId, roomSid, metadata string) { log.Infoln(fmt.Sprintf("roomId: %s has been cleaned properly", roomId)) // release the room - m.natsService.UnlockRoomCreation(roomId) + m.rs.UnlockRoomCreation(roomId) // finally, create the analytics file analyticsModel := NewAnalyticsModel(m.app, m.ds, m.rs) diff --git a/pkg/models/scheduler_duration.go b/pkg/models/scheduler_duration.go index b9535ba7..cfb063a4 100644 --- a/pkg/models/scheduler_duration.go +++ b/pkg/models/scheduler_duration.go @@ -6,16 +6,16 @@ import ( ) func (m *SchedulerModel) checkRoomWithDuration() { - locked := m.natsService.IsSchedulerTaskLock("checkRoomWithDuration") + locked := m.rs.IsSchedulerTaskLock("checkRoomWithDuration") if locked { // if lock then we will not perform here return } // now set lock - _ = m.natsService.LockSchedulerTask("checkRoomWithDuration", time.Minute*1) + _ = m.rs.LockSchedulerTask("checkRoomWithDuration", time.Minute*1) // clean at the end - defer m.natsService.UnlockSchedulerTask("checkRoomWithDuration") + defer m.rs.UnlockSchedulerTask("checkRoomWithDuration") rooms := m.rmDuration.GetRoomsWithDurationMap() for i, r := range rooms { diff --git a/pkg/models/scheduler_room.go b/pkg/models/scheduler_room.go index 8819f212..8b18f469 100644 --- a/pkg/models/scheduler_room.go +++ b/pkg/models/scheduler_room.go @@ -9,16 +9,16 @@ import ( // activeRoomChecker will check & do reconciliation between DB & livekit func (m *SchedulerModel) activeRoomChecker() { - locked := m.natsService.IsSchedulerTaskLock("activeRoomChecker") + locked := m.rs.IsSchedulerTaskLock("activeRoomChecker") if locked { // if lock then we will not perform here return } // now set lock - _ = m.natsService.LockSchedulerTask("activeRoomChecker", time.Minute*10) + _ = m.rs.LockSchedulerTask("activeRoomChecker", time.Minute*10) // clean at the end - defer m.natsService.UnlockSchedulerTask("activeRoomChecker") + defer m.rs.UnlockSchedulerTask("activeRoomChecker") activeRooms, err := m.ds.GetActiveRoomsInfo() if err != nil { diff --git a/pkg/models/scheduler_user.go b/pkg/models/scheduler_user.go index 554407f9..fdcf09b3 100644 --- a/pkg/models/scheduler_user.go +++ b/pkg/models/scheduler_user.go @@ -13,15 +13,15 @@ import ( // checkOnlineUsersStatus will compare last ping result // and take the decision to update user's status func (m *SchedulerModel) checkOnlineUsersStatus() { - locked := m.natsService.IsSchedulerTaskLock("checkOnlineUsersStatus") + locked := m.rs.IsSchedulerTaskLock("checkOnlineUsersStatus") if locked { // if lock then we will not perform here return } // now set lock - _ = m.natsService.LockSchedulerTask("checkOnlineUsersStatus", time.Minute*1) + _ = m.rs.LockSchedulerTask("checkOnlineUsersStatus", time.Minute*1) // clean at the end - defer m.natsService.UnlockSchedulerTask("checkOnlineUsersStatus") + defer m.rs.UnlockSchedulerTask("checkOnlineUsersStatus") kl := m.app.JetStream.KeyValueStoreNames(context.Background()) for s := range kl.Name() { diff --git a/pkg/models/user.go b/pkg/models/user.go index c3ff96aa..8e8d34c1 100644 --- a/pkg/models/user.go +++ b/pkg/models/user.go @@ -59,7 +59,7 @@ func (m *UserModel) CommonValidation(c *fiber.Ctx) error { // CheckAndWaitUntilRoomCreationInProgress will check the process & wait if needed func (m *UserModel) CheckAndWaitUntilRoomCreationInProgress(roomId string) { for { - locked := m.natsService.IsRoomCreationLock(roomId) + locked := m.rs.IsRoomCreationLock(roomId) if locked { log.Println(roomId, "joining not possible because of room locked, waiting for:", config.WaitDurationIfRoomCreationLocked) time.Sleep(config.WaitDurationIfRoomCreationLocked) diff --git a/pkg/services/nats/lock.go b/pkg/services/nats/lock.go deleted file mode 100644 index a1321248..00000000 --- a/pkg/services/nats/lock.go +++ /dev/null @@ -1,64 +0,0 @@ -package natsservice - -import ( - "fmt" - "github.com/nats-io/nats.go/jetstream" - "time" -) - -const ( - RoomCreationLockBucket = Prefix + "roomCreationLock-%s" - SchedulerLockBucket = Prefix + "schedulerLock-%s" -) - -func (s *NatsService) LockRoomCreation(roomId string, ttl time.Duration) error { - _, err := s.js.CreateOrUpdateKeyValue(s.ctx, jetstream.KeyValueConfig{ - Bucket: fmt.Sprintf(RoomCreationLockBucket, roomId), - TTL: ttl, - }) - if err != nil { - return err - } - return nil -} - -func (s *NatsService) IsRoomCreationLock(roomId string) bool { - kv, err := s.js.KeyValue(s.ctx, fmt.Sprintf(RoomCreationLockBucket, roomId)) - if err != nil { - return false - } - if kv != nil { - return true - } - return false -} - -func (s *NatsService) UnlockRoomCreation(roomId string) { - _ = s.js.DeleteKeyValue(s.ctx, fmt.Sprintf(RoomCreationLockBucket, roomId)) -} - -func (s *NatsService) LockSchedulerTask(task string, ttl time.Duration) error { - _, err := s.js.CreateOrUpdateKeyValue(s.ctx, jetstream.KeyValueConfig{ - Bucket: fmt.Sprintf(SchedulerLockBucket, task), - TTL: ttl, - }) - if err != nil { - return err - } - return nil -} - -func (s *NatsService) IsSchedulerTaskLock(task string) bool { - kv, err := s.js.KeyValue(s.ctx, fmt.Sprintf(SchedulerLockBucket, task)) - if err != nil { - return false - } - if kv != nil { - return true - } - return false -} - -func (s *NatsService) UnlockSchedulerTask(task string) { - _ = s.js.DeleteKeyValue(s.ctx, fmt.Sprintf(SchedulerLockBucket, task)) -} diff --git a/pkg/services/redis/lock.go b/pkg/services/redis/lock.go new file mode 100644 index 00000000..a9d18058 --- /dev/null +++ b/pkg/services/redis/lock.go @@ -0,0 +1,67 @@ +package redisservice + +import ( + "fmt" + "time" +) + +const ( + RoomCreationLockKey = Prefix + "roomCreationLock-%s" + SchedulerLockKey = Prefix + "schedulerLock-%s" +) + +func (s *RedisService) LockRoomCreation(roomId string, ttl time.Duration) error { + key := fmt.Sprintf(RoomCreationLockKey, roomId) + _, err := s.rc.Set(s.ctx, key, fmt.Sprintf("%d", time.Now().Unix()), ttl).Result() + if err != nil { + return err + } + + return nil +} + +func (s *RedisService) IsRoomCreationLock(roomId string) bool { + key := fmt.Sprintf(RoomCreationLockKey, roomId) + result, err := s.rc.Get(s.ctx, key).Result() + if err != nil { + return false + } + + if result != "" { + return true + } + + return false +} + +func (s *RedisService) UnlockRoomCreation(roomId string) { + _, _ = s.rc.Del(s.ctx, fmt.Sprintf(RoomCreationLockKey, roomId)).Result() +} + +func (s *RedisService) LockSchedulerTask(task string, ttl time.Duration) error { + key := fmt.Sprintf(SchedulerLockKey, task) + _, err := s.rc.Set(s.ctx, key, fmt.Sprintf("%d", time.Now().Unix()), ttl).Result() + if err != nil { + return err + } + + return nil +} + +func (s *RedisService) IsSchedulerTaskLock(task string) bool { + key := fmt.Sprintf(SchedulerLockKey, task) + result, err := s.rc.Get(s.ctx, key).Result() + if err != nil { + return false + } + + if result != "" { + return true + } + + return false +} + +func (s *RedisService) UnlockSchedulerTask(task string) { + _, _ = s.rc.Del(s.ctx, fmt.Sprintf(SchedulerLockKey, task)).Result() +}