Parallel Processing with Asynchronous Calls for OpeanAI API

I have written an app to fetch news from certain rss feeds and do some processes on them using chatgpt api.

I have scheduled fetching news and processing them every minute task, Then it caused problem. Every minute let's say script fetches 300 news, and chatgpt api tries to process all of these 300 news. After a minute without completing previous 300 it starts to process the next batch, so automatically can't complete the previous batch. What I need is, it will still fetch news every minute and once it was fetched, it will start to process it.

All the processing for each news article by chatgpt api was going to happen simultaneously and adding new articles won't interrupt previous. For example let's say 300 news has been fetched, all of them will be processed by chatgpt at the same time seperately.

I have updated my code like the following per OpenAI documentation, but can't make it work for some reason.

import openai
from django.utils import timezone
from .models import NewsArticle, UserPreference, SelectedNews
from .telegram_integration import send_telegram_message
import random
import logging
from openai import AsyncOpenAI


key = 'sk-xxxx' 

client = openai.AsyncOpenAI(api_key=key)


async def chunk_list(data, chunk_size):
    """Splits the list into smaller chunks."""
    for i in range(0, len(data), chunk_size):
        yield data[i:i + chunk_size]

async def save_news_to_database(news):
    # Implement logic to save news to the database
    pass



async def rate_news_headlines(news_articles):
    chunk_size = 5  # Adjust based on token size estimates and the number of articles
    for batch in chunk_list(news_articles, chunk_size):
        for article in batch:
            if "Azerbaijan" in article.headline:
                article.rating = 10
            else:
                completion = await client.chat.completions.create(
                    model="gpt-4o-mini",
                    messages=[
                        {"role": "system", "content": "You are an assistant that rates headlines."},
                        {"role": "user", "content": f"Rate the headline '{article.headline}' on a scale of 1 to 10 based on attention-grabbing potential. If it is about Azerbaijan or anyone Azerbaijani or officials, rate it 10. Only give out number as an output"}
                    ] 
                )

                article.rating = int(completion.choices[0].message.content)
                logging.info(f"Received response: {article.rating}")
                

async def paraphrase_content(news_articles):
    chunk_size = 5
    for batch in chunk_list(news_articles, chunk_size):
        for article in batch:
            if article.rating > 6:
                completion = await client.chat.completions.create(
                    model="gpt-4o-mini",
                    messages=[
                        {"role": "system", "content": "You are an assistant that accesses links to retrieve full news article content and paraphrases that content."},
                        {"role": "user", "content": f"Access the full content via provided link and paraphrase the full news content while keeping at least 75% of its size: {article.content}"}
                    ]
                )
                article.paraphrased_content = completion.choices[0].message.content
            article.save()

async def translate_content(news_articles):
    chunk_size = 5
    for batch in chunk_list(news_articles, chunk_size):
        for article in batch:
            completion = await client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[
                    {"role": "system", "content": "You are an assistant that translates content."},
                    {"role": "user", "content": f"If this news is in English, translate this content to Azerbaijani; if in another language, translate it into English first, then Azerbaijani. Give only Azerbaijani text as output: {article.paraphrased_content}"}
                ]
            )
            article.translated_content = completion.choices[0].message.content
            article.save()

async def categorize_processed_news(news_articles):
    """Categorize processed and translated news articles in Azerbaijani."""
    chunk_size = 5
    for batch in chunk_list(news_articles, chunk_size):
        for article in batch:
            if article.translated_content:  # Ensure the article has been translated
                completion = await client.chat.completions.create(
                    model="gpt-4o-mini",
                    messages=[
                        {"role": "system", "content": "You are an assistant that categorizes news articles."},
                        {"role": "user", "content": f"Categorize this news and name categories in Azerbaijani: {article.headline}\n\n{article.translated_content}"}
                    ]
                )
                article.category = completion.choices[0].message.content
                article.save()



async def process_news():
    recent_news = NewsArticle.objects.filter(
        timestamp__gte=timezone.now() - timezone.timedelta(hours=6)
    )
    
    # Step 1: Rate headlines in batches
    rate_news_headlines(recent_news)

    # Step 2: Paraphrase content for high-rated articles
    high_priority_news = [article for article in recent_news if article.rating > 6]
    paraphrase_content(high_priority_news)

    # Step 3: Translate content for high-priority articles
    translate_content(high_priority_news)

    # Step 4: Categorize processed and translated news
    categorize_processed_news(high_priority_news)


