-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsumers.py
127 lines (100 loc) · 4.35 KB
/
consumers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import json
from django.shortcuts import get_object_or_404
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from django.utils.timezone import now
from django.conf import settings
from typing import Generator
from djangochannelsrestframework.generics import GenericAsyncAPIConsumer, AsyncAPIConsumer
from djangochannelsrestframework.observer.generics import (ObserverModelInstanceMixin, action)
from djangochannelsrestframework.observer import model_observer
from .models import Room, Message
from .serializers import MessageSerializer, RoomSerializer
from users.models import User
from users.serializers import UserSerializer
class UserConsumerObserver(GenericAsyncAPIConsumer):
queryset = User.objects.all()
serializer_class = UserSerializer
async def accept(self, **kwargs):
await super().accept(**kwargs)
await self.model_change.subscribe()
@model_observer(User)
async def model_change(self, message, **kwargs):
await self.send_json(message)
class RoomConsumer(ObserverModelInstanceMixin, GenericAsyncAPIConsumer):
queryset = Room.objects.all()
serializer_class = RoomSerializer
lookup_field = "code"
# async def accept(self, **kwargs):
# await super().accept(**kwargs)
async def disconnect(self, code):
if hasattr(self, "room_subscribe"):
await self.remove_user_from_room(self.room_subscribe)
await self.notify_users()
await super().disconnect(code)
# def retrieve(self, **kwargs):
# instance = self.get_object(**kwargs)
# serializer = self.get_serializer(instance=instance, action_kwargs=kwargs)
# return serializer.data, status.HTTP_200_OK
def get_object(self, **kwargs) -> Room:
return get_object_or_404(Room, code=kwargs["code"])
@action()
async def join_room(self, code, **kwargs):
self.room_subscribe = code
await self.add_user_to_room(code)
await self.notify_users()
@action()
async def leave_room(self, pk, **kwargs):
await self.remove_user_from_room(pk)
@action()
async def create_message(self, message, **kwargs):
room:Room = await self.get_room(code=self.room_subscribe)
await database_sync_to_async(Message.objects.create)(
room=room,
user=self.scope["user"],
text=message)
@action()
async def subscribe_to_messages_in_room(self, code, **kwargs):
room : Room = await self.get_room(code)
await self.message_activity.subscribe(room=room.pk)
@model_observer(Message)
async def message_activity(self, message, observer=None, **kwargs):
await self.send_json(message)
@message_activity.groups_for_signal
def message_activity(self, instance: Message, **kwargs):
yield 'room__%s' % instance.room_id
yield 'pk__%s' % instance.pk
@message_activity.groups_for_consumer
def message_activity(self, room=None, **kwargs):
if room is not None:
yield 'room__%s' % room
@message_activity.serializer
def message_activiy(self, instance:Message, action, **kwargs):
return dict(data=MessageSerializer(instance).data, action=action.value, pk=instance.pk)
async def notify_users(self):
room:Room = await self.get_room(self.room_subscribe)
for group in self.groups:
await self.channel_layer.group_send(
group,
{
'type':'update_users',
'usuarios':await self.current_users(room)
}
)
async def update_users(self, event:dict):
await self.send(text_data=json.dumps({'usuarios':event["usuarios"]}))
@database_sync_to_async
def get_room(self, code:str)->Room:
return Room.objects.get(code=code)
@database_sync_to_async
def current_users(self, room:Room):
return [UserSerializer(usuario).data for usuario in room.current_users.all()]
@database_sync_to_async
def remove_user_from_room(self, room):
user:Usuario = self.scope["user"]
user.current_rooms.remove(room)
@database_sync_to_async
def add_user_to_room(self, code):
user:Usuario = self.scope["user"]
if not user.current_rooms.filter(code=self.room_subscribe).exists():
user.current_rooms.add(Room.objects.get(code=code))