Unexpected behaviour with django with django channels

class SessionTakeOverAPIView(generics.GenericAPIView):
    """
    This API view allows a human or AI to take over a chat session.
    The view handles session takeover validation, updates the session state, 
    and broadcasts relevant events to the chat group.
    
    A POST request is used to trigger either a human or AI takeover of a session.

    Authentication is required to access this view.
    """

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.room_group_name = None

    permission_classes = [BotUserHasRequiredPermissionForMethod]
    post_permission_required = ['session.reply_session']
    queryset = Session.objects.select_related('bot').all()
    serializer_class = SessionTakeOverSerializer

    def get_object(self):
        """
        Retrieves the session object based on the session_id provided in the request data.
        Raises a 404 error if the session is not found.
        """
        try:
            return super().get_queryset().get(session_id=self.request.data.get('session_id'))
        except Session.DoesNotExist:
            raise Http404  # Return 404 error if session not found


    def handle_human_take_over(self):
        """
        Handles the logic when a human takes over the chat session. It performs the following:
        - Validates if the session is already taken over by another human.
        - Updates the session to reflect the current user as the human taking over.
        - Sends a message to the chat group about the takeover.
        - Creates a log entry for the takeover asynchronously.
        """
        request = self.request
        session: Session = self.get_object()

        # Check if the session is already taken over by another human
        if session.is_session_currently_taken_over:
            raise ValidationError({
                "detail" :f"Session is currently taken over by {session.currently_taken_over_by.get_full_name}"})

        # prepare the takeover message for the bot
        take_over_msg = f"{session.bot.human_take_over_msg} {request.user.get_full_name}"

        # Prepare the response message to send to the group
        response = {
            "event_type": EventTypeChoicesChat.HUMAN_TAKEOVER,
            "user_type": "client",
            "sender_icon": request.user.profile_icon_url,

            "chat_data": {
                "msg": take_over_msg,
                "recorded_audio_msg_link": None,
                "images_link": [],
                "files_link": []
            }
        }

        # Update the session state to reflect the human takeover
        session.currently_taken_over_by = request.user
        session.is_session_currently_taken_over = True
        session.is_ai_active = False  # AI is no longer in control

        session.save(
            update_fields=[
                'currently_taken_over_by',
                'is_session_currently_taken_over',
                'is_ai_active'
            ]
        )

        transaction.on_commit(
            lambda: send_data_to_channel_layer(
                self.room_group_name, 
                'send_chat_message', 
                response
            )
        )

        # Create an entry to log the session takeover action asynchronously
        create_session_take_over_info.delay(session.id, request.user.id)

    def handle_ai_take_over(self):
        """
        Handles the logic when an AI takes over the chat session. It performs the following:
        - Validates if AI is already active in the session.
        - Updates the session to reflect AI control and sends a message to the chat group.
        """
        session: Session = self.get_object()

        # Check if AI is already active in the session
        if session.is_ai_active:
            raise ValidationError({"detail": "AI is already in action"})

        # Prepare the AI takeover message from the bot
        take_over_msg = session.bot.ai_take_over_msg

        # Prepare the response message to send to the chat room group
        response = {
            "event_type": EventTypeChoicesChat.AI_TAKE_OVER,
            "user_type": "ai",
            "sender_icon": session.bot.bot_icon,

            "chat_data": {
                "msg": take_over_msg,
                "recorded_audio_msg_link": None,
                "images_link": [],
                "files_link": []
            }
        }

        # Update the session state to reflect AI takeover
        session.currently_taken_over_by = None  # No human is in control
        session.is_session_currently_taken_over = False
        session.is_ai_active = True  # AI is now in control
        
        session.save(
            update_fields=[
                'currently_taken_over_by',
                'is_session_currently_taken_over',
                'is_ai_active'
            ]
        )

        transaction.on_commit(
            lambda: send_data_to_channel_layer(
                self.room_group_name, 
                'send_chat_message', 
                response
            )
        )


    def post(self, request, *args, **kwargs):
        """
        Handles the POST request to take over the session (either by human or AI).
        It validates the session ID and checks if the session is active before 
        allowing a human or AI to take over.
        """

        serializer = self.serializer_class(data=request.data)
        serializer.is_valid(raise_exception=True)

        # Determine the type of takeover (human or AI) based on request data
        take_over_type = serializer.validated_data.get('take_over_type')
        self.room_group_name = constants.CHAT_SESSION_ROOM_GROUP_NAME.format(
            session_id=request.data.get('session_id')
        )  

        session: Session = self.get_object()
        if session.session_has_ended:
            raise ValidationError({"detail" :"Session is not active."})

        with transaction.atomic():
            if take_over_type == EventTypeChoicesChat.HUMAN_TAKEOVER:
                self.handle_human_take_over()  # Handle human takeover
            elif take_over_type == EventTypeChoicesChat.AI_TAKE_OVER:
                self.handle_ai_take_over()  # Handle AI takeover
            else:
                raise ValidationError({"detail": "Invalid takeover type."})

        return Response({"message": "Take over succeeded"})

