Create production-ready FastAPI projects with async patterns, dependency injection, and comprehensive error handling. Use when building new FastAPI applications or setting up backend API projects.
Inherits all available tools
Additional assets for this skill
This skill inherits all available tools. When active, it can use any tool Claude has access to.
Production-ready FastAPI project structures with async patterns, dependency injection, middleware, and best practices for building high-performance APIs.
Recommended Layout:
app/
├── api/ # API routes
│ ├── v1/
│ │ ├── endpoints/
│ │ │ ├── users.py
│ │ │ ├── auth.py
│ │ │ └── items.py
│ │ └── router.py
│ └── dependencies.py # Shared dependencies
├── core/ # Core configuration
│ ├── config.py
│ ├── security.py
│ └── database.py
├── models/ # Database models
│ ├── user.py
│ └── item.py
├── schemas/ # Pydantic schemas
│ ├── user.py
│ └── item.py
├── services/ # Business logic
│ ├── user_service.py
│ └── auth_service.py
├── repositories/ # Data access
│ ├── user_repository.py
│ └── item_repository.py
└── main.py # Application entry
FastAPI's built-in DI system using Depends:
Proper async/await usage:
# main.py
from fastapi import FastAPI, Depends
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan events."""
# Startup
await database.connect()
yield
# Shutdown
await database.disconnect()
app = FastAPI(
title="API Template",
version="1.0.0",
lifespan=lifespan
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
from app.api.v1.router import api_router
app.include_router(api_router, prefix="/api/v1")
# core/config.py
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
"""Application settings."""
DATABASE_URL: str
SECRET_KEY: str
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
API_V1_STR: str = "/api/v1"
class Config:
env_file = ".env"
@lru_cache()
def get_settings() -> Settings:
return Settings()
# core/database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from app.core.config import get_settings
settings = get_settings()
engine = create_async_engine(
settings.DATABASE_URL,
echo=True,
future=True
)
AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
Base = declarative_base()
async def get_db() -> AsyncSession:
"""Dependency for database session."""
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
# repositories/base_repository.py
from typing import Generic, TypeVar, Type, Optional, List
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from pydantic import BaseModel
ModelType = TypeVar("ModelType")
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
class BaseRepository(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
"""Base repository for CRUD operations."""
def __init__(self, model: Type[ModelType]):
self.model = model
async def get(self, db: AsyncSession, id: int) -> Optional[ModelType]:
"""Get by ID."""
result = await db.execute(
select(self.model).where(self.model.id == id)
)
return result.scalars().first()
async def get_multi(
self,
db: AsyncSession,
skip: int = 0,
limit: int = 100
) -> List[ModelType]:
"""Get multiple records."""
result = await db.execute(
select(self.model).offset(skip).limit(limit)
)
return result.scalars().all()
async def create(
self,
db: AsyncSession,
obj_in: CreateSchemaType
) -> ModelType:
"""Create new record."""
db_obj = self.model(**obj_in.dict())
db.add(db_obj)
await db.flush()
await db.refresh(db_obj)
return db_obj
async def update(
self,
db: AsyncSession,
db_obj: ModelType,
obj_in: UpdateSchemaType
) -> ModelType:
"""Update record."""
update_data = obj_in.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(db_obj, field, value)
await db.flush()
await db.refresh(db_obj)
return db_obj
async def delete(self, db: AsyncSession, id: int) -> bool:
"""Delete record."""
obj = await self.get(db, id)
if obj:
await db.delete(obj)
return True
return False
# repositories/user_repository.py
from app.repositories.base_repository import BaseRepository
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
class UserRepository(BaseRepository[User, UserCreate, UserUpdate]):
"""User-specific repository."""
async def get_by_email(self, db: AsyncSession, email: str) -> Optional[User]:
"""Get user by email."""
result = await db.execute(
select(User).where(User.email == email)
)
return result.scalars().first()
async def is_active(self, db: AsyncSession, user_id: int) -> bool:
"""Check if user is active."""
user = await self.get(db, user_id)
return user.is_active if user else False
user_repository = UserRepository(User)
# services/user_service.py
from typing import Optional
from sqlalchemy.ext.asyncio import AsyncSession
from app.repositories.user_repository import user_repository
from app.schemas.user import UserCreate, UserUpdate, User
from app.core.security import get_password_hash, verify_password
class UserService:
"""Business logic for users."""
def __init__(self):
self.repository = user_repository
async def create_user(
self,
db: AsyncSession,
user_in: UserCreate
) -> User:
"""Create new user with hashed password."""
# Check if email exists
existing = await self.repository.get_by_email(db, user_in.email)
if existing:
raise ValueError("Email already registered")
# Hash password
user_in_dict = user_in.dict()
user_in_dict["hashed_password"] = get_password_hash(user_in_dict.pop("password"))
# Create user
user = await self.repository.create(db, UserCreate(**user_in_dict))
return user
async def authenticate(
self,
db: AsyncSession,
email: str,
password: str
) -> Optional[User]:
"""Authenticate user."""
user = await self.repository.get_by_email(db, email)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
async def update_user(
self,
db: AsyncSession,
user_id: int,
user_in: UserUpdate
) -> Optional[User]:
"""Update user."""
user = await self.repository.get(db, user_id)
if not user:
return None
if user_in.password:
user_in_dict = user_in.dict(exclude_unset=True)
user_in_dict["hashed_password"] = get_password_hash(
user_in_dict.pop("password")
)
user_in = UserUpdate(**user_in_dict)
return await self.repository.update(db, user, user_in)
user_service = UserService()
# api/v1/endpoints/users.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
from app.core.database import get_db
from app.schemas.user import User, UserCreate, UserUpdate
from app.services.user_service import user_service
from app.api.dependencies import get_current_user
router = APIRouter()
@router.post("/", response_model=User, status_code=status.HTTP_201_CREATED)
async def create_user(
user_in: UserCreate,
db: AsyncSession = Depends(get_db)
):
"""Create new user."""
try:
user = await user_service.create_user(db, user_in)
return user
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/me", response_model=User)
async def read_current_user(
current_user: User = Depends(get_current_user)
):
"""Get current user."""
return current_user
@router.get("/{user_id}", response_model=User)
async def read_user(
user_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Get user by ID."""
user = await user_service.repository.get(db, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@router.patch("/{user_id}", response_model=User)
async def update_user(
user_id: int,
user_in: UserUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Update user."""
if current_user.id != user_id:
raise HTTPException(status_code=403, detail="Not authorized")
user = await user_service.update_user(db, user_id, user_in)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(
user_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Delete user."""
if current_user.id != user_id:
raise HTTPException(status_code=403, detail="Not authorized")
deleted = await user_service.repository.delete(db, user_id)
if not deleted:
raise HTTPException(status_code=404, detail="User not found")
# core/security.py
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from app.core.config import get_settings
settings = get_settings()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
ALGORITHM = "HS256"
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""Create JWT access token."""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify password against hash."""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hash password."""
return pwd_context.hash(password)
# api/dependencies.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.core.security import ALGORITHM
from app.core.config import get_settings
from app.repositories.user_repository import user_repository
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/login")
async def get_current_user(
db: AsyncSession = Depends(get_db),
token: str = Depends(oauth2_scheme)
):
"""Get current authenticated user."""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM])
user_id: int = payload.get("sub")
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = await user_repository.get(db, user_id)
if user is None:
raise credentials_exception
return user
# tests/conftest.py
import pytest
import asyncio
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from app.main import app
from app.core.database import get_db, Base
TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:"
@pytest.fixture(scope="session")
def event_loop():
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture
async def db_session():
engine = create_async_engine(TEST_DATABASE_URL, echo=True)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
AsyncSessionLocal = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async with AsyncSessionLocal() as session:
yield session
@pytest.fixture
async def client(db_session):
async def override_get_db():
yield db_session
app.dependency_overrides[get_db] = override_get_db
async with AsyncClient(app=app, base_url="http://test") as client:
yield client
# tests/test_users.py
import pytest
@pytest.mark.asyncio
async def test_create_user(client):
response = await client.post(
"/api/v1/users/",
json={
"email": "test@example.com",
"password": "testpass123",
"name": "Test User"
}
)
assert response.status_code == 201
data = response.json()
assert data["email"] == "test@example.com"
assert "id" in data