Celery task completion check isn't checking the correct task

I'm trying to have a task to process multiple orders in parallel. I used a chord, and send a the chord id back to the front-end to periodically check the task progress, however, the id I send is actually getting the results of the parent task that I nested the chord in? I'm not really sure what's wrong.

Here is my parent task and Django endpoint

@shared_task
def bulk_create_shipping_labels_task(skus, carrier, service, measurements, user_id):
    start_time = time.time()
    logger.info(f"Starting bulk shipping label creation for SKUs: {skus}")
    
    orders = Order.objects.filter(
        Q(items__sku__sku__in=skus) & Q(order_status='pending_fulfillment')
    ).distinct().values_list('order_id', flat=True)

    # Create a chord of tasks
    header = [process_single_order.s(order_id, user_id, carrier, service, measurements) for order_id in orders]
    callback = collect_bulk_results.s()
    
    # Execute the chord
    chord_result = chord(header)(callback)

    print('chord_result', chord_result)
    
    end_time = time.time()
    total_time = end_time - start_time

    logger.info(f"Bulk shipping labels processing started. Total setup time: {total_time:.2f} seconds")

    chord_info = {
        'id': chord_result.id,
        'state': chord_result.state,
        'ready': chord_result.ready(),
        'successful': chord_result.successful(),
        'failed': chord_result.failed(),
    }

    return {
        'message': 'Bulk shipping labels processing started',
        'task_id': chord_result.id,
        'chord_info': chord_info
    }
@api_view(['GET'])
def check_bulk_shipping_status(request, task_id):
    logger.info(f"Checking status of bulk shipping task: {task_id}")
    try:
        task_result = AsyncResult(task_id)
        
        if task_result.ready():
            result = task_result.get()
            print('result', result)
            return Response({
                'status': 'completed',
                'message': 'All tasks have finished processing',
            })
        else:
            return Response({
                'status': 'processing',
                'message': 'Tasks are still processing'
            })
    except Exception as e:
        return Response({
            'status': 'error',
            'error': str(e)
        }, status=500)

I tried using groups instead and calling the group ID there, but it would always say the group doesn't exist.

Вернуться на верх