Back to skills
SkillHub ClubWrite Technical DocsFull StackData / AITech Writer

moai-lang-python

Python 3.13+ development specialist covering FastAPI, Django, async patterns, data science, testing with pytest, and modern Python features. Use when developing Python APIs, web applications, data pipelines, or writing tests.

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
0
Hot score
74
Updated
March 20, 2026
Overall rating
C2.5
Composite score
2.5
Best-practice grade
B73.6

Install command

npx @skill-hub/cli install junseokandylee-claudeautomate-moai-lang-python

Repository

junseokandylee/ClaudeAutomate

Skill path: .claude/skills/moai-lang-python

Python 3.13+ development specialist covering FastAPI, Django, async patterns, data science, testing with pytest, and modern Python features. Use when developing Python APIs, web applications, data pipelines, or writing tests.

Open repository

Best for

Primary workflow: Write Technical Docs.

Technical facets: Full Stack, Data / AI, Tech Writer, Testing.

Target audience: everyone.

License: Unknown.

Original source

Catalog source: SkillHub Club.

Repository owner: junseokandylee.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install moai-lang-python into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/junseokandylee/ClaudeAutomate before adding moai-lang-python to shared team environments
  • Use moai-lang-python for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: moai-lang-python
description: Python 3.13+ development specialist covering FastAPI, Django, async patterns, data science, testing with pytest, and modern Python features. Use when developing Python APIs, web applications, data pipelines, or writing tests.
version: 1.0.0
updated: 2025-12-07
status: active
allowed-tools: Read, Grep, Glob, Bash, mcp__context7__resolve-library-id, mcp__context7__get-library-docs
---

## Quick Reference (30 seconds)

Python 3.13+ Development Specialist - FastAPI, Django, async patterns, pytest, and modern Python features.

Auto-Triggers: `.py` files, `pyproject.toml`, `requirements.txt`, `pytest.ini`, FastAPI/Django discussions

Core Capabilities:
- Python 3.13 Features: JIT compiler (PEP 744), GIL-free mode (PEP 703), pattern matching
- Web Frameworks: FastAPI 0.115+, Django 5.2 LTS
- Data Validation: Pydantic v2.9 with model_validate patterns
- ORM: SQLAlchemy 2.0 async patterns
- Testing: pytest with fixtures, async testing, parametrize
- Package Management: poetry, uv, pip with pyproject.toml
- Type Hints: Protocol, TypeVar, ParamSpec, modern typing patterns
- Async: asyncio, async generators, task groups
- Data Science: numpy, pandas, polars basics

### Quick Patterns

FastAPI Endpoint:
```python
from fastapi import FastAPI, Depends
from pydantic import BaseModel

app = FastAPI()

class UserCreate(BaseModel):
    name: str
    email: str

@app.post("/users/")
async def create_user(user: UserCreate) -> User:
    return await UserService.create(user)
```

Pydantic v2.9 Validation:
```python
from pydantic import BaseModel, ConfigDict

class User(BaseModel):
    model_config = ConfigDict(from_attributes=True, str_strip_whitespace=True)

    id: int
    name: str
    email: str

user = User.model_validate(orm_obj)  # from ORM object
user = User.model_validate_json(json_data)  # from JSON
```

pytest Async Test:
```python
import pytest

@pytest.mark.asyncio
async def test_create_user(async_client):
    response = await async_client.post("/users/", json={"name": "Test"})
    assert response.status_code == 201
```

---

## Implementation Guide (5 minutes)

### Python 3.13 New Features

JIT Compiler (PEP 744):
- Experimental feature, disabled by default
- Enable: `PYTHON_JIT=1` environment variable
- Build option: `--enable-experimental-jit`
- Provides performance improvements for CPU-bound code
- Copy-and-patch JIT that translates specialized bytecode to machine code

GIL-Free Mode (PEP 703):
- Experimental free-threaded build (python3.13t)
- Allows true parallel thread execution
- Available in official Windows/macOS installers
- Best for: CPU-intensive multi-threaded applications
- Not recommended for production yet

Pattern Matching (match/case):
```python
def process_response(response: dict) -> str:
    match response:
        case {"status": "ok", "data": data}:
            return f"Success: {data}"
        case {"status": "error", "message": msg}:
            return f"Error: {msg}"
        case {"status": status} if status in ("pending", "processing"):
            return "In progress..."
        case _:
            return "Unknown response"
```

### FastAPI 0.115+ Patterns

Async Dependency Injection:
```python
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    await init_db()
    yield
    # Shutdown
    await cleanup()

app = FastAPI(lifespan=lifespan)

async def get_db() -> AsyncGenerator[AsyncSession, None]:
    async with async_session() as session:
        yield session

@app.get("/users/{user_id}")
async def get_user(
    user_id: int,
    db: AsyncSession = Depends(get_db)
) -> UserResponse:
    user = await get_user_by_id(db, user_id)
    return UserResponse.model_validate(user)
```

Class-Based Dependencies:
```python
from fastapi import Depends

class Paginator:
    def __init__(self, page: int = 1, size: int = 20):
        self.page = max(1, page)
        self.size = min(100, max(1, size))
        self.offset = (self.page - 1) * self.size

@app.get("/items/")
async def list_items(pagination: Paginator = Depends()) -> list[Item]:
    return await Item.get_page(pagination.offset, pagination.size)
```

### Django 5.2 LTS Features

Composite Primary Keys:
```python
from django.db import models

class OrderItem(models.Model):
    order = models.ForeignKey(Order, on_delete=models.CASCADE)
    product = models.ForeignKey(Product, on_delete=models.CASCADE)
    quantity = models.IntegerField()

    class Meta:
        pk = models.CompositePrimaryKey("order", "product")
```

URL Reverse with Query Parameters:
```python
from django.urls import reverse

url = reverse("search", query={"q": "django", "page": 1}, fragment="results")
# /search/?q=django&page=1#results
```

Automatic Model Imports in Shell:
```bash
python manage.py shell
# Models from all installed apps are automatically imported
```

### Pydantic v2.9 Deep Patterns

Reusable Validators with Annotated:
```python
from typing import Annotated
from pydantic import AfterValidator, BaseModel

def validate_positive(v: int) -> int:
    if v <= 0:
        raise ValueError("Must be positive")
    return v

PositiveInt = Annotated[int, AfterValidator(validate_positive)]

class Product(BaseModel):
    price: PositiveInt
    quantity: PositiveInt
```

Model Validator for Cross-Field Validation:
```python
from pydantic import BaseModel, model_validator
from typing import Self

class DateRange(BaseModel):
    start_date: date
    end_date: date

    @model_validator(mode="after")
    def validate_dates(self) -> Self:
        if self.end_date < self.start_date:
            raise ValueError("end_date must be after start_date")
        return self
```

