Что такое Celery
Celery — это распределённая система очередей задач для Python. Она позволяет выполнять задачи асинхронно, планировать выполнение и обрабатывать ошибки.
▸Компоненты Celery
Настройка
▸Установка
1pip install celery[redis]
▸Конфигурация (celery.py)
1from celery import Celery23app = Celery('myproject')45app.config_from_object('django.conf:settings', namespace='CELERY')67# Автоматическое обнаружение задач8app.autodiscover_tasks()910# Конфигурация (settings.py)11CELERY_BROKER_URL = 'redis://localhost:6379/0'12CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'13CELERY_ACCEPT_CONTENT = ['json']14CELERY_TASK_SERIALIZER = 'json'15CELERY_RESULT_SERIALIZER = 'json'
Определение задач
▸Простые задачи
1# tasks.py2from celery import shared_task3import time45@shared_task6def add(x, y):7 return x + y89@shared_task10def process_data(data_id):11 data = Data.objects.get(id=data_id)12 # Длительная обработка13 result = heavy_computation(data)14 data.result = result15 data.save()16 return result
▸Задачи с результатами
1@shared_task(bind=True, max_retries=3)2def send_email(self, to, subject, body):3 try:4 # Отправка email5 smtp_send(to, subject, body)6 except Exception as exc:7 # Повторная попытка через 60 секунд8 self.retry(exc=exc, countdown=60)
▸Задачи с таймаутами
1@shared_task(time_limit=300, soft_time_limit=250)2def long_running_task():3 # 5 минут максимум, предупреждение через 4 минуты 10 секунд4 time.sleep(300)
Вызов задач
▸Асинхронный вызов
1# Из Django view2from .tasks import process_data34def my_view(request):5 data_id = request.POST['data_id']6 # Задача будет выполнена асинхронно7 task = process_data.delay(data_id)8 return JsonResponse({'task_id': task.id})
▸Ожидание результата
1# Синхронное ожидание (не рекомендуется в web)2result = process_data.delay(data_id).get(timeout=10)34# Асинхронное ожидание5from celery.result import AsyncResult67task = process_data.delay(data_id)8# Позже проверяем результат9result = AsyncResult(task.id)10if result.ready():11 data = result.get()
Планировщик (Celery Beat)
1# settings.py2CELERY_BEAT_SCHEDULE = {3 'cleanup-every-hour': {4 'task': 'myapp.tasks.cleanup',5 'schedule': crontab(minute=0), # Каждый час6 },7 'daily-report': {8 'task': 'myapp.tasks.generate_report',9 'schedule': crontab(hour=8, minute=0), # Каждый день в 8:0010 },11 'weekly-cleanup': {12 'task': 'myapp.tasks.weekly_cleanup',13 'schedule': crontab(hour=2, minute=0, day_of_week=0), # Каждое воскресенье в 2:0014 },15}
Группы и цепочки
1from celery import group, chain, chord23# Группа — параллельное выполнение4job = group(add.s(i, i) for i in range(10))5result = job.apply_async()67# Цепочка — последовательное выполнение8job = chain(9 fetch_data.s(url),10 process_data.s(),11 save_result.s()12)13result = job.apply_async()1415# Аккорд — цепочка с колбэком16header = group(fetch_data.s(url) for url in urls)17callback = process_results.s()18result = chord(header)(callback)
Мониторинг
▸Flower
1pip install flower2celery -A myproject flower
▸Просмотр задач
1from celery.result import AsyncResult23task_id = 'abc123'4result = AsyncResult(task_id)56print(result.state) # PENDING, STARTED, SUCCESS, FAILURE7print(result.info) # Информация о задаче8print(result.result) # Результат (если есть)
Обработка ошибок
1from celery.exceptions import MaxRetriesExceededError23@shared_task(bind=True, max_retries=3)4def reliable_task(self):5 try:6 # Основная логика7 do_something()8 except ConnectionError as exc:9 # Повторная попытка10 raise self.retry(exc=exc, countdown=60)11 except MaxRetriesExceededError:12 # Логика после исчерпания попыток13 notify_admin("Task failed after 3 retries")14 except Exception as exc:15 # Логирование неожиданных ошибок16 logger.error(f"Unexpected error: {exc}")17 raise
Best Practices
▸Идемпотентность
1@shared_task2def process_order(order_id):3 order = Order.objects.get(id=order_id)4 if order.status == 'processed':5 return # Уже обработан6 # Обработка...7 order.status = 'processed'8 order.save()
▸Размер задач
1# Плохо: большая задача2@shared_task3def process_all_users():4 for user in User.objects.all():5 process_user(user)67# Хорошо: маленькие задачи8@shared_task9def process_user(user_id):10 user = User.objects.get(id=user_id)11 # Обработка одного пользователя
Заключение
Celery — стандарт для фоновых задач в Python-приложениях. Понимание broker, workers, планировщика и паттернов использования критически важно для масштабируемых приложений. На собеседовании спрашивают про идемпотентность, обработку ошибок и мониторинг задач.