Back to Rules
FastAPI84% popularity

FastAPI Background Tasks

Handle long-running operations with BackgroundTasks and Celery for distributed task queues.

Sebastián RamírezUpdated Mar 20, 2024

Overview

FastAPI's BackgroundTasks provides a simple mechanism for executing code after returning a response, without waiting for the operation to complete. This is ideal for fire-and-forget operations like sending emails, generating reports, or logging analytics where immediate feedback to the user is more important than the task completion. BackgroundTasks executes in the same process as the request, making it unsuitable for CPU-intensive work or operations that might fail. Tasks are processed after the response is sent, and any exceptions raised won't affect the HTTP response—they'll only be logged. For critical operations requiring guaranteed execution and retry logic, a proper task queue like Celery or Redis Queue is appropriate. Task dependency management matters for BackgroundTasks. If a background task depends on objects from the request, those values must be explicitly passed rather than object references. Database connections especially should be recreated in background tasks since the original session will be closed when the request completes. For production applications with high reliability requirements, BackgroundTasks alone is insufficient. Celery provides distributed task execution with multiple broker options (Redis, RabbitMQ), result storage, retry policies, and scheduling. Integrating Celery with FastAPI requires a separate worker process but provides enterprise-grade reliability for critical workflows like payment processing or email sending.

Code Example

.fastapirules
from fastapi import FastAPI, BackgroundTasks, Depends, HTTPException
from pydantic import BaseModel, EmailStr
from sqlalchemy.orm import Session
from typing import Optional
import logging

from .database import get_db, engine
from .models import User, Task
from .email_service import send_verification_email, send_welcome_email
from .celery_app import celery_app

app = FastAPI()
logger = logging.getLogger(__name__)

class UserCreate(BaseModel):
    email: EmailStr
    password: str
    full_name: str

class UserResponse(BaseModel):
    id: int
    email: str
    full_name: str
    is_verified: bool

    class Config:
        from_attributes = True

def send_confirmation_email_task(email: str, token: str):
    """This runs in the same process - for light tasks only"""
    try:
        send_verification_email(email, token)
        logger.info(f"Verification email sent to {email}")
    except Exception as e:
        logger.error(f"Failed to send email to {email}: {e}")

def heavy_processing_task(task_id: int, data: dict):
    """CPU intensive task - should use Celery in production"""
    # This would block the worker if run inline
    import time
    time.sleep(5)
    # Process data...
    logger.info(f"Task {task_id} completed")

@app.post("/users/", response_model=UserResponse, status_code=201)
async def create_user(
    user_data: UserCreate,
    background_tasks: BackgroundTasks,
    db: Session = Depends(get_db)
):
    # Check existing user
    existing = db.query(User).filter(User.email == user_data.email).first()
    if existing:
        raise HTTPException(status_code=400, detail="Email already registered")

    # Create user
    user = User(
        email=user_data.email,
        full_name=user_data.full_name,
        is_verified=False
    )
    user.set_password(user_data.password)

    db.add(user)
    db.commit()
    db.refresh(user)

    # Queue lightweight background task
    token = generate_verification_token(user.id)
    background_tasks.add_task(send_confirmation_email_task, user.email, token)

    return user

@app.post("/tasks/long-running/{task_id}")
async def start_long_task(
    task_id: int,
    background_tasks: BackgroundTasks,
    db: Session = Depends(get_db)
):
    task = db.query(Task).filter(Task.id == task_id).first()
    if not task:
        raise HTTPException(status_code=404, detail="Task not found")

    # For production, use Celery instead
    celery_task = process_data_task.delay(task_id, {"key": "value"})

    return {
        "task_id": task_id,
        "celery_task_id": celery_task.id,
        "status": "queued"
    }

# Celery task definitions
@celery_app.task(bind=True, max_retries=3)
def process_data_task(self, task_id: int, data: dict):
    try:
        # Simulate work
        import time
        time.sleep(10)

        # Update task status in DB
        from .database import SessionLocal
        db = SessionLocal()
        try:
            task = db.query(Task).get(task_id)
            task.status = "completed"
            task.result = {"processed": True}
            db.commit()
        finally:
            db.close()

        return {"status": "success", "task_id": task_id}
    except Exception as e:
        self.retry(exc=e, countdown=60)

More FastAPI Rules

FASTAPI
95%

FastAPI Dependency Injection Pattern

Use FastAPI's dependency injection system for authentication, database sessions, and shared business logic.

dependency-injectionauthenticationarchitecture
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from typing im...
Feb 20, 2024by Sebastián Ramírez
View Rule
FASTAPI
88%

FastAPI Async Database Patterns

Implement async database operations with SQLAlchemy 2.0 for high-concurrency FastAPI applications.

fastapiasyncdatabase
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base
from sqlalchemy i...
Mar 12, 2024by TiSever
View Rule
FASTAPI
86%

FastAPI WebSocket Real-time Patterns

Build real-time features with FastAPI WebSockets and connection manager patterns.

fastapiwebsocketsreal-time
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from typing import Dict,...
Mar 15, 2024by Sebastián Ramírez
View Rule