ConfigDict Best Practices:
```python
from pydantic import BaseModel, ConfigDict

class BaseSchema(BaseModel):
    model_config = ConfigDict(
        from_attributes=True,      # ORM object support
        populate_by_name=True,     # Allow aliases
        extra="forbid",            # Fail on unknown fields
        str_strip_whitespace=True, # Clean strings
    )
```

### SQLAlchemy 2.0 Async Patterns

Engine and Session Setup:
```python
from sqlalchemy.ext.asyncio import (
    create_async_engine,
    async_sessionmaker,
    AsyncSession,
)

engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    pool_pre_ping=True,
    echo=True,
)

async_session = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,  # Prevent detached instance errors
)
```

Repository Pattern:
```python
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

class UserRepository:
    def __init__(self, session: AsyncSession):
        self.session = session

    async def get_by_id(self, user_id: int) -> User | None:
        result = await self.session.execute(
            select(User).where(User.id == user_id)
        )
        return result.scalar_one_or_none()

    async def create(self, user: UserCreate) -> User:
        db_user = User(**user.model_dump())
        self.session.add(db_user)
        await self.session.commit()
        await self.session.refresh(db_user)
        return db_user
```

Streaming Large Results:
```python
async def stream_users(db: AsyncSession):
    result = await db.stream(select(User))
    async for user in result.scalars():
        yield user
```

### pytest Advanced Patterns

Async Fixtures with pytest-asyncio:
```python
import pytest
import pytest_asyncio
from httpx import AsyncClient

@pytest_asyncio.fixture
async def async_client(app) -> AsyncGenerator[AsyncClient, None]:
    async with AsyncClient(app=app, base_url="http://test") as client:
        yield client

@pytest_asyncio.fixture
async def db_session() -> AsyncGenerator[AsyncSession, None]:
    async with async_session() as session:
        async with session.begin():
            yield session
            await session.rollback()
```

Parametrized Tests:
```python
@pytest.mark.parametrize(
    "input_data,expected_status",
    [
        ({"name": "Valid"}, 201),
        ({"name": ""}, 422),
        ({}, 422),
    ],
    ids=["valid", "empty_name", "missing_name"],
)
async def test_create_user(async_client, input_data, expected_status):
    response = await async_client.post("/users/", json=input_data)
    assert response.status_code == expected_status
```

Fixture Factories:
```python
@pytest.fixture
def user_factory():
    async def _create_user(db: AsyncSession, **kwargs) -> User:
        defaults = {"name": "Test User", "email": "[email protected]"}
        user = User(**(defaults | kwargs))
        db.add(user)
        await db.commit()
        return user
    return _create_user
```

### Type Hints Modern Patterns

Protocol for Structural Typing:
```python
from typing import Protocol, runtime_checkable

@runtime_checkable
class Repository(Protocol[T]):
    async def get(self, id: int) -> T | None: ...
    async def create(self, data: dict) -> T: ...
    async def delete(self, id: int) -> bool: ...
```

ParamSpec for Decorators:
```python
from typing import ParamSpec, TypeVar, Callable
from functools import wraps

P = ParamSpec("P")
R = TypeVar("R")

def retry(times: int = 3) -> Callable[[Callable[P, R]], Callable[P, R]]:
    def decorator(func: Callable[P, R]) -> Callable[P, R]:
        @wraps(func)
        async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
            for attempt in range(times):
                try:
                    return await func(*args, **kwargs)
                except Exception:
                    if attempt == times - 1:
                        raise
        return wrapper
    return decorator
```

### Package Management

pyproject.toml (Poetry):
```toml
[tool.poetry]
name = "my-project"
version = "1.0.0"
python = "^3.13"

[tool.poetry.dependencies]
fastapi = "^0.115.0"
pydantic = "^2.9.0"
sqlalchemy = {extras = ["asyncio"], version = "^2.0.0"}

[tool.poetry.group.dev.dependencies]
pytest = "^8.0"
pytest-asyncio = "^0.24"
ruff = "^0.8"

[tool.ruff]
line-length = 100
target-version = "py313"

[tool.pytest.ini_options]
asyncio_mode = "auto"
```

uv (Fast Package Manager):
```bash
# Install uv
curl -LsSf https://astral.sh/uv/install.sh | sh

# Create virtual environment
uv venv

# Install dependencies
uv pip install -r requirements.txt

# Add dependency
uv add fastapi
```

---

## Advanced Implementation (10+ minutes)

For comprehensive coverage including:
- Production deployment patterns (Docker, Kubernetes)
- Advanced async patterns (task groups, semaphores)
- Data science integration (numpy, pandas, polars)
- Performance optimization techniques
- Security best practices (OWASP patterns)
- CI/CD integration patterns

See:
- [reference.md](reference.md) - Complete reference documentation
- [examples.md](examples.md) - Production-ready code examples

---

## Context7 Library Mappings

```
/tiangolo/fastapi - FastAPI async web framework
/django/django - Django web framework
/pydantic/pydantic - Data validation with type annotations
/sqlalchemy/sqlalchemy - SQL toolkit and ORM
/pytest-dev/pytest - Testing framework
/numpy/numpy - Numerical computing
/pandas-dev/pandas - Data analysis library
/pola-rs/polars - Fast DataFrame library
```

---

## Works Well With

- `moai-domain-backend` - REST API and microservices architecture
- `moai-domain-database` - SQL patterns and ORM optimization
- `moai-quality-testing` - TDD and testing strategies
- `moai-essentials-debug` - AI-powered debugging
- `moai-foundation-trust` - TRUST 5 quality principles

---

## Troubleshooting

Common Issues:

Python Version Check:
```bash
python --version  # Should be 3.13+
python -c "import sys; print(sys.version_info)"
```

Async Session Detached Error:
- Solution: Set `expire_on_commit=False` in session config
- Or: Use `await session.refresh(obj)` after commit

pytest asyncio Mode Warning:
```toml
# pyproject.toml
[tool.pytest.ini_options]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
```

Pydantic v2 Migration:
- `parse_obj()` is now `model_validate()`
- `parse_raw()` is now `model_validate_json()`
- `from_orm()` requires `from_attributes=True` in ConfigDict

---

Last Updated: 2025-12-07
Status: Active (v1.0.0)


---

## Referenced Files

> The following files are referenced in this skill and included for context.

### reference.md

