Back to skills
SkillHub ClubRun DevOpsFull StackBackendSecurity

moai-security-api

API security patterns - authentication, authorization, rate limiting, OWASP

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
0
Hot score
74
Updated
March 20, 2026
Overall rating
C0.8
Composite score
0.8
Best-practice grade
C64.8

Install command

npx @skill-hub/cli install jg-chalk-io-nora-livekit-moai-security-api
apisecurityauthenticationowaspjwt

Repository

jg-chalk-io/Nora-LiveKit

Skill path: .claude/skills/moai-security-api

API security patterns - authentication, authorization, rate limiting, OWASP

Open repository

Best for

Primary workflow: Run DevOps.

Technical facets: Full Stack, Backend, Security.

Target audience: everyone.

License: Unknown.

Original source

Catalog source: SkillHub Club.

Repository owner: jg-chalk-io.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install moai-security-api into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/jg-chalk-io/Nora-LiveKit before adding moai-security-api to shared team environments
  • Use moai-security-api for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: moai-security-api
version: 4.0.0
updated: 2025-11-20
status: stable
description: API security patterns - authentication, authorization, rate limiting, OWASP
allowed-tools: [Read, Bash, WebSearch, WebFetch]
---

# API Security Expert

**Secure API Design & Implementation**

> **Focus**: Authentication, Authorization, Rate Limiting, OWASP API Top 10  
> **Stack**: OAuth 2.0, JWT, API Keys, CORS

---

## Overview

Comprehensive patterns for securing RESTful and GraphQL APIs.

### Core Security Layers

1.  **Authentication**: Who are you? (OAuth, JWT, API keys)
2.  **Authorization**: What can you do? (RBAC, ABAC)
3.  **Rate Limiting**: Prevent abuse (token bucket, sliding window)
4.  **Input Validation**: Prevent injection attacks

---

## Quick Start

### 1. JWT Authentication

Issue and verify JWT tokens for API access.

**Key Concepts**:

- Token structure: `Header.Payload.Signature`
- Short-lived tokens (<1 hour)
- Refresh token rotation

