moai-security-api
API security patterns - authentication, authorization, rate limiting, OWASP
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install jg-chalk-io-nora-livekit-moai-security-api
Repository
Skill path: .claude/skills/moai-security-api
API security patterns - authentication, authorization, rate limiting, OWASP
Open repositoryBest for
Primary workflow: Run DevOps.
Technical facets: Full Stack, Backend, Security.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: jg-chalk-io.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install moai-security-api into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/jg-chalk-io/Nora-LiveKit before adding moai-security-api to shared team environments
- Use moai-security-api for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: moai-security-api
version: 4.0.0
updated: 2025-11-20
status: stable
description: API security patterns - authentication, authorization, rate limiting, OWASP
allowed-tools: [Read, Bash, WebSearch, WebFetch]
---
# API Security Expert
**Secure API Design & Implementation**
> **Focus**: Authentication, Authorization, Rate Limiting, OWASP API Top 10
> **Stack**: OAuth 2.0, JWT, API Keys, CORS
---
## Overview
Comprehensive patterns for securing RESTful and GraphQL APIs.
### Core Security Layers
1. **Authentication**: Who are you? (OAuth, JWT, API keys)
2. **Authorization**: What can you do? (RBAC, ABAC)
3. **Rate Limiting**: Prevent abuse (token bucket, sliding window)
4. **Input Validation**: Prevent injection attacks
---
## Quick Start
### 1. JWT Authentication
Issue and verify JWT tokens for API access.
**Key Concepts**:
- Token structure: `Header.Payload.Signature`
- Short-lived tokens (<1 hour)
- Refresh token rotation
See: [examples.md](./examples.md#jwt-authentication) for implementation
### 2. Role-Based Access Control (RBAC)
Enforce permissions based on user roles.
**Pattern**: Decorator/middleware checks user role before allowing access.
See: [examples.md](./examples.md#rbac-implementation) for code
### 3. Rate Limiting
Prevent API abuse with token bucket algorithm.
**Common Limits**:
- Public endpoints: 100 req/min
- Authenticated: 1000 req/min
- Admin: Unlimited
See: [examples.md](./examples.md#rate-limiting) for implementation
### 4. CORS Configuration
Restrict cross-origin requests to trusted domains.
**Critical**: Never use `allow_origins=["*"]` in production.
See: [examples.md](./examples.md#cors-setup) for configuration
---
## OWASP API Security Top 10 (2023)
| # | Vulnerability | Mitigation |
| --- | ----------------------------------------------- | -------------------------------- |
| 1 | Broken Object Level Authorization | Validate user owns resource |
| 2 | Broken Authentication | OAuth 2.0, MFA |
| 3 | Broken Object Property Level Authorization | Don't expose internal fields |
| 4 | Unrestricted Resource Consumption | Rate limiting, pagination |
| 5 | Broken Function Level Authorization | Verify permissions per endpoint |
| 6 | Unrestricted Access to Sensitive Business Flows | CAPTCHA, anomaly detection |
| 7 | Server Side Request Forgery (SSRF) | Validate URLs, block private IPs |
| 8 | Security Misconfiguration | Disable debug, remove defaults |
| 9 | Improper Inventory Management | Document endpoints, versioning |
| 10 | Unsafe Consumption of APIs | Validate third-party responses |
See: [reference.md](./reference.md#owasp-details) for detailed mitigations
---
## Best Practices
1. **HTTPS Only**: Enforce TLS 1.3+
2. **Short-Lived Tokens**: JWT expiry <1 hour
3. **API Versioning**: `/v1/users`, `/v2/users`
4. **Logging**: Log auth failures, suspicious patterns
5. **Error Messages**: Don't leak system details
---
## Validation Checklist
- [ ] **Auth**: JWT/OAuth implemented?
- [ ] **AuthZ**: RBAC/ABAC enforced?
- [ ] **Rate Limiting**: Configured per endpoint?
- [ ] **CORS**: Restricted to trusted origins?
- [ ] **HTTPS**: TLS 1.3+ enforced?
- [ ] **Input**: Pydantic/Zod validation used?
---
## Related Skills
- `moai-security-auth`: Authentication patterns
- `moai-security-devsecops`: Security testing
- `moai-domain-backend`: API design
---
## Additional Resources
- [examples.md](./examples.md): Code implementations
- [reference.md](./reference.md): Complete API reference
---
**Last Updated**: 2025-11-20
---
## Referenced Files
> The following files are referenced in this skill and included for context.
### examples.md
```markdown
# API Security Examples
Practical implementation examples for secure API development.
---
## JWT Authentication
### Python/FastAPI Implementation
```python
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer
from datetime import datetime, timedelta
import jwt
SECRET_KEY = "your-secret-key-from-env"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
security = HTTPBearer()
def create_access_token(data: dict) -> str:
"""Create JWT access token."""
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(credentials = Depends(security)) -> str:
"""Verify JWT token and return user_id."""
try:
token = credentials.credentials
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials"
)
return user_id
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired"
)
except jwt.InvalidTokenError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token"
)
# Usage in endpoint
from fastapi import FastAPI
app = FastAPI()
@app.post("/login")
async def login(username: str, password: str):
# Verify credentials (simplified)
if username == "admin" and password == "secret":
access_token = create_access_token(data={"sub": username})
return {"access_token": access_token, "token_type": "bearer"}
raise HTTPException(status_code=401, detail="Invalid credentials")
@app.get("/protected")
async def protected_route(user_id: str = Depends(verify_token)):
return {"message": f"Hello {user_id}"}
```
### Node.js/Express Implementation
```javascript
const jwt = require("jsonwebtoken");
const express = require("express");
const SECRET_KEY = process.env.JWT_SECRET;
const app = express();
// Create token
function createToken(userId) {
return jwt.sign({ sub: userId }, SECRET_KEY, { expiresIn: "1h" });
}
// Verify middleware
function verifyToken(req, res, next) {
const authHeader = req.headers["authorization"];
const token = authHeader && authHeader.split(" ")[1];
if (!token) {
return res.status(401).json({ error: "No token provided" });
}
jwt.verify(token, SECRET_KEY, (err, decoded) => {
if (err) {
return res.status(401).json({ error: "Invalid token" });
}
req.userId = decoded.sub;
next();
});
}
// Routes
app.post("/login", (req, res) => {
const { username, password } = req.body;
// Verify credentials
if (username === "admin" && password === "secret") {
const token = createToken(username);
res.json({ access_token: token });
} else {
res.status(401).json({ error: "Invalid credentials" });
}
});
app.get("/protected", verifyToken, (req, res) => {
res.json({ message: `Hello ${req.userId}` });
});
```
---
## RBAC Implementation
### Decorator Pattern (Python)
```python
from functools import wraps
from fastapi import HTTPException, status
def require_role(*required_roles):
"""Decorator to enforce role-based access control."""
def decorator(func):
@wraps(func)
async def wrapper(*args, user_id: str = Depends(verify_token), **kwargs):
# Fetch user role from database
user = await get_user(user_id)
if user.role not in required_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Requires one of roles: {required_roles}"
)
return await func(*args, user_id=user_id, **kwargs)
return wrapper
return decorator
# Usage
@app.delete("/users/{user_id}")
@require_role("admin", "moderator")
async def delete_user(user_id: int, current_user: str = Depends(verify_token)):
# Only admins and moderators can delete users
await User.delete(user_id)
return {"status": "deleted"}
@app.get("/analytics")
@require_role("admin")
async def get_analytics(current_user: str = Depends(verify_token)):
# Only admins can view analytics
return await Analytics.get_all()
```
### Middleware Pattern (Node.js)
```javascript
function requireRole(...roles) {
return (req, res, next) => {
const userRole = req.user.role; // Set by verifyToken middleware
if (!roles.includes(userRole)) {
return res.status(403).json({
error: `Requires one of roles: ${roles.join(", ")}`,
});
}
next();
};
}
// Usage
app.delete(
"/users/:id",
verifyToken,
requireRole("admin", "moderator"),
(req, res) => {
// Delete user
res.json({ status: "deleted" });
}
);
app.get("/analytics", verifyToken, requireRole("admin"), (req, res) => {
// Return analytics
res.json({ data: [] });
});
```
---
## Rate Limiting
### Token Bucket Algorithm (Python)
```python
import time
from collections import defaultdict
from fastapi import Request, HTTPException
class RateLimiter:
def __init__(self, requests_per_minute=60):
self.rate = requests_per_minute / 60.0 # Tokens per second
self.max_tokens = requests_per_minute
self.tokens = defaultdict(lambda: self.max_tokens)
self.last_check = defaultdict(time.time)
def allow_request(self, identifier: str) -> bool:
"""Check if request is allowed under rate limit."""
now = time.time()
time_passed = now - self.last_check[identifier]
# Refill tokens based on time passed
self.tokens[identifier] += time_passed * self.rate
self.tokens[identifier] = min(self.tokens[identifier], self.max_tokens)
self.last_check[identifier] = now
# Try to consume one token
if self.tokens[identifier] >= 1:
self.tokens[identifier] -= 1
return True
return False
# Create rate limiter
rate_limiter = RateLimiter(requests_per_minute=100)
# Middleware
from fastapi import Request
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
# Use IP address as identifier
client_ip = request.client.host
if not rate_limiter.allow_request(client_ip):
raise HTTPException(
status_code=429,
detail="Too many requests. Please try again later.",
headers={"Retry-After": "60"}
)
response = await call_next(request)
return response
```
### Redis-Based Rate Limiting (Node.js)
```javascript
const redis = require("redis");
const client = redis.createClient();
async function rateLimitMiddleware(req, res, next) {
const identifier = req.ip;
const key = `rate_limit:${identifier}`;
const limit = 100; // requests per minute
const window = 60; // seconds
const current = await client.incr(key);
if (current === 1) {
// First request, set expiry
await client.expire(key, window);
}
if (current > limit) {
return res.status(429).json({
error: "Too many requests",
retry_after: window,
});
}
next();
}
app.use(rateLimitMiddleware);
```
---
## CORS Setup
### Secure CORS Configuration (FastAPI)
```python
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
# Production configuration
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://app.example.com",
"https://admin.example.com"
], # Specific domains only
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
max_age=3600, # Cache preflight for 1 hour
)
# Development configuration (separate file)
if os.getenv("ENV") == "development":
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
```
### Express CORS (Node.js)
```javascript
const cors = require("cors");
// Production
const corsOptions = {
origin: ["https://app.example.com", "https://admin.example.com"],
credentials: true,
methods: ["GET", "POST", "PUT", "DELETE"],
allowedHeaders: ["Authorization", "Content-Type"],
maxAge: 3600,
};
app.use(cors(corsOptions));
// Development
if (process.env.NODE_ENV === "development") {
app.use(
cors({
origin: "http://localhost:3000",
credentials: true,
})
);
}
```
---
## Input Validation
### Pydantic Models (Python)
```python
from pydantic import BaseModel, EmailStr, Field, constr, validator
class UserCreate(BaseModel):
email: EmailStr
password: constr(min_length=8, max_length=100)
age: int = Field(ge=18, le=120, description="User age")
username: constr(regex=r'^[a-zA-Z0-9_-]{3,20}$')
@validator('password')
def password_strength(cls, v):
if not any(c.isupper() for c in v):
raise ValueError('Password must contain uppercase')
if not any(c.isdigit() for c in v):
raise ValueError('Password must contain digit')
return v
@app.post("/users")
async def create_user(user: UserCreate):
# Input is automatically validated
hashed_password = hash_password(user.password)
new_user = await User.create(
email=user.email,
password=hashed_password,
age=user.age
)
return {"id": new_user.id}
```
### Zod Validation (TypeScript)
```typescript
import { z } from "zod";
const userCreateSchema = z.object({
email: z.string().email(),
password: z
.string()
.min(8, "Password must be at least 8 characters")
.max(100)
.refine((pwd) => /[A-Z]/.test(pwd), "Must contain uppercase")
.refine((pwd) => /[0-9]/.test(pwd), "Must contain digit"),
age: z.number().int().min(18).max(120),
username: z.string().regex(/^[a-zA-Z0-9_-]{3,20}$/),
});
type UserCreate = z.infer<typeof userCreateSchema>;
app.post("/users", async (req, res) => {
try {
const userData = userCreateSchema.parse(req.body);
const hashedPassword = await hashPassword(userData.password);
const newUser = await User.create({
...userData,
password: hashedPassword,
});
res.json({ id: newUser.id });
} catch (error) {
if (error instanceof z.ZodError) {
res.status(400).json({ errors: error.errors });
} else {
res.status(500).json({ error: "Internal server error" });
}
}
});
```
---
**See also**: [reference.md](./reference.md) for complete API reference and advanced patterns.
```
### reference.md
```markdown
# API Security Reference
Complete reference guide for OWASP API Security Top 10 and advanced patterns.
---
## OWASP API Security Top 10 (2023) - Detailed
### 1. Broken Object Level Authorization (BOLA)
**Description**: API exposes endpoints that handle object identifiers without proper authorization checks.
**Attack Example**:
```http
GET /api/users/123/orders
Authorization: Bearer <user_456_token>
# User 456 shouldn't see User 123's orders!
```
**Mitigation**:
```python
@app.get("/users/{user_id}/orders")
async def get_user_orders(user_id: int, current_user: User = Depends(get_current_user)):
# Verify user owns this resource
if current_user.id != user_id and not current_user.is_admin:
raise HTTPException(403, "Access denied")
return await Order.get_by_user(user_id)
```
**Prevention Checklist**:
- [ ] Validate user owns requested resource
- [ ] Implement object-level permission checks
- [ ] Use UUIDs instead of sequential IDs
- [ ] Log unauthorized access attempts
---
### 2. Broken Authentication
**Description**: Weak authentication mechanisms allowing attackers to compromise tokens or credentials.
**Common Issues**:
- Weak password policies
- No MFA
- Long-lived tokens
- Credentials in URLs
**Mitigation**:
```python
# Strong password policy
PASSWORD_REQUIREMENTS = {
"min_length": 12,
"require_uppercase": True,
"require_lowercase": True,
"require_digits": True,
"require_special": True,
}
# Short-lived JWT
ACCESS_TOKEN_EXPIRE_MINUTES = 15 # Not 24 hours!
REFRESH_TOKEN_EXPIRE_DAYS = 7
# MFA enforcement
@app.post("/login")
async def login(username: str, password: str):
user = await authenticate(username, password)
if user.mfa_enabled:
otp_token = generate_otp_challenge(user)
return {"requires_mfa": True, "otp_token": otp_token}
# ... issue tokens
```
---
### 3. Broken Object Property Level Authorization
**Description**: API returns more data than necessary, exposing sensitive fields.
**Attack Example**:
```json
// Response exposes internal fields
{
"id": 123,
"name": "John Doe",
"email": "[email protected]",
"password_hash": "$2b$12$...", // ❌ Should not be exposed
"is_admin": true, // ❌ Internal field
"created_at": "2025-01-01"
}
```
**Mitigation**:
```python
from pydantic import BaseModel
class UserPublic(BaseModel):
"""Public user representation - excludes sensitive fields."""
id: int
name: str
email: str
created_at: datetime
class Config:
from_attributes = True
@app.get("/users/{user_id}", response_model=UserPublic)
async def get_user(user_id: int):
user = await User.get(user_id)
return user # Pydantic automatically filters fields
```
---
### 4. Unrestricted Resource Consumption
**Description**: No limits on API usage, leading to DoS or excessive billing.
**Attack Scenarios**:
- No pagination (fetch 1M records)
- No rate limiting
- Expensive operations without throttling
- Large file uploads
**Mitigation**:
```python
from fastapi import Query
@app.get("/users")
async def list_users(
page: int = Query(1, ge=1),
limit: int = Query(20, ge=1, le=100) # Max 100 per request
):
offset = (page - 1) * limit
users = await User.get_paginated(offset=offset, limit=limit)
return {
"users": users,
"page": page,
"limit": limit,
"total": await User.count()
}
# File upload limits
@app.post("/upload")
async def upload_file(file: UploadFile):
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
size = 0
chunks = []
async for chunk in file.stream():
size += len(chunk)
if size > MAX_FILE_SIZE:
raise HTTPException(413, "File too large")
chunks.append(chunk)
# Process file...
```
---
### 5. Broken Function Level Authorization
**Description**: Regular users can access admin/privileged functions.
**Attack Example**:
```http
DELETE /api/admin/users/123
Authorization: Bearer <regular_user_token>
# Regular user shouldn't access admin endpoints!
```
**Mitigation**:
```python
# Route-level protection
admin_router = APIRouter(prefix="/admin", dependencies=[Depends(require_admin)])
@admin_router.delete("/users/{user_id}")
async def delete_user(user_id: int):
# Only reachable if user is admin
await User.delete(user_id)
return {"status": "deleted"}
# Function-level check
def require_admin(current_user: User = Depends(get_current_user)):
if not current_user.is_admin:
raise HTTPException(403, "Admin access required")
return current_user
```
---
### 6. Unrestricted Access to Sensitive Business Flows
**Description**: Lack of flow control allows automated attacks (credential stuffing, scalping).
**Scenarios**:
- Mass account creation
- Automated ticket purchasing
- Credential stuffing attacks
**Mitigation**:
```python
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
# Stricter limits on sensitive endpoints
@app.post("/register")
@limiter.limit("5/hour") # Only 5 registrations per hour per IP
async def register(user_data: UserCreate):
# Add CAPTCHA verification
if not verify_captcha(user_data.captcha_token):
raise HTTPException(400, "Invalid CAPTCHA")
# Detect suspicious patterns
if await is_suspicious_activity(user_data.email):
await send_alert_to_security_team()
raise HTTPException(429, "Too many requests")
# Proceed with registration...
```
---
### 7. Server Side Request Forgery (SSRF)
**Description**: Attacker tricks server into making requests to internal services.
**Attack Example**:
```http
POST /api/fetch-url
{
"url": "http://169.254.169.254/latest/meta-data/iam/security-credentials/"
}
# Accesses AWS metadata service!
```
**Mitigation**:
```python
import ipaddress
from urllib.parse import urlparse
BLOCKED_IP_RANGES = [
ipaddress.ip_network("10.0.0.0/8"), # Private
ipaddress.ip_network("172.16.0.0/12"), # Private
ipaddress.ip_network("192.168.0.0/16"), # Private
ipaddress.ip_network("169.254.0.0/16"), # Link-local (AWS metadata)
ipaddress.ip_network("127.0.0.0/8"), # Loopback
]
ALLOWED_DOMAINS = ["api.trusted-partner.com", "cdn.example.com"]
async def safe_fetch_url(url: str):
"""Safely fetch URL with SSRF protection."""
# Parse and validate URL
parsed = urlparse(url)
# Check protocol
if parsed.scheme not in ["http", "https"]:
raise ValueError("Invalid protocol")
# Check domain allowlist
if parsed.hostname not in ALLOWED_DOMAINS:
raise ValueError("Domain not allowed")
# Resolve IP and check against blocklist
import socket
ip = socket.gethostbyname(parsed.hostname)
ip_obj = ipaddress.ip_address(ip)
for blocked_range in BLOCKED_IP_RANGES:
if ip_obj in blocked_range:
raise ValueError("IP address blocked")
# Fetch with timeout
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(url)
return response.text
```
---
### 8. Security Misconfiguration
**Common Issues**:
- Debug mode enabled in production
- Default credentials
- Verbose error messages
- Missing security headers
**Mitigation**:
```python
# Production configuration
app = FastAPI(
debug=False, # Never True in production
docs_url=None, # Disable Swagger in production
redoc_url=None,
)
# Security headers middleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from starlette.middleware.httpsredirect import HTTPSRedirectMiddleware
app.add_middleware(TrustedHostMiddleware, allowed_hosts=["example.com", "*.example.com"])
app.add_middleware(HTTPSRedirectMiddleware)
@app.middleware("http")
async def add_security_headers(request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
return response
# Generic error responses
@app.exception_handler(Exception)
async def generic_exception_handler(request, exc):
# Log detailed error internally
logger.error(f"Unhandled exception: {exc}", exc_info=True)
# Return generic message to client
return JSONResponse(
status_code=500,
content={"detail": "Internal server error"}
)
```
---
### 9. Improper Inventory Management
**Issues**:
- Undocumented endpoints
- Deprecated versions still active
- No API versioning
**Mitigation**:
```python
# API versioning
from fastapi import APIRouter
v1_router = APIRouter(prefix="/v1")
v2_router = APIRouter(prefix="/v2")
# Deprecation warnings
@v1_router.get("/users", deprecated=True)
async def get_users_v1():
return {
"warning": "This endpoint is deprecated. Use /v2/users instead.",
"users": []
}
@v2_router.get("/users")
async def get_users_v2():
return {"users": []}
app.include_router(v1_router)
app.include_router(v2_router)
# OpenAPI documentation with versions
app = FastAPI(
title="My API",
version="2.0.0",
description="API v2 - v1 deprecated, will be removed 2026-01-01"
)
```
---
### 10. Unsafe Consumption of APIs
**Description**: Blindly trusting third-party API responses.
**Mitigation**:
```python
from pydantic import BaseModel, ValidationError
class ThirdPartyResponse(BaseModel):
"""Validate external API responses."""
user_id: int
name: str
email: str
async def consume_third_party_api(url: str):
async with httpx.AsyncClient(timeout=10.0) as client:
try:
response = await client.get(url)
response.raise_for_status()
# Validate response structure
data = ThirdPartyResponse(**response.json())
# Additional business logic validation
if data.user_id <= 0:
raise ValueError("Invalid user_id")
return data
except httpx.HTTPError as e:
logger.error(f"Third-party API error: {e}")
raise HTTPException(502, "External service unavailable")
except ValidationError as e:
logger.error(f"Invalid third-party response: {e}")
raise HTTPException(502, "Invalid external response")
```
---
## Advanced Security Patterns
### API Key Rotation
```python
class APIKey(BaseModel):
key: str
created_at: datetime
expires_at: datetime
last_used: datetime | None
async def rotate_api_key(old_key: str) -> str:
"""Generate new API key while deprecating old one."""
# Mark old key as deprecated (grace period: 30 days)
await APIKey.update(
key=old_key,
status="deprecated",
expires_at=datetime.utcnow() + timedelta(days=30)
)
# Generate new key
new_key = secrets.token_urlsafe(32)
await APIKey.create(
key=new_key,
created_at=datetime.utcnow(),
expires_at=datetime.utcnow() + timedelta(days=365)
)
return new_key
```
### Request Signing
```python
import hmac
import hashlib
def sign_request(payload: dict, secret: str) -> str:
"""Sign request payload with HMAC."""
message = json.dumps(payload, sort_keys=True).encode()
signature = hmac.new(
secret.encode(),
message,
hashlib.sha256
).hexdigest()
return signature
def verify_request_signature(payload: dict, signature: str, secret: str) -> bool:
"""Verify request signature."""
expected_signature = sign_request(payload, secret)
return hmac.compare_digest(expected_signature, signature)
```
---
## Security Testing
### Automated Security Scanning
```bash
# Install security tools
pip install bandit safety
# Scan for security issues
bandit -r app/
safety check --json
# API security testing
pip install owasp-zap-api-python
# Run ZAP scan
zap-cli quick-scan --self-contained http://localhost:8000
```
---
**See also**: [SKILL.md](./SKILL.md) for overview and quick start guide.
```