```markdown
# Python 3.13+ Complete Reference

## Language Features Reference

### Python 3.13 Feature Matrix

| Feature | Status | PEP | Production Ready |
|---------|--------|-----|------------------|
| JIT Compiler | Experimental | PEP 744 | No |
| Free Threading (GIL-free) | Experimental | PEP 703 | No |
| Pattern Matching | Stable | PEP 634-636 | Yes |
| Type Parameter Syntax | Stable | PEP 695 | Yes |
| Exception Groups | Stable | PEP 654 | Yes |

### JIT Compiler Details (PEP 744)

Build Configuration:
```bash
# Build Python with JIT support
./configure --enable-experimental-jit
make

# Or with "disabled by default" mode
./configure --enable-experimental-jit=yes-off
make
```

Runtime Activation:
```bash
# Enable JIT at runtime
PYTHON_JIT=1 python my_script.py

# With debugging info
PYTHON_JIT=1 PYTHON_JIT_DEBUG=1 python my_script.py
```

Expected Benefits:
- 5-10% performance improvement for CPU-bound code
- Better optimization for hot loops
- Future foundation for more aggressive optimizations

### Free Threading (PEP 703)

Installation:
```bash
# macOS/Windows: Use official installers with free-threaded option
# Linux: Build from source
./configure --disable-gil
make

# Verify installation
python3.13t -c "import sys; print(sys._is_gil_enabled())"
```

Thread-Safe Patterns:
```python
import threading
from queue import Queue

def parallel_processing(items: list[str], workers: int = 4) -> list[str]:
    results = Queue()
    threads = []

    def worker(chunk: list[str]):
        for item in chunk:
            processed = heavy_computation(item)
            results.put(processed)

    chunk_size = len(items) // workers
    for i in range(workers):
        start = i * chunk_size
        end = start + chunk_size if i < workers - 1 else len(items)
        t = threading.Thread(target=worker, args=(items[start:end],))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

    return [results.get() for _ in range(results.qsize())]
```

### Pattern Matching Complete Guide

Literal Patterns:
```python
def http_status(status: int) -> str:
    match status:
        case 200:
            return "OK"
        case 201:
            return "Created"
        case 400:
            return "Bad Request"
        case 404:
            return "Not Found"
        case 500:
            return "Internal Server Error"
        case _:
            return f"Unknown status: {status}"
```

Structural Patterns:
```python
def process_event(event: dict) -> None:
    match event:
        case {"type": "click", "x": x, "y": y}:
            handle_click(x, y)
        case {"type": "keypress", "key": key, "modifiers": [*mods]}:
            handle_keypress(key, mods)
        case {"type": "scroll", "delta": delta, **rest}:
            handle_scroll(delta, rest)
```

Class Patterns:
```python
from dataclasses import dataclass

@dataclass
class Point:
    x: float
    y: float

@dataclass
class Circle:
    center: Point
    radius: float

@dataclass
class Rectangle:
    top_left: Point
    width: float
    height: float

def area(shape) -> float:
    match shape:
        case Circle(center=_, radius=r):
            return 3.14159 * r ** 2
        case Rectangle(width=w, height=h):
            return w * h
        case Point():
            return 0.0
```

Guard Clauses:
```python
def validate_user(user: dict) -> str:
    match user:
        case {"age": age} if age < 0:
            return "Invalid age"
        case {"age": age} if age < 18:
            return "Minor"
        case {"age": age, "verified": True} if age >= 18:
            return "Verified adult"
        case {"age": age} if age >= 18:
            return "Unverified adult"
        case _:
            return "Invalid user data"
```

---

## Web Framework Reference

### FastAPI 0.115+ Complete Reference

Application Structure:
```
project/
├── app/
│   ├── __init__.py
│   ├── main.py              # Application entry point
│   ├── config.py            # Settings and configuration
│   ├── dependencies.py      # Shared dependencies
│   ├── api/
│   │   ├── __init__.py
│   │   ├── v1/
│   │   │   ├── __init__.py
│   │   │   ├── router.py    # API router
│   │   │   └── endpoints/
│   │   │       ├── users.py
│   │   │       └── items.py
│   ├── core/
│   │   ├── security.py      # Auth and security
│   │   └── exceptions.py    # Custom exceptions
│   ├── models/
│   │   ├── __init__.py
│   │   ├── user.py          # SQLAlchemy models
│   │   └── item.py
│   ├── schemas/
│   │   ├── __init__.py
│   │   ├── user.py          # Pydantic schemas
│   │   └── item.py
│   ├── services/
│   │   ├── __init__.py
│   │   └── user_service.py  # Business logic
│   └── repositories/
│       ├── __init__.py
│       └── user_repo.py     # Data access
├── tests/
├── pyproject.toml
└── Dockerfile
```

Configuration with Pydantic Settings:
```python
# app/config.py
from pydantic_settings import BaseSettings, SettingsConfigDict
from functools import lru_cache

class Settings(BaseSettings):
    model_config = SettingsConfigDict(
        env_file=".env",
        env_file_encoding="utf-8",
        case_sensitive=False,
    )

    # Application
    app_name: str = "My API"
    debug: bool = False
    api_v1_prefix: str = "/api/v1"

    # Database
    database_url: str
    db_pool_size: int = 5
    db_max_overflow: int = 10

    # Security
    secret_key: str
    access_token_expire_minutes: int = 30
    algorithm: str = "HS256"

    # External Services
    redis_url: str | None = None

@lru_cache
def get_settings() -> Settings:
    return Settings()
```

Advanced Dependency Injection:
```python
# app/dependencies.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession
from jose import jwt, JWTError

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token")

async def get_db() -> AsyncGenerator[AsyncSession, None]:
    async with async_session() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: AsyncSession = Depends(get_db),
) -> User:
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        user_id: int = payload.get("sub")
        if user_id is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception

    user = await UserRepository(db).get_by_id(user_id)
    if user is None:
        raise credentials_exception
    return user

async def get_current_active_user(
    current_user: User = Depends(get_current_user),
) -> User:
    if not current_user.is_active:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

def require_role(required_role: str):
    async def role_checker(
        current_user: User = Depends(get_current_active_user),
    ) -> User:
        if current_user.role != required_role:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Insufficient permissions",
            )
        return current_user
    return role_checker
```

Background Tasks:
```python
from fastapi import BackgroundTasks

async def send_notification(email: str, message: str):
    # Simulate email sending
    await asyncio.sleep(1)
    print(f"Sent to {email}: {message}")

@app.post("/users/")
async def create_user(
    user: UserCreate,
    background_tasks: BackgroundTasks,
    db: AsyncSession = Depends(get_db),
) -> User:
    db_user = await UserRepository(db).create(user)
    background_tasks.add_task(
        send_notification,
        db_user.email,
        "Welcome to our platform!",
    )
    return db_user
