Parallel Processing with Asynchronous Calls for OpeanAI API
I have written an app to fetch news from certain rss feeds and do some processes on them using chatgpt api.
I have scheduled fetching news and processing them every minute task, Then it caused problem. Every minute let's say script fetches 300 news, and chatgpt api tries to process all of these 300 news. After a minute without completing previous 300 it starts to process the next batch, so automatically can't complete the previous batch. What I need is, it will still fetch news every minute and once it was fetched, it will start to process it.
All the processing for each news article by chatgpt api was going to happen simultaneously and adding new articles won't interrupt previous. For example let's say 300 news has been fetched, all of them will be processed by chatgpt at the same time seperately.
I have updated my code like the following per OpenAI documentation, but can't make it work for some reason.
import openai
from django.utils import timezone
from .models import NewsArticle, UserPreference, SelectedNews
from .telegram_integration import send_telegram_message
import random
import logging
from openai import AsyncOpenAI
key = 'sk-xxxx'
client = openai.AsyncOpenAI(api_key=key)
async def chunk_list(data, chunk_size):
"""Splits the list into smaller chunks."""
for i in range(0, len(data), chunk_size):
yield data[i:i + chunk_size]
async def save_news_to_database(news):
# Implement logic to save news to the database
pass
async def rate_news_headlines(news_articles):
chunk_size = 5 # Adjust based on token size estimates and the number of articles
for batch in chunk_list(news_articles, chunk_size):
for article in batch:
if "Azerbaijan" in article.headline:
article.rating = 10
else:
completion = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are an assistant that rates headlines."},
{"role": "user", "content": f"Rate the headline '{article.headline}' on a scale of 1 to 10 based on attention-grabbing potential. If it is about Azerbaijan or anyone Azerbaijani or officials, rate it 10. Only give out number as an output"}
]
)
article.rating = int(completion.choices[0].message.content)
logging.info(f"Received response: {article.rating}")
async def paraphrase_content(news_articles):
chunk_size = 5
for batch in chunk_list(news_articles, chunk_size):
for article in batch:
if article.rating > 6:
completion = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are an assistant that accesses links to retrieve full news article content and paraphrases that content."},
{"role": "user", "content": f"Access the full content via provided link and paraphrase the full news content while keeping at least 75% of its size: {article.content}"}
]
)
article.paraphrased_content = completion.choices[0].message.content
article.save()
async def translate_content(news_articles):
chunk_size = 5
for batch in chunk_list(news_articles, chunk_size):
for article in batch:
completion = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are an assistant that translates content."},
{"role": "user", "content": f"If this news is in English, translate this content to Azerbaijani; if in another language, translate it into English first, then Azerbaijani. Give only Azerbaijani text as output: {article.paraphrased_content}"}
]
)
article.translated_content = completion.choices[0].message.content
article.save()
async def categorize_processed_news(news_articles):
"""Categorize processed and translated news articles in Azerbaijani."""
chunk_size = 5
for batch in chunk_list(news_articles, chunk_size):
for article in batch:
if article.translated_content: # Ensure the article has been translated
completion = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are an assistant that categorizes news articles."},
{"role": "user", "content": f"Categorize this news and name categories in Azerbaijani: {article.headline}\n\n{article.translated_content}"}
]
)
article.category = completion.choices[0].message.content
article.save()
async def process_news():
recent_news = NewsArticle.objects.filter(
timestamp__gte=timezone.now() - timezone.timedelta(hours=6)
)
# Step 1: Rate headlines in batches
rate_news_headlines(recent_news)
# Step 2: Paraphrase content for high-rated articles
high_priority_news = [article for article in recent_news if article.rating > 6]
paraphrase_content(high_priority_news)
# Step 3: Translate content for high-priority articles
translate_content(high_priority_news)
# Step 4: Categorize processed and translated news
categorize_processed_news(high_priority_news)
async def select_and_send_news():
users = UserPreference.objects.all()
for user in users:
# Handle cases where excluded_categories might be None
excluded_categories = user.excluded_categories.split(',') if user.excluded_categories else []
# Filter news that does not belong to excluded categories and is processed
available_news = NewsArticle.objects.filter(
translated_content__isnull=False
)
if excluded_categories:
available_news = available_news.exclude(category__in=excluded_categories)
# Randomly select 5 news articles
selected_news = random.sample(list(available_news), min(5, len(available_news)))
for article in selected_news:
# Save the selected news to the new table
selected_news_entry = SelectedNews(
headline=article.headline,
content=article.content,
paraphrased_content=article.paraphrased_content,
translated_content=article.translated_content,
category=article.category,
)
selected_news_entry.save()
# Construct the message
message = f"Content: {article.translated_content}\nLink: {article.link}"
# Send the news to the user's Telegram
send_telegram_message(user.telegram_number, message)
I keep getting this error, even tho I am doing as per OpenAI documentation:
Here is the error I get:
jango.core.exceptions.SynchronousOnlyOperation: You cannot call this from an async context - use a thread or sync_to_async.
Full details:
D:\Downloads\script\news_system\news\utils.py changed, reloading.
Watching for file changes with StatReloader
Performing system checks...
Exception in thread django-main-thread:
Traceback (most recent call last):
File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\threading.py", line 1073, in _bootstrap_inner
self.run()
File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\threading.py", line 1010, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\utils\autoreload.py", line 64, in wrapper
fn(*args, **kwargs)
File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\core\management\commands\runserver.py", line 133, in inner_run
self.check(display_num_errors=True)
File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\core\management\base.py", line 486, in check
all_issues = checks.run_checks(
^^^^^^^^^^^^^^^^^^
File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\core\checks\registry.py", line 88, in run_checks
new_errors = check(app_configs=app_configs, databases=databases)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\core\checks\urls.py", line 14, in check_url_config
return check_resolver(resolver)
^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\core\checks\urls.py", line 24, in check_resolver
return check_method()
^^^^^^^^^^^^^^
File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\urls\resolvers.py", line 519, in check
for pattern in self.url_patterns:
^^^^^^^^^^^^^^^^^
File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\utils\functional.py", line 47, in __get__
res = instance.__dict__[self.name] = self.func(instance)
^^^^^^^^^^^^^^^^^^^
File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\urls\resolvers.py", line 738, in url_patterns
patterns = getattr(self.urlconf_module, "urlpatterns", self.urlconf_module)
^^^^^^^^^^^^^^^^^^^
File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\utils\functional.py", line 47, in __get__
res = instance.__dict__[self.name] = self.func(instance)
^^^^^^^^^^^^^^^^^^^
File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\urls\resolvers.py", line 731, in urlconf_module
return import_module(self.urlconf_name)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\importlib\__init__.py", line 90, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<frozen importlib._bootstrap>", line 1387, in _gcd_import
File "<frozen importlib._bootstrap>", line 1360, in _find_and_load
File "<frozen importlib._bootstrap>", line 1331, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 935, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 995, in exec_module
File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
File "D:\Downloads\script\news_system\news_system\urls.py", line 7, in <module>
path('news/', include('news.urls')),
^^^^^^^^^^^^^^^^^^^^
File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\urls\conf.py", line 39, in include
urlconf_module = import_module(urlconf_module)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\importlib\__init__.py", line 90, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<frozen importlib._bootstrap>", line 1387, in _gcd_import
File "<frozen importlib._bootstrap>", line 1360, in _find_and_load
File "<frozen importlib._bootstrap>", line 1331, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 935, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 995, in exec_module
File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
File "D:\Downloads\script\news_system\news\urls.py", line 142, in <module>
schedule.run_pending()
File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\schedule\__init__.py", line 854, in run_pending
default_scheduler.run_pending()
File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\schedule\__init__.py", line 101, in run_pending
self._run_job(job)
File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\schedule\__init__.py", line 173, in _run_job
ret = job.run()
^^^^^^^^^
File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\schedule\__init__.py", line 691, in run
ret = self.job_func()
^^^^^^^^^^^^^^^
File "D:\Downloads\script\news_system\news\urls.py", line 137, in job_collect_and_process
asyncio.run(process_news())
File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\asyncio\runners.py", line 194, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\asyncio\runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\asyncio\base_events.py", line 687, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "D:\Downloads\script\news_system\news\urls.py", line 102, in process_news
await process_news_articles(recent_news)
File "D:\Downloads\script\news_system\news\urls.py", line 82, in process_news_articles
rate_tasks = [rate_article(article, session) for article in news_articles]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\db\models\query.py", line 400, in __iter__
self._fetch_all()
File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\db\models\query.py", line 1928, in _fetch_all
self._result_cache = list(self._iterable_class(self))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\db\models\query.py", line 91, in __iter__
results = compiler.execute_sql(
^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\db\models\sql\compiler.py", line 1560, in execute_sql
cursor = self.connection.cursor()
^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\utils\asyncio.py", line 24, in inner
raise SynchronousOnlyOperation(message)
django.core.exceptions.SynchronousOnlyOperation: You cannot call this from an async context - use a thread or sync_to_async.
How can I fix this issue?
I have tried to check the code for bugs, couldn't pinpoint what's wrong