this is an api to update my session model.

now here is the consumer

class ChatConsumer(AsyncWebsocketConsumer):
    """ WebSocket consumer for handling chat messages in a specific session."""

    async def connect(self):
        """Handle WebSocket connection initialization."""
        self.room_name = self.scope["url_route"]["kwargs"]["room_name"]
        self.room_group_name = constants.CHAT_SESSION_ROOM_GROUP_NAME.format(session_id=self.room_name)

        session = await self.__get_session({"session_id": self.room_name, "session_has_ended": False})
        if not session:
            logger.info(f"Session [{self.room_name}] expired or does not exist")
            # close the connection
            await self.close()
            return

        self.session = session

        await self.__set_attributes()
        await self.channel_layer.group_add(self.room_group_name, self.channel_name)
        await self.accept()

        # Verify connection conditions
        if not await self.__validate_connection():
            await self.__send_validation_failed_message()
            return

        # Send intro message if no messages exist
        if not await database_sync_to_async(lambda: session.has_messages)():
            await self.__send_intro_message()


    async def __set_attributes(self):
        self.org = self.session.org
        self.bot = self.session.bot
        self.platform = self.bot.platform
        self.language = self.bot.language
        self.ai_response_url = settings.AI_BASE_URL + f'/api/v1.0/ai/conversation/{self.org.id}/{self.bot.id}'
        self.goal = 'you are an intelligent customer service agent'
        self.customer_name = self.session.user_name if self.session.user_name else ""
        self.customer_email = self.session.user_email if self.session.user_email else ""
        self.user_type = None
        self.user = self.scope["user"] if not self.scope["user"].is_anonymous else None
    @database_sync_to_async
    def __get_session(self, filter_kwargs: dict):
        """Fetch a session with related data."""
        try:
            session = Session.objects.select_related(
                "bot",
                "org",
                "bot__language", 
                "bot__platform"
            ).get(**filter_kwargs)
            return session
        except Exception as err:
            logger.error(f"Error fetching session: {err}")
            return None

now when i am connected to my consumer then if i try to use the api that i have given above doesn't actually updates the database but if i am not connected to my consumer then the api works

more details is if i try to update those fields via django admin while connected to the consumer the update works fine but it doesn't work with the api i created

and the most unexpected scenario is if i remove `self.org` from the consumer then the api works, something is related to the organization model that i am trying to fetch

if i remove the part where i am sending a message to the channel layer via the api then the update works

i tried background task but didn't work as well

The issue occurs because your WebSocket consumer holds database objects (self.org, self.session) in memory, creating connection conflicts with your synchronous API.
This is a well-documented Django Channels problem related to database connection isolation between async and sync contexts. For more details, check the Django Channels documentation.

To solve your issue, try replacing the database object storage in your consumer's __set_attributes() Method you have like this:

async def __set_attributes(self):
    # DON'T store database objects - which causes connection conflicts
    # self.org = self.session.org  # Remove this
    # self.bot = self.session.bot  # Remove this
    
    # DO store only IDs and refresh data when needed
    session = await self.__get_session({"session_id": self.session_id})
    if not session:
        return
        
    self.org_id = session.org.id
    self.bot_id = session.bot.id
    self.ai_response_url = settings.AI_BASE_URL + f'/api/v1.0/ai/conversation/{session.org.id}/{session.bot.id}'
    # ... rest of your attributes

async def __get_fresh_session(self):
    """Get fresh session data from database when needed."""
    return await self.__get_session({"session_id": self.session_id})

By storing only IDs instead of full database objects, you prevent the consumer from holding stale database connections that conflict with your synchronous API operations.

Hope this will solve your issue.

Вернуться на верх