```

### Django 5.2 LTS Reference

Composite Primary Keys:
```python
# models.py
from django.db import models

class Enrollment(models.Model):
    student = models.ForeignKey("Student", on_delete=models.CASCADE)
    course = models.ForeignKey("Course", on_delete=models.CASCADE)
    enrolled_at = models.DateTimeField(auto_now_add=True)
    grade = models.CharField(max_length=2, blank=True)

    class Meta:
        pk = models.CompositePrimaryKey("student", "course")
        verbose_name = "Enrollment"
        verbose_name_plural = "Enrollments"

# Usage
enrollment = Enrollment.objects.get(pk=(student_id, course_id))
```

Async Views and ORM:
```python
# views.py
from django.http import JsonResponse
from asgiref.sync import sync_to_async

async def async_user_list(request):
    users = await sync_to_async(list)(User.objects.all()[:100])
    return JsonResponse({"users": [u.to_dict() for u in users]})

# With Django 5.2 async ORM support
async def async_user_detail(request, user_id):
    user = await User.objects.aget(pk=user_id)
    return JsonResponse(user.to_dict())
```

Custom Form Rendering:
```python
# forms.py
from django import forms

class CustomBoundField(forms.BoundField):
    def label_tag(self, contents=None, attrs=None, label_suffix=None):
        attrs = attrs or {}
        attrs["class"] = attrs.get("class", "") + " custom-label"
        return super().label_tag(contents, attrs, label_suffix)

class CustomFormMixin:
    def get_bound_field(self, field, field_name):
        return CustomBoundField(self, field, field_name)

class UserForm(CustomFormMixin, forms.ModelForm):
    class Meta:
        model = User
        fields = ["name", "email"]
```

---

## Data Validation Reference

### Pydantic v2.9 Complete Patterns

Discriminated Unions:
```python
from typing import Literal, Union
from pydantic import BaseModel, Field

class EmailNotification(BaseModel):
    type: Literal["email"] = "email"
    recipient: str
    subject: str
    body: str

class SMSNotification(BaseModel):
    type: Literal["sms"] = "sms"
    phone_number: str
    message: str

class PushNotification(BaseModel):
    type: Literal["push"] = "push"
    device_token: str
    title: str
    body: str

Notification = Union[EmailNotification, SMSNotification, PushNotification]

class NotificationRequest(BaseModel):
    notification: Notification = Field(discriminator="type")
```

Computed Fields:
```python
from pydantic import BaseModel, computed_field

class Product(BaseModel):
    name: str
    price: float
    quantity: int
    tax_rate: float = 0.1

    @computed_field
    @property
    def subtotal(self) -> float:
        return self.price * self.quantity

    @computed_field
    @property
    def tax(self) -> float:
        return self.subtotal * self.tax_rate

    @computed_field
    @property
    def total(self) -> float:
        return self.subtotal + self.tax
```

Custom JSON Serialization:
```python
from pydantic import BaseModel, field_serializer
from datetime import datetime
from decimal import Decimal

class Transaction(BaseModel):
    id: int
    amount: Decimal
    created_at: datetime

    @field_serializer("amount")
    def serialize_amount(self, amount: Decimal) -> str:
        return f"${amount:.2f}"

    @field_serializer("created_at")
    def serialize_datetime(self, dt: datetime) -> str:
        return dt.isoformat()
```

TypeAdapter for Dynamic Validation:
```python
from pydantic import TypeAdapter
from typing import Any

# Validate arbitrary data without a model
int_adapter = TypeAdapter(int)
validated_int = int_adapter.validate_python("42")  # Returns 42

# Validate complex types
list_adapter = TypeAdapter(list[dict[str, int]])
validated_list = list_adapter.validate_json('[{"a": 1}, {"b": 2}]')

# Validate with custom types
UserListAdapter = TypeAdapter(list[User])
users = UserListAdapter.validate_python(raw_data)
```

---

## ORM Reference

### SQLAlchemy 2.0 Complete Patterns

Declarative Models with Type Hints:
```python
from sqlalchemy import String, ForeignKey
from sqlalchemy.orm import (
    DeclarativeBase,
    Mapped,
    mapped_column,
    relationship,
)
from datetime import datetime

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
    name: Mapped[str] = mapped_column(String(100))
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)

    # Relationships
    posts: Mapped[list["Post"]] = relationship(back_populates="author")

class Post(Base):
    __tablename__ = "posts"

    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    content: Mapped[str]
    author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))

    author: Mapped["User"] = relationship(back_populates="posts")
```

Advanced Queries:
```python
from sqlalchemy import select, func, and_, or_
from sqlalchemy.orm import selectinload, joinedload

# Eager loading
async def get_user_with_posts(db: AsyncSession, user_id: int) -> User | None:
    result = await db.execute(
        select(User)
        .options(selectinload(User.posts))
        .where(User.id == user_id)
    )
    return result.scalar_one_or_none()

# Aggregations
async def get_post_counts_by_user(db: AsyncSession) -> list[tuple[str, int]]:
    result = await db.execute(
        select(User.name, func.count(Post.id).label("post_count"))
        .join(Post, isouter=True)
        .group_by(User.id)
        .order_by(func.count(Post.id).desc())
    )
    return result.all()

# Complex filtering
async def search_posts(
    db: AsyncSession,
    search: str | None = None,
    author_id: int | None = None,
    limit: int = 20,
) -> list[Post]:
    query = select(Post).options(joinedload(Post.author))

    conditions = []
    if search:
        conditions.append(
            or_(
                Post.title.ilike(f"%{search}%"),
                Post.content.ilike(f"%{search}%"),
            )
        )
    if author_id:
        conditions.append(Post.author_id == author_id)

    if conditions:
        query = query.where(and_(*conditions))

    result = await db.execute(query.limit(limit))
    return result.scalars().unique().all()
```

Upsert (Insert or Update):
```python
from sqlalchemy.dialects.postgresql import insert

async def upsert_user(db: AsyncSession, user_data: dict) -> User:
    stmt = insert(User).values(**user_data)
    stmt = stmt.on_conflict_do_update(
        index_elements=[User.email],
        set_={
            "name": stmt.excluded.name,
            "updated_at": datetime.utcnow(),
        },
    )
    await db.execute(stmt)
    await db.commit()

    return await db.execute(
        select(User).where(User.email == user_data["email"])
    ).scalar_one()
```

---

## Testing Reference

### pytest Complete Patterns

Conftest Configuration:
```python
# conftest.py
import pytest
import pytest_asyncio
from httpx import AsyncClient, ASGITransport
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

@pytest.fixture(scope="session")
def event_loop():
    """Create event loop for async tests."""
    import asyncio
    loop = asyncio.new_event_loop()
    yield loop
    loop.close()