async def select_and_send_news():
    users = UserPreference.objects.all()
    for user in users:
        # Handle cases where excluded_categories might be None
        excluded_categories = user.excluded_categories.split(',') if user.excluded_categories else []

        # Filter news that does not belong to excluded categories and is processed
        available_news = NewsArticle.objects.filter(
            translated_content__isnull=False
        )

        if excluded_categories:
            available_news = available_news.exclude(category__in=excluded_categories)

        # Randomly select 5 news articles
        selected_news = random.sample(list(available_news), min(5, len(available_news)))

        for article in selected_news:
            # Save the selected news to the new table
            selected_news_entry = SelectedNews(
                headline=article.headline,
                content=article.content,
                paraphrased_content=article.paraphrased_content,
                translated_content=article.translated_content,
                category=article.category,
            )
            selected_news_entry.save()


            # Construct the message
            message = f"Content: {article.translated_content}\nLink: {article.link}"

            # Send the news to the user's Telegram
            send_telegram_message(user.telegram_number, message)

I keep getting this error, even tho I am doing as per OpenAI documentation:

Here is the error I get:

jango.core.exceptions.SynchronousOnlyOperation: You cannot call this from an async context - use a thread or sync_to_async.

Full details:

D:\Downloads\script\news_system\news\utils.py changed, reloading.
Watching for file changes with StatReloader
Performing system checks...
 
Exception in thread django-main-thread:
Traceback (most recent call last):
  File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\threading.py", line 1073, in _bootstrap_inner
    self.run()
  File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\threading.py", line 1010, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\utils\autoreload.py", line 64, in wrapper
    fn(*args, **kwargs)
  File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\core\management\commands\runserver.py", line 133, in inner_run
    self.check(display_num_errors=True)
  File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\core\management\base.py", line 486, in check
    all_issues = checks.run_checks(
                 ^^^^^^^^^^^^^^^^^^
  File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\core\checks\registry.py", line 88, in run_checks     
    new_errors = check(app_configs=app_configs, databases=databases)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\core\checks\urls.py", line 14, in check_url_config   
    return check_resolver(resolver)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\core\checks\urls.py", line 24, in check_resolver     
    return check_method()
           ^^^^^^^^^^^^^^
  File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\urls\resolvers.py", line 519, in check
    for pattern in self.url_patterns:
                   ^^^^^^^^^^^^^^^^^
  File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\utils\functional.py", line 47, in __get__
    res = instance.__dict__[self.name] = self.func(instance)
                                         ^^^^^^^^^^^^^^^^^^^
  File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\urls\resolvers.py", line 738, in url_patterns        
    patterns = getattr(self.urlconf_module, "urlpatterns", self.urlconf_module)
                       ^^^^^^^^^^^^^^^^^^^
  File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\utils\functional.py", line 47, in __get__
    res = instance.__dict__[self.name] = self.func(instance)
                                         ^^^^^^^^^^^^^^^^^^^
  File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\urls\resolvers.py", line 731, in urlconf_module      
    return import_module(self.urlconf_name)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\importlib\__init__.py", line 90, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<frozen importlib._bootstrap>", line 1387, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1360, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1331, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 935, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 995, in exec_module
  File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
  File "D:\Downloads\script\news_system\news_system\urls.py", line 7, in <module>
    path('news/', include('news.urls')),
                  ^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\urls\conf.py", line 39, in include
    urlconf_module = import_module(urlconf_module)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\importlib\__init__.py", line 90, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<frozen importlib._bootstrap>", line 1387, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1360, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1331, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 935, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 995, in exec_module
  File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
  File "D:\Downloads\script\news_system\news\urls.py", line 142, in <module>
    schedule.run_pending()
  File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\schedule\__init__.py", line 854, in run_pending
    default_scheduler.run_pending()
  File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\schedule\__init__.py", line 101, in run_pending
    self._run_job(job)
  File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\schedule\__init__.py", line 173, in _run_job
    ret = job.run()
          ^^^^^^^^^
  File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\schedule\__init__.py", line 691, in run
    ret = self.job_func()
          ^^^^^^^^^^^^^^^
  File "D:\Downloads\script\news_system\news\urls.py", line 137, in job_collect_and_process
    asyncio.run(process_news())
  File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\asyncio\runners.py", line 194, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\asyncio\base_events.py", line 687, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "D:\Downloads\script\news_system\news\urls.py", line 102, in process_news
    await process_news_articles(recent_news)
  File "D:\Downloads\script\news_system\news\urls.py", line 82, in process_news_articles
    rate_tasks = [rate_article(article, session) for article in news_articles]
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\db\models\query.py", line 400, in __iter__
    self._fetch_all()
  File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\db\models\query.py", line 1928, in _fetch_all        
    self._result_cache = list(self._iterable_class(self))
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\db\models\query.py", line 91, in __iter__
    results = compiler.execute_sql(
              ^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\db\models\sql\compiler.py", line 1560, in execute_sql
    cursor = self.connection.cursor()
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\rasul\AppData\Local\Programs\Python\Python312\Lib\site-packages\django\utils\asyncio.py", line 24, in inner
    raise SynchronousOnlyOperation(message)
django.core.exceptions.SynchronousOnlyOperation: You cannot call this from an async context - use a thread or sync_to_async.

How can I fix this issue?

I have tried to check the code for bugs, couldn't pinpoint what's wrong

Back to Top