Django каналы self.send() из пользовательской функции
Возможно ли self.send() из пользовательской функции внутри consumer.py?
У меня есть игра в рулетку, которая запускается в цикле через каждые 30 секунд. Внутри этого цикла я вызываю функцию для сохранения информации о выигрыше в игре и обновления модели пользователя с его выигрышем. Я хотел бы также отправить эту информацию на внешний экран, чтобы пользователю не нужно было обновлять страницу для обновления своего баланса.
В настоящее время проблема заключается в том, что способ, которым я думал это сделать, не работает, поскольку self.send() внутри моей пользовательской функции save_roll() не работает.
Как можно достичь чего-то подобного?
consumers.py
@database_sync_to_async
def save_roll(self, result, roll_color, round_number):
winners = Bets.objects.filter(round_number=round_number, bet_choice=roll_color)
multiplier = 0
if roll_color == 'blue' or roll_color == 'red':
multiplier = 2
elif roll_color == 'green':
multiplier = 14
for winner in winners:
winnerId = winner.id
winnerBet = winner.bet_value
userObj = CustomUser.objects.filter(id=winnerId)[0]
newBalance = userObj.user_coins + (winnerBet * multiplier)
CustomUser.objects.filter(id=winnerId).update(user_coins=newBalance)
self.send(text_data = json.dumps({
'balance': newBalance
}))
Roulette.objects.create(win = result, roll_color = roll_color, round_number = round_number, is_over = True)
class RouletteGame(AsyncWebsocketConsumer):
async def connect(self):
self.connected = True
await self.accept()
while self.connected:
await asyncio.sleep(30)
server_seed = get_random_string(length=64)
public_seed = random.randint(0, 999999)
round = await get_round() + 1
print(round)
hash = pbkdf2_hmac('sha256', bytes(server_seed, 'utf-8'), bytes(public_seed), round)
result = int(hash[0:8].hex(), 16) % 15
roll_color = ''
if result == 0:
roll_color = 'green'
elif result >= 1 and result <= 7:
roll_color = 'blue'
elif result >= 8 and result <= 14:
roll_color = 'red'
await save_roll(self, result, roll_color, round)
await self.send(text_data = json.dumps({
'round_result': result
}))
async def disconnect(self):
self.connected = False