@pytest_asyncio.fixture(scope="session")
async def engine():
    """Create test database engine."""
    engine = create_async_engine(
        "sqlite+aiosqlite:///:memory:",
        echo=True,
    )
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield engine
    await engine.dispose()

@pytest_asyncio.fixture
async def db_session(engine) -> AsyncGenerator[AsyncSession, None]:
    """Create isolated database session for each test."""
    async_session = sessionmaker(
        engine, class_=AsyncSession, expire_on_commit=False
    )
    async with async_session() as session:
        async with session.begin():
            yield session
            await session.rollback()

@pytest_asyncio.fixture
async def async_client(db_session) -> AsyncGenerator[AsyncClient, None]:
    """Create async HTTP client for API testing."""
    def get_db_override():
        return db_session

    app.dependency_overrides[get_db] = get_db_override

    async with AsyncClient(
        transport=ASGITransport(app=app),
        base_url="http://test",
    ) as client:
        yield client

    app.dependency_overrides.clear()
```

Advanced Fixtures:
```python
@pytest.fixture
def user_factory(db_session):
    """Factory for creating test users."""
    created_users = []

    async def _create(**kwargs) -> User:
        defaults = {
            "name": f"User {len(created_users)}",
            "email": f"user{len(created_users)}@test.com",
        }
        user = User(**(defaults | kwargs))
        db_session.add(user)
        await db_session.flush()
        created_users.append(user)
        return user

    return _create

@pytest.fixture
def mock_external_api(mocker):
    """Mock external API calls."""
    return mocker.patch(
        "app.services.external_api.fetch_data",
        return_value={"status": "ok", "data": []},
    )
```

Hypothesis Property-Based Testing:
```python
from hypothesis import given, strategies as st
from hypothesis.extra.pydantic import from_model

@given(from_model(UserCreate))
def test_user_create_validation(user_data: UserCreate):
    """Test that any valid UserCreate can be processed."""
    assert user_data.name
    assert "@" in user_data.email

@given(st.lists(st.integers(min_value=0, max_value=100), min_size=1))
def test_calculate_average(numbers: list[int]):
    """Property: average is always between min and max."""
    avg = calculate_average(numbers)
    assert min(numbers) <= avg <= max(numbers)
```

---

## Type Hints Reference

### Modern Type Patterns

Generic Classes:
```python
from typing import Generic, TypeVar

T = TypeVar("T")
K = TypeVar("K")

class Cache(Generic[K, T]):
    def __init__(self, max_size: int = 100):
        self._cache: dict[K, T] = {}
        self._max_size = max_size

    def get(self, key: K) -> T | None:
        return self._cache.get(key)

    def set(self, key: K, value: T) -> None:
        if len(self._cache) >= self._max_size:
            oldest_key = next(iter(self._cache))
            del self._cache[oldest_key]
        self._cache[key] = value

# Usage
user_cache: Cache[int, User] = Cache(max_size=1000)
```

TypeVar with Bounds:
```python
from typing import TypeVar
from pydantic import BaseModel

ModelT = TypeVar("ModelT", bound=BaseModel)

def validate_and_create(model_class: type[ModelT], data: dict) -> ModelT:
    return model_class.model_validate(data)
```

Self Type:
```python
from typing import Self

class Builder:
    def __init__(self):
        self._config: dict = {}

    def with_option(self, key: str, value: str) -> Self:
        self._config[key] = value
        return self

    def build(self) -> dict:
        return self._config.copy()

# Subclassing works correctly
class AdvancedBuilder(Builder):
    def with_advanced_option(self, value: int) -> Self:
        self._config["advanced"] = value
        return self
```

---

## Context7 Integration

Library ID Resolution:
```python
# Step 1: Resolve library ID
library_id = await mcp__context7__resolve_library_id("fastapi")
# Returns: /tiangolo/fastapi

# Step 2: Get documentation
docs = await mcp__context7__get_library_docs(
    context7CompatibleLibraryID="/tiangolo/fastapi",
    topic="dependency injection async",
    tokens=5000,
)
```

Available Libraries:
| Library | Context7 ID | Topics |
|---------|-------------|--------|
| FastAPI | /tiangolo/fastapi | async, dependencies, security, websockets |
| Django | /django/django | views, models, forms, admin |
| Pydantic | /pydantic/pydantic | validation, serialization, settings |
| SQLAlchemy | /sqlalchemy/sqlalchemy | orm, async, queries, migrations |
| pytest | /pytest-dev/pytest | fixtures, markers, plugins |
| numpy | /numpy/numpy | arrays, broadcasting, ufuncs |
| pandas | /pandas-dev/pandas | dataframe, series, io |
| polars | /pola-rs/polars | lazy, expressions, streaming |

---

Last Updated: 2025-12-07
Version: 1.0.0

```

### examples.md

```markdown
# Python Production-Ready Code Examples

## Complete FastAPI Application

### Project Structure
```
fastapi_app/
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── config.py
│   ├── database.py
│   ├── dependencies.py
│   ├── models/
│   │   ├── __init__.py
│   │   └── user.py
│   ├── schemas/
│   │   ├── __init__.py
│   │   └── user.py
│   ├── repositories/
│   │   ├── __init__.py
│   │   └── user_repository.py
│   ├── services/
│   │   ├── __init__.py
│   │   └── user_service.py
│   └── api/
│       ├── __init__.py
│       └── v1/
│           ├── __init__.py
│           ├── router.py
│           └── endpoints/
│               └── users.py
├── tests/
│   ├── conftest.py
│   ├── test_users.py
│   └── test_services.py
├── pyproject.toml
└── Dockerfile
```

### Main Application Entry

```python
# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from app.config import get_settings
from app.database import init_db, close_db
from app.api.v1.router import api_router

settings = get_settings()

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    await init_db()
    yield
    # Shutdown
    await close_db()

app = FastAPI(
    title=settings.app_name,
    version="1.0.0",
    lifespan=lifespan,
    docs_url="/api/docs",
    redoc_url="/api/redoc",
)

# CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.cors_origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Include API router
app.include_router(api_router, prefix="/api/v1")

@app.get("/health")
async def health_check():
    return {"status": "healthy", "version": "1.0.0"}
```

### Configuration

```python
# app/config.py
from functools import lru_cache
from pydantic_settings import BaseSettings, SettingsConfigDict