See: [examples.md](./examples.md#jwt-authentication) for implementation

### 2. Role-Based Access Control (RBAC)

Enforce permissions based on user roles.

**Pattern**: Decorator/middleware checks user role before allowing access.

See: [examples.md](./examples.md#rbac-implementation) for code

### 3. Rate Limiting

Prevent API abuse with token bucket algorithm.

**Common Limits**:

- Public endpoints: 100 req/min
- Authenticated: 1000 req/min
- Admin: Unlimited

See: [examples.md](./examples.md#rate-limiting) for implementation

### 4. CORS Configuration

Restrict cross-origin requests to trusted domains.

**Critical**: Never use `allow_origins=["*"]` in production.

See: [examples.md](./examples.md#cors-setup) for configuration

---

## OWASP API Security Top 10 (2023)

| #   | Vulnerability                                   | Mitigation                       |
| --- | ----------------------------------------------- | -------------------------------- |
| 1   | Broken Object Level Authorization               | Validate user owns resource      |
| 2   | Broken Authentication                           | OAuth 2.0, MFA                   |
| 3   | Broken Object Property Level Authorization      | Don't expose internal fields     |
| 4   | Unrestricted Resource Consumption               | Rate limiting, pagination        |
| 5   | Broken Function Level Authorization             | Verify permissions per endpoint  |
| 6   | Unrestricted Access to Sensitive Business Flows | CAPTCHA, anomaly detection       |
| 7   | Server Side Request Forgery (SSRF)              | Validate URLs, block private IPs |
| 8   | Security Misconfiguration                       | Disable debug, remove defaults   |
| 9   | Improper Inventory Management                   | Document endpoints, versioning   |
| 10  | Unsafe Consumption of APIs                      | Validate third-party responses   |

See: [reference.md](./reference.md#owasp-details) for detailed mitigations

---

## Best Practices

1.  **HTTPS Only**: Enforce TLS 1.3+
2.  **Short-Lived Tokens**: JWT expiry <1 hour
3.  **API Versioning**: `/v1/users`, `/v2/users`
4.  **Logging**: Log auth failures, suspicious patterns
5.  **Error Messages**: Don't leak system details

---

## Validation Checklist

- [ ] **Auth**: JWT/OAuth implemented?
- [ ] **AuthZ**: RBAC/ABAC enforced?
- [ ] **Rate Limiting**: Configured per endpoint?
- [ ] **CORS**: Restricted to trusted origins?
- [ ] **HTTPS**: TLS 1.3+ enforced?
- [ ] **Input**: Pydantic/Zod validation used?

---

## Related Skills

- `moai-security-auth`: Authentication patterns
- `moai-security-devsecops`: Security testing
- `moai-domain-backend`: API design

---

## Additional Resources

- [examples.md](./examples.md): Code implementations
- [reference.md](./reference.md): Complete API reference

---

**Last Updated**: 2025-11-20


---

## Referenced Files

> The following files are referenced in this skill and included for context.

### examples.md

```markdown
# API Security Examples

Practical implementation examples for secure API development.

---

## JWT Authentication

### Python/FastAPI Implementation

```python
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer
from datetime import datetime, timedelta
import jwt

SECRET_KEY = "your-secret-key-from-env"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

security = HTTPBearer()

def create_access_token(data: dict) -> str:
    """Create JWT access token."""
    to_encode = data.copy()
    expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

def verify_token(credentials = Depends(security)) -> str:
    """Verify JWT token and return user_id."""
    try:
        token = credentials.credentials
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        user_id: str = payload.get("sub")
        if user_id is None:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid authentication credentials"
            )
        return user_id
    except jwt.ExpiredSignatureError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Token has expired"
        )
    except jwt.InvalidTokenError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid token"
        )

# Usage in endpoint
from fastapi import FastAPI

app = FastAPI()

@app.post("/login")
async def login(username: str, password: str):
    # Verify credentials (simplified)
    if username == "admin" and password == "secret":
        access_token = create_access_token(data={"sub": username})
        return {"access_token": access_token, "token_type": "bearer"}
    raise HTTPException(status_code=401, detail="Invalid credentials")

@app.get("/protected")
async def protected_route(user_id: str = Depends(verify_token)):
    return {"message": f"Hello {user_id}"}
```

### Node.js/Express Implementation

```javascript
const jwt = require("jsonwebtoken");
const express = require("express");

const SECRET_KEY = process.env.JWT_SECRET;
const app = express();

// Create token
function createToken(userId) {
  return jwt.sign({ sub: userId }, SECRET_KEY, { expiresIn: "1h" });
}

// Verify middleware
function verifyToken(req, res, next) {
  const authHeader = req.headers["authorization"];
  const token = authHeader && authHeader.split(" ")[1];

  if (!token) {
    return res.status(401).json({ error: "No token provided" });
  }

  jwt.verify(token, SECRET_KEY, (err, decoded) => {
    if (err) {
      return res.status(401).json({ error: "Invalid token" });
    }
    req.userId = decoded.sub;
    next();
  });
}

// Routes
app.post("/login", (req, res) => {
  const { username, password } = req.body;
  // Verify credentials
  if (username === "admin" && password === "secret") {
    const token = createToken(username);
    res.json({ access_token: token });
  } else {
    res.status(401).json({ error: "Invalid credentials" });
  }
});

app.get("/protected", verifyToken, (req, res) => {
  res.json({ message: `Hello ${req.userId}` });
});
```

---

## RBAC Implementation

### Decorator Pattern (Python)

```python
from functools import wraps
from fastapi import HTTPException, status

def require_role(*required_roles):
    """Decorator to enforce role-based access control."""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, user_id: str = Depends(verify_token), **kwargs):
            # Fetch user role from database
            user = await get_user(user_id)

            if user.role not in required_roles:
                raise HTTPException(
                    status_code=status.HTTP_403_FORBIDDEN,
                    detail=f"Requires one of roles: {required_roles}"
                )

            return await func(*args, user_id=user_id, **kwargs)
        return wrapper
    return decorator

# Usage
@app.delete("/users/{user_id}")
@require_role("admin", "moderator")
async def delete_user(user_id: int, current_user: str = Depends(verify_token)):
    # Only admins and moderators can delete users
    await User.delete(user_id)
    return {"status": "deleted"}

@app.get("/analytics")
@require_role("admin")
async def get_analytics(current_user: str = Depends(verify_token)):
    # Only admins can view analytics
    return await Analytics.get_all()
```

### Middleware Pattern (Node.js)

```javascript
function requireRole(...roles) {
  return (req, res, next) => {
    const userRole = req.user.role; // Set by verifyToken middleware

    if (!roles.includes(userRole)) {
      return res.status(403).json({
        error: `Requires one of roles: ${roles.join(", ")}`,
      });
    }

    next();
  };
}

// Usage
app.delete(
  "/users/:id",
  verifyToken,
  requireRole("admin", "moderator"),
  (req, res) => {
    // Delete user
    res.json({ status: "deleted" });
  }
);

app.get("/analytics", verifyToken, requireRole("admin"), (req, res) => {
  // Return analytics
  res.json({ data: [] });
});
```

---

## Rate Limiting

### Token Bucket Algorithm (Python)

```python
import time
from collections import defaultdict
from fastapi import Request, HTTPException

class RateLimiter:
    def __init__(self, requests_per_minute=60):
        self.rate = requests_per_minute / 60.0  # Tokens per second
        self.max_tokens = requests_per_minute
        self.tokens = defaultdict(lambda: self.max_tokens)
        self.last_check = defaultdict(time.time)

    def allow_request(self, identifier: str) -> bool:
        """Check if request is allowed under rate limit."""
        now = time.time()
        time_passed = now - self.last_check[identifier]

        # Refill tokens based on time passed
        self.tokens[identifier] += time_passed * self.rate
        self.tokens[identifier] = min(self.tokens[identifier], self.max_tokens)

        self.last_check[identifier] = now

        # Try to consume one token
        if self.tokens[identifier] >= 1:
            self.tokens[identifier] -= 1
            return True

        return False

# Create rate limiter
rate_limiter = RateLimiter(requests_per_minute=100)

# Middleware
from fastapi import Request

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    # Use IP address as identifier
    client_ip = request.client.host

    if not rate_limiter.allow_request(client_ip):
        raise HTTPException(
            status_code=429,
            detail="Too many requests. Please try again later.",
            headers={"Retry-After": "60"}
        )

    response = await call_next(request)
    return response
```

### Redis-Based Rate Limiting (Node.js)

```javascript
const redis = require("redis");
const client = redis.createClient();

async function rateLimitMiddleware(req, res, next) {
  const identifier = req.ip;
  const key = `rate_limit:${identifier}`;
  const limit = 100; // requests per minute
  const window = 60; // seconds

  const current = await client.incr(key);

  if (current === 1) {
    // First request, set expiry
    await client.expire(key, window);
  }

  if (current > limit) {
    return res.status(429).json({
      error: "Too many requests",
      retry_after: window,
    });
  }

  next();
}

app.use(rateLimitMiddleware);
```

---

## CORS Setup

### Secure CORS Configuration (FastAPI)

```python
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

# Production configuration
app.add_middleware(
    CORSMiddleware,
    allow_origins=[
        "https://app.example.com",
        "https://admin.example.com"
    ],  # Specific domains only
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE"],
    allow_headers=["Authorization", "Content-Type"],
    max_age=3600,  # Cache preflight for 1 hour
)

# Development configuration (separate file)
if os.getenv("ENV") == "development":
    app.add_middleware(
        CORSMiddleware,
        allow_origins=["http://localhost:3000"],
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
    )
```

### Express CORS (Node.js)

```javascript
const cors = require("cors");

// Production
const corsOptions = {
  origin: ["https://app.example.com", "https://admin.example.com"],
  credentials: true,
  methods: ["GET", "POST", "PUT", "DELETE"],
  allowedHeaders: ["Authorization", "Content-Type"],
  maxAge: 3600,
};

app.use(cors(corsOptions));

// Development
if (process.env.NODE_ENV === "development") {
  app.use(
    cors({
      origin: "http://localhost:3000",
      credentials: true,
    })
  );
}
```

---

## Input Validation

### Pydantic Models (Python)

```python
from pydantic import BaseModel, EmailStr, Field, constr, validator

class UserCreate(BaseModel):
    email: EmailStr
    password: constr(min_length=8, max_length=100)
    age: int = Field(ge=18, le=120, description="User age")
    username: constr(regex=r'^[a-zA-Z0-9_-]{3,20}$')

    @validator('password')
    def password_strength(cls, v):
        if not any(c.isupper() for c in v):
            raise ValueError('Password must contain uppercase')
        if not any(c.isdigit() for c in v):
            raise ValueError('Password must contain digit')
        return v

@app.post("/users")
async def create_user(user: UserCreate):
    # Input is automatically validated
    hashed_password = hash_password(user.password)
    new_user = await User.create(
        email=user.email,
        password=hashed_password,
        age=user.age
    )
    return {"id": new_user.id}
```

### Zod Validation (TypeScript)

```typescript
import { z } from "zod";

const userCreateSchema = z.object({
  email: z.string().email(),
  password: z
    .string()
    .min(8, "Password must be at least 8 characters")
    .max(100)
    .refine((pwd) => /[A-Z]/.test(pwd), "Must contain uppercase")
    .refine((pwd) => /[0-9]/.test(pwd), "Must contain digit"),
  age: z.number().int().min(18).max(120),
  username: z.string().regex(/^[a-zA-Z0-9_-]{3,20}$/),
});

type UserCreate = z.infer<typeof userCreateSchema>;

app.post("/users", async (req, res) => {
  try {
    const userData = userCreateSchema.parse(req.body);
    const hashedPassword = await hashPassword(userData.password);
    const newUser = await User.create({
      ...userData,
      password: hashedPassword,
    });
    res.json({ id: newUser.id });
  } catch (error) {
    if (error instanceof z.ZodError) {
      res.status(400).json({ errors: error.errors });
    } else {
      res.status(500).json({ error: "Internal server error" });
    }
  }
});
```

---

**See also**: [reference.md](./reference.md) for complete API reference and advanced patterns.

```

### reference.md

```markdown
# API Security Reference

Complete reference guide for OWASP API Security Top 10 and advanced patterns.

---

## OWASP API Security Top 10 (2023) - Detailed

### 1. Broken Object Level Authorization (BOLA)

**Description**: API exposes endpoints that handle object identifiers without proper authorization checks.

**Attack Example**:

```http
GET /api/users/123/orders
Authorization: Bearer <user_456_token>

# User 456 shouldn't see User 123's orders!
```

**Mitigation**:

```python
@app.get("/users/{user_id}/orders")
async def get_user_orders(user_id: int, current_user: User = Depends(get_current_user)):
    # Verify user owns this resource
    if current_user.id != user_id and not current_user.is_admin:
        raise HTTPException(403, "Access denied")

    return await Order.get_by_user(user_id)
```

**Prevention Checklist**:

- [ ] Validate user owns requested resource
- [ ] Implement object-level permission checks
- [ ] Use UUIDs instead of sequential IDs
- [ ] Log unauthorized access attempts

---

### 2. Broken Authentication

**Description**: Weak authentication mechanisms allowing attackers to compromise tokens or credentials.

**Common Issues**:

- Weak password policies
- No MFA
- Long-lived tokens
- Credentials in URLs

**Mitigation**:

```python
# Strong password policy
PASSWORD_REQUIREMENTS = {
    "min_length": 12,
    "require_uppercase": True,
    "require_lowercase": True,
    "require_digits": True,
    "require_special": True,
}

# Short-lived JWT
ACCESS_TOKEN_EXPIRE_MINUTES = 15  # Not 24 hours!
REFRESH_TOKEN_EXPIRE_DAYS = 7

# MFA enforcement
@app.post("/login")
async def login(username: str, password: str):
    user = await authenticate(username, password)
    if user.mfa_enabled:
        otp_token = generate_otp_challenge(user)
        return {"requires_mfa": True, "otp_token": otp_token}
    # ... issue tokens
```

---

### 3. Broken Object Property Level Authorization

**Description**: API returns more data than necessary, exposing sensitive fields.

**Attack Example**:

```json
// Response exposes internal fields
{
  "id": 123,
  "name": "John Doe",
  "email": "[email protected]",
  "password_hash": "$2b$12$...", // ❌ Should not be exposed
  "is_admin": true, // ❌ Internal field
  "created_at": "2025-01-01"
}
```

**Mitigation**:

```python
from pydantic import BaseModel

class UserPublic(BaseModel):
    """Public user representation - excludes sensitive fields."""
    id: int
    name: str
    email: str
    created_at: datetime

    class Config:
        from_attributes = True

@app.get("/users/{user_id}", response_model=UserPublic)
async def get_user(user_id: int):
    user = await User.get(user_id)
    return user  # Pydantic automatically filters fields
```

---

### 4. Unrestricted Resource Consumption

**Description**: No limits on API usage, leading to DoS or excessive billing.

**Attack Scenarios**:

- No pagination (fetch 1M records)
- No rate limiting
- Expensive operations without throttling
- Large file uploads

**Mitigation**:

```python
from fastapi import Query

@app.get("/users")
async def list_users(
    page: int = Query(1, ge=1),
    limit: int = Query(20, ge=1, le=100)  # Max 100 per request
):
    offset = (page - 1) * limit
    users = await User.get_paginated(offset=offset, limit=limit)
    return {
        "users": users,
        "page": page,
        "limit": limit,
        "total": await User.count()
    }

# File upload limits
@app.post("/upload")
async def upload_file(file: UploadFile):
    MAX_FILE_SIZE = 10 * 1024 * 1024  # 10MB

    size = 0
    chunks = []
    async for chunk in file.stream():
        size += len(chunk)
        if size > MAX_FILE_SIZE:
            raise HTTPException(413, "File too large")
        chunks.append(chunk)

    # Process file...
```

---

### 5. Broken Function Level Authorization

**Description**: Regular users can access admin/privileged functions.

**Attack Example**:

```http
DELETE /api/admin/users/123
Authorization: Bearer <regular_user_token>

# Regular user shouldn't access admin endpoints!
```

**Mitigation**:

```python
# Route-level protection
admin_router = APIRouter(prefix="/admin", dependencies=[Depends(require_admin)])

@admin_router.delete("/users/{user_id}")
async def delete_user(user_id: int):
    # Only reachable if user is admin
    await User.delete(user_id)
    return {"status": "deleted"}

# Function-level check
def require_admin(current_user: User = Depends(get_current_user)):
    if not current_user.is_admin:
        raise HTTPException(403, "Admin access required")
    return current_user
```

---

### 6. Unrestricted Access to Sensitive Business Flows

**Description**: Lack of flow control allows automated attacks (credential stuffing, scalping).

**Scenarios**:

- Mass account creation
- Automated ticket purchasing
- Credential stuffing attacks

**Mitigation**:

```python
from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

# Stricter limits on sensitive endpoints
@app.post("/register")
@limiter.limit("5/hour")  # Only 5 registrations per hour per IP
async def register(user_data: UserCreate):
    # Add CAPTCHA verification
    if not verify_captcha(user_data.captcha_token):
        raise HTTPException(400, "Invalid CAPTCHA")

    # Detect suspicious patterns
    if await is_suspicious_activity(user_data.email):
        await send_alert_to_security_team()
        raise HTTPException(429, "Too many requests")

    # Proceed with registration...
```

---

### 7. Server Side Request Forgery (SSRF)

**Description**: Attacker tricks server into making requests to internal services.

**Attack Example**:

```http
POST /api/fetch-url
{
  "url": "http://169.254.169.254/latest/meta-data/iam/security-credentials/"
}

# Accesses AWS metadata service!
```

**Mitigation**:

```python
import ipaddress
from urllib.parse import urlparse

BLOCKED_IP_RANGES = [
    ipaddress.ip_network("10.0.0.0/8"),      # Private
    ipaddress.ip_network("172.16.0.0/12"),   # Private
    ipaddress.ip_network("192.168.0.0/16"),  # Private
    ipaddress.ip_network("169.254.0.0/16"),  # Link-local (AWS metadata)
    ipaddress.ip_network("127.0.0.0/8"),     # Loopback
]

ALLOWED_DOMAINS = ["api.trusted-partner.com", "cdn.example.com"]

async def safe_fetch_url(url: str):
    """Safely fetch URL with SSRF protection."""
    # Parse and validate URL
    parsed = urlparse(url)

    # Check protocol
    if parsed.scheme not in ["http", "https"]:
        raise ValueError("Invalid protocol")

    # Check domain allowlist
    if parsed.hostname not in ALLOWED_DOMAINS:
        raise ValueError("Domain not allowed")

    # Resolve IP and check against blocklist
    import socket
    ip = socket.gethostbyname(parsed.hostname)
    ip_obj = ipaddress.ip_address(ip)

    for blocked_range in BLOCKED_IP_RANGES:
        if ip_obj in blocked_range:
            raise ValueError("IP address blocked")

    # Fetch with timeout
    async with httpx.AsyncClient(timeout=5.0) as client:
        response = await client.get(url)
        return response.text
```

---

### 8. Security Misconfiguration

**Common Issues**:

- Debug mode enabled in production
- Default credentials
- Verbose error messages
- Missing security headers

**Mitigation**:

```python
# Production configuration
app = FastAPI(
    debug=False,  # Never True in production
    docs_url=None,  # Disable Swagger in production
    redoc_url=None,
)

# Security headers middleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from starlette.middleware.httpsredirect import HTTPSRedirectMiddleware

app.add_middleware(TrustedHostMiddleware, allowed_hosts=["example.com", "*.example.com"])
app.add_middleware(HTTPSRedirectMiddleware)

@app.middleware("http")
async def add_security_headers(request, call_next):
    response = await call_next(request)
    response.headers["X-Content-Type-Options"] = "nosniff"
    response.headers["X-Frame-Options"] = "DENY"
    response.headers["X-XSS-Protection"] = "1; mode=block"
    response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
    return response

# Generic error responses
@app.exception_handler(Exception)
async def generic_exception_handler(request, exc):
    # Log detailed error internally
    logger.error(f"Unhandled exception: {exc}", exc_info=True)

    # Return generic message to client
    return JSONResponse(
        status_code=500,
        content={"detail": "Internal server error"}
    )
```

---

### 9. Improper Inventory Management

**Issues**:

- Undocumented endpoints
- Deprecated versions still active
- No API versioning

**Mitigation**:

```python
# API versioning
from fastapi import APIRouter

v1_router = APIRouter(prefix="/v1")
v2_router = APIRouter(prefix="/v2")

# Deprecation warnings
@v1_router.get("/users", deprecated=True)
async def get_users_v1():
    return {
        "warning": "This endpoint is deprecated. Use /v2/users instead.",
        "users": []
    }

@v2_router.get("/users")
async def get_users_v2():
    return {"users": []}

app.include_router(v1_router)
app.include_router(v2_router)

# OpenAPI documentation with versions
app = FastAPI(
    title="My API",
    version="2.0.0",
    description="API v2 - v1 deprecated, will be removed 2026-01-01"
)
```

---

### 10. Unsafe Consumption of APIs

**Description**: Blindly trusting third-party API responses.

**Mitigation**:

```python
from pydantic import BaseModel, ValidationError

class ThirdPartyResponse(BaseModel):
    """Validate external API responses."""
    user_id: int
    name: str
    email: str

async def consume_third_party_api(url: str):
    async with httpx.AsyncClient(timeout=10.0) as client:
        try:
            response = await client.get(url)
            response.raise_for_status()

            # Validate response structure
            data = ThirdPartyResponse(**response.json())

            # Additional business logic validation
            if data.user_id <= 0:
                raise ValueError("Invalid user_id")

            return data

        except httpx.HTTPError as e:
            logger.error(f"Third-party API error: {e}")
            raise HTTPException(502, "External service unavailable")
        except ValidationError as e:
            logger.error(f"Invalid third-party response: {e}")
            raise HTTPException(502, "Invalid external response")
```

---

## Advanced Security Patterns

### API Key Rotation

```python
class APIKey(BaseModel):
    key: str
    created_at: datetime
    expires_at: datetime
    last_used: datetime | None

async def rotate_api_key(old_key: str) -> str:
    """Generate new API key while deprecating old one."""
    # Mark old key as deprecated (grace period: 30 days)
    await APIKey.update(
        key=old_key,
        status="deprecated",
        expires_at=datetime.utcnow() + timedelta(days=30)
    )

    # Generate new key
    new_key = secrets.token_urlsafe(32)
    await APIKey.create(
        key=new_key,
        created_at=datetime.utcnow(),
        expires_at=datetime.utcnow() + timedelta(days=365)
    )

    return new_key
```

### Request Signing

```python
import hmac
import hashlib

def sign_request(payload: dict, secret: str) -> str:
    """Sign request payload with HMAC."""
    message = json.dumps(payload, sort_keys=True).encode()
    signature = hmac.new(
        secret.encode(),
        message,
        hashlib.sha256
    ).hexdigest()
    return signature

def verify_request_signature(payload: dict, signature: str, secret: str) -> bool:
    """Verify request signature."""
    expected_signature = sign_request(payload, secret)
    return hmac.compare_digest(expected_signature, signature)
```

---

## Security Testing

### Automated Security Scanning

```bash
# Install security tools
pip install bandit safety

# Scan for security issues
bandit -r app/
safety check --json

# API security testing
pip install owasp-zap-api-python

# Run ZAP scan
zap-cli quick-scan --self-contained http://localhost:8000
```

---

**See also**: [SKILL.md](./SKILL.md) for overview and quick start guide.

```

moai-security-api | SkillHub