kube-audit-kit
Performs read-only Kubernetes security audits by exporting resources, sanitizing metadata, grouping applications by topology, and generating PSS/NSA-compliant audit reports. Use when the user requests auditing Kubernetes clusters, Namespaces, security reviews, or configuration analysis.
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install crazygit-kube-audit-kit
Repository
Performs read-only Kubernetes security audits by exporting resources, sanitizing metadata, grouping applications by topology, and generating PSS/NSA-compliant audit reports. Use when the user requests auditing Kubernetes clusters, Namespaces, security reviews, or configuration analysis.
Open repositoryBest for
Primary workflow: Run DevOps.
Technical facets: Full Stack, DevOps, Security.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: crazygit.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install kube-audit-kit into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://www.skillhub.club/skills/crazygit-kube-audit-kit before adding kube-audit-kit to shared team environments
- Use kube-audit-kit for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: kube-audit-kit
description: Performs read-only Kubernetes security audits by exporting resources, sanitizing metadata, grouping applications by topology, and generating PSS/NSA-compliant audit reports. Use when the user requests auditing Kubernetes clusters, Namespaces, security reviews, or configuration analysis.
user-invocable: true
allowed-tools: Read, Write, Bash(python:*), Bash(uv:*), Bash(kubectl:*), Bash(export:*)
examples:
- "Run a security audit for the payment namespace in prod-cluster"
- "Check whether the backend apps in staging meet PSS standards"
- "Analyze sensitive data leakage risk for all resources in the development namespace"
- "Generate a full audit report for the default namespace in test-cluster"
- "review k8s cluster security configuration"
- "kubernetes security audit for production workload"
author: crazygit
repository: https://github.com/crazygit/kube-audit-kit
---
# Kube Audit Kit - Read-Only Kubernetes Security Audit Toolkit
This Skill uses a standardized, scripted workflow to export Kubernetes cluster resources in **read-only** mode, sanitize them, group applications, and perform a deep security audit. The entire process strictly follows the **read-only** principle and does not modify any cluster state.
## Core Principles
- **Read-only**: only `get/list` operations, never `apply/patch/delete`
- **Full coverage**: dynamically discover all resource types without hardcoding lists
- **Scripted**: core logic runs through Python scripts for stability
## Quick Start
### Prerequisites
1. **Environment setup**:
```bash
uv sync
```
See [SETUP.md](SETUP.md) for details.
2. **Verify kubectl**:
```bash
kubectl config get-contexts
```
### Run an audit
When a user requests an audit, follow these steps strictly:
**Set the output directory first** (important!):
```bash
# Set the output directory to output/ under the current working directory
# This ensures output files are generated in the user's working directory, not the SKILL install directory
export KUBE_AUDIT_OUTPUT="$(pwd)/output"
```
**Use the progress checklist**:
```
Audit progress:
- [ ] Step 1: Export - Dynamic discovery and full resource export
- [ ] Step 2: Sanitize - Remove metadata and status fields
- [ ] Step 3: Group - Associate applications by workload topology
- [ ] Step 4: Audit - Dual-layer security audit
```
#### Step 1: Export
```bash
# Keep the environment variable effective for each command
export KUBE_AUDIT_OUTPUT="$(pwd)/output" && \
uv run python scripts/export.py --context <context> --namespace <namespace>
```
Output: `{OUTPUT_BASE}/export/`
#### Step 2: Sanitize
```bash
export KUBE_AUDIT_OUTPUT="$(pwd)/output" && \
uv run python scripts/sanitize.py --context <context> --namespace <namespace>
```
Output: `{OUTPUT_BASE}/sanitize/`, `{OUTPUT_BASE}/sanitize_fields/`
#### Step 3: Group
```bash
export KUBE_AUDIT_OUTPUT="$(pwd)/output" && \
uv run python scripts/group_apps.py --context <context> --namespace <namespace>
```
Output: `{OUTPUT_BASE}/group/`, `{OUTPUT_BASE}/ungrouped_resources.txt`
#### Step 4: Audit
**Phase 1 - Script-based static scan**:
```bash
export KUBE_AUDIT_OUTPUT="$(pwd)/output" && \
uv run python scripts/audit.py --context <context> --namespace <namespace>
```
Output:
- `{OUTPUT_BASE}/audit/audit_results.json` - structured audit results
- `{OUTPUT_BASE}/audit/configmap_to_secret.csv` - ConfigMap sensitive data
- `{OUTPUT_BASE}/audit/secret_to_configmap.csv` - Secret non-sensitive data
- `{OUTPUT_BASE}/audit/rbac_issues.csv` - RBAC audit results
- `{OUTPUT_BASE}/audit/network_security.csv` - network security audit results
- `{OUTPUT_BASE}/audit/hostpath_mounts.csv` - hostPath mount findings
- `{OUTPUT_BASE}/audit/security_policies.csv` - seccomp/AppArmor results
- `{OUTPUT_BASE}/audit/pdb_and_secrets.csv` - PDB/Secret/ServiceAccount results
**Phase 2 - AI expert deep review**:
AI independently reviews results without relying on phase 1 output:
1. **Independent analysis**: traverse `{OUTPUT_BASE}/group/*/` and read all original YAML files
2. **Deep review**: identify risks not covered by script rules
- business logic risks (e.g., plaintext private keys, hardcoded passwords)
- architecture risks (e.g., missing NetworkPolicy, overly broad RBAC)
- configuration drift risks (e.g., `latest` images, missing resource limits)
3. **Supplement findings**: if sensitive data was missed, append to the CSV files
4. **Report summary**: merge phase 1 findings with AI analysis into `{OUTPUT_BASE}/audit/audit_report.md`
**Report template**: see `audit_report_template.md` in the same directory.
**Key requirements**:
- Must read original YAML files, not just audit_results.json
- Every application must have specific analysis; avoid vague statements like "not reviewed"
- If script misses sensitive data, update the CSV files to keep data complete
## Output Structure
```
output/{context}/{namespace}/
├── export/ # raw export data
├── sanitize/ # sanitized data
├── sanitize_fields/ # sanitization records
├── group/ # application grouping
│ └── {app_name}/
│ ├── *.yaml # grouped resource files
│ └── config_usage.json # CM/Secret usage record
├── ungrouped_resources.txt # orphan resources
└── audit/ # audit results
├── audit_results.json # static analysis results
├── configmap_to_secret.csv # sensitive data in ConfigMaps
├── secret_to_configmap.csv # non-sensitive data in Secrets
├── rbac_issues.csv # RBAC audit results
├── network_security.csv # network security audit results
├── hostpath_mounts.csv # hostPath mount findings
├── security_policies.csv # seccomp/AppArmor results
├── pdb_and_secrets.csv # PDB/Secret/ServiceAccount results
└── audit_report.md # final AI-generated report
```
## Reference Docs
- **[QUICKSTART.md](QUICKSTART.md)**: 30-second quick start
- **[WORKFLOW.md](WORKFLOW.md)**: full workflow and implementation details
- **[SETUP.md](SETUP.md)**: environment setup and dependency installation
- **[EXAMPLES.md](EXAMPLES.md)**: output examples and typical scenarios
## User Interaction Conventions
### Planning Phase
```
Received. Target: Context `{ctx}`, Namespace `{ns}`.
Execution plan:
1. Set the output directory environment variable: export KUBE_AUDIT_OUTPUT="$(pwd)/output"
2. [Export] Dynamic discovery and full resource export → scripts/export.py
3. [Sanitize] Remove metadata and status fields → scripts/sanitize.py
4. [Group] Associate applications by workload topology → scripts/group_apps.py
5. [Audit] Dual-layer security audit (static scan + AI expert review) → scripts/audit.py
Output directory: $(pwd)/output/{ctx}/{ns}/
Start?
```
### Execution Phase
Output a summary after each step (each command must include the environment variable):
```
✅ [Export completed] Scanned 32 resource types, exported 150 YAMLs
Output: output/{ctx}/{ns}/export/
```
### Results Phase
```
✅ [Audit completed] Static report and AI expert analysis merged
📊 Audit stats:
- Applications: 12
- Critical risks: X (see audit_results.json)
- Warning risks: Y
- Info recommendations: Z
📁 Output directory: output/{ctx}/{ns}/
📄 Full audit report: output/{ctx}/{ns}/audit/audit_report.md
⚠️ Security reminder: the output/ directory contains decrypted Secret data. Please delete it securely after the audit!
```
## Path Conventions
**{OUTPUT_BASE}** = `output/{context}/{namespace}/`
### Output path mechanism
All paths are computed by `get_output_paths()` in `scripts/utils.py`, with the following precedence:
1. **Environment variable `KUBE_AUDIT_OUTPUT`** (recommended)
- Set in SKILL.md before running: `export KUBE_AUDIT_OUTPUT="$(pwd)/output"`
- Ensures output files are created in the **user's working directory**
- Avoids writing to the SKILL installation directory
2. **Current working directory** (fallback)
- If the environment variable is not set, use `Path.cwd() / "output"`
- Note: when a SKILL runs, cwd may be the SKILL directory
### Why the environment variable?
When the SKILL is invoked, the Agent switches to the SKILL installation directory to run scripts. Using `Path.cwd()` directly would write to the wrong location.
By setting `KUBE_AUDIT_OUTPUT="$(pwd)/output"` before each command, you ensure:
- `$(pwd)` resolves to the user's working directory
- Python scripts read the environment variable and write to the intended path
- Output always lands in the user's working directory, regardless of where the SKILL is called
## Key Design Decisions
### Volume vs EnvVar distinction
ConfigMaps/Secrets usage determines whether sensitive data is scanned:
- **Volume mount**: skip sensitive scanning (treated as application config files)
- **EnvVar reference**: scan for sensitive data (may include passwords/keys)
`config_usage.json` records the usage type for each ConfigMap/Secret.
### Permission error handling
Scripts use a fault-tolerant approach:
- If a single resource type is denied, skip it and show a warning
- Other resource types continue normally
- The final report notes which checks are missing due to insufficient permissions
Use a dedicated audit service account (see [SETUP.md](SETUP.md)).
## Security Reminder
**Warning**: the `output/` directory contains decrypted Secret data.
**After the audit**:
- Keep `audit_report.md` (it does not contain sensitive data)
- Securely delete other directories or store them encrypted
- Do not commit `output/` to version control
---
## Referenced Files
> The following files are referenced in this skill and included for context.
### SETUP.md
```markdown
# Kube Audit Kit - Environment Setup
This document describes the environment preparation needed before using Kube Audit Kit.
## Dependency Management
This project uses **uv** for Python dependency and virtual environment management.
### Install uv
```bash
# macOS/Linux
curl -LsSf https://astral.sh/uv/install.sh | sh
# or use pip
pip install uv
```
### Install project dependencies
```bash
# Run in the project root
uv sync
# Activate the virtual environment (optional, uv handles this automatically)
source .venv/bin/activate
```
### Verify the environment
```bash
# Check the Python version
python --version # should be >=3.14
# Check dependencies
uv pip list
```
## Dependency Notes
Core dependencies:
- **pyyaml**: YAML file parsing
- **rich**: terminal output styling
- **kubectl**: Kubernetes CLI (system dependency)
## Kubernetes Permission Requirements
### Recommended setup: dedicated audit service account
Using a dedicated audit account follows the least-privilege principle. See `examples/audit-service-account.yaml`:
- Default uses `Role/RoleBinding` to bind `audit-service-account`
- For cluster-wide RBAC audits, enable `ClusterRole/ClusterRoleBinding` per the comments in the example file
### Apply the audit account configuration
```bash
kubectl apply -f examples/audit-service-account.yaml
# Get the audit token
kubectl -n <namespace> create token audit-service-account --duration=1h
# Configure kubectl context to use this token
kubectl config set-credentials audit-service-account --token=<token>
kubectl config set-context audit-context --cluster=<cluster> --user=audit-service-account
```
### Behavior when permissions are insufficient
If the audit account lacks permission for a resource type (such as Secret):
- ⚠️ Skip exporting that resource type (show a friendly warning)
- ✅ Other resources are exported and audited normally
- ❌ The final report lacks results for that resource type
## kubectl Configuration
### Verify kubectl availability
```bash
kubectl version --client
```
### List available Contexts
```bash
kubectl config get-contexts
```
### Switch Context
```bash
kubectl config use-context <context-name>
```
## Path Conventions
All output paths are computed by `get_output_paths()` in `scripts/utils.py`.
**Base output directory**:
```
{OUTPUT_BASE} = output/{context}/{namespace}
```
**Full directory structure**:
```
output/
└── {context}/
└── {namespace}/
├── export/ # raw export data
├── sanitize/ # sanitized data
├── sanitize_fields/ # sanitization records
├── group/ # application grouping
│ ├── app-1/
│ └── app-2/
├── ungrouped_resources.txt
└── audit/ # audit results
├── audit_results.json
├── configmap_to_secret.csv
├── secret_to_configmap.csv
├── rbac_issues.csv
├── network_security.csv
├── hostpath_mounts.csv
├── security_policies.csv
├── pdb_and_secrets.csv
└── audit_report.md
```
## How to Run Scripts
### Recommended: use uv run
```bash
uv run python scripts/export.py --context <ctx> --namespace <ns>
```
### Or within the activated virtual environment
```bash
source .venv/bin/activate
python scripts/export.py --context <ctx> --namespace <ns>
```
## Security Notes
**Important**: the `output/` directory contains decrypted Secret data.
**After the audit**:
- ✅ You may keep the audit report (it does not contain sensitive data)
- ⚠️ Other directories should be securely deleted or encrypted
- ❌ Do not commit `output/` to version control
**Suggested cleanup commands**:
```bash
# Clean up sensitive data after the audit
rm -rf output/<context>/<namespace>/export
rm -rf output/<context>/<namespace>/sanitize
rm -rf output/<context>/<namespace>/group
```
Keep the final audit report:
```bash
# Keep only the report
mv output/<context>/<namespace>/audit/audit_report.md .
rm -rf output/<context>/<namespace>
```
```
### QUICKSTART.md
```markdown
# 30-Second Quick Start
This guide helps you complete a Kubernetes security audit in the shortest possible time.
## Pre-checks
```bash
# 1. Confirm Python version >= 3.14
python --version
# 2. Confirm kubectl is configured
kubectl config get-contexts
# 3. Confirm target Context and Namespace
kubectl config current-context
```
## Complete the audit in three steps
### Step 1: Install dependencies (first run)
```bash
uv sync
```
### Step 2: Run the audit workflow
```bash
# Replace <context> and <namespace> with your actual values
export CTX="your-cluster-context"
export NS="your-namespace"
# Set the output directory (important: ensure output goes to current working directory)
export KUBE_AUDIT_OUTPUT="$(pwd)/output"
# 1. Export resources
uv run python scripts/export.py --context $CTX --namespace $NS
# 2. Sanitize resources
uv run python scripts/sanitize.py --context $CTX --namespace $NS
# 3. Group applications
uv run python scripts/group_apps.py --context $CTX --namespace $NS
# 4. Static audit
uv run python scripts/audit.py --context $CTX --namespace $NS
```
### Step 3: View the report
```bash
# After AI deep analysis completes (performed by the Agent), view the final report
cat output/$CTX/$NS/audit/audit_report.md
```
**Done!** 🎉
## One-click script
Create a script `audit-k8s.sh`:
```bash
#!/bin/bash
CTX=${1:-"your-context"}
NS=${2:-"default"}
# Set output directory
export KUBE_AUDIT_OUTPUT="$(pwd)/output"
uv run python scripts/export.py --context $CTX --namespace $NS && \
uv run python scripts/sanitize.py --context $CTX --namespace $NS && \
uv run python scripts/group_apps.py --context $CTX --namespace $NS && \
uv run python scripts/audit.py --context $CTX --namespace $NS && \
echo "✅ Audit complete! Report path: output/$CTX/$NS/audit/audit_report.md"
```
Usage:
```bash
chmod +x audit-k8s.sh
./audit-k8s.sh prod-cluster backend
```
## Expected Output
```
✅ [Export completed] Scanned 32 resource types, exported 150 YAMLs
✅ [Sanitize completed] Sanitized 150 files
✅ [Group completed] Identified 12 applications
✅ [Audit phase 1 completed] Script-based static scan finished
✅ [Audit completed] Static report and AI expert analysis merged
```
## Post-audit Cleanup
```bash
# Keep the report, delete sensitive data
mv output/$CTX/$NS/audit/audit_report.md .
rm -rf output/$CTX/$NS
```
## Need help?
- **Detailed workflow**: [WORKFLOW.md](WORKFLOW.md)
- **Environment setup**: [SETUP.md](SETUP.md)
- **Output examples**: [EXAMPLES.md](EXAMPLES.md)
```
### WORKFLOW.md
```markdown
# Kube Audit Kit - Detailed Workflow
This document describes the full workflow and implementation details of Kube Audit Kit.
## Workflow Overview
When a user initiates an audit request, the system executes the following four steps in order:
```
Export → Sanitize → Group → Audit
```
## Step 1: Dynamic Discovery and Full Export
**Script**: `scripts/export.py`
**Command format**:
```bash
# Set the output directory environment variable (ensure output goes to the user's working directory)
export KUBE_AUDIT_OUTPUT="$(pwd)/output" && \
uv run python scripts/export.py --context <context> --namespace <namespace>
```
> **Important**: the `KUBE_AUDIT_OUTPUT` environment variable ensures output files are generated in the user's working directory rather than the SKILL installation directory. Every command must set this variable.
### Execution Flow
1. **Pre-check**: verify `kubectl` is available
2. **Dynamic discovery**: call `kubectl api-resources` to get all namespaced resources
3. **Exclude list filtering**: remove resources with no persistence value:
- `events`, `endpoints`, `endpointslices`
- `replicasets`, `controllerrevisions`
- `*review` resources (tokenreviews, localsubjectaccessreviews, etc.)
- `bindings`, `componentstatuses`, `events.events.k8s.io`
4. **Batch export**: run `kubectl get -o yaml` for each resource type
5. **Parse and save**: save each resource as an individual YAML file
### Output Structure
```
{OUTPUT_BASE}/export/
├── pods/
│ ├── pod-a.yaml
│ └── pod-b.yaml
├── deployments/
│ └── deployment-x.yaml
└── ... (other resource types)
```
### Fault Tolerance
- **Permission errors**: skip unauthorized resource types, warn but continue
- **Single resource failures**: log errors without aborting the overall export
## Step 2: Deep Resource Sanitization
**Script**: `scripts/sanitize.py`
**Command format**:
```bash
export KUBE_AUDIT_OUTPUT="$(pwd)/output" && \
uv run python scripts/sanitize.py --context <context> --namespace <namespace>
```
### Sanitization Rules
Recursively remove the following fields:
1. **status**: root-level `status` field
2. **metadata**:
- `uid`, `resourceVersion`, `creationTimestamp`, `generation`
- `managedFields`, `ownerReferences`, `selfLink`
3. **annotations**:
- `kubectl.kubernetes.io/last-applied-configuration`
- `deployment.kubernetes.io/revision`
### Output Structure
```
{OUTPUT_BASE}/
├── sanitize/ # sanitized YAML
│ ├── pods/
│ └── deployments/
└── sanitize_fields/ # sanitization records
├── pods/
└── deployments/
```
## Step 3: Intelligent Application Grouping
**Script**: `scripts/group_apps.py`
**Command format**:
```bash
export KUBE_AUDIT_OUTPUT="$(pwd)/output" && \
uv run python scripts/group_apps.py --context <context> --namespace <namespace>
```
### Grouping Logic
1. **Anchor resources**: Deployment, StatefulSet, DaemonSet, CronJob, Job
2. **Association rules**:
- **Service**: match via label selector
- **Pod/PodMetrics**: match via labels
- **Ingress**: reference via Service name
- **ConfigMap/Secret**: parse from Pod spec and mark usage type
- `volumes` → mark as **"Volume"**
- `env`/`envFrom` → mark as **"EnvVar"**
- **PVC**: match via volume claim name
- **ServiceAccount**: match via `serviceAccountName`
- **RBAC**: associate via RoleBinding/ClusterRoleBinding
- **HPA**: match via `scaleTargetRef`
- **PDB**: match via selector
### Output Structure
```
{OUTPUT_BASE}/group/
├── app-1/ # named by Workload
│ ├── Deployment_app-1.yaml
│ ├── Service_app-1-svc.yaml
│ ├── ConfigMap_*.yaml
│ ├── Secret_*.yaml
│ └── config_usage.json # usage type record
└── ungrouped_resources.txt # orphaned resources
```
## Step 4: Dual-Layer Security Audit
### Phase 1: Script-based Static Scan
**Script**: `scripts/audit.py`
**Command format**:
```bash
export KUBE_AUDIT_OUTPUT="$(pwd)/output" && \
uv run python scripts/audit.py --context <context> --namespace <namespace>
```
### Scan Rules
**1. Sensitive data scan**
- **Skip logic**: skip ConfigMap/Secret when only used as Volume (treated as internal files)
- **Targets**: EnvVar references or orphaned resources
- **Keywords**: `password`, `token`, `secret`, `key`, `auth`, `access`, `credential`
**2. Workload security baseline**
| Level | Check | Path |
| -------- | ----------------------- | ---------------------------------------- |
| Critical | privileged | `securityContext.privileged` |
| Critical | capabilities.add | `securityContext.capabilities.add` |
| Critical | hostNetwork | `hostNetwork` |
| Critical | hostPID | `hostPID` |
| Critical | hostIPC | `hostIPC` |
| Warning | capabilities.drop | `securityContext.capabilities.drop` |
| Warning | runAsNonRoot | `securityContext.runAsNonRoot` |
| Warning | readOnlyRootFilesystem | `securityContext.readOnlyRootFilesystem` |
| Info | resources | `resources.requests/limits` |
| Info | probes | `livenessProbe/readinessProbe` |
| Info | image tag | `image` (checks latest) |
### Output Structure
```
{OUTPUT_BASE}/audit/
├── audit_results.json # structured audit results
├── configmap_to_secret.csv # ConfigMap sensitive data
├── secret_to_configmap.csv # Secret non-sensitive data
├── rbac_issues.csv # RBAC audit results
├── network_security.csv # network security audit results
├── hostpath_mounts.csv # hostPath mount findings
├── security_policies.csv # seccomp/AppArmor results
└── pdb_and_secrets.csv # PDB/Secret/ServiceAccount results
```
### Phase 2: AI Expert Deep Review
**Review principle**: AI **independently reviews** the original YAML files without relying on phase 1 results.
**Execution flow**:
1. **Independent analysis**: traverse each application directory and read all YAML files
2. **Deep review**: identify risks not covered by scripts (business logic, architecture, configuration drift)
3. **Supplement findings**: if the script missed sensitive data, append to the CSV files
4. **Report summary**: combine phase 1 (script findings) with phase 2 (AI analysis) into the final report
**Output file**: `{OUTPUT_BASE}/audit/audit_report.md`
## Full Execution Example
```bash
# 1. Environment setup
uv sync
# 2. Export resources
export KUBE_AUDIT_OUTPUT="$(pwd)/output" && \
uv run python scripts/export.py --context prod-cluster --namespace backend
# 3. Sanitize resources
export KUBE_AUDIT_OUTPUT="$(pwd)/output" && \
uv run python scripts/sanitize.py --context prod-cluster --namespace backend
# 4. Group applications
export KUBE_AUDIT_OUTPUT="$(pwd)/output" && \
uv run python scripts/group_apps.py --context prod-cluster --namespace backend
# 5. Static audit
export KUBE_AUDIT_OUTPUT="$(pwd)/output" && \
uv run python scripts/audit.py --context prod-cluster --namespace backend
# 6. AI deep analysis (performed by the Agent)
# The Agent reads YAML and audit results to produce the final report
```
> **Note**: every command must set the `KUBE_AUDIT_OUTPUT` environment variable to ensure output goes to the current working directory.
## Console Output Conventions
### Planning Phase
```
Received. Target: Context `{ctx}`, Namespace `{ns}`.
Execution plan:
1. Set the output directory environment variable: export KUBE_AUDIT_OUTPUT="$(pwd)/output"
2. [Export] Dynamic discovery and full resource export → scripts/export.py
3. [Sanitize] Remove metadata and status fields → scripts/sanitize.py
4. [Group] Associate applications by workload topology → scripts/group_apps.py
5. [Audit] Dual-layer security audit (static scan + AI expert review) → scripts/audit.py
Output directory: $(pwd)/output/{ctx}/{ns}/
Start?
```
### Execution Phase
```
✅ [Export completed] Scanned 32 resource types, exported 150 YAMLs
Output: output/{ctx}/{ns}/export/
✅ [Sanitize completed] Sanitized 150 files, records saved
Output: output/{ctx}/{ns}/sanitize/, output/{ctx}/{ns}/sanitize_fields/
✅ [Group completed] Identified 12 apps, generated reference map, 5 orphan resources
Output: output/{ctx}/{ns}/group/, output/{ctx}/{ns}/ungrouped_resources.txt
✅ [Audit phase 1 completed] Script-based static scan finished
Output: output/{ctx}/{ns}/audit/audit_results.json
```
### Results Phase
```
✅ [Audit completed] Static report and AI expert analysis merged
📊 Audit stats:
- Applications: 12
- Critical risks: X
- Warning risks: Y
- Info recommendations: Z
📁 Output directory: output/{ctx}/{ns}/
📄 Full audit report: output/{ctx}/{ns}/audit/audit_report.md
⚠️ Security reminder: the output/ directory contains decrypted Secret data. Please delete it securely after the audit!
```
```
### EXAMPLES.md
```markdown
# Kube Audit Kit - Output Examples
This document shows typical outputs of Kube Audit Kit to help you understand the expected results.
## Directory Structure Example
```
output/prod-cluster/backend/
├── export/
│ ├── deployments/
│ │ ├── payment-api.yaml
│ │ └── frontend.yaml
│ ├── services/
│ │ ├── payment-api-svc.yaml
│ │ └── frontend-svc.yaml
│ ├── configmaps/
│ │ └── app-config.yaml
│ └── secrets/
│ └── db-credentials.yaml
├── sanitize/
│ └── [sanitized YAML, same structure as export]
├── sanitize_fields/
│ ├── deployments/
│ │ └── payment-api.txt
│ └── secrets/
│ └── db-credentials.txt
├── group/
│ ├── payment-api/
│ │ ├── Deployment_payment-api.yaml
│ │ ├── Service_payment-api-svc.yaml
│ │ ├── ConfigMap_app-config.yaml
│ │ ├── Secret_db-credentials.yaml
│ │ └── config_usage.json
│ └── frontend/
│ ├── Deployment_frontend.yaml
│ ├── Service_frontend-svc.yaml
│ └── config_usage.json
├── ungrouped_resources.txt
└── audit/
├── audit_results.json
├── configmap_to_secret.csv
├── secret_to_configmap.csv
├── rbac_issues.csv
├── network_security.csv
├── hostpath_mounts.csv
├── security_policies.csv
├── pdb_and_secrets.csv
└── audit_report.md
```
## Sanitization Record Example
### sanitize_fields/secrets/db-credentials.txt
```
metadata.uid = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
metadata.resourceVersion = "12345678"
metadata.creationTimestamp = "2024-01-15T10:30:00Z"
metadata.annotations.kubectl.kubernetes.io/last-applied-configuration = '{"apiVersion":"v1","kind":"Secret","metadata":{"name":"db-credentials","namespace":"backend"},"type":"Opaque","data":{"password":"****","username":"****"}}'
```
## config_usage.json Example
```json
{
"ConfigMap/app-config": ["EnvVar", "Volume"],
"Secret/db-credentials": ["EnvVar"],
"ConfigMap/nginx-conf": ["Volume"]
}
```
**Notes**:
- `["EnvVar"]` - used only as environment variables (sensitive data is scanned)
- `["Volume"]` - used only as volume mounts (not scanned, treated as config files)
- `["EnvVar", "Volume"]` - used in both ways (scanned)
## audit_results.json Example
```json
[
{
"AppName": "payment-api",
"Critical": [
"[Deployment/payment-api/container:api] privileged mode enabled (privileged: true)",
"[Deployment/payment-api/container:api] added dangerous Capabilities: ['NET_ADMIN', 'SYS_ADMIN']"
],
"Warning": [
"[Deployment/payment-api/container:api] missing runAsNonRoot: true",
"[Deployment/payment-api/container:api] missing readOnlyRootFilesystem: true",
"[Deployment/payment-api/container:api] capabilities.drop is not [\"ALL\"]"
],
"Info": [
"[Deployment/payment-api/container:api] incomplete resource Limits/Requests",
"[Deployment/payment-api/container:api] missing livenessProbe",
"[Deployment/payment-api/container:api] missing readinessProbe",
"[Deployment/payment-api/container:api] image uses latest tag"
]
},
{
"AppName": "frontend",
"Critical": [],
"Warning": [
"[Deployment/frontend/container:nginx] missing runAsNonRoot: true"
],
"Info": [
"[Deployment/frontend/container:nginx] missing livenessProbe"
]
}
]
```
## CSV File Examples
### configmap_to_secret.csv
Potential sensitive data found in ConfigMaps:
```csv
AppName,ConfigMap,SensitiveKeys,Usage
payment-api,app-config,db-password,EnvVar
payment-api,app-config,api-secret-key,EnvVar
frontend,app-config,admin-token,EnvVar
```
### secret_to_configmap.csv
Potential non-sensitive data found in Secrets (consider moving to ConfigMaps):
```csv
AppName,Secret,NonSensitiveKeys
payment-api,app-secret,"nginx-conf;log-level"
frontend,theme-config,"css-vars;font-settings"
```
## ungrouped_resources.txt Example
Orphan resources that could not be associated with any application:
```
ConfigMap/global-config
Secret/global-certificates
NetworkPolicy/default-deny
IngressController/nginx-ingress-controller
```
## audit_report.md Structure Example
```markdown
# Kubernetes Security Audit Report
**Cluster**: prod-cluster
**Namespace**: backend
**Audit time**: 2024-01-15 10:30:00 UTC
**Audit scope**: 12 applications, 150 resources
---
## Execution Summary
### Application Stats
- **Total workloads**: 15 (Deployment: 10, StatefulSet: 3, DaemonSet: 2)
- **Total associated resources**: 150
- **Orphan resources**: 5
### Risk Stats
| Level | Count | Severity |
|------|------|----------|
| **Critical** | 8 | 🔴 High - fix immediately |
| **Warning** | 23 | 🟡 Warning - plan to fix |
| **Info** | 45 | 🔵 Recommendation - optimize |
### Sensitive Data Risks
- **Sensitive data found in ConfigMaps**: 3 (see `configmap_to_secret.csv`)
- **Potential non-sensitive data in Secrets**: 2 (see `secret_to_configmap.csv`)
---
## Detailed Audit Report
### 1. payment-api
**Resource type**: Deployment
**Pod replicas**: 3
#### Automated Scan Results
**🔴 Critical risks (2)**:
- `[container:api] privileged mode enabled (privileged: true)`
- `[container:api] added dangerous Capabilities: ['NET_ADMIN', 'SYS_ADMIN']`
**🟡 Warning risks (3)**:
- `[container:api] missing runAsNonRoot: true`
- `[container:api] missing readOnlyRootFilesystem: true`
- `[container:api] capabilities.drop is not ["ALL"]`
**🔵 Info recommendations (4)**:
- `[container:api] incomplete resource Limits/Requests`
- `[container:api] missing livenessProbe`
- `[container:api] missing readinessProbe`
- `[container:api] image uses latest tag`
#### RBAC Permission Audit
✅ Using dedicated ServiceAccount: `payment-api-sa`
⚠️ Bound to ClusterRole `cluster-admin` (excessive permissions)
#### Network Security Audit
✅ NetworkPolicy configured
⚠️ All egress traffic allowed
#### Host Mount Detection
✅ No hostPath mounts
#### Security Policy Checks
❌ seccomp profile not configured
❌ AppArmor profile not configured
#### High Availability Configuration
❌ PodDisruptionBudget not configured
✅ HPA configured (min: 3, max: 10)
#### AI Deep Analysis
**Business logic risks**:
- 🔴 Secret `db-credentials` injects database password via environment variables; consider using External Secrets Operator
- 🔴 ConfigMap `app-config` contains `api-secret-key` that may be a JWT secret, move to Secret
**Architecture risks**:
- 🟡 Service uses LoadBalancer; for internal environments, consider ClusterIP
- 🟡 Missing Pod anti-affinity; replicas may schedule on the same node
**Configuration drift risks**:
- 🔴 Image tag uses `latest`, risk of inconsistent versions and rollbacks
- 🟡 Resource limits not set, risk of resource exhaustion
---
## Comprehensive Remediation Plan
### P0 Priority (fix immediately)
1. **Remove privileged mode** - payment-api
- Remove `privileged: true`
- Remove `NET_ADMIN` and `SYS_ADMIN` capabilities
2. **Move sensitive config** - payment-api
- Move sensitive data in ConfigMaps to Secrets
- Consider External Secrets Operator
3. **Fix image tags** - all apps
- Use semantic version tags instead of `latest`
### P1 Priority (fix this week)
1. **Configure security contexts** - all apps
- Set `runAsNonRoot: true`
- Set `readOnlyRootFilesystem: true`
2. **Configure health checks** - all apps
- Add `livenessProbe`
- Add `readinessProbe`
3. **Set resource limits** - all apps
- Configure `resources.requests`
- Configure `resources.limits`
### P2 Priority (planned improvements)
1. **Configure seccomp/AppArmor** - high-risk apps
2. **Optimize RBAC permissions** - remove cluster-admin binding
3. **Configure PodDisruptionBudget** - critical apps
4. **Add Pod anti-affinity** - multi-replica apps
---
## Appendix
### Related Files
- **Full audit results**: `audit_results.json`
- **Sensitive data list**: `configmap_to_secret.csv`
- **Config optimization suggestions**: `secret_to_configmap.csv`
- **Orphan resources**: `ungrouped_resources.txt`
### Audit Methodology
- **Static scan**: automated checks based on PSS/NSA standards
- **AI analysis**: deep analysis of business logic and architecture risks
### Reference Standards
- Pod Security Standards (PSS)
- NSA/CISA Kubernetes Hardening Guidance
- CIS Kubernetes Benchmark
---
**Audit tool**: [Kube Audit Kit](https://github.com/crazygit/kube-audit-kit)
**Audit version**: v1.0.0
```
## Typical Scenario Outputs
### Scenario 1: Healthy application
**audit_results.json**:
```json
{
"AppName": "healthy-app",
"Critical": [],
"Warning": [],
"Info": [
"[Deployment/healthy-app/container:app] consider adding startupProbe"
]
}
```
**Conclusion**: ✅ Application configuration is good, only minor improvements needed
### Scenario 2: High-risk application
**audit_results.json**:
```json
{
"AppName": "legacy-app",
"Critical": [
"[Deployment/legacy-app/container:main] privileged: true",
"[Deployment/legacy-app/container:main] hostNetwork: true",
"[Deployment/legacy-app/container:main] hostPID: true"
],
"Warning": [
"[Deployment/legacy-app/container:main] missing runAsNonRoot"
],
"Info": []
}
```
**Conclusion**: 🔴 Severe security risk, disable and refactor immediately
### Scenario 3: Configuration confusion
**configmap_to_secret.csv**:
```csv
AppName,ConfigMap,SensitiveKeys,Usage
legacy-app,app-config,"password;api-key;secret-key",EnvVar
```
**secret_to_configmap.csv**:
```csv
AppName,Secret,NonSensitiveKeys
legacy-app,app-secret,"nginx-conf;log-level;debug-mode"
```
**Conclusion**: 🟡 ConfigMap/Secret usage is reversed and should be swapped
## Related Docs
- **Quick start**: [QUICKSTART.md](QUICKSTART.md)
- **Workflow**: [WORKFLOW.md](WORKFLOW.md)
```
### scripts/export.py
```python
import argparse
import subprocess
import os
import sys
import yaml
from typing import List, Dict, Any, Optional, Set
from pathlib import Path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
import utils
import output
BLACKLIST_RESOURCES: Set[str] = {
"events",
"endpoints",
"endpointslices",
"replicasets",
"controllerrevisions",
"tokenreviews",
"localsubjectaccessreviews",
"selfsubjectaccessreviews",
"selfsubjectrulesreviews",
"subjectaccessreviews",
"componentstatuses",
"bindings",
"events.events.k8s.io"
}
def run_command(command: str, allow_failure: bool = False) -> Optional[str]:
"""
Run a shell command and return output.
allow_failure: if True, return None on failure without raising (but log stderr).
"""
try:
result = subprocess.run(
command,
shell=True,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
if allow_failure:
if e.stderr and e.stderr.strip():
stderr_lower = e.stderr.lower()
if "forbidden" in stderr_lower or "unauthorized" in stderr_lower:
resource = command.split()[-1] if command else "resource"
output.warning(f"Permission denied for {resource} - skipping")
output.info(f" Tip: If this is Secret, you can skip it by not granting Secret permissions")
output.info(f" Other security checks will continue normally")
else:
output.warning(f"Command failed: {command}")
output.info(f"Error: {e.stderr.strip()}")
return None
else:
output.error(f"Critical command failed: {command}")
if e.stderr:
output.info(f"Error: {e.stderr.strip()}")
raise e
def validate_context(context: str) -> bool:
"""
Validate that the Kubernetes context exists.
"""
cmd = "kubectl config get-contexts -o name"
try:
result = run_command(cmd)
if result is None:
return False
contexts = result.splitlines()
if context not in contexts:
output.error(f"Context '{context}' not found.")
output.info("Available contexts:")
for ctx in contexts:
output.info(f" - {ctx}")
return False
return True
except subprocess.CalledProcessError:
output.error("Failed to list Kubernetes contexts.")
return False
def validate_namespace(context: str, namespace: str) -> bool:
"""
Validate that the Kubernetes namespace exists.
"""
cmd = f"kubectl get namespace {namespace} --context {context} -o name"
try:
result = run_command(cmd, allow_failure=True)
if result is None:
output.error(f"Namespace '{namespace}' not found in context '{context}'.")
output.info("Available namespaces:")
list_cmd = f"kubectl get namespaces --context {context} -o jsonpath='{{.items[*].metadata.name}}'"
ns_result = run_command(list_cmd, allow_failure=True)
if ns_result:
for ns in ns_result.split():
output.info(f" - {ns}")
return False
return True
except subprocess.CalledProcessError:
output.error(f"Failed to validate namespace '{namespace}'.")
return False
def get_namespaced_resources(context: str) -> List[str]:
"""Fetch namespaced resource types."""
output.info(f"Discovering API resources in context '{context}'...")
cmd = f"kubectl api-resources --verbs=list --namespaced -o name --context {context}"
try:
result = run_command(cmd)
except subprocess.CalledProcessError:
output.error("Failed to list API resources. Please check connectivity and permissions.")
sys.exit(1)
if result is None:
return []
resources: List[str] = result.splitlines()
valid_resources: List[str] = []
for r in resources:
name = r.split('.')[0]
if name not in BLACKLIST_RESOURCES and r not in BLACKLIST_RESOURCES:
valid_resources.append(r)
output.success(f"Found {len(valid_resources)} valid resource types to scan.")
return valid_resources
def export_resources(context: str, namespace: str, resources: List[str]) -> int:
paths = utils.get_output_paths(context, namespace)
base_dir = paths["export"]
total_exported = 0
for resource_type in resources:
cmd = f"kubectl get {resource_type} -n {namespace} --context {context} -o yaml"
yaml_output = run_command(cmd, allow_failure=True)
if not yaml_output:
continue
try:
data: Dict[str, Any] = yaml.safe_load(yaml_output)
except yaml.YAMLError as e:
output.warning(f"Failed to parse YAML for {resource_type}: {e}")
continue
items: List[Dict[str, Any]] = data.get("items", [])
if not items:
continue
dir_name = resource_type
save_dir = os.path.join(base_dir, dir_name)
os.makedirs(save_dir, exist_ok=True)
output.info(f"Exporting {len(items)} {resource_type}...")
for item in items:
metadata: Dict[str, Any] = item.get("metadata", {})
name: Optional[str] = metadata.get("name")
if not name:
continue
if "apiVersion" not in item and "apiVersion" in data:
item["apiVersion"] = data["apiVersion"]
file_path = os.path.join(save_dir, f"{name}.yaml")
with open(file_path, "w") as f:
yaml.dump(item, f, default_flow_style=False, sort_keys=False)
total_exported += 1
return total_exported
def main() -> None:
parser = argparse.ArgumentParser(description="Export K8s resources using kubectl.")
parser.add_argument("--context", required=True, help="K8s context")
parser.add_argument("--namespace", required=True, help="K8s namespace")
args = parser.parse_args()
try:
run_command("kubectl version --client")
except subprocess.CalledProcessError:
output.error("kubectl not found or not working.")
sys.exit(1)
if not validate_context(args.context):
sys.exit(1)
if not validate_namespace(args.context, args.namespace):
sys.exit(1)
resources = get_namespaced_resources(args.context)
output.header(f"Starting export for namespace '{args.namespace}'")
count = export_resources(args.context, args.namespace, resources)
console = output.console
console.print()
output.success(f"Export complete! Total files: {count}")
paths = utils.get_output_paths(args.context, args.namespace)
output.info(f"Data saved to: {paths['export']}")
if __name__ == "__main__":
main()
```
### scripts/sanitize.py
```python
import os
import yaml
import sys
import argparse
from pathlib import Path
from typing import Dict, Any, List, Tuple, Union
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
import utils
import output
METADATA_FIELDS_TO_REMOVE: List[str] = [
"uid",
"resourceVersion",
"creationTimestamp",
"generation",
"managedFields",
"ownerReferences",
"selfLink",
]
ANNO_FIELDS_TO_REMOVE: List[str] = [
"kubectl.kubernetes.io/last-applied-configuration",
"kubectl.kubernetes.io/restartedAt",
"deployment.kubernetes.io/revision",
]
def clean_metadata_dict(metadata: Dict[str, Any], path_prefix: str) -> List[str]:
"""
Sanitize a single metadata dict and return removed field logs.
"""
removed_logs: List[str] = []
for field in METADATA_FIELDS_TO_REMOVE:
if field in metadata:
val = metadata.pop(field)
removed_logs.append(f"{path_prefix}.{field} = {val}")
if "annotations" in metadata:
annotations = metadata["annotations"]
if isinstance(annotations, dict):
for field in ANNO_FIELDS_TO_REMOVE:
if field in annotations:
val = annotations.pop(field)
removed_logs.append(f"{path_prefix}.annotations.{field} = {val}")
if not annotations:
metadata.pop("annotations")
elif annotations is None:
metadata.pop("annotations")
return removed_logs
def recursive_sanitize(data: Any, path: str = "") -> List[str]:
"""
Recursively traverse and sanitize all 'metadata' keys.
"""
logs: List[str] = []
if isinstance(data, dict):
if "metadata" in data and isinstance(data["metadata"], dict):
current_path = f"{path}.metadata" if path else "metadata"
logs.extend(clean_metadata_dict(data["metadata"], current_path))
for key, value in data.items():
if key == "metadata":
continue
new_path = f"{path}.{key}" if path else key
logs.extend(recursive_sanitize(value, new_path))
elif isinstance(data, list):
for idx, item in enumerate(data):
new_path = f"{path}[{idx}]"
logs.extend(recursive_sanitize(item, new_path))
return logs
def sanitize_resource(data: Dict[str, Any]) -> Tuple[Dict[str, Any], List[str]]:
"""
Sanitize a K8s resource.
"""
removed_fields: List[str] = []
if "status" in data:
removed_fields.append(f"status = {data.pop('status')}")
removed_fields.extend(recursive_sanitize(data))
return data, removed_fields
def process_directory(context: str, namespace: str) -> None:
paths = utils.get_output_paths(context, namespace)
export_dir = paths["export"]
sanitize_dir = paths["sanitize"]
records_dir = paths["sanitize_fields"]
if not export_dir.exists():
output.error(f"Export directory {export_dir} does not exist.")
return
count: int = 0
for root, dirs, files in os.walk(export_dir):
for file in files:
if not file.endswith(".yaml"):
continue
rel_path: Path = Path(root).relative_to(export_dir)
source_file: Path = Path(root) / file
with open(source_file, "r", encoding="utf-8") as f:
try:
docs: List[Any] = list(yaml.safe_load_all(f))
except Exception as e:
output.error(f"Error reading {source_file}: {e}")
continue
sanitized_docs: List[Dict[str, Any]] = []
all_removed_fields: List[str] = []
for doc in docs:
if not doc:
continue
if isinstance(doc, dict):
sanitized_doc, removed = sanitize_resource(doc)
sanitized_docs.append(sanitized_doc)
all_removed_fields.extend(removed)
else:
output.warning(f"Skipping non-dict document in {source_file}")
target_yaml_path: Path = sanitize_dir / rel_path / file
target_record_path: Path = (
records_dir / rel_path / file.replace(".yaml", ".txt")
)
target_yaml_path.parent.mkdir(parents=True, exist_ok=True)
target_record_path.parent.mkdir(parents=True, exist_ok=True)
with open(target_yaml_path, "w", encoding="utf-8") as f:
if len(sanitized_docs) == 1:
yaml.dump(sanitized_docs[0], f, sort_keys=False, allow_unicode=True)
else:
yaml.dump_all(
sanitized_docs, f, sort_keys=False, allow_unicode=True
)
with open(target_record_path, "w", encoding="utf-8") as f:
f.write("\n".join(all_removed_fields))
count += 1
output.info(f"Sanitized: {rel_path}/{file}")
console = output.console
console.print()
output.success(f"Total processed: {count} files.")
def main() -> None:
parser = argparse.ArgumentParser(description="Sanitize exported K8s resources.")
parser.add_argument("--context", required=True, help="K8s context")
parser.add_argument("--namespace", required=True, help="K8s namespace")
args = parser.parse_args()
process_directory(args.context, args.namespace)
if __name__ == "__main__":
main()
```
### scripts/group_apps.py
```python
import os
import yaml
import json
import argparse
import shutil
import sys
from pathlib import Path
from typing import Dict, Any, List, Set, Optional, Tuple, Union
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
import utils
import output
ResourceData = Dict[str, Any]
ResourceKey = Tuple[str, str]
ResourceIndex = Dict[ResourceKey, ResourceData]
WORKLOAD_KINDS = {"Deployment", "StatefulSet", "DaemonSet", "Job", "CronJob"}
def load_resources(context: str, namespace: str) -> Tuple[ResourceIndex, List[Path]]:
"""
Load all resources under the sanitize directory.
Returns: (index dict, file path list)
"""
paths = utils.get_output_paths(context, namespace)
sanitize_dir = paths["sanitize"]
index: ResourceIndex = {}
paths_list: List[Path] = []
if not sanitize_dir.exists():
output.error(f"Sanitize directory {sanitize_dir} does not exist.")
return {}, []
output.info(f"Loading resources from {sanitize_dir}...")
for root, _, files in os.walk(sanitize_dir):
for file in files:
if not file.endswith(".yaml"):
continue
file_path = Path(root) / file
try:
with open(file_path, 'r', encoding='utf-8') as f:
docs = list(yaml.safe_load_all(f))
for doc in docs:
if not doc or not isinstance(doc, dict):
continue
kind = doc.get("kind")
metadata = doc.get("metadata", {})
name = metadata.get("name")
if kind and name:
index[(kind, name)] = doc
paths_list.append(file_path)
except Exception as e:
output.warning(f"Error loading {file_path}: {e}")
output.success(f"Loaded {len(index)} resources.")
return index, paths_list
def match_labels(selector: Dict[str, str], labels: Dict[str, str]) -> bool:
"""Check whether labels satisfy the selector."""
if not selector or not isinstance(selector, dict):
return False
for k, v in selector.items():
if labels.get(k) != v:
return False
return True
def get_pod_template_spec(workload: ResourceData) -> Optional[Dict[str, Any]]:
"""Extract the Pod template spec from a workload."""
kind = workload.get("kind")
spec = workload.get("spec", {})
if kind == "CronJob":
result = spec.get("jobTemplate", {}).get("spec", {}).get("template")
else:
result = spec.get("template")
if result is None or isinstance(result, dict):
return result
return None
def find_associated_resources(
anchor_key: ResourceKey,
anchor_data: ResourceData,
index: ResourceIndex
) -> Tuple[List[ResourceKey], Dict[str, List[str]]]:
"""
Find all resources associated with the anchor.
"""
associated: List[ResourceKey] = []
config_usage: Dict[str, List[str]] = {}
kind, name = anchor_key
pod_template = get_pod_template_spec(anchor_data)
if not pod_template:
return [], {}
pod_meta = pod_template.get("metadata", {})
pod_labels = pod_meta.get("labels", {})
pod_spec = pod_template.get("spec", {})
spec = anchor_data.get("spec", {})
selector_dict: Dict[str, str] = {}
if kind == "CronJob":
selector_dict = spec.get("jobTemplate", {}).get("spec", {}).get("selector", {}).get("matchLabels", {})
else:
selector_obj = spec.get("selector", {})
if isinstance(selector_obj, dict):
selector_dict = selector_obj.get("matchLabels", selector_obj)
for (r_kind, r_name), r_data in index.items():
if r_kind == "Service":
s_selector = r_data.get("spec", {}).get("selector", {})
if s_selector and match_labels(s_selector, pod_labels):
associated.append((r_kind, r_name))
elif r_kind == "PodDisruptionBudget":
p_selector = r_data.get("spec", {}).get("selector", {}).get("matchLabels", {})
if p_selector and match_labels(p_selector, pod_labels):
associated.append((r_kind, r_name))
elif r_kind == "Pod":
r_labels = r_data.get("metadata", {}).get("labels", {})
if selector_dict and match_labels(selector_dict, r_labels):
associated.append((r_kind, r_name))
if ("PodMetrics", r_name) in index:
associated.append(("PodMetrics", r_name))
elif r_kind == "HorizontalPodAutoscaler":
target = r_data.get("spec", {}).get("scaleTargetRef", {})
if target.get("kind") == kind and target.get("name") == name:
associated.append((r_kind, r_name))
for vol in pod_spec.get("volumes", []):
if "configMap" in vol:
cm_name = vol["configMap"].get("name")
if cm_name:
key = ("ConfigMap", cm_name)
if key in index:
associated.append(key)
config_usage.setdefault(f"ConfigMap/{cm_name}", []).append("Volume")
if "secret" in vol:
sec_name = vol["secret"].get("secretName")
if sec_name:
key = ("Secret", sec_name)
if key in index:
associated.append(key)
config_usage.setdefault(f"Secret/{sec_name}", []).append("Volume")
if "persistentVolumeClaim" in vol:
pvc_name = vol["persistentVolumeClaim"].get("claimName")
if pvc_name:
key = ("PersistentVolumeClaim", pvc_name)
if key in index:
associated.append(key)
for container in pod_spec.get("containers", []) + pod_spec.get("initContainers", []):
for env in container.get("env", []):
val_from = env.get("valueFrom", {})
if "configMapKeyRef" in val_from:
cm_name = val_from["configMapKeyRef"].get("name")
if cm_name:
key = ("ConfigMap", cm_name)
if key in index:
associated.append(key)
config_usage.setdefault(f"ConfigMap/{cm_name}", []).append("EnvVar")
if "secretKeyRef" in val_from:
sec_name = val_from["secretKeyRef"].get("name")
if sec_name:
key = ("Secret", sec_name)
if key in index:
associated.append(key)
config_usage.setdefault(f"Secret/{sec_name}", []).append("EnvVar")
for env_from in container.get("envFrom", []):
if "configMapRef" in env_from:
cm_name = env_from["configMapRef"].get("name")
if cm_name:
key = ("ConfigMap", cm_name)
if key in index:
associated.append(key)
config_usage.setdefault(f"ConfigMap/{cm_name}", []).append("EnvVar")
if "secretRef" in env_from:
sec_name = env_from["secretRef"].get("name")
if sec_name:
key = ("Secret", sec_name)
if key in index:
associated.append(key)
config_usage.setdefault(f"Secret/{sec_name}", []).append("EnvVar")
sa_name = pod_spec.get("serviceAccountName", "default")
sa_key = ("ServiceAccount", sa_name)
if sa_key in index:
associated.append(sa_key)
for (r_kind, r_name), r_data in index.items():
if r_kind in ["RoleBinding", "ClusterRoleBinding"]:
subjects = r_data.get("subjects", [])
for sub in subjects:
if sub.get("kind") == "ServiceAccount" and sub.get("name") == sa_name:
associated.append((r_kind, r_name))
role_ref = r_data.get("roleRef", {})
role_kind = role_ref.get("kind")
role_name = role_ref.get("name")
if role_kind and role_name:
role_key = (role_kind, role_name)
if role_key in index:
associated.append(role_key)
associated_services = {name for (k, name) in associated if k == "Service"}
if associated_services:
for (r_kind, r_name), r_data in index.items():
if r_kind == "Ingress":
rules = r_data.get("spec", {}).get("rules", [])
for rule in rules:
http = rule.get("http", {})
paths = http.get("paths", [])
for path in paths:
backend = path.get("backend", {})
svc = backend.get("service", {})
svc_name = svc.get("name")
if svc_name and svc_name in associated_services:
associated.append((r_kind, r_name))
break
for (r_kind, r_name), r_data in index.items():
if r_kind == "PodMetrics" and (r_kind, r_name) not in associated:
if r_name.startswith(name + "-"):
associated.append((r_kind, r_name))
return list(set(associated)), config_usage
def save_group(
context: str,
namespace: str,
anchor_name: str,
resources: List[Tuple[ResourceKey, ResourceData]],
config_usage: Dict[str, List[str]]
) -> None:
paths = utils.get_output_paths(context, namespace)
group_dir = paths["group"] / anchor_name
if group_dir.exists():
shutil.rmtree(group_dir)
group_dir.mkdir(parents=True, exist_ok=True)
for (kind, name), data in resources:
file_path = group_dir / f"{kind}_{name}.yaml"
with open(file_path, 'w', encoding='utf-8') as f:
yaml.dump(data, f, sort_keys=False, allow_unicode=True)
usage_path = group_dir / "config_usage.json"
final_usage = {k: list(set(v)) for k, v in config_usage.items()}
with open(usage_path, 'w', encoding='utf-8') as f:
json.dump(final_usage, f, indent=2, ensure_ascii=False)
def main() -> None:
parser = argparse.ArgumentParser(description="Smart group K8s resources into applications.")
parser.add_argument("--context", required=True, help="K8s context")
parser.add_argument("--namespace", required=True, help="K8s namespace")
args = parser.parse_args()
context = args.context
namespace = args.namespace
index, _ = load_resources(context, namespace)
if not index: return
grouped_keys: Set[ResourceKey] = set()
anchors: List[ResourceKey] = []
for key in index.keys():
if key[0] in WORKLOAD_KINDS:
anchors.append(key)
for anchor_key in anchors:
anchor_kind, anchor_name = anchor_key
anchor_data = index[anchor_key]
assoc_keys, config_usage = find_associated_resources(anchor_key, anchor_data, index)
assoc_keys.append(anchor_key)
group_resources: List[Tuple[ResourceKey, ResourceData]] = []
for key in set(assoc_keys):
group_resources.append((key, index[key]))
grouped_keys.add(key)
save_group(context, namespace, anchor_name, group_resources, config_usage)
all_keys = set(index.keys())
orphan_keys = all_keys - grouped_keys
paths = utils.get_output_paths(context, namespace)
orphan_file = paths["orphan"]
orphan_file.parent.mkdir(parents=True, exist_ok=True)
with open(orphan_file, 'w', encoding='utf-8') as f:
for kind, name in sorted(list(orphan_keys)):
f.write(f"{kind}/{name}\n")
console = output.console
console.print()
output.success(f"Grouping complete. Apps: {len(anchors)}, Orphans: {len(orphan_keys)}")
if __name__ == "__main__":
main()
```
### scripts/audit.py
```python
import os
import yaml
import json
import csv
import argparse
import sys
from typing import Dict, Any, List, Set
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
import utils
import output
SENSITIVE_KEYWORDS = [
"password",
"token",
"secret",
"key",
"auth",
"access",
"credential",
]
WORKLOAD_KINDS = {"Deployment", "StatefulSet", "DaemonSet", "Job", "CronJob"}
RBAC_KINDS = {"Role", "ClusterRole", "RoleBinding", "ClusterRoleBinding"}
NETWORK_KINDS = {"NetworkPolicy", "Service", "Ingress"}
SEC_CONFIG_KINDS = {"PodDisruptionBudget", "ServiceAccount"}
class Auditor:
def __init__(self, context: str, namespace: str) -> None:
self.context = context
self.namespace = namespace
self.paths = utils.get_output_paths(context, namespace)
self.group_dir = self.paths["group"]
self.audit_report: List[Dict[str, Any]] = []
self.cm_to_secret_rows: List[Dict[str, str]] = []
self.secret_to_cm_rows: List[Dict[str, str]] = []
self.global_config_usage: Dict[str, Set[str]] = {}
self.rbac_issues: List[Dict[str, str]] = []
self.network_findings: List[Dict[str, str]] = []
self.hostpath_mounts: List[Dict[str, str]] = []
self.security_policies: List[Dict[str, str]] = []
self.pdb_and_secrets: List[Dict[str, str]] = []
def run(self) -> None:
if not self.group_dir.exists():
output.error(f"Group directory {self.group_dir} does not exist.")
return
output.info(f"Starting security audit for {self.context}/{self.namespace}...")
self.audit_configs()
self.audit_workloads()
self.audit_rbac()
self.audit_network_security()
self.audit_host_mounts()
self.audit_security_policies()
self.audit_pod_disruption_and_secrets()
self.save_reports()
def audit_configs(self) -> None:
"""Audit ConfigMap and Secret usage and contents."""
for app_dir in self.group_dir.iterdir():
if not app_dir.is_dir():
continue
usage_path = app_dir / "config_usage.json"
if usage_path.exists():
with open(usage_path, "r") as f:
usage = json.load(f)
for res_id, types in usage.items():
self.global_config_usage.setdefault(res_id, set()).update(types)
for app_dir in self.group_dir.iterdir():
if not app_dir.is_dir():
continue
for yaml_file in app_dir.glob("*.yaml"):
with open(yaml_file, "r") as f:
try:
data = yaml.safe_load(f)
except Exception:
continue
if not data:
continue
kind = data.get("kind")
name = data.get("metadata", {}).get("name")
res_id = f"{kind}/{name}"
if kind == "ConfigMap":
self.check_configmap(app_dir.name, name, data, res_id)
elif kind == "Secret":
self.check_secret(app_dir.name, name, data, res_id)
def check_configmap(
self, app_name: str, name: str, data: Dict[str, Any], res_id: str
) -> None:
usage = self.global_config_usage.get(res_id, set())
if usage == {"Volume"}:
return
found_sensitive = []
content = data.get("data", {}) or {}
for key, value in content.items():
if any(kw in key.lower() for kw in SENSITIVE_KEYWORDS):
found_sensitive.append(key)
elif isinstance(value, str) and self.is_high_entropy(value):
found_sensitive.append(f"{key} (high entropy)")
if found_sensitive:
self.cm_to_secret_rows.append(
{
"AppName": app_name,
"ConfigMap": name,
"SensitiveKeys": ", ".join(found_sensitive),
"Usage": ", ".join(list(usage)) if usage else "Orphan",
}
)
def check_secret(self, app_name: str, name: str, data: Dict[str, Any], res_id: str) -> None:
content = data.get("data", {}) or {}
non_sensitive_keys = []
for key in content.keys():
if not any(kw in key.lower() for kw in SENSITIVE_KEYWORDS):
non_sensitive_keys.append(key)
if len(non_sensitive_keys) > 0:
self.secret_to_cm_rows.append(
{
"AppName": app_name,
"Secret": name,
"NonSensitiveKeys": ", ".join(non_sensitive_keys),
}
)
def is_high_entropy(self, text: str) -> bool:
if len(text) < 20:
return False
has_upper = any(c.isupper() for c in text)
has_lower = any(c.islower() for c in text)
has_digit = any(c.isdigit() for c in text)
return has_upper and has_lower and has_digit
def audit_workloads(self) -> None:
"""Audit workload security configuration."""
for app_dir in self.group_dir.iterdir():
if not app_dir.is_dir():
continue
app_findings: Dict[str, Any] = {
"AppName": app_dir.name,
"Critical": [],
"Warning": [],
"Info": [],
}
for yaml_file in app_dir.glob("*.yaml"):
with open(yaml_file, "r") as f:
try:
data = yaml.safe_load(f)
except Exception:
continue
if not data or data.get("kind") not in WORKLOAD_KINDS:
continue
self.check_workload_security(data, app_findings)
if (
app_findings["Critical"]
or app_findings["Warning"]
or app_findings["Info"]
):
self.audit_report.append(app_findings)
def check_workload_security(
self, data: Dict[str, Any], findings: Dict[str, List[str]]
) -> None:
kind = data.get("kind")
name = data.get("metadata", {}).get("name")
spec = data.get("spec", {})
if kind == "CronJob":
pod_template = (
spec.get("jobTemplate", {}).get("spec", {}).get("template", {})
)
else:
pod_template = spec.get("template", {})
pod_spec = pod_template.get("spec", {})
containers = pod_spec.get("containers", []) + pod_spec.get("initContainers", [])
for field in ["hostNetwork", "hostPID", "hostIPC"]:
if pod_spec.get(field) is True:
findings["Critical"].append(f"[{kind}/{name}] enabled {field}")
for container in containers:
c_name = container.get("name")
sc = container.get("securityContext", {})
if sc.get("privileged") is True:
findings["Critical"].append(
f"[{kind}/{name}/container:{c_name}] privileged mode enabled (privileged: true)"
)
caps = sc.get("capabilities", {})
if "add" in caps:
findings["Critical"].append(
f"[{kind}/{name}/container:{c_name}] added dangerous Capabilities: {caps['add']}"
)
if caps.get("drop") != ["ALL"] and caps.get("drop") != ["all"]:
findings["Warning"].append(
f"[{kind}/{name}/container:{c_name}] did not drop ALL capabilities"
)
if (
pod_spec.get("securityContext", {}).get("runAsNonRoot") is not True
and sc.get("runAsNonRoot") is not True
):
findings["Warning"].append(
f"[{kind}/{name}/container:{c_name}] missing runAsNonRoot: true"
)
if sc.get("readOnlyRootFilesystem") is not True:
findings["Warning"].append(
f"[{kind}/{name}/container:{c_name}] missing readOnlyRootFilesystem: true"
)
resources = container.get("resources", {})
if not resources.get("limits") or not resources.get("requests"):
findings["Info"].append(
f"[{kind}/{name}/container:{c_name}] incomplete resource Limits/Requests"
)
probes = ["livenessProbe", "readinessProbe", "startupProbe"]
for p in probes:
if p not in container:
findings["Info"].append(
f"[{kind}/{name}/container:{c_name}] missing {p}"
)
image = container.get("image", "")
if ":" not in image or image.endswith(":latest"):
findings["Info"].append(
f"[{kind}/{name}/container:{c_name}] image tag not pinned ({image})"
)
def audit_rbac(self) -> None:
"""Audit RBAC permissions and check for over-privilege."""
roles: Dict[str, Dict[str, Any]] = {}
for app_dir in self.group_dir.iterdir():
if not app_dir.is_dir():
continue
for yaml_file in app_dir.glob("*.yaml"):
with open(yaml_file, "r") as f:
try:
data = yaml.safe_load(f)
except Exception:
continue
if not data or data.get("kind") not in {"Role", "ClusterRole"}:
continue
name = data.get("metadata", {}).get("name")
kind = data.get("kind")
key = f"{kind}/{name}"
roles[key] = {
"app": app_dir.name,
"kind": kind,
"name": name,
"rules": data.get("rules", []),
}
for key, role_info in roles.items():
for rule in role_info["rules"]:
verbs = rule.get("verbs", [])
resources = rule.get("resources", [])
api_groups = rule.get("apiGroups", [])
non_resource_urls = rule.get("nonResourceURLs", [])
issues = []
if "*" in verbs:
issues.append("verbs include wildcard [*]")
if "*" in resources:
issues.append("resources include wildcard [*]")
if "*" in api_groups:
issues.append("apiGroups include wildcard [*]")
if "*" in non_resource_urls:
issues.append("nonResourceURLs include wildcard [*]")
if role_info["kind"] == "ClusterRole" and role_info["name"] in [
"cluster-admin",
"admin",
"edit",
"view",
]:
issues.append(f"uses high-privilege system role [{role_info['name']}]")
if issues:
self.rbac_issues.append(
{
"AppName": role_info["app"],
"Kind": role_info["kind"],
"Name": role_info["name"],
"Issues": "; ".join(issues),
"Resources": ", ".join(resources) if resources else "N/A",
"Verbs": ", ".join(verbs) if verbs else "N/A",
}
)
def audit_network_security(self) -> None:
"""Audit network security: NetworkPolicy, Service, Ingress."""
apps_with_policies: Set[str] = set()
all_apps: Set[str] = set()
for app_dir in self.group_dir.iterdir():
if not app_dir.is_dir():
continue
all_apps.add(app_dir.name)
for yaml_file in app_dir.glob("*.yaml"):
with open(yaml_file, "r") as f:
try:
data = yaml.safe_load(f)
except Exception:
continue
if not data:
continue
kind = data.get("kind")
name = data.get("metadata", {}).get("name")
if kind == "NetworkPolicy":
apps_with_policies.add(app_dir.name)
spec = data.get("spec", {})
pod_selector = spec.get("podSelector", {})
if not pod_selector:
self.network_findings.append(
{
"AppName": app_dir.name,
"Type": "NetworkPolicy",
"Name": name,
"Issue": "PodSelector is empty and may affect all Pods",
"Severity": "Warning",
}
)
ingress = spec.get("ingress", [])
egress = spec.get("egress", [])
if not ingress and not egress:
self.network_findings.append(
{
"AppName": app_dir.name,
"Type": "NetworkPolicy",
"Name": name,
"Issue": "no ingress or egress rules",
"Severity": "Warning",
}
)
elif kind == "Service":
spec = data.get("spec", {})
svc_type = spec.get("type", "ClusterIP")
if svc_type in ["LoadBalancer", "NodePort"]:
self.network_findings.append(
{
"AppName": app_dir.name,
"Type": "Service",
"Name": name,
"Issue": f"Service type is {svc_type}, directly exposed to external network",
"Severity": "Warning",
"Details": f"Ports: {spec.get('ports', [])}",
}
)
external_ips = spec.get("externalIPs", [])
if external_ips:
self.network_findings.append(
{
"AppName": app_dir.name,
"Type": "Service",
"Name": name,
"Issue": f"externalIPs configured: {external_ips}",
"Severity": "Warning",
}
)
elif kind == "Ingress":
spec = data.get("spec", {})
rules = spec.get("rules", [])
tls = spec.get("tls", [])
hosts = [r.get("host", "") for r in rules if r.get("host")]
if hosts and not tls:
self.network_findings.append(
{
"AppName": app_dir.name,
"Type": "Ingress",
"Name": name,
"Issue": f"hosts exposed without TLS: {hosts}",
"Severity": "Warning",
}
)
if hosts:
self.network_findings.append(
{
"AppName": app_dir.name,
"Type": "Ingress",
"Name": name,
"Issue": f"exposed externally: {', '.join(hosts)}",
"Severity": "Info",
}
)
apps_without_policies = all_apps - apps_with_policies
for app in apps_without_policies:
self.network_findings.append(
{
"AppName": app,
"Type": "NetworkPolicy",
"Name": "N/A",
"Issue": "application has no NetworkPolicy protection",
"Severity": "Warning",
}
)
def audit_host_mounts(self) -> None:
"""Detect hostPath mounts."""
for app_dir in self.group_dir.iterdir():
if not app_dir.is_dir():
continue
for yaml_file in app_dir.glob("*.yaml"):
with open(yaml_file, "r") as f:
try:
data = yaml.safe_load(f)
except Exception:
continue
if not data or data.get("kind") not in WORKLOAD_KINDS:
continue
kind = data.get("kind")
name = data.get("metadata", {}).get("name")
spec = data.get("spec", {})
if kind == "CronJob":
pod_spec = (
spec.get("jobTemplate", {})
.get("spec", {})
.get("template", {})
.get("spec", {})
)
else:
pod_spec = spec.get("template", {}).get("spec", {})
volumes = pod_spec.get("volumes", [])
for vol in volumes:
if "hostPath" in vol:
host_path = vol["hostPath"]
path = host_path.get("path", "")
path_type = host_path.get("type", "")
severity = "Critical"
if any(
dangerous in path.lower()
for dangerous in [
"var/run/docker.sock",
"var/lib/kubelet",
"var/lib/docker",
"etc",
"root",
]
):
severity = "Critical"
elif path_type in ["Socket", "BlockDevice"]:
severity = "Critical"
else:
severity = "Warning"
self.hostpath_mounts.append(
{
"AppName": app_dir.name,
"Workload": f"{kind}/{name}",
"VolumeName": vol.get("name", ""),
"HostPath": path,
"Type": path_type,
"Severity": severity,
}
)
def audit_security_policies(self) -> None:
"""Check seccomp/AppArmor security policy configuration."""
for app_dir in self.group_dir.iterdir():
if not app_dir.is_dir():
continue
for yaml_file in app_dir.glob("*.yaml"):
with open(yaml_file, "r") as f:
try:
data = yaml.safe_load(f)
except Exception:
continue
if not data or data.get("kind") not in WORKLOAD_KINDS:
continue
kind = data.get("kind")
name = data.get("metadata", {}).get("name")
annotations = data.get("metadata", {}).get("annotations", {})
spec = data.get("spec", {})
if kind == "CronJob":
pod_spec = (
spec.get("jobTemplate", {})
.get("spec", {})
.get("template", {})
.get("spec", {})
)
else:
pod_spec = spec.get("template", {}).get("spec", {})
pod_sc = pod_spec.get("securityContext", {})
seccomp_profile = pod_sc.get("seccompProfile", {})
if seccomp_profile:
profile_type = seccomp_profile.get("type", "")
if profile_type == "Unconfined":
self.security_policies.append(
{
"AppName": app_dir.name,
"Workload": f"{kind}/{name}",
"PolicyType": "seccomp",
"Status": "Unconfined",
"Severity": "Warning",
}
)
elif profile_type == "RuntimeDefault":
self.security_policies.append(
{
"AppName": app_dir.name,
"Workload": f"{kind}/{name}",
"PolicyType": "seccomp",
"Status": "RuntimeDefault",
"Severity": "OK",
}
)
else:
has_seccomp = any(
k.startswith("seccomp.security.beta.kubernetes.io/")
for k in annotations.keys()
)
if has_seccomp:
self.security_policies.append(
{
"AppName": app_dir.name,
"Workload": f"{kind}/{name}",
"PolicyType": "seccomp (legacy)",
"Status": "Configured via annotation",
"Severity": "OK",
}
)
else:
self.security_policies.append(
{
"AppName": app_dir.name,
"Workload": f"{kind}/{name}",
"PolicyType": "seccomp",
"Status": "Not configured",
"Severity": "Warning",
}
)
apparmor_key = "container.apparmor.security.beta.kubernetes.io/"
has_apparmor = any(
k.startswith(apparmor_key) for k in annotations.keys()
)
if has_apparmor:
for k, v in annotations.items():
if k.startswith(apparmor_key):
container = k.replace(apparmor_key, "")
if v == "unconfined":
self.security_policies.append(
{
"AppName": app_dir.name,
"Workload": f"{kind}/{name}",
"PolicyType": f"AppArmor ({container})",
"Status": "Unconfined",
"Severity": "Warning",
}
)
else:
self.security_policies.append(
{
"AppName": app_dir.name,
"Workload": f"{kind}/{name}",
"PolicyType": f"AppArmor ({container})",
"Status": v,
"Severity": "OK",
}
)
else:
self.security_policies.append(
{
"AppName": app_dir.name,
"Workload": f"{kind}/{name}",
"PolicyType": "AppArmor",
"Status": "Not configured",
"Severity": "Info",
}
)
def audit_pod_disruption_and_secrets(self) -> None:
"""Check PodDisruptionBudget and Secret/ServiceAccount configuration."""
apps_with_workloads: Set[str] = set()
apps_with_pdb: Set[str] = set()
for app_dir in self.group_dir.iterdir():
if not app_dir.is_dir():
continue
for yaml_file in app_dir.glob("*.yaml"):
with open(yaml_file, "r") as f:
try:
data = yaml.safe_load(f)
except Exception:
continue
if not data:
continue
kind = data.get("kind")
name = data.get("metadata", {}).get("name")
if kind in WORKLOAD_KINDS:
apps_with_workloads.add(app_dir.name)
elif kind == "PodDisruptionBudget":
apps_with_pdb.add(app_dir.name)
spec = data.get("spec", {})
min_available = spec.get("minAvailable")
max_unavailable = spec.get("maxUnavailable")
if min_available:
self.pdb_and_secrets.append(
{
"AppName": app_dir.name,
"Type": "PDB",
"Name": name,
"Check": "minAvailable",
"Value": str(min_available),
"Severity": "OK",
}
)
elif max_unavailable:
self.pdb_and_secrets.append(
{
"AppName": app_dir.name,
"Type": "PDB",
"Name": name,
"Check": "maxUnavailable",
"Value": str(max_unavailable),
"Severity": "OK",
}
)
elif kind == "Secret":
secret_type = data.get("type", "Opaque")
issues = []
if secret_type == "Opaque":
content = data.get("data", {}) or {}
for key in content.keys():
if "docker" in key.lower() or "registry" in key.lower():
issues.append(
"consider using kubernetes.io/dockerconfigjson type"
)
break
self.pdb_and_secrets.append(
{
"AppName": app_dir.name,
"Type": "Secret",
"Name": name,
"Check": "Type",
"Value": secret_type,
"Severity": "OK" if not issues else "Warning",
"Notes": "; ".join(issues) if issues else "",
}
)
elif kind == "ServiceAccount":
automount_token = data.get("automountServiceAccountToken")
if automount_token is False:
self.pdb_and_secrets.append(
{
"AppName": app_dir.name,
"Type": "ServiceAccount",
"Name": name,
"Check": "automountServiceAccountToken",
"Value": "false",
"Severity": "OK",
}
)
elif automount_token is True:
self.pdb_and_secrets.append(
{
"AppName": app_dir.name,
"Type": "ServiceAccount",
"Name": name,
"Check": "automountServiceAccountToken",
"Value": "true",
"Severity": "Warning",
"Notes": "explicitly enabled token automount",
}
)
else:
self.pdb_and_secrets.append(
{
"AppName": app_dir.name,
"Type": "ServiceAccount",
"Name": name,
"Check": "automountServiceAccountToken",
"Value": "default",
"Severity": "Info",
}
)
apps_needing_pdb = apps_with_workloads - apps_with_pdb
for app in apps_needing_pdb:
self.pdb_and_secrets.append(
{
"AppName": app,
"Type": "PDB",
"Name": "N/A",
"Check": "Missing",
"Value": "No PodDisruptionBudget",
"Severity": "Info",
"Notes": "recommend configuring a PDB for stateful workloads",
}
)
def save_reports(self) -> None:
if "audit_dir" in self.paths:
self.paths["audit_dir"].mkdir(parents=True, exist_ok=True)
with open(self.paths["csv_cm_to_sec"], "w", newline="") as f:
writer = csv.DictWriter(
f, fieldnames=["AppName", "ConfigMap", "SensitiveKeys", "Usage"]
)
writer.writeheader()
writer.writerows(self.cm_to_secret_rows)
with open(self.paths["csv_sec_to_cm"], "w", newline="") as f:
writer = csv.DictWriter(
f, fieldnames=["AppName", "Secret", "NonSensitiveKeys"]
)
writer.writeheader()
writer.writerows(self.secret_to_cm_rows)
if self.rbac_issues:
with open(self.paths["csv_rbac"], "w", newline="") as f:
writer = csv.DictWriter(
f,
fieldnames=["AppName", "Kind", "Name", "Issues", "Resources", "Verbs"],
)
writer.writeheader()
writer.writerows(self.rbac_issues)
if self.network_findings:
fieldnames_set: set[str] = set()
for row in self.network_findings:
fieldnames_set.update(row.keys())
fieldnames = sorted(fieldnames_set)
with open(self.paths["csv_network"], "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(self.network_findings)
if self.hostpath_mounts:
with open(self.paths["csv_hostpath"], "w", newline="") as f:
writer = csv.DictWriter(
f,
fieldnames=[
"AppName",
"Workload",
"VolumeName",
"HostPath",
"Type",
"Severity",
],
)
writer.writeheader()
writer.writerows(self.hostpath_mounts)
if self.security_policies:
with open(self.paths["csv_secpol"], "w", newline="") as f:
writer = csv.DictWriter(
f,
fieldnames=["AppName", "Workload", "PolicyType", "Status", "Severity"],
)
writer.writeheader()
writer.writerows(self.security_policies)
if self.pdb_and_secrets:
with open(self.paths["csv_pdb_sec"], "w", newline="") as f:
writer = csv.DictWriter(
f,
fieldnames=[
"AppName",
"Type",
"Name",
"Check",
"Value",
"Severity",
"Notes",
],
)
writer.writeheader()
writer.writerows(self.pdb_and_secrets)
json_path = self.paths["audit_dir"] / "audit_results.json"
extended_report = []
for app_report in self.audit_report:
app_name = app_report.get("AppName")
extended = app_report.copy()
extended["RBAC_Issues"] = [
r for r in self.rbac_issues if r.get("AppName") == app_name
]
extended["Network_Findings"] = [
r for r in self.network_findings if r.get("AppName") == app_name
]
extended["Host_Mounts"] = [
r for r in self.hostpath_mounts if r.get("AppName") == app_name
]
extended["Security_Policies"] = [
r for r in self.security_policies if r.get("AppName") == app_name
]
extended["PDB_and_Secrets"] = [
r for r in self.pdb_and_secrets if r.get("AppName") == app_name
]
extended_report.append(extended)
with open(json_path, "w") as f:
json.dump(extended_report, f, indent=2, ensure_ascii=False)
console = output.console
console.print()
output.success("Audit complete!")
output.info(f" - JSON Report: {json_path}")
output.info(f" - CM to Secret: {self.paths['csv_cm_to_sec']}")
output.info(f" - Secret to CM: {self.paths['csv_sec_to_cm']}")
if self.rbac_issues:
output.info(f" - RBAC Issues: {self.paths['csv_rbac']}")
if self.network_findings:
output.info(f" - Network Security: {self.paths['csv_network']}")
if self.hostpath_mounts:
output.info(f" - HostPath Mounts: {self.paths['csv_hostpath']}")
if self.security_policies:
output.info(f" - Security Policies: {self.paths['csv_secpol']}")
if self.pdb_and_secrets:
output.info(f" - PDB and Secrets: {self.paths['csv_pdb_sec']}")
def main() -> None:
parser = argparse.ArgumentParser(description="Security audit for K8s resources.")
parser.add_argument("--context", required=True, help="K8s context")
parser.add_argument("--namespace", required=True, help="K8s namespace")
args = parser.parse_args()
auditor = Auditor(args.context, args.namespace)
auditor.run()
if __name__ == "__main__":
main()
```
### scripts/utils.py
```python
from pathlib import Path
from typing import Dict
import os
def get_output_paths(context: str, namespace: str) -> Dict[str, Path]:
"""
Get output paths for all stages.
Output structure: output/{context}/{namespace}/{stage}/
Output path priority:
1. KUBE_AUDIT_OUTPUT env var (set by SKILL.md)
2. Current working directory (Path.cwd())
"""
# Use env var if set (SKILL.md sets this to user's working directory)
if "KUBE_AUDIT_OUTPUT" in os.environ:
base = Path(os.environ["KUBE_AUDIT_OUTPUT"])
else:
# Fallback to current working directory
base = Path.cwd() / "output"
base_output = base / context / namespace
return {
"export": base_output / "export",
"sanitize": base_output / "sanitize",
"sanitize_fields": base_output / "sanitize_fields",
"group": base_output / "group",
"orphan": base_output / "ungrouped_resources.txt",
"audit_dir": base_output / "audit",
"audit": base_output / "audit" / "audit_report.md",
"csv_cm_to_sec": base_output / "audit" / "configmap_to_secret.csv",
"csv_sec_to_cm": base_output / "audit" / "secret_to_configmap.csv",
"csv_rbac": base_output / "audit" / "rbac_issues.csv",
"csv_network": base_output / "audit" / "network_security.csv",
"csv_hostpath": base_output / "audit" / "hostpath_mounts.csv",
"csv_secpol": base_output / "audit" / "security_policies.csv",
"csv_pdb_sec": base_output / "audit" / "pdb_and_secrets.csv",
}
```