class Settings(BaseSettings):
    model_config = SettingsConfigDict(
        env_file=".env",
        env_file_encoding="utf-8",
        case_sensitive=False,
    )

    # Application
    app_name: str = "FastAPI App"
    debug: bool = False
    environment: str = "development"

    # Database
    database_url: str = "postgresql+asyncpg://user:pass@localhost/db"
    db_pool_size: int = 5
    db_max_overflow: int = 10
    db_pool_timeout: int = 30

    # Security
    secret_key: str
    access_token_expire_minutes: int = 30
    refresh_token_expire_days: int = 7
    algorithm: str = "HS256"

    # CORS
    cors_origins: list[str] = ["http://localhost:3000"]

    # Redis (optional)
    redis_url: str | None = None

@lru_cache
def get_settings() -> Settings:
    return Settings()
```

### Database Setup

```python
# app/database.py
from sqlalchemy.ext.asyncio import (
    create_async_engine,
    async_sessionmaker,
    AsyncSession,
    AsyncEngine,
)
from sqlalchemy.orm import DeclarativeBase

from app.config import get_settings

settings = get_settings()

class Base(DeclarativeBase):
    pass

engine: AsyncEngine | None = None
async_session_factory: async_sessionmaker[AsyncSession] | None = None

async def init_db():
    global engine, async_session_factory

    engine = create_async_engine(
        settings.database_url,
        pool_size=settings.db_pool_size,
        max_overflow=settings.db_max_overflow,
        pool_timeout=settings.db_pool_timeout,
        pool_pre_ping=True,
        echo=settings.debug,
    )

    async_session_factory = async_sessionmaker(
        engine,
        class_=AsyncSession,
        expire_on_commit=False,
        autoflush=False,
    )

    # Create tables (for development only)
    if settings.debug:
        async with engine.begin() as conn:
            await conn.run_sync(Base.metadata.create_all)

async def close_db():
    global engine
    if engine:
        await engine.dispose()

async def get_db() -> AsyncSession:
    if async_session_factory is None:
        raise RuntimeError("Database not initialized")

    async with async_session_factory() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
```

### SQLAlchemy Models

```python
# app/models/user.py
from datetime import datetime
from sqlalchemy import String, Boolean, DateTime, func
from sqlalchemy.orm import Mapped, mapped_column

from app.database import Base

class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True, index=True)
    email: Mapped[str] = mapped_column(
        String(255), unique=True, index=True, nullable=False
    )
    hashed_password: Mapped[str] = mapped_column(String(255), nullable=False)
    name: Mapped[str] = mapped_column(String(100), nullable=False)
    is_active: Mapped[bool] = mapped_column(Boolean, default=True)
    is_superuser: Mapped[bool] = mapped_column(Boolean, default=False)
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now()
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now(), onupdate=func.now()
    )

    def __repr__(self) -> str:
        return f"<User(id={self.id}, email={self.email})>"
```

### Pydantic Schemas

```python
# app/schemas/user.py
from datetime import datetime
from pydantic import BaseModel, ConfigDict, EmailStr, Field

class UserBase(BaseModel):
    email: EmailStr
    name: str = Field(min_length=1, max_length=100)

class UserCreate(UserBase):
    password: str = Field(min_length=8, max_length=100)

class UserUpdate(BaseModel):
    name: str | None = Field(None, min_length=1, max_length=100)
    password: str | None = Field(None, min_length=8, max_length=100)

class UserResponse(UserBase):
    model_config = ConfigDict(from_attributes=True)

    id: int
    is_active: bool
    created_at: datetime
    updated_at: datetime

class UserListResponse(BaseModel):
    users: list[UserResponse]
    total: int
    page: int
    size: int

class Token(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str = "bearer"

class TokenPayload(BaseModel):
    sub: int
    exp: datetime
    type: str  # "access" or "refresh"
```

### Repository Pattern

```python
# app/repositories/user_repository.py
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession

from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate

class UserRepository:
    def __init__(self, session: AsyncSession):
        self.session = session

    async def get_by_id(self, user_id: int) -> User | None:
        result = await self.session.execute(
            select(User).where(User.id == user_id)
        )
        return result.scalar_one_or_none()

    async def get_by_email(self, email: str) -> User | None:
        result = await self.session.execute(
            select(User).where(User.email == email)
        )
        return result.scalar_one_or_none()

    async def get_multi(
        self,
        skip: int = 0,
        limit: int = 100,
        is_active: bool | None = None,
    ) -> tuple[list[User], int]:
        query = select(User)
        count_query = select(func.count(User.id))

        if is_active is not None:
            query = query.where(User.is_active == is_active)
            count_query = count_query.where(User.is_active == is_active)

        # Get total count
        total_result = await self.session.execute(count_query)
        total = total_result.scalar_one()

        # Get users
        query = query.offset(skip).limit(limit).order_by(User.created_at.desc())
        result = await self.session.execute(query)
        users = result.scalars().all()

        return list(users), total

    async def create(self, user_create: UserCreate, hashed_password: str) -> User:
        user = User(
            email=user_create.email,
            name=user_create.name,
            hashed_password=hashed_password,
        )
        self.session.add(user)
        await self.session.flush()
        await self.session.refresh(user)
        return user

    async def update(self, user: User, user_update: UserUpdate) -> User:
        update_data = user_update.model_dump(exclude_unset=True)
        for field, value in update_data.items():
            setattr(user, field, value)
        await self.session.flush()
        await self.session.refresh(user)
        return user

    async def delete(self, user: User) -> None:
        await self.session.delete(user)
        await self.session.flush()

    async def deactivate(self, user: User) -> User:
        user.is_active = False
        await self.session.flush()
        await self.session.refresh(user)
        return user
```

### Service Layer

```python
# app/services/user_service.py
from datetime import datetime, timedelta, timezone
from jose import jwt
from passlib.context import CryptContext

from app.config import get_settings
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate, Token, TokenPayload
from app.repositories.user_repository import UserRepository

settings = get_settings()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

class UserService:
    def __init__(self, repository: UserRepository):
        self.repository = repository

    @staticmethod
    def hash_password(password: str) -> str:
        return pwd_context.hash(password)

    @staticmethod
    def verify_password(plain_password: str, hashed_password: str) -> bool:
        return pwd_context.verify(plain_password, hashed_password)

    @staticmethod
    def create_token(user_id: int, token_type: str, expires_delta: timedelta) -> str:
        expire = datetime.now(timezone.utc) + expires_delta
        payload = TokenPayload(
            sub=user_id,
            exp=expire,
            type=token_type,
        )
        return jwt.encode(
            payload.model_dump(),
            settings.secret_key,
            algorithm=settings.algorithm,
        )

    def create_tokens(self, user: User) -> Token:
        access_token = self.create_token(
            user.id,
            "access",
            timedelta(minutes=settings.access_token_expire_minutes),
        )
        refresh_token = self.create_token(
            user.id,
            "refresh",
            timedelta(days=settings.refresh_token_expire_days),
        )
        return Token(access_token=access_token, refresh_token=refresh_token)

    async def authenticate(self, email: str, password: str) -> User | None:
        user = await self.repository.get_by_email(email)
        if not user:
            return None
        if not self.verify_password(password, user.hashed_password):
            return None
        if not user.is_active:
            return None
        return user

    async def register(self, user_create: UserCreate) -> User:
        hashed_password = self.hash_password(user_create.password)
        return await self.repository.create(user_create, hashed_password)

    async def update(self, user: User, user_update: UserUpdate) -> User:
        if user_update.password:
            user_update.password = self.hash_password(user_update.password)
        return await self.repository.update(user, user_update)
```

### API Endpoints

```python
# app/api/v1/endpoints/users.py
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.ext.asyncio import AsyncSession

from app.database import get_db
from app.dependencies import get_current_user, get_current_active_superuser
from app.models.user import User
from app.schemas.user import (
    UserCreate,
    UserUpdate,
    UserResponse,
    UserListResponse,
    Token,
)
from app.repositories.user_repository import UserRepository
from app.services.user_service import UserService

router = APIRouter(prefix="/users", tags=["users"])

def get_user_service(db: AsyncSession = Depends(get_db)) -> UserService:
    repository = UserRepository(db)
    return UserService(repository)

@router.post("/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def register(
    user_create: UserCreate,
    service: UserService = Depends(get_user_service),
):
    """Register a new user."""
    existing = await service.repository.get_by_email(user_create.email)
    if existing:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Email already registered",
        )
    user = await service.register(user_create)
    return user

