Ошибка при обновлении таблицы с помощью задачи celery: OperationalError

Я использую задачу Celery и получил ошибку, которую я не понимаю.

Я прохожусь по таблице (содержащей определения запросов) для редактирования отсутствующих/несоответствующих данных в базе данных (используя API) и регистрирую несоответствия в другой таблице.

Если я запускаю запрос по одному, он работает, но когда я пытаюсь выполнить цикл запросов, я получаю ошибку

OperationalError('server closed the connection unexpectedly\n\tThis probably means the server terminated abnormally\n\tbefore or while processing the request.\n')

def DCF_edition(self):

    DCF_BEFORE_UPDATE = pd.DataFrame.from_records(DataCorrectionForm.objects.all().values())
    DCF_BEFORE_UPDATE = DCF_BEFORE_UPDATE.astype({'record_date': str,'created_date': str}) if not DCF_BEFORE_UPDATE.empty else DCF_BEFORE_UPDATE

    data = []
    # load queries definition
    queries = queries_definitions()
    if not queries.empty:
        for index, row in queries.iterrows():
           
            try:
                missing_or_inconsistent =  missing_or_inconsistent_data(row['ide'],row['query_type'],row['crf_name'].lower(),row['crf identification 
                data.append(missing_or_inconsistent[['dcf_ide','category','crf','crf_ide','pat','record_date','field_name','field_label','message','field_value','dcf_status','DEF','deactivated']])

                DCF_AFTER_UPDATE = pd.concat(data)
                DCF_AFTER_UPDATE = DCF_AFTER_UPDATE.drop_duplicates(keep='last')

                DCF_AFTER_UPDATE = DCF_AFTER_UPDATE.drop(['DEF'], axis=1)
                DCF_AFTER_UPDATE.rename(columns = {'pat':'patient',}, inplace = True)

            except Exception as e:
                Log.objects.create(dcf_edition_status=0,dcf_edition_exception=str(e)[:200])
                continue

        # Cast date into string format to be able to dumps data
        DCF_AFTER_UPDATE = DCF_AFTER_UPDATE.astype({'record_date': str}) if not DCF_AFTER_UPDATE.empty else DCF_AFTER_UPDATE
    
        records = json.loads(json.dumps(list(DCF_AFTER_UPDATE.T.to_dict().values())))
        for record in records:
            if not DCF_BEFORE_UPDATE.empty and record['dcf_ide'] in DCF_BEFORE_UPDATE.values:
                DataCorrectionForm.objects.filter(dcf_ide=record['dcf_ide']).update(dcf_status=2)
            else:
                DataCorrectionForm.objects.get_or_create(**record)
    
        # resolved dcf => status=0
        if not DCF_BEFORE_UPDATE.empty:
            records = json.loads(json.dumps(list(DCF_BEFORE_UPDATE.T.to_dict().values())))
            for record in records:
                if record['dcf_ide'] not in DCF_AFTER_UPDATE.values:
                    DataCorrectionForm.objects.filter(dcf_ide=record['dcf_ide']).update(dcf_status=0)
    
    Log.objects.create(dcf_edition_status=1)
    
    return True

Время жизни соединения с базой данных, как целое число секунд. Используйте 0 для закрытия соединений с базой данных в конце каждого запроса - историческое поведение Django - и None для неограниченных постоянных соединений.

Похоже, что ваша задача является долго выполняющейся задачей и должна удерживать соединение с бд в течение длительного периода времени. Вы пытались установить значение None

DATABASES = {
'default': env.db(),
}
# https://docs.djangoproject.com/en/3.1/ref/settings/#conn-max-age
DATABASES['default']['CONN_MAX_AGE'] = None

Сколько времени требуется для завершения вашей задачи? Это может быть проблема с настройками базы данных сервера, например, tcp_keepalives_ilde...

Вернуться на верх