FastAPI Background Tasks
Handle long-running operations with BackgroundTasks and Celery for distributed task queues.
Overview
FastAPI's BackgroundTasks provides a simple mechanism for executing code after returning a response, without waiting for the operation to complete. This is ideal for fire-and-forget operations like sending emails, generating reports, or logging analytics where immediate feedback to the user is more important than the task completion. BackgroundTasks executes in the same process as the request, making it unsuitable for CPU-intensive work or operations that might fail. Tasks are processed after the response is sent, and any exceptions raised won't affect the HTTP response—they'll only be logged. For critical operations requiring guaranteed execution and retry logic, a proper task queue like Celery or Redis Queue is appropriate. Task dependency management matters for BackgroundTasks. If a background task depends on objects from the request, those values must be explicitly passed rather than object references. Database connections especially should be recreated in background tasks since the original session will be closed when the request completes. For production applications with high reliability requirements, BackgroundTasks alone is insufficient. Celery provides distributed task execution with multiple broker options (Redis, RabbitMQ), result storage, retry policies, and scheduling. Integrating Celery with FastAPI requires a separate worker process but provides enterprise-grade reliability for critical workflows like payment processing or email sending.
Code Example
from fastapi import FastAPI, BackgroundTasks, Depends, HTTPException
from pydantic import BaseModel, EmailStr
from sqlalchemy.orm import Session
from typing import Optional
import logging
from .database import get_db, engine
from .models import User, Task
from .email_service import send_verification_email, send_welcome_email
from .celery_app import celery_app
app = FastAPI()
logger = logging.getLogger(__name__)
class UserCreate(BaseModel):
email: EmailStr
password: str
full_name: str
class UserResponse(BaseModel):
id: int
email: str
full_name: str
is_verified: bool
class Config:
from_attributes = True
def send_confirmation_email_task(email: str, token: str):
"""This runs in the same process - for light tasks only"""
try:
send_verification_email(email, token)
logger.info(f"Verification email sent to {email}")
except Exception as e:
logger.error(f"Failed to send email to {email}: {e}")
def heavy_processing_task(task_id: int, data: dict):
"""CPU intensive task - should use Celery in production"""
# This would block the worker if run inline
import time
time.sleep(5)
# Process data...
logger.info(f"Task {task_id} completed")
@app.post("/users/", response_model=UserResponse, status_code=201)
async def create_user(
user_data: UserCreate,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db)
):
# Check existing user
existing = db.query(User).filter(User.email == user_data.email).first()
if existing:
raise HTTPException(status_code=400, detail="Email already registered")
# Create user
user = User(
email=user_data.email,
full_name=user_data.full_name,
is_verified=False
)
user.set_password(user_data.password)
db.add(user)
db.commit()
db.refresh(user)
# Queue lightweight background task
token = generate_verification_token(user.id)
background_tasks.add_task(send_confirmation_email_task, user.email, token)
return user
@app.post("/tasks/long-running/{task_id}")
async def start_long_task(
task_id: int,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db)
):
task = db.query(Task).filter(Task.id == task_id).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
# For production, use Celery instead
celery_task = process_data_task.delay(task_id, {"key": "value"})
return {
"task_id": task_id,
"celery_task_id": celery_task.id,
"status": "queued"
}
# Celery task definitions
@celery_app.task(bind=True, max_retries=3)
def process_data_task(self, task_id: int, data: dict):
try:
# Simulate work
import time
time.sleep(10)
# Update task status in DB
from .database import SessionLocal
db = SessionLocal()
try:
task = db.query(Task).get(task_id)
task.status = "completed"
task.result = {"processed": True}
db.commit()
finally:
db.close()
return {"status": "success", "task_id": task_id}
except Exception as e:
self.retry(exc=e, countdown=60)More FastAPI Rules
FastAPI Dependency Injection Pattern
Use FastAPI's dependency injection system for authentication, database sessions, and shared business logic.
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from typing im...FastAPI Async Database Patterns
Implement async database operations with SQLAlchemy 2.0 for high-concurrency FastAPI applications.
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base
from sqlalchemy i...FastAPI WebSocket Real-time Patterns
Build real-time features with FastAPI WebSockets and connection manager patterns.
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from typing import Dict,...