@router.post("/login", response_model=Token)
async def login(
    email: str,
    password: str,
    service: UserService = Depends(get_user_service),
):
    """Authenticate and get tokens."""
    user = await service.authenticate(email, password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid credentials",
        )
    return service.create_tokens(user)

@router.get("/me", response_model=UserResponse)
async def get_current_user_info(
    current_user: User = Depends(get_current_user),
):
    """Get current user information."""
    return current_user

@router.patch("/me", response_model=UserResponse)
async def update_current_user(
    user_update: UserUpdate,
    current_user: User = Depends(get_current_user),
    service: UserService = Depends(get_user_service),
):
    """Update current user."""
    return await service.update(current_user, user_update)

@router.get("", response_model=UserListResponse)
async def list_users(
    page: int = Query(1, ge=1),
    size: int = Query(20, ge=1, le=100),
    is_active: bool | None = None,
    current_user: User = Depends(get_current_active_superuser),
    service: UserService = Depends(get_user_service),
):
    """List all users (admin only)."""
    skip = (page - 1) * size
    users, total = await service.repository.get_multi(
        skip=skip, limit=size, is_active=is_active
    )
    return UserListResponse(users=users, total=total, page=page, size=size)

@router.get("/{user_id}", response_model=UserResponse)
async def get_user(
    user_id: int,
    current_user: User = Depends(get_current_active_superuser),
    service: UserService = Depends(get_user_service),
):
    """Get user by ID (admin only)."""
    user = await service.repository.get_by_id(user_id)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="User not found",
        )
    return user

@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def deactivate_user(
    user_id: int,
    current_user: User = Depends(get_current_active_superuser),
    service: UserService = Depends(get_user_service),
):
    """Deactivate user (admin only)."""
    user = await service.repository.get_by_id(user_id)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="User not found",
        )
    await service.repository.deactivate(user)
```

### Dependencies

```python
# app/dependencies.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from sqlalchemy.ext.asyncio import AsyncSession

from app.config import get_settings
from app.database import get_db
from app.models.user import User
from app.repositories.user_repository import UserRepository
from app.schemas.user import TokenPayload

settings = get_settings()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/users/login")

async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: AsyncSession = Depends(get_db),
) -> User:
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )

    try:
        payload = jwt.decode(
            token, settings.secret_key, algorithms=[settings.algorithm]
        )
        token_data = TokenPayload(**payload)

        if token_data.type != "access":
            raise credentials_exception

    except JWTError:
        raise credentials_exception

    repository = UserRepository(db)
    user = await repository.get_by_id(token_data.sub)

    if user is None:
        raise credentials_exception

    return user

async def get_current_active_user(
    current_user: User = Depends(get_current_user),
) -> User:
    if not current_user.is_active:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Inactive user",
        )
    return current_user

async def get_current_active_superuser(
    current_user: User = Depends(get_current_active_user),
) -> User:
    if not current_user.is_superuser:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Insufficient permissions",
        )
    return current_user
```

---

## Complete pytest Test Suite

```python
# tests/conftest.py
import pytest
import pytest_asyncio
from httpx import AsyncClient, ASGITransport
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool

from app.database import Base, get_db
from app.main import app
from app.config import get_settings

settings = get_settings()

@pytest.fixture(scope="session")
def event_loop():
    import asyncio
    loop = asyncio.new_event_loop()
    yield loop
    loop.close()

@pytest_asyncio.fixture(scope="session")
async def engine():
    engine = create_async_engine(
        "sqlite+aiosqlite:///:memory:",
        connect_args={"check_same_thread": False},
        poolclass=StaticPool,
    )
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield engine
    await engine.dispose()

@pytest_asyncio.fixture
async def db_session(engine):
    async_session = sessionmaker(
        engine, class_=AsyncSession, expire_on_commit=False
    )
    async with async_session() as session:
        yield session
        await session.rollback()

@pytest_asyncio.fixture
async def async_client(db_session):
    async def override_get_db():
        yield db_session

    app.dependency_overrides[get_db] = override_get_db

    async with AsyncClient(
        transport=ASGITransport(app=app),
        base_url="http://test",
    ) as client:
        yield client

    app.dependency_overrides.clear()

@pytest.fixture
def user_data():
    return {
        "email": "[email protected]",
        "name": "Test User",
        "password": "password123",
    }
```

```python
# tests/test_users.py
import pytest
from httpx import AsyncClient

@pytest.mark.asyncio
async def test_register_user(async_client: AsyncClient, user_data: dict):
    response = await async_client.post("/api/v1/users/register", json=user_data)

    assert response.status_code == 201
    data = response.json()
    assert data["email"] == user_data["email"]
    assert data["name"] == user_data["name"]
    assert "id" in data
    assert "password" not in data

@pytest.mark.asyncio
async def test_register_duplicate_email(async_client: AsyncClient, user_data: dict):
    # First registration
    await async_client.post("/api/v1/users/register", json=user_data)

    # Second registration with same email
    response = await async_client.post("/api/v1/users/register", json=user_data)

    assert response.status_code == 400
    assert "already registered" in response.json()["detail"]

