database-query-and-export
Query SQLite, PostgreSQL, and MySQL databases and export results to CSV/JSON. Use when: (1) Extracting data for reports, (2) Database backup and migration, (3) Data analysis workflows, or (4) Automated database queries.
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install besoeasy-open-skills-database-query-and-export
Repository
Skill path: skills/database-query-and-export
Query SQLite, PostgreSQL, and MySQL databases and export results to CSV/JSON. Use when: (1) Extracting data for reports, (2) Database backup and migration, (3) Data analysis workflows, or (4) Automated database queries.
Open repositoryBest for
Primary workflow: Analyze Data & AI.
Technical facets: Full Stack, Backend, Data / AI.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: besoeasy.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install database-query-and-export into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/besoeasy/open-skills before adding database-query-and-export to shared team environments
- Use database-query-and-export for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: database-query-and-export
description: "Query SQLite, PostgreSQL, and MySQL databases and export results to CSV/JSON. Use when: (1) Extracting data for reports, (2) Database backup and migration, (3) Data analysis workflows, or (4) Automated database queries."
---
# Database Query and Export
Query relational databases (SQLite, PostgreSQL, MySQL) and export results to CSV, JSON, or other formats. Essential for data extraction, reporting, backup automation, and analytics pipelines.
## When to use
- Use case 1: When the user asks to query a database and export results
- Use case 2: When you need to extract data for analysis or reporting
- Use case 3: For backup and data migration workflows
- Use case 4: When building automated database monitoring and alerts
## Required tools / APIs
- **SQLite** — Lightweight file-based database (often pre-installed)
- **PostgreSQL client** — For PostgreSQL databases
- **MySQL client** — For MySQL/MariaDB databases
- No external API required
Install options:
```bash
# Ubuntu/Debian
sudo apt-get install -y sqlite3 postgresql-client mysql-client
# macOS
brew install sqlite3 postgresql mysql-client
# Node.js (database drivers)
npm install better-sqlite3 # SQLite
npm install pg # PostgreSQL
npm install mysql2 # MySQL
```
## Skills
### query_sqlite_to_json
Query SQLite database and export to JSON format.
```bash
# Basic query to JSON
sqlite3 database.db "SELECT * FROM users LIMIT 10;" -json
# With pretty formatting using jq
sqlite3 database.db "SELECT * FROM users WHERE active=1;" -json | jq '.'
# Export entire table to JSON file
sqlite3 database.db "SELECT * FROM orders;" -json > orders.json
# Query with WHERE clause
sqlite3 database.db "SELECT id, name, email FROM users WHERE created_at > '2024-01-01';" -json
```
**Node.js:**
```javascript
const Database = require('better-sqlite3');
function querySQLiteToJSON(dbPath, query) {
const db = new Database(dbPath, { readonly: true });
const rows = db.prepare(query).all();
db.close();
return rows;
}
// Usage
// const users = querySQLiteToJSON('./database.db', 'SELECT * FROM users LIMIT 10');
// console.log(JSON.stringify(users, null, 2));
```
### query_sqlite_to_csv
Query SQLite database and export to CSV format.
```bash
# Basic query to CSV
sqlite3 database.db <<EOF
.mode csv
.headers on
SELECT * FROM users LIMIT 10;
EOF
# Export to CSV file
sqlite3 database.db <<EOF
.mode csv
.headers on
.output users.csv
SELECT id, name, email, created_at FROM users WHERE active=1;
EOF
# Query multiple tables with JOIN
sqlite3 database.db <<EOF
.mode csv
.headers on
SELECT u.name, o.order_id, o.total
FROM users u
JOIN orders o ON u.id = o.user_id
WHERE o.created_at > '2024-01-01';
EOF
```
**Node.js:**
```javascript
const Database = require('better-sqlite3');
const fs = require('fs');
function querySQLiteToCSV(dbPath, query, outputPath) {
const db = new Database(dbPath, { readonly: true });
const rows = db.prepare(query).all();
db.close();
if (rows.length === 0) {
return 'No results';
}
// Generate CSV
const headers = Object.keys(rows[0]).join(',');
const csvRows = rows.map(row =>
Object.values(row).map(val =>
typeof val === 'string' && val.includes(',') ? `"${val}"` : val
).join(',')
);
const csv = [headers, ...csvRows].join('\n');
if (outputPath) {
fs.writeFileSync(outputPath, csv);
return `Exported ${rows.length} rows to ${outputPath}`;
}
return csv;
}
// Usage
// querySQLiteToCSV('./database.db', 'SELECT * FROM users LIMIT 10', './users.csv');
```
### query_postgresql
Query PostgreSQL database and export results.
```bash
# Set connection string (alternative: use individual flags)
export PGHOST=localhost
export PGPORT=5432
export PGDATABASE=mydb
export PGUSER=postgres
export PGPASSWORD=mypassword
# Query to JSON (using psql with formatted output)
psql -t -A -F"," -c "SELECT row_to_json(t) FROM (SELECT * FROM users LIMIT 10) t;"
# Query to CSV
psql -c "COPY (SELECT * FROM users WHERE active=true) TO STDOUT WITH CSV HEADER;" > users.csv
# Query with connection string
psql "postgresql://user:password@localhost:5432/mydb" -c "SELECT * FROM users LIMIT 5;"
# Query to formatted table
psql -c "SELECT id, name, email FROM users ORDER BY created_at DESC LIMIT 10;"
```
**Node.js:**
```javascript
const { Pool } = require('pg');
async function queryPostgreSQL(connectionString, query) {
const pool = new Pool({ connectionString });
try {
const result = await pool.query(query);
return result.rows;
} finally {
await pool.end();
}
}
// Usage
// const connStr = 'postgresql://user:password@localhost:5432/mydb';
// queryPostgreSQL(connStr, 'SELECT * FROM users LIMIT 10')
// .then(rows => console.log(JSON.stringify(rows, null, 2)));
```
### query_mysql
Query MySQL/MariaDB database and export results.
```bash
# Query to CSV with headers
mysql -h localhost -u root -p'mypassword' -D mydb \
-e "SELECT * FROM users WHERE active=1;" \
--batch --silent \
| cat > users.csv
# Query to JSON-like format (requires jq for proper formatting)
mysql -h localhost -u root -p'mypassword' -D mydb \
-e "SELECT * FROM users LIMIT 10;" \
--batch --silent
# Export entire table to CSV
mysql -h localhost -u root -p'mypassword' -D mydb \
-e "SELECT * FROM orders INTO OUTFILE '/tmp/orders.csv'
FIELDS TERMINATED BY ','
ENCLOSED BY '\"'
LINES TERMINATED BY '\n';"
# Query with timeout
mysql -h localhost -u root -p'mypassword' -D mydb \
--connect-timeout=10 \
-e "SELECT COUNT(*) as total FROM users;"
```
**Node.js:**
```javascript
const mysql = require('mysql2/promise');
async function queryMySQL(config, query) {
const connection = await mysql.createConnection({
host: config.host || 'localhost',
user: config.user,
password: config.password,
database: config.database,
connectTimeout: 10000
});
try {
const [rows] = await connection.execute(query);
return rows;
} finally {
await connection.end();
}
}
// Usage
// const config = {
// host: 'localhost',
// user: 'root',
// password: 'mypassword',
// database: 'mydb'
// };
// queryMySQL(config, 'SELECT * FROM users LIMIT 10')
// .then(rows => console.log(JSON.stringify(rows, null, 2)));
```
### advanced_sqlite_export_with_error_handling
Production-ready SQLite export with validation and error handling.
```bash
#!/bin/bash
DB_PATH="database.db"
QUERY="SELECT * FROM users WHERE active=1;"
OUTPUT_FILE="users.csv"
# Check if database exists
if [ ! -f "$DB_PATH" ]; then
echo "Error: Database file not found: $DB_PATH" >&2
exit 1
fi
# Check if table exists
if ! sqlite3 "$DB_PATH" "SELECT name FROM sqlite_master WHERE type='table' AND name='users';" | grep -q "users"; then
echo "Error: Table 'users' not found in database" >&2
exit 1
fi
# Execute query and export to CSV
if sqlite3 "$DB_PATH" <<EOF > "$OUTPUT_FILE" 2>&1
.mode csv
.headers on
$QUERY
EOF
then
ROW_COUNT=$(wc -l < "$OUTPUT_FILE")
echo "Success: Exported $((ROW_COUNT - 1)) rows to $OUTPUT_FILE"
else
echo "Error: Query failed" >&2
exit 1
fi
```
**Node.js:**
```javascript
const Database = require('better-sqlite3');
const fs = require('fs');
async function exportSQLiteWithValidation(options) {
const { dbPath, query, outputPath, format = 'json' } = options;
// Validate database exists
if (!fs.existsSync(dbPath)) {
throw new Error(`Database file not found: ${dbPath}`);
}
let db;
try {
db = new Database(dbPath, { readonly: true, timeout: 10000 });
// Prepare and execute query
const stmt = db.prepare(query);
const rows = stmt.all();
if (rows.length === 0) {
return { success: true, rowCount: 0, message: 'No rows returned' };
}
// Export based on format
let output;
if (format === 'json') {
output = JSON.stringify(rows, null, 2);
} else if (format === 'csv') {
const headers = Object.keys(rows[0]).join(',');
const csvRows = rows.map(row =>
Object.values(row).map(val =>
typeof val === 'string' && val.includes(',') ? `"${val.replace(/"/g, '""')}"` : val
).join(',')
);
output = [headers, ...csvRows].join('\n');
} else {
throw new Error(`Unsupported format: ${format}`);
}
// Write to file
fs.writeFileSync(outputPath, output);
return {
success: true,
rowCount: rows.length,
outputPath,
format,
message: `Exported ${rows.length} rows to ${outputPath}`
};
} catch (err) {
throw new Error(`Database export failed: ${err.message}`);
} finally {
if (db) db.close();
}
}
// Usage
// exportSQLiteWithValidation({
// dbPath: './database.db',
// query: 'SELECT * FROM users WHERE active=1',
// outputPath: './users.json',
// format: 'json'
// }).then(result => console.log(result));
```
## Rate limits / Best practices
- ✅ **Use readonly connections** — Open databases in readonly mode when only querying
- ✅ **Set connection timeouts** — Use 10-30 second timeouts to prevent hanging
- ✅ **Validate inputs** — Check that database files/tables exist before querying
- ✅ **Escape user inputs** — Use parameterized queries to prevent SQL injection
- ✅ **Handle large datasets** — Use LIMIT/OFFSET for pagination on large tables
- ✅ **Close connections** — Always close database connections after queries
- ⚠️ **Secure credentials** — Store database passwords in environment variables, never hardcode
- ⚠️ **Export file permissions** — Ensure export directories have proper write permissions
## Agent prompt
```text
You have database query and export capability. When a user asks to query a database:
1. Identify the database type (SQLite, PostgreSQL, MySQL) from:
- File extension (.db, .sqlite, .sqlite3 = SQLite)
- Connection string (postgresql://, mysql://)
- User specification
2. For SQLite:
- Use `sqlite3 database.db "QUERY" -json` for JSON output
- Use `.mode csv` with `.headers on` for CSV output
- Always check if the database file exists first
3. For PostgreSQL:
- Use `psql` with connection string or environment variables
- Use `COPY ... TO STDOUT WITH CSV HEADER` for CSV export
- Export JSON using `row_to_json()` function
4. For MySQL:
- Use `mysql` with `-e` flag for queries
- Use `--batch --silent` for CSV-like output
- Set connection timeout with `--connect-timeout=10`
5. Always:
- Validate database/table exists before querying
- Use readonly connections when only reading
- Handle errors gracefully with clear messages
- Sanitize outputs (escape commas in CSV, quote strings)
6. For large datasets:
- Add LIMIT clause to queries
- Use pagination with OFFSET for very large tables
- Warn user if result set is likely to be huge
```
## Troubleshooting
**Error: "unable to open database file"**
- Symptom: SQLite cannot find or access the database file
- Solution: Check file path is correct and file has read permissions
**Error: "connection refused"**
- Symptom: Cannot connect to PostgreSQL or MySQL server
- Solution: Verify host/port are correct, database service is running, and firewall allows connections
**Error: "authentication failed"**
- Symptom: Database rejects username/password
- Solution: Verify credentials are correct, user has necessary privileges
**Error: "table does not exist"**
- Symptom: Query references non-existent table
- Solution: List available tables first (`sqlite3 db.db ".tables"` or `\dt` in psql)
**CSV output has broken formatting:**
- Symptom: Commas in data break CSV columns
- Solution: Properly escape values with commas using quotes, escape existing quotes
**Query takes too long:**
- Symptom: Query hangs or runs for minutes
- Solution: Add LIMIT clause, optimize query with indexes, increase timeout
## See also
- [../json-and-csv-data-transformation/SKILL.md](../json-and-csv-data-transformation/SKILL.md) — Transform exported data between formats
- [../file-tracker/SKILL.md](../file-tracker/SKILL.md) — Track database file changes over time
- [../chat-logger/SKILL.md](../chat-logger/SKILL.md) — Example of SQLite usage for logging
---
## Referenced Files
> The following files are referenced in this skill and included for context.
### ../json-and-csv-data-transformation/SKILL.md
```markdown
---
name: json-and-csv-data-transformation
description: "Transform data between JSON, CSV, and other formats with filtering, mapping, and flattening. Use when: (1) Converting API responses to CSV, (2) Processing data pipelines, (3) Extracting specific fields, or (4) Flattening nested structures."
---
# JSON and CSV Data Transformation
Transform data between JSON, CSV, and other formats. Filter, map, flatten nested objects, and reshape data for analysis, reporting, and API integration.
## When to use
- Use case 1: When the user asks to convert data between JSON and CSV formats
- Use case 2: When you need to filter, extract, or transform specific fields from data
- Use case 3: For flattening nested JSON structures into tabular format
- Use case 4: When processing API responses for analysis or reporting
## Required tools / APIs
- **jq** — Command-line JSON processor (essential for JSON manipulation)
- **csvkit** — Suite of CSV tools (csvjson, csvcut, csvgrep, etc.)
- No external API required
Install options:
```bash
# Ubuntu/Debian
sudo apt-get install -y jq csvkit
# macOS
brew install jq csvkit
# Node.js (native support, no packages needed for basic operations)
# For advanced CSV parsing: npm install csv-parse csv-stringify
```
## Skills
### json_to_csv
Convert JSON array to CSV format.
```bash
# Simple JSON array to CSV
echo '[{"name":"Alice","age":30},{"name":"Bob","age":25}]' | jq -r '(.[0] | keys_unsorted) as $keys | $keys, (map([.[ $keys[] ]]) | .[] | @csv)'
# JSON file to CSV file
jq -r '(.[0] | keys_unsorted) as $keys | $keys, (map([.[ $keys[] ]]) | .[] | @csv)' data.json > output.csv
# JSON to CSV with specific fields
jq -r '.[] | [.id, .name, .email] | @csv' users.json
# Using csvkit (simpler syntax)
cat data.json | in2csv -f json > output.csv
```
**Node.js:**
```javascript
function jsonToCSV(jsonArray) {
if (!Array.isArray(jsonArray) || jsonArray.length === 0) {
return '';
}
// Get headers from first object
const headers = Object.keys(jsonArray[0]);
// Escape CSV values
const escape = (val) => {
if (val === null || val === undefined) return '';
const str = String(val);
if (str.includes(',') || str.includes('"') || str.includes('\n')) {
return `"${str.replace(/"/g, '""')}"`;
}
return str;
};
// Build CSV
const headerRow = headers.join(',');
const dataRows = jsonArray.map(obj =>
headers.map(header => escape(obj[header])).join(',')
);
return [headerRow, ...dataRows].join('\n');
}
// Usage
// const data = [
// { name: 'Alice', age: 30, city: 'New York' },
// { name: 'Bob', age: 25, city: 'San Francisco' }
// ];
// console.log(jsonToCSV(data));
```
### csv_to_json
Convert CSV to JSON array.
```bash
# CSV to JSON
csvjson data.csv
# CSV to JSON with pretty printing
csvjson data.csv | jq '.'
# CSV to JSON array of objects
csvjson --stream data.csv
# CSV file to JSON file
csvjson input.csv > output.json
# Using pure jq (if headers are in first row)
jq -Rsn '[inputs | split(",") | {name: .[0], age: .[1], city: .[2]}]' < data.csv
```
**Node.js:**
```javascript
function csvToJSON(csvString) {
const lines = csvString.trim().split('\n');
if (lines.length < 2) return [];
// Parse CSV value (handle quotes)
const parseCSVValue = (val) => {
val = val.trim();
if (val.startsWith('"') && val.endsWith('"')) {
return val.slice(1, -1).replace(/""/g, '"');
}
return val;
};
// Split CSV line (basic implementation)
const splitCSVLine = (line) => {
const result = [];
let current = '';
let inQuotes = false;
for (let i = 0; i < line.length; i++) {
const char = line[i];
if (char === '"') {
inQuotes = !inQuotes;
current += char;
} else if (char === ',' && !inQuotes) {
result.push(parseCSVValue(current));
current = '';
} else {
current += char;
}
}
result.push(parseCSVValue(current));
return result;
};
const headers = splitCSVLine(lines[0]);
const data = lines.slice(1).map(line => {
const values = splitCSVLine(line);
const obj = {};
headers.forEach((header, i) => {
obj[header] = values[i] || '';
});
return obj;
});
return data;
}
// Usage
// const csv = `name,age,city
// Alice,30,New York
// Bob,25,"San Francisco"`;
// console.log(JSON.stringify(csvToJSON(csv), null, 2));
```
### filter_and_extract_json
Filter and extract specific fields from JSON.
```bash
# Extract specific fields
jq '.[] | {name: .name, email: .email}' users.json
# Filter by condition
jq '.[] | select(.age > 25)' users.json
# Filter and extract
jq '[.[] | select(.active == true) | {id: .id, name: .name}]' data.json
# Extract nested fields
jq '.[] | {name: .name, street: .address.street, city: .address.city}' data.json
# Get array of single field
jq '.[].name' users.json
# Filter with multiple conditions
jq '.[] | select(.age > 20 and .country == "USA")' users.json
# Map and transform values
jq '.[] | .price = (.price * 1.1)' products.json
```
**Node.js:**
```javascript
function filterAndExtractJSON(data, options) {
const { filter, extract } = options;
let result = Array.isArray(data) ? data : [data];
// Apply filter function
if (filter) {
result = result.filter(filter);
}
// Extract specific fields
if (extract) {
result = result.map(item => {
const extracted = {};
extract.forEach(field => {
// Support nested fields with dot notation
const value = field.split('.').reduce((obj, key) => obj?.[key], item);
extracted[field] = value;
});
return extracted;
});
}
return result;
}
// Usage
// const users = [
// { id: 1, name: 'Alice', age: 30, address: { city: 'NYC' } },
// { id: 2, name: 'Bob', age: 25, address: { city: 'SF' } },
// { id: 3, name: 'Charlie', age: 35, address: { city: 'LA' } }
// ];
//
// const result = filterAndExtractJSON(users, {
// filter: user => user.age > 25,
// extract: ['name', 'age', 'address.city']
// });
// console.log(result);
```
### flatten_nested_json
Flatten nested JSON objects into flat structure.
```bash
# Flatten nested JSON with jq
jq '[.[] | {id: .id, name: .name, street: .address.street, city: .address.city, zip: .address.zip}]' users.json
# Flatten all nested fields with custom separator
jq '[.[] | to_entries | map({key: .key, value: .value}) | from_entries]' data.json
# Flatten deeply nested structure
jq 'recurse | select(type != "object" and type != "array")' complex.json
```
**Node.js:**
```javascript
function flattenJSON(obj, prefix = '', separator = '.') {
const flattened = {};
for (const key in obj) {
const value = obj[key];
const newKey = prefix ? `${prefix}${separator}${key}` : key;
if (value !== null && typeof value === 'object' && !Array.isArray(value)) {
// Recursively flatten nested objects
Object.assign(flattened, flattenJSON(value, newKey, separator));
} else if (Array.isArray(value)) {
// Convert arrays to string or flatten each item
flattened[newKey] = JSON.stringify(value);
} else {
flattened[newKey] = value;
}
}
return flattened;
}
// Usage
// const nested = {
// id: 1,
// name: 'Alice',
// address: {
// street: '123 Main St',
// city: 'NYC',
// coordinates: { lat: 40.7, lon: -74.0 }
// },
// tags: ['user', 'active']
// };
// console.log(flattenJSON(nested));
// Output: {
// id: 1,
// name: 'Alice',
// 'address.street': '123 Main St',
// 'address.city': 'NYC',
// 'address.coordinates.lat': 40.7,
// 'address.coordinates.lon': -74.0,
// tags: '["user","active"]'
// }
```
### transform_csv_data
Transform and manipulate CSV data.
```bash
# Select specific columns
csvcut -c name,email,age users.csv
# Filter rows by value
csvgrep -c age -r "^[3-9][0-9]$" users.csv # age >= 30
# Sort CSV
csvsort -c age -r users.csv # reverse sort by age
# Remove duplicate rows
csvcut -c name,email users.csv | uniq
# Combine: filter, select columns, sort
csvgrep -c country -m "USA" users.csv | csvcut -c name,age | csvsort -c age
# Add calculated column (requires csvpy or awk)
awk -F',' 'BEGIN{OFS=","} NR==1{print $0,"total"} NR>1{print $0,$2*$3}' data.csv
# Merge two CSV files by column
csvjoin -c id users.csv orders.csv
```
**Node.js:**
```javascript
function transformCSV(csvData, transformations) {
const { selectColumns, filterRows, sortBy } = transformations;
// Parse CSV to objects
const data = csvToJSON(csvData);
let result = data;
// Filter rows
if (filterRows) {
result = result.filter(filterRows);
}
// Select columns
if (selectColumns) {
result = result.map(row => {
const selected = {};
selectColumns.forEach(col => {
selected[col] = row[col];
});
return selected;
});
}
// Sort
if (sortBy) {
const { column, reverse } = sortBy;
result.sort((a, b) => {
const aVal = a[column];
const bVal = b[column];
const comparison = aVal > bVal ? 1 : aVal < bVal ? -1 : 0;
return reverse ? -comparison : comparison;
});
}
// Convert back to CSV
return jsonToCSV(result);
}
// Usage
// const csv = `name,age,country
// Alice,30,USA
// Bob,25,Canada
// Charlie,35,USA`;
//
// const transformed = transformCSV(csv, {
// filterRows: row => row.country === 'USA',
// selectColumns: ['name', 'age'],
// sortBy: { column: 'age', reverse: true }
// });
// console.log(transformed);
```
### aggregate_and_group_json
Aggregate and group JSON data (similar to SQL GROUP BY).
```bash
# Group by field and count
jq 'group_by(.country) | map({country: .[0].country, count: length})' users.json
# Sum values by group
jq 'group_by(.category) | map({category: .[0].category, total: map(.price) | add})' products.json
# Average by group
jq 'group_by(.department) | map({department: .[0].department, avg_salary: (map(.salary) | add / length)})' employees.json
# Multiple aggregations
jq 'group_by(.region) | map({
region: .[0].region,
count: length,
total_sales: map(.sales) | add,
avg_sales: (map(.sales) | add / length)
})' sales.json
```
**Node.js:**
```javascript
function groupAndAggregate(data, groupBy, aggregations) {
// Group data
const grouped = {};
data.forEach(item => {
const key = item[groupBy];
if (!grouped[key]) grouped[key] = [];
grouped[key].push(item);
});
// Apply aggregations
return Object.entries(grouped).map(([key, items]) => {
const result = { [groupBy]: key };
aggregations.forEach(agg => {
if (agg.type === 'count') {
result[agg.name] = items.length;
} else if (agg.type === 'sum') {
result[agg.name] = items.reduce((sum, item) => sum + (item[agg.field] || 0), 0);
} else if (agg.type === 'avg') {
const sum = items.reduce((s, item) => s + (item[agg.field] || 0), 0);
result[agg.name] = items.length > 0 ? sum / items.length : 0;
} else if (agg.type === 'min') {
result[agg.name] = Math.min(...items.map(item => item[agg.field] || Infinity));
} else if (agg.type === 'max') {
result[agg.name] = Math.max(...items.map(item => item[agg.field] || -Infinity));
}
});
return result;
});
}
// Usage
// const sales = [
// { region: 'East', product: 'A', amount: 100 },
// { region: 'East', product: 'B', amount: 200 },
// { region: 'West', product: 'A', amount: 150 },
// { region: 'West', product: 'B', amount: 250 }
// ];
//
// const result = groupAndAggregate(sales, 'region', [
// { name: 'count', type: 'count' },
// { name: 'total_amount', type: 'sum', field: 'amount' },
// { name: 'avg_amount', type: 'avg', field: 'amount' }
// ]);
// console.log(result);
```
## Rate limits / Best practices
- ✅ **Stream large files** — Use jq with `-c` flag and process line by line for large datasets
- ✅ **Validate data** — Check JSON/CSV format before transformation
- ✅ **Handle missing fields** — Use default values for null/undefined fields
- ✅ **Memory management** — For files >100MB, use streaming parsers
- ✅ **Type conversion** — Be aware of number/string conversions in CSV
- ✅ **Preserve data types** — JSON maintains types, CSV converts everything to strings
- ⚠️ **Character encoding** — Ensure UTF-8 encoding for international characters
- ⚠️ **Quote escaping** — Properly escape quotes in CSV values
## Agent prompt
```text
You have JSON and CSV data transformation capability. When a user asks to transform data:
1. Identify the input format:
- JSON: Look for {...} or [...]
- CSV: Look for comma-separated values with headers
2. For JSON to CSV:
- Use jq with @csv filter: `jq -r '... | @csv'`
- Or csvkit: `in2csv -f json`
- Node.js: Convert array of objects to CSV string
3. For CSV to JSON:
- Use csvjson from csvkit: `csvjson file.csv`
- Node.js: Parse CSV headers and data rows into objects
4. For filtering/extracting:
- Use jq select(): `jq '.[] | select(.age > 25)'`
- Use csvkit csvgrep: `csvgrep -c column -m value`
- Node.js: Use Array.filter() and map()
5. For flattening:
- Flatten nested JSON objects into dot notation
- Convert nested structures to tabular format
- Handle arrays by stringifying or creating separate rows
6. For aggregation:
- Use jq group_by(): `jq 'group_by(.field) | map({...})'`
- CSV: Convert to JSON, aggregate, convert back
- Node.js: Implement grouping and aggregation functions
Always:
- Preserve data integrity (no data loss)
- Handle edge cases (empty values, special characters)
- Validate output format matches expected structure
- For large files (>100MB), recommend streaming approaches
```
## Troubleshooting
**Error: "parse error: Invalid numeric literal"**
- Symptom: jq fails to parse JSON
- Solution: Validate JSON format with `jq empty file.json`, fix syntax errors
**CSV columns not aligned:**
- Symptom: Data appears in wrong columns after transformation
- Solution: Check for unescaped commas in data, ensure quotes are properly escaped
**Empty output from jq:**
- Symptom: jq returns no results
- Solution: Check filter expression syntax, verify data structure matches filter
**Special characters broken in CSV:**
- Symptom: Non-ASCII characters appear garbled
- Solution: Ensure UTF-8 encoding: `iconv -f UTF-8 -t UTF-8 file.csv`
**Memory error with large files:**
- Symptom: Process runs out of memory
- Solution: Use streaming mode: `jq -c` or Node.js streams for line-by-line processing
**JSON doesn't convert to flat CSV:**
- Symptom: Nested objects create complex CSV structure
- Solution: Flatten JSON first before converting to CSV
## See also
- [../database-query-and-export/SKILL.md](../database-query-and-export/SKILL.md) — Export database results as JSON/CSV
- [../web-search-api/SKILL.md](../web-search-api/SKILL.md) — Transform API responses to desired format
- [../using-web-scraping/SKILL.md](../using-web-scraping/SKILL.md) — Process scraped data into structured formats
```
### ../file-tracker/SKILL.md
```markdown
---
name: file-tracker
description: "Log all file changes (write, edit, delete) to a SQLite database for debugging and audit. Use when: (1) Tracking code changes, (2) Debugging issues, (3) Auditing file modifications, or (4) The user asks to track file changes."
---
# File Tracker
Log every file change (write, edit, delete) to a SQLite database for debugging, audit trails, and version history tracking. Works with any file operation system or code editor.
## When to use
- Tracking file modifications during development
- Creating audit trails for file changes
- Debugging what files were modified and when
- Building version history without git
- User asks to track or review file changes
## Required tools / APIs
- Python standard library (sqlite3, datetime, os)
- Any programming language with SQLite support
No external APIs or services required.
## Database Schema
```sql
CREATE TABLE IF NOT EXISTS file_changes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
action TEXT NOT NULL, -- 'write', 'edit', 'delete', 'rename'
file_path TEXT NOT NULL,
old_content TEXT, -- for edits/deletes
new_content TEXT, -- for writes/edits
file_size INTEGER, -- size in bytes
metadata TEXT, -- JSON: user, session_id, etc.
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_file_path ON file_changes(file_path);
CREATE INDEX idx_timestamp ON file_changes(timestamp);
CREATE INDEX idx_action ON file_changes(action);
-- Automatic purge: delete records older than 1 year
DELETE FROM file_changes WHERE created_at < datetime('now', '-1 year');
```
**Fields:**
- `id` - Auto-incrementing primary key
- `timestamp` - ISO 8601 timestamp of the change
- `action` - Type of operation: 'write', 'edit', 'delete', 'rename'
- `file_path` - Absolute or relative path to the file
- `old_content` - Previous content (for edits) or deleted content (for deletes)
- `new_content` - New content (for writes/edits)
- `file_size` - File size in bytes after operation
- `metadata` - JSON field for additional context (user, session, tools)
- `created_at` - Database insertion timestamp
## Basic Implementation
### Python
**Initialize database:**
```python
import sqlite3
from datetime import datetime
from pathlib import Path
import json
import os
# Configure database path (customize as needed)
DB_PATH = Path.home() / ".file_tracker" / "changes.db"
def init_db():
"""Initialize database and create tables."""
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH))
conn.execute("""
CREATE TABLE IF NOT EXISTS file_changes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
action TEXT NOT NULL,
file_path TEXT NOT NULL,
old_content TEXT,
new_content TEXT,
file_size INTEGER,
metadata TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_file_path ON file_changes(file_path)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_timestamp ON file_changes(timestamp)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_action ON file_changes(action)")
conn.commit()
conn.close()
def purge_old_changes():
"""Delete file change records older than 1 year to keep the database size sane."""
conn = sqlite3.connect(str(DB_PATH))
conn.execute("DELETE FROM file_changes WHERE created_at < datetime('now', '-1 year')")
conn.commit()
conn.close()
# Initialize on import and purge old records
init_db()
purge_old_changes()
```
**Log file changes:**
```python
def log_file_change(
action: str,
file_path: str,
old_content: str = None,
new_content: str = None,
metadata: dict = None
):
"""Log a file change to the database."""
conn = sqlite3.connect(str(DB_PATH))
try:
# Get file size if file exists
file_size = None
if os.path.exists(file_path) and action != "delete":
file_size = os.path.getsize(file_path)
conn.execute(
"""INSERT INTO file_changes
(timestamp, action, file_path, old_content, new_content, file_size, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(
datetime.utcnow().isoformat(),
action,
file_path,
old_content[:5000] if old_content else None, # Truncate large content
new_content[:5000] if new_content else None,
file_size,
json.dumps(metadata) if metadata else None
)
)
conn.commit()
finally:
conn.close()
# Usage examples
log_file_change("write", "/path/to/file.py", new_content="print('Hello')")
log_file_change("edit", "/path/to/file.py", old_content="print('Hello')", new_content="print('Hi')")
log_file_change("delete", "/path/to/file.py", old_content="print('Hi')")
log_file_change("write", "/path/to/config.json", new_content='{"key": "value"}',
metadata={"user": "john", "session": "sess_123"})
```
**Tracked file operations:**
```python
def tracked_write(file_path: str, content: str, metadata: dict = None):
"""Write file and log the change."""
with open(file_path, 'w') as f:
f.write(content)
log_file_change("write", file_path, new_content=content, metadata=metadata)
def tracked_edit(file_path: str, old_content: str, new_content: str, metadata: dict = None):
"""Edit file and log the change."""
with open(file_path, 'w') as f:
f.write(new_content)
log_file_change("edit", file_path, old_content=old_content,
new_content=new_content, metadata=metadata)
def tracked_delete(file_path: str, metadata: dict = None):
"""Delete file and log the change."""
with open(file_path, 'r') as f:
old_content = f.read()
os.remove(file_path)
log_file_change("delete", file_path, old_content=old_content, metadata=metadata)
# Usage
tracked_write("example.txt", "Hello, World!")
tracked_edit("example.txt", "Hello, World!", "Hello, Python!")
tracked_delete("example.txt")
```
**Query file history:**
```python
def get_file_history(file_path: str, limit: int = 20):
"""Get change history for a specific file."""
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
cursor = conn.execute(
"""SELECT timestamp, action, old_content, new_content, file_size
FROM file_changes
WHERE file_path = ?
ORDER BY timestamp DESC
LIMIT ?""",
(file_path, limit)
)
results = cursor.fetchall()
conn.close()
return results
def get_recent_changes(limit: int = 50):
"""Get recent file changes across all files."""
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
cursor = conn.execute(
"""SELECT timestamp, action, file_path, file_size
FROM file_changes
ORDER BY timestamp DESC
LIMIT ?""",
(limit,)
)
results = cursor.fetchall()
conn.close()
return results
def search_file_changes(pattern: str):
"""Search for files matching a pattern."""
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
cursor = conn.execute(
"""SELECT timestamp, action, file_path
FROM file_changes
WHERE file_path LIKE ?
ORDER BY timestamp DESC""",
(f"%{pattern}%",)
)
results = cursor.fetchall()
conn.close()
return results
def get_changes_by_action(action: str, limit: int = 50):
"""Get all changes of a specific type (write, edit, delete)."""
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
cursor = conn.execute(
"""SELECT timestamp, file_path, file_size
FROM file_changes
WHERE action = ?
ORDER BY timestamp DESC
LIMIT ?""",
(action, limit)
)
results = cursor.fetchall()
conn.close()
return results
# Usage
history = get_file_history("/path/to/file.py")
for change in history:
print(f"[{change['timestamp']}] {change['action']}: {change['file_size']} bytes")
recent = get_recent_changes(10)
print(f"Last {len(recent)} file changes")
edits = get_changes_by_action("edit", limit=20)
print(f"Found {len(edits)} file edits")
```
### Node.js
```javascript
import sqlite3 from "sqlite3";
import { promisify } from "util";
import path from "path";
import os from "os";
import fs from "fs/promises";
const DB_PATH = path.join(os.homedir(), ".file_tracker", "changes.db");
// Initialize database
const db = new sqlite3.Database(DB_PATH);
const run = promisify(db.run.bind(db));
const all = promisify(db.all.bind(db));
await run(`
CREATE TABLE IF NOT EXISTS file_changes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
action TEXT NOT NULL,
file_path TEXT NOT NULL,
old_content TEXT,
new_content TEXT,
file_size INTEGER,
metadata TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
`);
// Log file change
async function logFileChange(action, filePath, oldContent = null, newContent = null, metadata = null) {
let fileSize = null;
try {
if (action !== "delete") {
const stats = await fs.stat(filePath);
fileSize = stats.size;
}
} catch (err) {
// File might not exist
}
await run(
`INSERT INTO file_changes (timestamp, action, file_path, old_content, new_content, file_size, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?)`,
[
new Date().toISOString(),
action,
filePath,
oldContent,
newContent,
fileSize,
metadata ? JSON.stringify(metadata) : null,
]
);
}
// Tracked file operations
async function trackedWrite(filePath, content, metadata = null) {
await fs.writeFile(filePath, content);
await logFileChange("write", filePath, null, content, metadata);
}
async function trackedEdit(filePath, oldContent, newContent, metadata = null) {
await fs.writeFile(filePath, newContent);
await logFileChange("edit", filePath, oldContent, newContent, metadata);
}
// Query history
async function getFileHistory(filePath, limit = 20) {
return await all(
`SELECT timestamp, action, old_content, new_content, file_size
FROM file_changes
WHERE file_path = ?
ORDER BY timestamp DESC
LIMIT ?`,
[filePath, limit]
);
}
// Usage
await trackedWrite("example.txt", "Hello, World!");
const history = await getFileHistory("example.txt");
console.log(history);
```
## Bash Quick Queries
```bash
# View recent file changes
sqlite3 ~/.file_tracker/changes.db "SELECT timestamp, action, file_path FROM file_changes ORDER BY timestamp DESC LIMIT 20"
# Get history for a specific file
sqlite3 ~/.file_tracker/changes.db "SELECT timestamp, action FROM file_changes WHERE file_path='/path/to/file' ORDER BY timestamp DESC"
# Count changes by action type
sqlite3 ~/.file_tracker/changes.db "SELECT action, COUNT(*) as count FROM file_changes GROUP BY action"
# Find all Python file changes
sqlite3 ~/.file_tracker/changes.db "SELECT timestamp, action, file_path FROM file_changes WHERE file_path LIKE '%.py' ORDER BY timestamp DESC"
# Export file history to JSON
sqlite3 -json ~/.file_tracker/changes.db "SELECT * FROM file_changes WHERE file_path='/path/to/file' ORDER BY timestamp ASC" > file_history.json
```
## Integration Examples
### Context Manager Pattern
```python
class FileChangeTracker:
"""Context manager to automatically track file changes."""
def __init__(self, file_path: str, action: str = "edit", metadata: dict = None):
self.file_path = file_path
self.action = action
self.metadata = metadata
self.old_content = None
def __enter__(self):
if os.path.exists(self.file_path):
with open(self.file_path, 'r') as f:
self.old_content = f.read()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None: # No exception
new_content = None
if os.path.exists(self.file_path):
with open(self.file_path, 'r') as f:
new_content = f.read()
log_file_change(
self.action,
self.file_path,
old_content=self.old_content,
new_content=new_content,
metadata=self.metadata
)
# Usage
with FileChangeTracker("config.json", action="edit"):
# Modify file
with open("config.json", 'w') as f:
f.write('{"updated": true}')
# Change is automatically logged on exit
```
### Decorator Pattern
```python
def track_file_operation(action: str):
"""Decorator to track file operations."""
def decorator(func):
def wrapper(file_path, *args, **kwargs):
# Read old content if file exists
old_content = None
if os.path.exists(file_path) and action in ["edit", "delete"]:
with open(file_path, 'r') as f:
old_content = f.read()
# Execute operation
result = func(file_path, *args, **kwargs)
# Read new content
new_content = None
if os.path.exists(file_path) and action in ["write", "edit"]:
with open(file_path, 'r') as f:
new_content = f.read()
# Log change
log_file_change(action, file_path, old_content, new_content)
return result
return wrapper
return decorator
# Usage
@track_file_operation("write")
def create_file(path, content):
with open(path, 'w') as f:
f.write(content)
@track_file_operation("edit")
def update_file(path, new_content):
with open(path, 'w') as f:
f.write(new_content)
```
## Agent Prompt
```text
You have file change tracking capability. All file operations are logged to a SQLite database.
When user asks to:
- Review file change history
- Track what files were modified
- Find when a file was changed
- Audit file operations
Use the SQLite database at ~/.file_tracker/changes.db with this schema:
- file_changes table (id, timestamp, action, file_path, old_content, new_content, file_size, metadata)
After performing file operations (write, edit, delete), always log them:
- Write: log_file_change("write", file_path, new_content=content)
- Edit: log_file_change("edit", file_path, old_content=old, new_content=new)
- Delete: log_file_change("delete", file_path, old_content=content)
Query examples:
1. File history: SELECT * FROM file_changes WHERE file_path = ? ORDER BY timestamp DESC
2. Recent changes: SELECT * FROM file_changes ORDER BY timestamp DESC LIMIT 50
3. Search files: SELECT * FROM file_changes WHERE file_path LIKE '%pattern%'
Always log file operations for audit trail and debugging purposes.
```
## Best Practices
1. **Truncate large content** (e.g., 5000 chars) to avoid database bloat
2. **Use indexes** on file_path, timestamp, and action for fast queries
3. **Store full paths** for clarity and uniqueness
4. **Log metadata** (user, session) for context
5. **Regular cleanup** of old entries to manage database size
6. **Privacy**: avoid storing sensitive file content
7. **Compression**: consider compressing old_content/new_content for large text files
## Troubleshooting
**Database getting too large:**
- Truncate old entries: `DELETE FROM file_changes WHERE timestamp < '2024-01-01'`
- Run VACUUM: `sqlite3 changes.db "VACUUM"`
- Limit content stored (already truncated to 5000 chars)
**Missing file changes:**
- Ensure log_file_change() is called after every file operation
- Check file permissions for database writes
- Verify DB_PATH is accessible
**Query performance slow:**
- Ensure indexes exist (file_path, timestamp, action)
- Use LIMIT on queries
- Consider archiving old entries
## See also
- [../chat-logger/SKILL.md](../chat-logger/SKILL.md) — Log chat messages
- [../generate-report/SKILL.md](../generate-report/SKILL.md) — Generate HTML reports
```
### ../chat-logger/SKILL.md
```markdown
---
name: chat-logger
description: "Log all chat messages to a SQLite database for searchable history and audit. Use when: (1) Building chat history, (2) Auditing conversations, (3) Searching past messages, or (4) User asks to log chats."
---
# Chat Logger
Log all incoming and outgoing chat messages to a SQLite database for searchable history, analytics, and auditing. Works with any chat system or agent framework.
## When to use
- Building a searchable chat history system
- Auditing and reviewing past conversations
- Creating analytics on chat interactions
- Debugging chat flows and responses
- User asks to track or search conversation history
## Required tools / APIs
- Python standard library (sqlite3, datetime, json)
- Any programming language with SQLite support
No external APIs or services required.
## Database Schema
```sql
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
session_id TEXT,
sender TEXT NOT NULL, -- 'user', 'assistant', or identifier
content TEXT,
metadata TEXT, -- JSON: channel, tools_used, etc.
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_timestamp ON messages(timestamp);
CREATE INDEX idx_session ON messages(session_id);
CREATE INDEX idx_sender ON messages(sender);
-- Automatic purge: delete records older than 1 year
DELETE FROM messages WHERE created_at < datetime('now', '-1 year');
```
**Fields:**
- `id` - Auto-incrementing primary key
- `timestamp` - ISO 8601 timestamp of the message
- `session_id` - Optional session/conversation identifier
- `sender` - Message sender ('user', 'assistant', or custom ID)
- `content` - Message text content
- `metadata` - JSON field for additional data (channel, tools, context)
- `created_at` - Database insertion timestamp
## Basic Implementation
### Python
**Initialize database:**
```python
import sqlite3
from datetime import datetime
from pathlib import Path
import json
# Configure database path
DB_PATH = Path.home() / ".chat_logs" / "messages.db"
def init_db():
"""Initialize database and create tables."""
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH))
conn.execute("""
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
session_id TEXT,
sender TEXT NOT NULL,
content TEXT,
metadata TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_timestamp ON messages(timestamp)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_session ON messages(session_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_sender ON messages(sender)")
conn.commit()
conn.close()
def purge_old_messages():
"""Delete messages older than 1 year to keep the database size sane."""
conn = sqlite3.connect(str(DB_PATH))
conn.execute("DELETE FROM messages WHERE created_at < datetime('now', '-1 year')")
conn.commit()
conn.close()
# Initialize on import and purge old records
init_db()
purge_old_messages()
```
**Log messages:**
```python
def log_message(sender: str, content: str, session_id: str = None, metadata: dict = None):
"""Log a chat message to the database."""
conn = sqlite3.connect(str(DB_PATH))
try:
conn.execute(
"""INSERT INTO messages (timestamp, session_id, sender, content, metadata)
VALUES (?, ?, ?, ?, ?)""",
(
datetime.utcnow().isoformat(),
session_id,
sender,
content[:10000] if content else None, # Truncate long messages
json.dumps(metadata) if metadata else None
)
)
conn.commit()
finally:
conn.close()
# Usage examples
log_message("user", "Hello, how are you?", session_id="session_123")
log_message("assistant", "I'm doing well, thank you!", session_id="session_123")
log_message("user", "Help me deploy a website", session_id="session_456",
metadata={"channel": "web", "ip": "192.168.1.1"})
```
**Query messages:**
```python
def get_recent_messages(limit: int = 50):
"""Get recent messages."""
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
cursor = conn.execute(
"SELECT * FROM messages ORDER BY timestamp DESC LIMIT ?",
(limit,)
)
results = cursor.fetchall()
conn.close()
return results
def get_session_history(session_id: str):
"""Get all messages from a specific session."""
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
cursor = conn.execute(
"SELECT * FROM messages WHERE session_id = ? ORDER BY timestamp ASC",
(session_id,)
)
results = cursor.fetchall()
conn.close()
return results
def search_messages(query: str, limit: int = 20):
"""Search message content."""
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
cursor = conn.execute(
"SELECT * FROM messages WHERE content LIKE ? ORDER BY timestamp DESC LIMIT ?",
(f"%{query}%", limit)
)
results = cursor.fetchall()
conn.close()
return results
# Usage
messages = get_recent_messages(10)
for msg in messages:
print(f"[{msg['timestamp']}] {msg['sender']}: {msg['content'][:100]}")
# Search
results = search_messages("deploy website")
print(f"Found {len(results)} messages about deploying websites")
```
### Node.js
```javascript
import sqlite3 from "sqlite3";
import { promisify } from "util";
import path from "path";
import os from "os";
const DB_PATH = path.join(os.homedir(), ".chat_logs", "messages.db");
// Initialize database
const db = new sqlite3.Database(DB_PATH);
const run = promisify(db.run.bind(db));
const all = promisify(db.all.bind(db));
await run(`
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
session_id TEXT,
sender TEXT NOT NULL,
content TEXT,
metadata TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
`);
// Log message
async function logMessage(sender, content, sessionId = null, metadata = null) {
await run(
`INSERT INTO messages (timestamp, session_id, sender, content, metadata)
VALUES (?, ?, ?, ?, ?)`,
[
new Date().toISOString(),
sessionId,
sender,
content,
metadata ? JSON.stringify(metadata) : null,
]
);
}
// Query messages
async function getRecentMessages(limit = 50) {
return await all(
`SELECT * FROM messages ORDER BY timestamp DESC LIMIT ?`,
[limit]
);
}
// Usage
await logMessage("user", "Hello!", "session_123");
await logMessage("assistant", "Hi there!", "session_123");
const messages = await getRecentMessages(10);
console.log(messages);
```
## Bash Quick Queries
```bash
# View recent messages
sqlite3 ~/.chat_logs/messages.db "SELECT timestamp, sender, substr(content, 1, 80) FROM messages ORDER BY timestamp DESC LIMIT 20"
# Search for specific content
sqlite3 ~/.chat_logs/messages.db "SELECT * FROM messages WHERE content LIKE '%docker%' ORDER BY timestamp DESC"
# Count messages by sender
sqlite3 ~/.chat_logs/messages.db "SELECT sender, COUNT(*) as count FROM messages GROUP BY sender"
# Export session to JSON
sqlite3 -json ~/.chat_logs/messages.db "SELECT * FROM messages WHERE session_id='session_123' ORDER BY timestamp ASC" > conversation.json
```
## Integration Examples
### Generic Chat Application
```python
class ChatLogger:
"""Simple chat logger that can wrap any chat system."""
def __init__(self, db_path: str = None):
self.db_path = db_path or str(Path.home() / ".chat_logs" / "messages.db")
self._init_db()
def _init_db(self):
# Same as init_db() above
pass
def log_user_message(self, content: str, session_id: str = None, **metadata):
return log_message("user", content, session_id, metadata)
def log_assistant_message(self, content: str, session_id: str = None, **metadata):
return log_message("assistant", content, session_id, metadata)
def get_conversation(self, session_id: str):
return get_session_history(session_id)
# Usage in any chat system
logger = ChatLogger()
# In your chat handler
def handle_message(user_input, session_id):
logger.log_user_message(user_input, session_id=session_id)
# Process message...
response = generate_response(user_input)
logger.log_assistant_message(response, session_id=session_id)
return response
```
### Decorator Pattern
```python
def with_logging(session_id: str = None):
"""Decorator to automatically log chat interactions."""
def decorator(func):
def wrapper(user_message, *args, **kwargs):
# Log user message
log_message("user", user_message, session_id=session_id)
# Call original function
response = func(user_message, *args, **kwargs)
# Log assistant response
log_message("assistant", response, session_id=session_id)
return response
return wrapper
return decorator
# Usage
@with_logging(session_id="session_123")
def chat_handler(message):
return f"You said: {message}"
```
## Agent Prompt
```text
You have chat logging capability. All conversations are logged to a SQLite database.
When user asks to:
- Search past conversations
- Find specific messages
- Review conversation history
- Export chat logs
Use the SQLite database at ~/.chat_logs/messages.db with this schema:
- messages table (id, timestamp, session_id, sender, content, metadata)
Query examples:
1. Recent history: SELECT * FROM messages ORDER BY timestamp DESC LIMIT 50
2. Search content: SELECT * FROM messages WHERE content LIKE '%keyword%'
3. Session history: SELECT * FROM messages WHERE session_id = ? ORDER BY timestamp ASC
Always use SQL queries to retrieve information and present results clearly to the user.
```
## Best Practices
1. **Truncate long messages** to avoid database bloat (e.g., 10,000 chars)
2. **Use indexes** on timestamp, session_id, and sender for fast queries
3. **Store metadata as JSON** for flexibility
4. **Use ISO 8601 timestamps** for consistency
5. **Session IDs** help organize conversations
6. **Privacy considerations**: be mindful of storing sensitive data
7. **Regular backups**: SQLite files are easy to backup/restore
## Troubleshooting
**Database locked error:**
- Close all connections properly with `conn.close()`
- Use connection pooling for high traffic
**Large database file:**
- Run `VACUUM` to compact: `sqlite3 messages.db "VACUUM"`
- Archive old messages periodically
**Query performance:**
- Ensure indexes are created (timestamp, session_id, sender)
- Use LIMIT on queries
- Consider pagination for large result sets
## See also
- [../file-tracker/SKILL.md](../file-tracker/SKILL.md) — Track file modifications
- [../web-search-api/SKILL.md](../web-search-api/SKILL.md) — Search external content
```