@pytest.mark.asyncio
@pytest.mark.parametrize(
    "invalid_data,expected_detail",
    [
        ({"email": "invalid", "name": "Test", "password": "pass123"}, "email"),
        ({"email": "[email protected]", "name": "", "password": "pass123"}, "name"),
        ({"email": "[email protected]", "name": "Test", "password": "short"}, "password"),
    ],
    ids=["invalid_email", "empty_name", "short_password"],
)
async def test_register_validation(
    async_client: AsyncClient,
    invalid_data: dict,
    expected_detail: str,
):
    response = await async_client.post("/api/v1/users/register", json=invalid_data)

    assert response.status_code == 422

@pytest.mark.asyncio
async def test_login_success(async_client: AsyncClient, user_data: dict):
    # Register user first
    await async_client.post("/api/v1/users/register", json=user_data)

    # Login
    response = await async_client.post(
        "/api/v1/users/login",
        params={"email": user_data["email"], "password": user_data["password"]},
    )

    assert response.status_code == 200
    data = response.json()
    assert "access_token" in data
    assert "refresh_token" in data
    assert data["token_type"] == "bearer"

@pytest.mark.asyncio
async def test_get_current_user(async_client: AsyncClient, user_data: dict):
    # Register and login
    await async_client.post("/api/v1/users/register", json=user_data)
    login_response = await async_client.post(
        "/api/v1/users/login",
        params={"email": user_data["email"], "password": user_data["password"]},
    )
    token = login_response.json()["access_token"]

    # Get current user
    response = await async_client.get(
        "/api/v1/users/me",
        headers={"Authorization": f"Bearer {token}"},
    )

    assert response.status_code == 200
    data = response.json()
    assert data["email"] == user_data["email"]
```

---

## Async Patterns Examples

### Task Groups (Python 3.11+)

```python
import asyncio
from typing import Any

async def fetch_user(user_id: int) -> dict:
    await asyncio.sleep(0.1)  # Simulate API call
    return {"id": user_id, "name": f"User {user_id}"}

async def fetch_all_users(user_ids: list[int]) -> list[dict]:
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(fetch_user(uid)) for uid in user_ids]

    return [task.result() for task in tasks]

# Exception handling with TaskGroup
async def fetch_with_error_handling(user_ids: list[int]) -> tuple[list[dict], list[Exception]]:
    results = []
    errors = []

    async def safe_fetch(user_id: int):
        try:
            result = await fetch_user(user_id)
            results.append(result)
        except Exception as e:
            errors.append(e)

    async with asyncio.TaskGroup() as tg:
        for uid in user_ids:
            tg.create_task(safe_fetch(uid))

    return results, errors
```

### Semaphore for Rate Limiting

```python
import asyncio
from contextlib import asynccontextmanager

class RateLimiter:
    def __init__(self, max_concurrent: int = 10):
        self._semaphore = asyncio.Semaphore(max_concurrent)

    @asynccontextmanager
    async def acquire(self):
        async with self._semaphore:
            yield

rate_limiter = RateLimiter(max_concurrent=5)

async def rate_limited_fetch(url: str) -> dict:
    async with rate_limiter.acquire():
        async with httpx.AsyncClient() as client:
            response = await client.get(url)
            return response.json()
```

### Async Generator Streaming

```python
from typing import AsyncGenerator

async def stream_large_data(
    db: AsyncSession,
    batch_size: int = 1000,
) -> AsyncGenerator[list[User], None]:
    offset = 0
    while True:
        result = await db.execute(
            select(User)
            .offset(offset)
            .limit(batch_size)
            .order_by(User.id)
        )
        users = result.scalars().all()

        if not users:
            break

        yield users
        offset += batch_size

# Usage
async def process_all_users(db: AsyncSession):
    async for batch in stream_large_data(db):
        for user in batch:
            await process_user(user)
```

---

## Docker Production Dockerfile

```dockerfile
# Dockerfile
FROM python:3.13-slim AS builder

WORKDIR /app

# Install build dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    && rm -rf /var/lib/apt/lists/*

# Install Python dependencies
COPY pyproject.toml poetry.lock ./
RUN pip install poetry && \
    poetry config virtualenvs.create false && \
    poetry install --no-dev --no-interaction --no-ansi

FROM python:3.13-slim AS runtime

WORKDIR /app

# Create non-root user
RUN addgroup --system --gid 1001 appgroup && \
    adduser --system --uid 1001 --gid 1001 appuser

# Copy dependencies from builder
COPY --from=builder /usr/local/lib/python3.13/site-packages /usr/local/lib/python3.13/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin

# Copy application code
COPY --chown=appuser:appgroup . .

# Switch to non-root user
USER appuser

# Expose port
EXPOSE 8000

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD python -c "import httpx; httpx.get('http://localhost:8000/health')"

# Run application
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
```

---

## pyproject.toml Complete Configuration

```toml
[tool.poetry]
name = "fastapi-app"
version = "1.0.0"
description = "Production FastAPI Application"
authors = ["Developer <[email protected]>"]
python = "^3.13"

[tool.poetry.dependencies]
python = "^3.13"
fastapi = "^0.115.0"
uvicorn = {extras = ["standard"], version = "^0.32.0"}
pydantic = "^2.9.0"
pydantic-settings = "^2.6.0"
sqlalchemy = {extras = ["asyncio"], version = "^2.0.0"}
asyncpg = "^0.30.0"
python-jose = {extras = ["cryptography"], version = "^3.3.0"}
passlib = {extras = ["bcrypt"], version = "^1.7.4"}
httpx = "^0.28.0"

[tool.poetry.group.dev.dependencies]
pytest = "^8.3.0"
pytest-asyncio = "^0.24.0"
pytest-cov = "^6.0.0"
aiosqlite = "^0.20.0"
ruff = "^0.8.0"
mypy = "^1.13.0"

[tool.ruff]
line-length = 100
target-version = "py313"

[tool.ruff.lint]
select = ["E", "F", "I", "N", "W", "UP", "B", "C4", "SIM"]
ignore = ["E501"]

[tool.ruff.lint.isort]
known-first-party = ["app"]

[tool.pytest.ini_options]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
testpaths = ["tests"]
addopts = "-v --tb=short --cov=app --cov-report=term-missing"

[tool.mypy]
python_version = "3.13"
strict = true
plugins = ["pydantic.mypy"]

[tool.coverage.run]
source = ["app"]
omit = ["tests/*"]

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
```

---

Last Updated: 2025-12-07
Version: 1.0.0

```

moai-lang-python | SkillHub