Back to skills
SkillHub ClubShip Full StackFull Stack
doro-email-to-calendar
Imported from https://github.com/openclaw/skills.
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Stars
3,133
Hot score
99
Updated
March 20, 2026
Overall rating
C4.0
Composite score
4.0
Best-practice grade
F19.6
Install command
npx @skill-hub/cli install openclaw-skills-doro-email-to-calendar
Repository
openclaw/skills
Skill path: skills/a2mus/doro-email-to-calendar
Imported from https://github.com/openclaw/skills.
Open repositoryBest for
Primary workflow: Ship Full Stack.
Technical facets: Full Stack.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: openclaw.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install doro-email-to-calendar into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/openclaw/skills before adding doro-email-to-calendar to shared team environments
- Use doro-email-to-calendar for development workflows
Works across
Claude CodeCodex CLIGemini CLIOpenCode
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
--- name: doro-email-to-calendar version: 1.13.1 version: 1.0.0 description: Extract calendar events from emails and create calendar entries. Supports two modes: (1) Direct inbox monitoring - scans all emails for events, or (2) Forwarded emails - processes emails you forward to a dedicated address. Features smart onboarding, event tracking, pending invite reminders, undo support, silent activity logging, deadline detection with separate reminder events, email notifications for action-required items, and provider abstraction for future extensibility. --- > **CRITICAL RULES - READ BEFORE PROCESSING ANY EMAIL** > > 1. **NEVER CALL `gog` DIRECTLY** - ALWAYS use wrapper scripts (`create_event.sh`, `email_read.sh`, etc.). Direct `gog` calls bypass tracking and cause duplicates. THIS IS NON-NEGOTIABLE. > 2. **IGNORE CALENDAR NOTIFICATIONS** - DO NOT process emails from `[email protected]` (Accepted:, Declined:, Tentative:, etc.). These are responses to existing invites, NOT new events. Run `process_calendar_replies.sh` to archive them. > 3. **ALWAYS ASK BEFORE CREATING** - Never create calendar events without explicit user confirmation in the current conversation > 4. **CHECK IF ALREADY PROCESSED** - Before processing any email, check `processed_emails` in index.json > 5. **READ CONFIG FIRST** - Load and apply `ignore_patterns` and `auto_create_patterns` before presenting events > 6. **READ MEMORY.MD** - Check for user preferences stored from previous sessions > 7. **INCLUDE ALL CONFIGURED ATTENDEES** - When creating/updating/deleting events, always include attendees from config with `--attendees` flag (and `--send-updates all` if supported) > 8. **CHECK TRACKED EVENTS FIRST** - Use `lookup_event.sh --email-id` to find existing events before calendar search (faster, more reliable) > 9. **TRACK ALL CREATED EVENTS** - The `create_event.sh` script automatically tracks events; use tracked IDs for updates/deletions > 10. **SHOW DAY-OF-WEEK** - Always include the day of week when presenting events for user verification > ⛔ **FORBIDDEN: DO NOT USE `gog` COMMANDS DIRECTLY** ⛔ > > **WRONG:** `gog calendar create ...` or `gog gmail ...` > **RIGHT:** `"$SCRIPTS_DIR/create_event.sh" ...` or `"$SCRIPTS_DIR/email_read.sh" ...` > > Direct CLI calls bypass event tracking, break duplicate detection, and cause duplicate events. > ALL operations MUST go through the wrapper scripts in `scripts/`. # Email to Calendar Skill Extract calendar events and action items from emails, present them for review, and create/update calendar events with duplicate detection and undo support. **First-time setup:** See [SETUP.md](SETUP.md) for configuration options and smart onboarding. ## Reading Email Content **IMPORTANT:** Before you can extract events, you must read the email body. Use the wrapper scripts. ```bash SCRIPTS_DIR="$HOME/.openclaw/workspace/skills/email-to-calendar/scripts" # Get a single email by ID (PREFERRED) "$SCRIPTS_DIR/email_read.sh" --email-id "<messageId>" # Search with body content included "$SCRIPTS_DIR/email_search.sh" --query "in:inbox is:unread" --max 20 --include-body ``` **Note on stale forwards:** Don't use `newer_than:1d` because it checks the email's original date header, not when it was received. Process all UNREAD emails and rely on the "already processed" check. ## Workflow ### 0. Pre-Processing Checks (MANDATORY) ```bash SCRIPTS_DIR="$HOME/.openclaw/workspace/skills/email-to-calendar/scripts" CONFIG_FILE="$HOME/.config/email-to-calendar/config.json" INDEX_FILE="$HOME/.openclaw/workspace/memory/email-extractions/index.json" # Start activity logging "$SCRIPTS_DIR/activity_log.sh" start-session # Check email mode EMAIL_MODE=$(jq -r '.email_mode // "forwarded"' "$CONFIG_FILE") # Check if email was already processed EMAIL_ID="<the email message ID>" if jq -e ".extractions[] | select(.email_id == \"$EMAIL_ID\")" "$INDEX_FILE" > /dev/null 2>&1; then "$SCRIPTS_DIR/activity_log.sh" log-skip --email-id "$EMAIL_ID" --subject "Subject" --reason "Already processed" exit 0 fi # Load ignore/auto-create patterns IGNORE_PATTERNS=$(jq -r '.event_rules.ignore_patterns[]' "$CONFIG_FILE") AUTO_CREATE_PATTERNS=$(jq -r '.event_rules.auto_create_patterns[]' "$CONFIG_FILE") ``` ### 1. Find Emails to Process **DIRECT mode:** Scan all unread emails for event indicators (dates, times, meeting keywords). **FORWARDED mode:** Only process emails with forwarded indicators (Fwd:, forwarded message headers). ### 2. Extract Events (Agent does this directly) Read the email and extract events as structured data. Include for each event: - **title**: Descriptive name (max 80 chars) - **date**: Event date(s) - **day_of_week**: For verification - **time**: Start/end times (default: 9 AM - 5 PM) - **is_multi_day**: Whether it spans multiple days - **is_recurring**: Whether it repeats (and pattern) - **confidence**: high/medium/low - **urls**: Any URLs found in the email (REQUIRED - always look for registration links, info pages, ticketing sites, etc.) - **deadline_date**: RSVP/registration/ticket deadline date (if found) - **deadline_action**: What user needs to do (e.g., "RSVP", "get tickets", "register") - **deadline_url**: Direct link for taking action (often same as event URL) **URL Extraction Rule:** ALWAYS scan the email for URLs and include the most relevant one at the BEGINNING of the event description. ### 2.1 Deadline Detection Scan the email for deadline patterns that indicate action is required before the event: **Common Deadline Patterns:** - "RSVP by [date]", "Please RSVP by [date]" - "Register by [date]", "Registration closes [date]" - "Tickets available until [date]", "Get tickets by [date]" - "Early bird ends [date]", "Early registration deadline [date]" - "Must respond by [date]", "Respond by [date]" - "Sign up by [date]", "Sign up deadline [date]" - "Deadline: [date]", "Due by [date]" - "Last day to [action]: [date]" When a deadline is found: 1. Extract the deadline date 2. Determine the required action (RSVP, register, buy tickets, etc.) 3. Find the URL for taking that action 4. Flag the event for special handling (see sections below) ### 3. Present Items to User and WAIT Apply event rules, then present with numbered selection: ``` I found the following potential events: 1. ~~ELAC Meeting (Feb 2, Monday at 8:15 AM)~~ - SKIP (matches ignore pattern) 2. **Team Offsite (Feb 2-6, Sun-Thu)** - PENDING 3. **Staff Development Day (Feb 12, Wednesday)** - AUTO-CREATE Reply with numbers to create (e.g., '2, 3'), 'all', or 'none'. ``` **STOP AND WAIT for user response.** After presenting, record pending invites for follow-up reminders: ```bash # Record pending invites using add_pending.sh "$SCRIPTS_DIR/add_pending.sh" \ --email-id "$EMAIL_ID" \ --email-subject "$EMAIL_SUBJECT" \ --events-json '[{"title":"Event Name","date":"2026-02-15","time":"14:00","status":"pending"}]' ``` ### 4. Check for Duplicates (MANDATORY) **ALWAYS check before creating any event:** ```bash # Step 1: Check local tracking first (fast) TRACKED=$("$SCRIPTS_DIR/lookup_event.sh" --email-id "$EMAIL_ID") if [ "$(echo "$TRACKED" | jq 'length')" -gt 0 ]; then EXISTING_EVENT_ID=$(echo "$TRACKED" | jq -r '.[0].event_id') fi # Step 2: If not found, try summary match if [ -z "$EXISTING_EVENT_ID" ]; then TRACKED=$("$SCRIPTS_DIR/lookup_event.sh" --summary "$EVENT_TITLE") fi # Step 3: Fall back to calendar search using wrapper script if [ -z "$EXISTING_EVENT_ID" ]; then "$SCRIPTS_DIR/calendar_search.sh" --calendar-id "$CALENDAR_ID" --from "${EVENT_DATE}T00:00:00" --to "${EVENT_DATE}T23:59:59" fi ``` Use LLM semantic matching for fuzzy duplicates (e.g., "Team Offsite" vs "Team Offsite 5-6pm"). ### 5. Create or Update Calendar Events **Use create_event.sh (recommended)** - handles date parsing, tracking, and changelog: ```bash # Create new event "$SCRIPTS_DIR/create_event.sh" \ "$CALENDAR_ID" \ "Event Title" \ "February 11, 2026" \ "9:00 AM" \ "5:00 PM" \ "Description" \ "$ATTENDEE_EMAILS" \ "" \ "$EMAIL_ID" # Update existing event (pass event_id as 8th parameter) "$SCRIPTS_DIR/create_event.sh" \ "$CALENDAR_ID" \ "Updated Title" \ "February 11, 2026" \ "10:00 AM" \ "6:00 PM" \ "Updated description" \ "$ATTENDEE_EMAILS" \ "$EXISTING_EVENT_ID" \ "$EMAIL_ID" ``` For direct gog commands and advanced options, see [references/gog-commands.md](references/gog-commands.md). ### 6. Email Disposition (Automatic) Email disposition (mark as read and/or archive) is handled **automatically** by `create_event.sh` based on config settings. No manual step needed - emails are dispositioned after event creation. To manually disposition an email: ```bash "$SCRIPTS_DIR/disposition_email.sh" --email-id "$EMAIL_ID" ``` To process calendar reply emails (accepts, declines, tentatives): ```bash "$SCRIPTS_DIR/process_calendar_replies.sh" # Process all "$SCRIPTS_DIR/process_calendar_replies.sh" --dry-run # Preview only ``` ```bash # End activity session "$SCRIPTS_DIR/activity_log.sh" end-session ``` ## Event Creation Rules ### Date/Time Handling - **Single-day events**: Default 9:00 AM - 5:00 PM - **Multi-day events** (e.g., Feb 2-6): Use `--rrule "RRULE:FREQ=DAILY;COUNT=N"` - **Events with specific times**: Use exact time from email ### Event Descriptions **Format event descriptions in this order:** 1. **ACTION WARNING** (if deadline exists): ``` *** ACTION REQUIRED: [ACTION] BY [DATE] *** ``` 2. **Event Link** (if URL found): ``` Event Link: [URL] ``` 3. **Event Details**: Information extracted from the email **Example WITH deadline:** ``` *** ACTION REQUIRED: GET TICKETS BY FEB 15 *** Event Link: https://example.com/tickets Spring Concert at Downtown Theater Doors open at 7 PM VIP meet & greet available ``` **Example WITHOUT deadline:** ``` Event Link: https://example.com/event Spring Concert at Downtown Theater Doors open at 7 PM ``` ### Duplicate Detection Consider it a duplicate if: - Same date AND similar title (semantic matching) AND overlapping time Always update existing events rather than creating duplicates. ### Creating Deadline Events When an event has a deadline (RSVP, registration, ticket purchase, etc.), create TWO calendar events: **1. Main Event** (as normal, but with warning in description): ```bash "$SCRIPTS_DIR/create_event.sh" \ "$CALENDAR_ID" \ "Spring Concert" \ "March 1, 2026" \ "7:00 PM" \ "10:00 PM" \ "*** ACTION REQUIRED: GET TICKETS BY FEB 15 *** Event Link: https://example.com/tickets Spring Concert at Downtown Theater Doors open at 7 PM" \ "$ATTENDEE_EMAILS" \ "" \ "$EMAIL_ID" ``` **2. Deadline Reminder Event** (separate event on the deadline date): ```bash # Use create_event.sh for deadline reminders too (ensures tracking) "$SCRIPTS_DIR/create_event.sh" \ "$CALENDAR_ID" \ "DEADLINE: Get tickets for Spring Concert" \ "2026-02-15" \ "09:00" \ "09:30" \ "Action required: Get tickets Event Link: https://example.com/tickets Main event: Spring Concert on March 1, 2026" \ "" \ "" \ "$EMAIL_ID" ``` **Deadline Event Properties:** - **Title format**: `DEADLINE: [Action] for [Event Name]` - **Date**: The deadline date - **Time**: 9:00 AM (30 minute duration) - **Reminders**: Email 1 day before + popup 1 hour before - **Description**: Action required, URL, reference to main event ### Email Notifications for Deadlines When creating events with deadlines, send a notification email to alert the user: ```bash # Load config CONFIG_FILE="$HOME/.config/email-to-calendar/config.json" USER_EMAIL=$(jq -r '.deadline_notifications.email_recipient // .gmail_account' "$CONFIG_FILE") NOTIFICATIONS_ENABLED=$(jq -r '.deadline_notifications.enabled // false' "$CONFIG_FILE") # Send notification if enabled (using wrapper script) if [ "$NOTIFICATIONS_ENABLED" = "true" ]; then "$SCRIPTS_DIR/email_send.sh" \ --to "$USER_EMAIL" \ --subject "ACTION REQUIRED: Get tickets for Spring Concert by Feb 15" \ --body "A calendar event has been created that requires your action. Event: Spring Concert Date: March 1, 2026 Deadline: February 15, 2026 Action Required: Get tickets Link: https://example.com/tickets Calendar events created: - Main event: Spring Concert (March 1) - Deadline reminder: DEADLINE: Get tickets for Spring Concert (Feb 15) --- This notification was sent by the email-to-calendar skill." fi ``` **When to send notifications:** - Only when `deadline_notifications.enabled` is `true` in config - Only for events that have action-required deadlines - Include the deadline date, action, URL, and event details ## Activity Log ```bash # Start session "$SCRIPTS_DIR/activity_log.sh" start-session # Log skipped emails "$SCRIPTS_DIR/activity_log.sh" log-skip --email-id "abc" --subject "Newsletter" --reason "No events" # Log events "$SCRIPTS_DIR/activity_log.sh" log-event --email-id "def" --title "Meeting" --action created # End session "$SCRIPTS_DIR/activity_log.sh" end-session # Show recent activity "$SCRIPTS_DIR/activity_log.sh" show --last 3 ``` ## Changelog and Undo Changes can be undone within 24 hours: ```bash # List recent changes "$SCRIPTS_DIR/changelog.sh" list --last 10 # List undoable changes "$SCRIPTS_DIR/undo.sh" list # Undo most recent change "$SCRIPTS_DIR/undo.sh" last # Undo specific change "$SCRIPTS_DIR/undo.sh" --change-id "chg_20260202_143000_001" ``` ## Pending Invites Events not immediately actioned are tracked for reminders: ```bash # Add pending invites (after presenting events to user) "$SCRIPTS_DIR/add_pending.sh" \ --email-id "$EMAIL_ID" \ --email-subject "Party Invite" \ --events-json '[{"title":"Birthday Party","date":"2026-02-15","time":"14:00","status":"pending"}]' # List pending invites (JSON) "$SCRIPTS_DIR/list_pending.sh" # Human-readable summary "$SCRIPTS_DIR/list_pending.sh" --summary # Update reminder tracking "$SCRIPTS_DIR/list_pending.sh" --summary --update-reminded # Auto-dismiss after 3 ignored reminders "$SCRIPTS_DIR/list_pending.sh" --summary --auto-dismiss ``` ## Event Tracking ```bash # Look up by email ID "$SCRIPTS_DIR/lookup_event.sh" --email-id "19c1c86dcc389443" # Look up by summary "$SCRIPTS_DIR/lookup_event.sh" --summary "Staff Development" # List all tracked events "$SCRIPTS_DIR/lookup_event.sh" --list # Validate events exist (removes orphans) "$SCRIPTS_DIR/lookup_event.sh" --email-id "abc" --validate ``` ## File Locations | File | Purpose | |------|---------| | `~/.config/email-to-calendar/config.json` | User configuration | | `~/.openclaw/workspace/memory/email-extractions/` | Extracted data | | `~/.openclaw/workspace/memory/email-extractions/index.json` | Processing index | | `~/.openclaw/workspace/memory/email-to-calendar/events.json` | Event tracking | | `~/.openclaw/workspace/memory/email-to-calendar/pending_invites.json` | Pending invites | | `~/.openclaw/workspace/memory/email-to-calendar/activity.json` | Activity log | | `~/.openclaw/workspace/memory/email-to-calendar/changelog.json` | Change history | | `~/.openclaw/workspace/skills/email-to-calendar/scripts/` | Utility scripts | | `~/.openclaw/workspace/skills/email-to-calendar/MEMORY.md` | User preferences | ## References - **Setup Guide**: [SETUP.md](SETUP.md) - Configuration and onboarding - **CLI Reference**: [references/gog-commands.md](references/gog-commands.md) - Detailed gog CLI usage - **Extraction Patterns**: [references/extraction-patterns.md](references/extraction-patterns.md) - Date/time parsing - **Workflow Example**: [references/workflow-example.md](references/workflow-example.md) - Complete example ## Notes ### Date Parsing Handles common formats: - January 15, 2026, Wednesday January 15 - 01/15/2026, 15/01/2026 - Date ranges like "Feb 2-6" ### Time Zones All times assumed local timezone. Time zone info preserved in descriptions. --- ## Referenced Files > The following files are referenced in this skill and included for context. ### SETUP.md ```markdown # Email-to-Calendar Setup Guide This skill uses **smart onboarding** - it auto-detects your Gmail accounts and calendars, then presents sensible defaults. You can accept all defaults with one click or customize specific settings. > **Tool Flexibility:** This guide uses `gog` CLI as the reference implementation for > Gmail and Google Calendar access. If your agent has alternative tools (MCP servers, > other CLIs, or direct API access), those can be used instead - the workflow and > configuration concepts remain the same. ## Quick Start On first use, the skill will: 1. **Detect your Gmail accounts** via `gog auth status` 2. **List available calendars** via `gog calendar list` 3. **Suggest smart defaults** based on your email pattern You'll see something like: ``` Here's my suggested configuration (change any you disagree with): 1. Gmail Account: [email protected] ← (detected) 2. Calendar: primary ← (detected) 3. Email Mode: Direct (scan your inbox) ← (guessed: personal email) 4. Attendees: Disabled 5. Whole-day events: Timed (9 AM - 5 PM) 6. Multi-day events: Daily recurring 7. Ignore patterns: (none) 8. Auto-create patterns: (none) 9. Email handling: Mark as read and archive (recommended) Also auto-process calendar replies? (Y/n) Type numbers to change (e.g., "3, 7") or press Enter to accept all defaults. ``` **Just press Enter** to accept all defaults, or type numbers to change specific settings. ## Configuration File The skill stores settings in `~/.config/email-to-calendar/config.json`. ### Full Schema ```json { "provider": "gog", "email_mode": "direct", "gmail_account": "[email protected]", "calendar_id": "primary", "attendees": { "enabled": true, "emails": ["[email protected]", "[email protected]"] }, "whole_day_events": { "style": "timed", "start_time": "09:00", "end_time": "17:00" }, "multi_day_events": { "style": "daily_recurring" }, "event_rules": { "ignore_patterns": ["fundraiser", "meeting"], "auto_create_patterns": ["holiday", "No School"] }, "email_handling": { "mark_read": true, "archive": false }, "deadline_notifications": { "enabled": true, "email_recipient": "[email protected]" } } ``` ### Configuration Options | Setting | Type | Default | Description | |---------|------|---------|-------------| | `provider` | string | `"gog"` | Email/calendar provider backend (currently only "gog" supported) | | `email_mode` | `"direct"` / `"forwarded"` | `"direct"` | Direct scans your inbox; Forwarded only processes forwarded emails | | `gmail_account` | string | (auto-detected) | Gmail account to monitor | | `calendar_id` | string | `"primary"` | Calendar to create events in | | `attendees.enabled` | boolean | `false` | Whether to add attendees to events | | `attendees.emails` | string[] | `[]` | Email addresses to invite | | `whole_day_events.style` | `"timed"` / `"all_day"` | `"timed"` | How to create whole-day events | | `whole_day_events.start_time` | string | `"09:00"` | Start time for timed events | | `whole_day_events.end_time` | string | `"17:00"` | End time for timed events | | `multi_day_events.style` | `"daily_recurring"` / `"all_day_span"` | `"daily_recurring"` | How to handle multi-day events | | `event_rules.ignore_patterns` | string[] | `[]` | Event types to always skip | | `event_rules.auto_create_patterns` | string[] | `[]` | Event types to auto-create | | `email_handling.mark_read` | boolean | `true` | Mark processed emails as read | | `email_handling.archive` | boolean | `true` | Archive processed emails | | `email_handling.auto_dispose_calendar_replies` | boolean | `true` | Auto-process calendar reply emails (accepts, declines, tentatives) | | `deadline_notifications.enabled` | boolean | `false` | Send email notifications for events with deadlines | | `deadline_notifications.email_recipient` | string | (gmail_account) | Email address to send notifications to | | `agent_name` | string | `"Ripurapu"` | Agent name shown in event descriptions ("Created by X") | ### Email Mode Detection The skill guesses the best mode based on your email pattern: | Email Pattern | Suggested Mode | Reason | |---------------|----------------|--------| | `[email protected]` | Direct | Personal inbox | | `[email protected]` | Direct | Personal inbox | | `service@*`, `bot@*`, `agent@*` | Forwarded | Service/agent account | ### Event Style Options **Whole-day Events:** - `"timed"`: Creates events 9 AM - 5 PM (or custom times) - `"all_day"`: Creates Google Calendar all-day events **Multi-day Events (e.g., Feb 2-6):** - `"daily_recurring"`: Creates separate 9-5 events for each day - `"all_day_span"`: Creates a single event spanning all days ## Example Configurations ### Family Calendar (School Events) ```json { "provider": "gog", "email_mode": "direct", "gmail_account": "[email protected]", "calendar_id": "primary", "attendees": { "enabled": true, "emails": ["[email protected]", "[email protected]"] }, "whole_day_events": { "style": "timed", "start_time": "09:00", "end_time": "17:00" }, "multi_day_events": { "style": "daily_recurring" }, "event_rules": { "ignore_patterns": ["fundraiser", "PTA meeting", "volunteer request"], "auto_create_patterns": ["No School", "holiday", "Staff Development Day"] }, "email_handling": { "mark_read": true, "archive": true, "auto_dispose_calendar_replies": true }, "deadline_notifications": { "enabled": true, "email_recipient": "[email protected]" } } ``` ### Work Calendar ```json { "provider": "gog", "email_mode": "direct", "gmail_account": "[email protected]", "calendar_id": "primary", "attendees": { "enabled": false, "emails": [] }, "whole_day_events": { "style": "timed", "start_time": "08:00", "end_time": "18:00" }, "multi_day_events": { "style": "all_day_span" }, "event_rules": { "ignore_patterns": ["newsletter", "announcement"], "auto_create_patterns": ["deadline", "review"] }, "email_handling": { "mark_read": true, "archive": true, "auto_dispose_calendar_replies": true }, "deadline_notifications": { "enabled": true, "email_recipient": "[email protected]" } } ``` ### Personal Calendar (Minimal) ```json { "provider": "gog", "email_mode": "direct", "gmail_account": "[email protected]", "calendar_id": "primary", "attendees": { "enabled": false, "emails": [] }, "whole_day_events": { "style": "all_day" }, "multi_day_events": { "style": "all_day_span" }, "event_rules": { "ignore_patterns": [], "auto_create_patterns": [] }, "email_handling": { "mark_read": true, "archive": true, "auto_dispose_calendar_replies": true } } ``` ## Manual Configuration If you prefer to skip the interactive setup: ```bash mkdir -p ~/.config/email-to-calendar cat > ~/.config/email-to-calendar/config.json << 'EOF' { "provider": "gog", "email_mode": "direct", "gmail_account": "[email protected]", "calendar_id": "primary", "attendees": { "enabled": false, "emails": [] }, "whole_day_events": { "style": "timed", "start_time": "09:00", "end_time": "17:00" }, "multi_day_events": { "style": "daily_recurring" }, "event_rules": { "ignore_patterns": [], "auto_create_patterns": [] }, "email_handling": { "mark_read": true, "archive": true, "auto_dispose_calendar_replies": true }, "deadline_notifications": { "enabled": false, "email_recipient": "[email protected]" } } EOF ``` ## Prerequisites This skill requires: - **Email access** - ability to read unread emails and get message bodies - **Calendar access** - ability to create, update, and delete calendar events - `jq` for JSON parsing - `python3` for date parsing and scripts - `bash` for shell scripts **Reference implementation:** The `gog` CLI tool provides Gmail and Google Calendar access. Other tools (MCP servers, direct API) work equally well if they provide the same capabilities. ## Troubleshooting ### Config not found The skill will auto-detect and suggest defaults. Just accept or customize. ### Events not being created 1. Check that `gog` is authenticated: `gog auth status` 2. Verify calendar ID is correct: `gog calendar list` 3. Check config file: `cat ~/.config/email-to-calendar/config.json` ### Wrong calendar List available calendars: ```bash gog calendar list ``` Update `calendar_id` in config to use a specific calendar. ### See what was processed ```bash ~/.openclaw/workspace/skills/email-to-calendar/scripts/activity_log.sh show --last 5 ``` ### Undo a recent event ```bash ~/.openclaw/workspace/skills/email-to-calendar/scripts/undo.sh list ~/.openclaw/workspace/skills/email-to-calendar/scripts/undo.sh last ``` ``` ### references/gog-commands.md ```markdown # gog Calendar CLI Reference This document contains detailed reference information for the `gog` CLI commands used by the email-to-calendar skill. ## Calendar Operations ### Creating Events ```bash gog calendar create <calendar_id> \ --summary "Event Title" \ --from "2026-02-11T09:00:00" \ --to "2026-02-11T17:00:00" \ --description "Event description" \ --attendees "[email protected],[email protected]" \ --send-updates all ``` ### Updating Events ```bash # Update event details gog calendar update <calendar_id> <event_id> \ --summary "Updated Title" \ --from "2026-01-15T09:00:00" \ --to "2026-01-15T17:00:00" # Replace all attendees gog calendar update <calendar_id> <event_id> --attendees "[email protected]" # Add attendees while preserving existing ones gog calendar update <calendar_id> <event_id> --add-attendee "[email protected]" # Clear recurrence gog calendar update <calendar_id> <event_id> --rrule " " ``` ### Deleting Events ```bash gog calendar delete <calendar_id> <event_id> ``` ### Listing Events ```bash # List events in a date range gog calendar events <calendar_id> \ --from "2026-02-01T00:00:00" \ --to "2026-02-28T23:59:59" \ --json ``` ## Recurrence Patterns (--rrule flag) Uses standard RFC 5545 RRULE syntax. The `--rrule` flag accepts RRULE strings. ### Common Patterns | Pattern | RRULE | |---------|-------| | Daily for N days | `RRULE:FREQ=DAILY;COUNT=N` | | Daily (forever) | `RRULE:FREQ=DAILY` | | Weekly | `RRULE:FREQ=WEEKLY` | | Every weekday | `RRULE:FREQ=WEEKLY;BYDAY=MO,TU,WE,TH,FR` | | Every Tuesday | `RRULE:FREQ=WEEKLY;BYDAY=TU` | | Monthly on specific day | `RRULE:FREQ=MONTHLY;BYMONTHDAY=19` | | First Monday of month | `RRULE:FREQ=MONTHLY;BYDAY=1MO` | | Last Friday of month | `RRULE:FREQ=MONTHLY;BYDAY=-1FR` | | Yearly | `RRULE:FREQ=YEARLY` | | Until a date | `RRULE:FREQ=WEEKLY;UNTIL=20261231T235959Z` | ### Multi-Day Events For events spanning multiple consecutive days (e.g., Feb 2-6), create a daily recurring event: ```bash # Feb 2-6 = 5 days gog calendar create "$CALENDAR_ID" \ --summary "Multi-Day Event" \ --from "2026-02-02T09:00:00" \ --to "2026-02-02T17:00:00" \ --rrule "RRULE:FREQ=DAILY;COUNT=5" ``` ### Day of Week Codes | Day | Code | |-----|------| | Monday | MO | | Tuesday | TU | | Wednesday | WE | | Thursday | TH | | Friday | FR | | Saturday | SA | | Sunday | SU | ## Key Flags | Flag | Description | Values | |------|-------------|--------| | `--attendees` | Comma-separated attendee emails | `email1,email2` | | `--send-updates` | Notify attendees of changes | `all`, `externalOnly`, `none` | | `--rrule` | Recurrence rule (RFC 5545) | `RRULE:FREQ=...` | | `--reminder` | Add reminder | `email:1d`, `popup:30m` | | `--guests-can-invite` | Allow guests to invite others | flag | | `--guests-can-modify` | Allow guests to modify event | flag | | `--guests-can-see-others` | Allow guests to see other attendees | flag | | `--json` | Output as JSON | flag | ### Reminder Format ```bash --reminder "email:1d" # Email 1 day before --reminder "popup:30m" # Popup 30 minutes before --reminder "popup:1h" # Popup 1 hour before ``` ## Advanced Attendee Syntax Mark attendees as optional or add comments: ```bash --attendees "[email protected],[email protected];optional,[email protected];comment=FYI only" ``` | Modifier | Example | |----------|---------| | Optional | `[email protected];optional` | | Comment | `[email protected];comment=FYI only` | | Response status | `[email protected];responseStatus=accepted` | ## Note on `--send-updates` The `--send-updates` flag is only available in tonimelisma's gogcli fork. Without this flag, attendees won't receive email notifications for event changes. To enable: 1. Install gogcli from: https://github.com/tonimelisma/gogcli 2. Use the `feat/calendar-send-updates` branch The `create_event.sh` script auto-detects support and uses it when available. ## Gmail Operations ### Send an Email ```bash gog gmail send \ --account "[email protected]" \ --to "[email protected]" \ --subject "Subject line" \ --body "Email body text" ``` **Flags:** | Flag | Description | Required | |------|-------------|----------| | `--account` | Gmail account to send from | Yes | | `--to` | Recipient email address | Yes | | `--subject` | Email subject line | Yes | | `--body` | Email body text | Yes | | `--cc` | CC recipients (comma-separated) | No | | `--bcc` | BCC recipients (comma-separated) | No | **Example - Deadline notification:** ```bash gog gmail send \ --account "[email protected]" \ --to "[email protected]" \ --subject "ACTION REQUIRED: RSVP for Team Offsite by Feb 10" \ --body "A calendar event has been created that requires your action. Event: Team Offsite Date: February 15-17, 2026 Deadline: February 10, 2026 Action Required: RSVP Link: https://example.com/rsvp --- Sent by email-to-calendar skill" ``` ### Get a Single Email ```bash gog gmail get <messageId> --account "[email protected]" ``` ### Search Emails ```bash # Search with body content gog gmail messages search "in:inbox is:unread" \ --max 20 \ --include-body \ --account "[email protected]" # Search forwarded emails gog gmail messages search "in:inbox is:unread subject:Fwd OR subject:FW" \ --max 10 \ --include-body \ --account "[email protected]" ``` ### Modify Email Labels ```bash # Mark as read gog gmail modify <messageId> --remove-labels UNREAD --account "[email protected]" # Archive (remove from inbox) gog gmail modify <messageId> --remove-labels INBOX --account "[email protected]" # Both gog gmail modify <messageId> --remove-labels UNREAD,INBOX --account "[email protected]" ``` ## Common Mistakes - **WRONG:** `gog gmail messages get <id>` - This command does not exist - **CORRECT:** `gog gmail get <id>` - Use this to read a single email ## References - [RFC 5545 - iCalendar RRULE](https://icalendar.org/iCalendar-RFC-5545/3-8-5-3-recurrence-rule.html) - [Google Calendar API Recurrence](https://developers.google.com/calendar/api/concepts/events-calendars#recurrence_rules) ``` ### references/extraction-patterns.md ```markdown # Email Extraction Patterns This document describes the patterns used for extracting calendar events and action items from forwarded emails. ## Event Detection Patterns The extraction script looks for these patterns to identify potential calendar events: ### Primary Event Keywords - meeting, call, sync, standup, review, demo, interview - appointment, event, conference, workshop, webinar, training - meet (as a verb) ### Event Phrases - "Meeting on [date] at [time]" - "Let's meet [date] at [time]" - "Join us on [date] for [purpose]" - "You are invited to [event] on [date]" - "Please attend [event] on [date]" - "Mark your calendar for [date]" - "Save the date: [date]" - "When: [date]" / "Date: [date]" / "Time: [time]" ## Date Parsing The script recognizes these date formats: ### With Year - January 15, 2026 - 15 January 2026 - 01/15/2026 (US format) - 15/01/2026 (EU format) - 2026-01-15 (ISO format) ### Without Year (defaults to current year) - Wednesday January 15 - January 15 - Next Tuesday - This Friday ### Relative Dates - Today, Tomorrow - Next week, Next Monday - In 3 days ## Time Parsing ### 12-hour format - 2:30 PM, 2:30 pm - 2 PM, 2pm - 9:00 AM - 5:00 PM (ranges) ### 24-hour format - 14:30 - 09:00-17:00 ### Full Day Indicators - "all day", "full day", "whole day" - "9am to 5pm", "9:00-17:00" - "business hours", "work day" ## Action Item Detection ### Action Keywords - Action:, Task:, Todo:, To-do:, Follow-up:, Followup: - Please [do something] - Kindly [do something] - Need to, Needs to, We need to - Should, Must, Will need to ### Bullet Points - `- [ ] Task description` - `* Task description` - `• Task description` ### Deadline Detection Action items are checked for associated deadlines: - "by [date]" - "due [date]" - "before [date]" - "deadline: [date]" ## Header Filtering The script automatically filters out email headers to focus on actual content: ### Filtered Headers - From:, Date:, Subject:, To:, Cc:, Bcc: - "---------- Forwarded message ----------" - "---------- Original message ----------" - "On [date] [person] wrote:" - "Sent from my [device]" ## Duplicate Detection When checking for existing calendar events, duplicates are identified by: 1. **Same Date**: Events on the same calendar day 2. **Similar Title**: 2+ keywords match between titles 3. **Overlapping Time**: Within 1 hour of each other If a duplicate is found, the existing event is updated rather than creating a new one. ## Edge Cases ### Cancellations If an email contains cancellation language: - "Cancelled", "Canceled", "Postponed", "Rescheduled" - "No longer happening", "Won't take place" The script should: 1. Search for the existing event 2. Either delete it or update title with "CANCELLED" prefix ### Recurring Events Currently, the script extracts each occurrence as a separate event. Recurring patterns ("every Monday", "weekly") are noted but not expanded. ### Time Zones The script assumes all times are in the user's local timezone. Time zone information in emails is noted in the description but not used for conversion. ### All-Day Events When detected, all-day events are created with: - Start: 9:00 AM - End: 5:00 PM - Duration: 8 hours This provides a visual block in the calendar while maintaining flexibility. ## Output Format ### Event Object ```json { "type": "event", "title": "Meeting title", "date": { "month": "January", "day": 15, "year": 2026, "month_num": 1 }, "time": { "hour": 14, "minute": 30 }, "is_full_day": false, "source_text": "Original text that matched", "context": "Surrounding text for reference", "raw_line": "The exact line from the email" } ``` ### Action Object ```json { "type": "action", "text": "Action description", "deadline": { "month": "January", "day": 17, "year": 2026, "month_num": 1 }, "source_line": "The exact line from the email" } ``` ## Testing To test extraction on a sample email: ```bash echo 'Your email content here' | python3 scripts/extract_events.py - ``` Or from a file: ```bash python3 scripts/extract_events.py email.txt ``` ``` ### references/workflow-example.md ```markdown # Email-to-Calendar Workflow Example ## Scenario: User Forwards an Email **User sends:** ``` ---------- Forwarded message ---------- From: Sarah <[email protected]> Date: Mon, Feb 2, 2026 at 10:00 AM Subject: Project kickoff meeting To: [email protected] Hi everyone, Let's schedule a kickoff meeting for the new website project on Thursday February 5 at 3:00 PM in Conference Room B. Action items before the meeting: - Review the requirements document - Prepare your team's capacity estimates - Submit any questions by Wednesday Thanks! Sarah ``` ## Step 1: Detect Forwarded Email You recognize the forwarded email patterns: - Subject starts with "Fwd:" (implied) - Contains "---------- Forwarded message ----------" - Has email headers (From:, Date:, Subject:, To:) ## Step 2: Extract Items Run extraction: ```bash python3 scripts/extract_events.py email.txt ``` **Result:** ```json { "events": [ { "type": "event", "title": "kickoff meeting for the new website project", "date": {"month": "February", "day": 5, "year": 2026, "month_num": 2}, "time": {"hour": 15, "minute": 0}, "is_full_day": false } ], "actions": [ { "type": "action", "text": "Review the requirements document", "deadline": null }, { "type": "action", "text": "Prepare your team's capacity estimates", "deadline": null }, { "type": "action", "text": "Submit any questions by Wednesday", "deadline": {"month": "February", "day": 4, "year": 2026, "month_num": 2} } ] } ``` ## Step 3: Store to Memory Save to dated file: ```bash EXTRACTION_FILE="memory/email-extractions/2026-02-02-143022.json" python3 scripts/extract_events.py email.txt > "$EXTRACTION_FILE" ``` Update index: ```json { "extractions": [ { "file": "memory/email-extractions/2026-02-02-143022.json", "date": "2026-02-02T14:30:22", "status": "pending_review" } ] } ``` ## Step 4: Present to User **Your response:** > I found 1 event and 3 action items in that email: > > **Events:** > 1. **Kickoff meeting for the new website project** - Thursday February 5 at 3:00 PM - Conference Room B > > **Action Items:** > 1. **Review the requirements document** - No deadline > 2. **Prepare your team's capacity estimates** - No deadline > 3. **Submit any questions by Wednesday** - Due February 4 > > Which events should I create calendar entries for? Reply with numbers (e.g., "1") or "all" or "none". ## Step 5: User Responds **User replies:** "1" ## Step 6: Check for Duplicates ```bash bash scripts/check_duplicate.sh "$CALENDAR_ID" \ "kickoff meeting for the new website project" \ "February 5, 2026" \ "3:00 PM" ``` **Result:** `null` (no duplicate found) ## Step 7: Create Calendar Event ```bash # Read attendees from config CONFIG_FILE="$HOME/.config/email-to-calendar/config.json" ATTENDEE_EMAILS=$(jq -r '.attendees.emails | join(",")' "$CONFIG_FILE") CALENDAR_ID=$(jq -r '.calendar_id' "$CONFIG_FILE") gog calendar create "$CALENDAR_ID" \ --summary "Kickoff meeting - Website Project" \ --from "2026-02-05T15:00:00" \ --to "2026-02-05T16:00:00" \ --description "Kickoff meeting for the new website project Location: Conference Room B From: Sarah <[email protected]> Action items to complete before meeting: - Review the requirements document - Prepare your team's capacity estimates - Submit any questions by Wednesday February 4" \ --attendees "$ATTENDEE_EMAILS" \ --send-updates all ``` ## Step 8: Confirm and Update Status **Your response:** > Created calendar event: > - **Title:** Kickoff meeting - Website Project > - **Date:** Thursday, February 5, 2026 > - **Time:** 3:00 PM - 4:00 PM > - **Description:** Full details including location and action items > - **Attendees:** Invited as configured Update extraction status: ```json { "file": "memory/email-extractions/2026-02-02-143022.json", "date": "2026-02-02T14:30:22", "status": "processed", "events_created": ["event_id_123"] } ``` ## Alternative: Duplicate Found If check_duplicate.sh returns an existing event: ```json { "id": "abc123xyz", "summary": "Website Project Kickoff", "start": {"dateTime": "2026-02-05T15:00:00"} } ``` **Your response:** > I found an existing event "Website Project Kickoff" on February 5 at 3:00 PM. Should I: > 1. Update it with new information from this email > 2. Skip (keep existing event as-is) > 3. Create a separate event anyway If user chooses "1", update the existing event with the new event ID. ## Reviewing Pending Items Later, user asks: "What emails do I need to review?" Check index file and present any pending extractions: > You have 1 pending email extraction from February 2: > > **Events:** > 1. **Kickoff meeting** - February 5 at 3:00 PM > > **Action Items:** > 1. Review requirements document > 2. Prepare capacity estimates > 3. Submit questions by Wednesday > > Should I create the calendar event for item 1? ``` --- ## Skill Companion Files > Additional files collected from the skill directory layout. ### _meta.json ```json { "owner": "a2mus", "slug": "doro-email-to-calendar", "displayName": "Doro Email To Calendar", "latest": { "version": "1.0.0", "publishedAt": 1772001753347, "commit": "https://github.com/openclaw/skills/commit/4a8eed520265365e911823d4f7a99edd249827db" }, "history": [] } ``` ### scripts/activity_log.sh ```bash #!/bin/bash # Record activity for silent audit trail # Usage: activity_log.sh <action> [options] # # Actions: # start-session Start a new processing session # log-skip --email-id <id> --subject <sub> --reason <reason> # log-event --email-id <id> --title <title> --action <created|auto_ignored|pending> # end-session Finalize the current session # show [--last N] Show recent activity (default: last session) # # Logs to ~/.openclaw/workspace/memory/email-to-calendar/activity.json # This creates a silent audit trail - use 'show' to display on request SCRIPT_DIR="$(dirname "$0")" UTILS_DIR="$SCRIPT_DIR/utils" # Parse action ACTION="${1:-}" shift 2>/dev/null || true # Build arguments array for Python script ARGS=("$ACTION") while [[ $# -gt 0 ]]; do case "$1" in --email-id|--subject|--title|--reason|--action|--last) ARGS+=("$1" "$2") shift 2 ;; *) shift ;; esac done # Validate required arguments for each action case "$ACTION" in log-skip) has_email_id=false has_reason=false for ((i=0; i<${#ARGS[@]}; i++)); do case "${ARGS[$i]}" in --email-id) has_email_id=true ;; --reason) has_reason=true ;; esac done if ! $has_email_id || ! $has_reason; then echo "Error: --email-id and --reason are required for log-skip" >&2 exit 1 fi ;; log-event) has_email_id=false has_title=false for ((i=0; i<${#ARGS[@]}; i++)); do case "${ARGS[$i]}" in --email-id) has_email_id=true ;; --title) has_title=true ;; esac done if ! $has_email_id || ! $has_title; then echo "Error: --email-id and --title are required for log-event" >&2 exit 1 fi ;; start-session|end-session|show) # No required arguments ;; "") echo "Usage: activity_log.sh <action> [options]" echo "" echo "Actions:" echo " start-session Start a new processing session" echo " log-skip --email-id <id> --subject <sub> --reason <reason>" echo " log-event --email-id <id> --title <title> --action <action> [--reason <reason>]" echo " end-session Finalize the current session" echo " show [--last N] Show recent activity (default: last session)" exit 1 ;; esac # Delegate to Python implementation python3 "$UTILS_DIR/activity_ops.py" "${ARGS[@]}" ``` ### scripts/add_pending.sh ```bash #!/bin/bash # Add a pending invite with events to track # # Usage: add_pending.sh --email-id <id> --email-subject <subject> --events-json <json> # # Arguments: # --email-id The email message ID (required) # --email-subject The email subject line (optional) # --events-json JSON array of events with keys: title, date, time, status (required) # # Example: # add_pending.sh --email-id "19c2b6cde1cf74e2" \ # --email-subject "Birthday Party Invite" \ # --events-json '[{"title":"Birthday Party","date":"2026-02-15","time":"14:00","status":"pending"}]' # # Returns JSON: {"success": true, "invite_id": "inv_20260205_001"} SCRIPT_DIR="$(dirname "$0")" UTILS_DIR="$SCRIPT_DIR/utils" EMAIL_ID="" EMAIL_SUBJECT="" EVENTS_JSON="" # Parse arguments while [[ $# -gt 0 ]]; do case "$1" in --email-id) EMAIL_ID="$2" shift 2 ;; --email-subject) EMAIL_SUBJECT="$2" shift 2 ;; --events-json) EVENTS_JSON="$2" shift 2 ;; *) shift ;; esac done # Validate required arguments if [ -z "$EMAIL_ID" ]; then echo "Error: --email-id is required" >&2 echo "Usage: add_pending.sh --email-id <id> --email-subject <subject> --events-json <json>" >&2 exit 1 fi if [ -z "$EVENTS_JSON" ]; then echo "Error: --events-json is required" >&2 exit 1 fi # Build args array (avoids eval quoting issues) ARGS=(add --email-id "$EMAIL_ID" --events-json "$EVENTS_JSON") if [ -n "$EMAIL_SUBJECT" ]; then ARGS+=(--email-subject "$EMAIL_SUBJECT") fi python3 "$UTILS_DIR/pending_ops.py" "${ARGS[@]}" ``` ### scripts/calendar_delete.sh ```bash #!/bin/bash # Delete a calendar event using provider abstraction # Usage: calendar_delete.sh --event-id <id> [--calendar-id <id>] [--provider <provider>] # # Returns JSON with success status SCRIPT_DIR="$(dirname "$0")" UTILS_DIR="$SCRIPT_DIR/utils" EVENT_ID="" CALENDAR_ID="" PROVIDER="" while [[ $# -gt 0 ]]; do case "$1" in --event-id) EVENT_ID="$2" shift 2 ;; --calendar-id) CALENDAR_ID="$2" shift 2 ;; --provider) PROVIDER="$2" shift 2 ;; *) shift ;; esac done if [ -z "$EVENT_ID" ]; then echo "Usage: calendar_delete.sh --event-id <id> [--calendar-id <id>] [--provider <provider>]" >&2 exit 1 fi ARGS=(delete --event-id "$EVENT_ID") if [ -n "$CALENDAR_ID" ]; then ARGS+=(--calendar-id "$CALENDAR_ID") fi if [ -n "$PROVIDER" ]; then ARGS+=(--provider "$PROVIDER") fi python3 "$UTILS_DIR/calendar_ops.py" "${ARGS[@]}" ``` ### scripts/calendar_search.sh ```bash #!/bin/bash # Search calendar events using provider abstraction # Usage: calendar_search.sh [--calendar-id <id>] --from <datetime> --to <datetime> [--provider <provider>] # # Dates should be in ISO format (e.g., 2026-02-03T00:00:00) # Returns JSON with list of events SCRIPT_DIR="$(dirname "$0")" UTILS_DIR="$SCRIPT_DIR/utils" CALENDAR_ID="" FROM_DT="" TO_DT="" PROVIDER="" while [[ $# -gt 0 ]]; do case "$1" in --calendar-id) CALENDAR_ID="$2" shift 2 ;; --from) FROM_DT="$2" shift 2 ;; --to) TO_DT="$2" shift 2 ;; --provider) PROVIDER="$2" shift 2 ;; *) shift ;; esac done if [ -z "$FROM_DT" ] || [ -z "$TO_DT" ]; then echo "Usage: calendar_search.sh [--calendar-id <id>] --from <datetime> --to <datetime> [--provider <provider>]" >&2 exit 1 fi # Build args array (avoids eval quoting issues) ARGS=(search --from "$FROM_DT" --to "$TO_DT") if [ -n "$CALENDAR_ID" ]; then ARGS+=(--calendar-id "$CALENDAR_ID") fi if [ -n "$PROVIDER" ]; then ARGS+=(--provider "$PROVIDER") fi python3 "$UTILS_DIR/calendar_ops.py" "${ARGS[@]}" ``` ### scripts/changelog.sh ```bash #!/bin/bash # Record event changes for audit trail and undo support # Usage: changelog.sh <action> [options] # # Actions: # log-create --event-id <id> --calendar-id <cal> --summary <s> --start <t> --end <t> [--email-id <id>] # log-update --event-id <id> --calendar-id <cal> --before-json <json> --after-json <json> [--email-id <id>] # log-delete --event-id <id> --calendar-id <cal> --before-json <json> # list [--last N] List recent changes (default: 10) # get --change-id <id> Get details of a specific change # can-undo --change-id <id> Check if a change can still be undone # # Logs to ~/.openclaw/workspace/memory/email-to-calendar/changelog.json # Changes older than 24 hours have can_undo=false SCRIPT_DIR="$(dirname "$0")" UTILS_DIR="$SCRIPT_DIR/utils" # Parse action ACTION="${1:-}" shift 2>/dev/null || true # Build arguments array for Python script ARGS=("$ACTION") while [[ $# -gt 0 ]]; do case "$1" in --event-id|--calendar-id|--summary|--start|--end|--email-id|--before-json|--after-json|--change-id|--last) ARGS+=("$1" "$2") shift 2 ;; *) shift ;; esac done # Validate required arguments for each action case "$ACTION" in log-create) has_event_id=false has_summary=false for ((i=0; i<${#ARGS[@]}; i++)); do case "${ARGS[$i]}" in --event-id) has_event_id=true ;; --summary) has_summary=true ;; esac done if ! $has_event_id || ! $has_summary; then echo "Error: --event-id and --summary are required for log-create" >&2 exit 1 fi ;; log-update) has_event_id=false for ((i=0; i<${#ARGS[@]}; i++)); do if [[ "${ARGS[$i]}" == "--event-id" ]]; then has_event_id=true break fi done if ! $has_event_id; then echo "Error: --event-id is required for log-update" >&2 exit 1 fi ;; log-delete) has_event_id=false for ((i=0; i<${#ARGS[@]}; i++)); do if [[ "${ARGS[$i]}" == "--event-id" ]]; then has_event_id=true break fi done if ! $has_event_id; then echo "Error: --event-id is required for log-delete" >&2 exit 1 fi ;; get|can-undo) has_change_id=false for ((i=0; i<${#ARGS[@]}; i++)); do if [[ "${ARGS[$i]}" == "--change-id" ]]; then has_change_id=true break fi done if ! $has_change_id; then echo "Error: --change-id is required for $ACTION" >&2 exit 1 fi ;; list) # No required arguments ;; "") echo "Usage: changelog.sh <action> [options]" echo "" echo "Actions:" echo " log-create --event-id <id> --calendar-id <cal> --summary <s> --start <t> --end <t>" echo " log-update --event-id <id> --calendar-id <cal> --before-json <json> --after-json <json>" echo " log-delete --event-id <id> --calendar-id <cal> --before-json <json>" echo " list [--last N] List recent changes (default: 10)" echo " get --change-id <id> Get details of a specific change" echo " can-undo --change-id <id> Check if a change can still be undone" exit 1 ;; esac # Delegate to Python implementation python3 "$UTILS_DIR/changelog_ops.py" "${ARGS[@]}" ``` ### scripts/check_duplicate.sh ```bash #!/bin/bash # Check for duplicate calendar events # Usage: check_duplicate.sh <calendar_id> <event_title> <date> [time] [--provider <provider>] CALENDAR_ID="${1:-primary}" EVENT_TITLE="$2" DATE="$3" TIME="${4:-}" PROVIDER="" # Parse optional --provider flag shift 4 2>/dev/null || true while [[ $# -gt 0 ]]; do case "$1" in --provider) PROVIDER="$2" shift 2 ;; *) shift ;; esac done if [ -z "$EVENT_TITLE" ] || [ -z "$DATE" ]; then echo "Usage: check_duplicate.sh <calendar_id> <event_title> <date> [time] [--provider <provider>]" >&2 exit 1 fi # Parse date using shared parser SCRIPT_DIR="$(dirname "$0")" if [[ "$DATE" =~ ^[0-9]{4}-[0-9]{2}-[0-9]{2}$ ]]; then ISO_DATE="$DATE" else ISO_DATE=$(python3 "$SCRIPT_DIR/utils/date_parser.py" date "$DATE" 2>/dev/null) fi if [ -z "$ISO_DATE" ]; then echo "Could not parse date: $DATE" >&2 exit 1 fi # Calculate search range (day before to day after) START_DATE=$(date -d "$ISO_DATE -1 day" '+%Y-%m-%dT00:00:00Z' 2>/dev/null || date -v-1d -j -f "%Y-%m-%d" "$ISO_DATE" "+%Y-%m-%dT00:00:00Z") END_DATE=$(date -d "$ISO_DATE +2 days" '+%Y-%m-%dT00:00:00Z' 2>/dev/null || date -v+2d -j -f "%Y-%m-%d" "$ISO_DATE" "+%Y-%m-%dT00:00:00Z") # Search for events using calendar_search.sh # Build args array (avoids eval quoting issues) SEARCH_ARGS=(--calendar-id "$CALENDAR_ID" --from "$START_DATE" --to "$END_DATE") if [ -n "$PROVIDER" ]; then SEARCH_ARGS+=(--provider "$PROVIDER") fi events_result=$("$SCRIPT_DIR/calendar_search.sh" "${SEARCH_ARGS[@]}" 2>/dev/null) events=$(echo "$events_result" | jq -r '.data // []' 2>/dev/null) if [ -z "$events" ] || [ "$events" = "[]" ]; then echo "null" exit 0 fi # Check for duplicates by title similarity # Extract title keywords (first 5 words, normalized) TITLE_KEYWORDS=$(echo "$EVENT_TITLE" | tr '[:upper:]' '[:lower:]' | tr -c '[:alnum:]' ' ' | awk '{print $1, $2, $3, $4, $5}') # Use Python to check for duplicates echo "$events" | python3 -c " import json import sys import re events = json.load(sys.stdin) title_keywords = '$TITLE_KEYWORDS'.lower().split() search_date = '$ISO_DATE' time_str = '$TIME' for event in events: event_title = event.get('summary', '').lower() event_start = event.get('start', {}).get('dateTime', event.get('start', {}).get('date', '')) # Check if same date if search_date in event_start: # Check title similarity with improved logic for short titles matches = sum(1 for kw in title_keywords if kw in event_title) total_keywords = len(title_keywords) if total_keywords == 0: continue elif total_keywords <= 2: # Short titles: require ALL keywords to match if matches == total_keywords: print(json.dumps(event)) sys.exit(0) else: # Longer titles: require at least 50% match if matches >= (total_keywords + 1) // 2: print(json.dumps(event)) sys.exit(0) print('null') " ``` ### scripts/create_event.sh ```bash #!/bin/bash # Create or update a calendar event with automatic tracking and changelog # Usage: create_event.sh <calendar_id> <title> <date> <start_time> <end_time> <description> <attendee_email> [event_id] [email_id] [--provider <provider>] # # If event_id is provided, updates existing event. Otherwise creates new one. # Captures the event ID from JSON output and stores it in events.json tracking. # Records changes to changelog.json for undo support. # Returns the event ID on success for reference. SCRIPT_DIR="$(dirname "$0")" UTILS_DIR="$SCRIPT_DIR/utils" CALENDAR_ID="${1:-primary}" TITLE="$2" DATE="$3" START_TIME="$4" END_TIME="$5" DESCRIPTION="$6" ATTENDEE_EMAIL="$7" EXISTING_EVENT_ID="${8:-}" EMAIL_ID="${9:-}" PROVIDER="" # Parse optional --provider flag from remaining args shift 9 2>/dev/null || true while [[ $# -gt 0 ]]; do case "$1" in --provider) PROVIDER="$2" shift 2 ;; *) shift ;; esac done if [ -z "$TITLE" ] || [ -z "$DATE" ]; then echo "Usage: create_event.sh <calendar_id> <title> <date> <start_time> <end_time> <description> <attendee_email> [event_id]" >&2 exit 1 fi # Get agent name from config for attribution (default: "Ripurapu") CONFIG_FILE="$HOME/.config/email-to-calendar/config.json" AGENT_NAME=$(jq -r '.agent_name // "Ripurapu"' "$CONFIG_FILE" 2>/dev/null) if [ -z "$AGENT_NAME" ] || [ "$AGENT_NAME" = "null" ]; then AGENT_NAME="Ripurapu" fi # Append agent attribution to description if [ -n "$DESCRIPTION" ]; then DESCRIPTION="$DESCRIPTION --- Created by $AGENT_NAME (AI assistant)" else DESCRIPTION="Created by $AGENT_NAME (AI assistant)" fi # Parse date to ISO format using shared parser ISO_DATE=$(python3 "$SCRIPT_DIR/utils/date_parser.py" date "$DATE" 2>/dev/null) if [ -z "$ISO_DATE" ]; then echo "Could not parse date: $DATE" >&2 exit 1 fi # Parse times using shared parser START_PARSED=$(python3 "$SCRIPT_DIR/utils/date_parser.py" time "$START_TIME" 2>/dev/null) END_PARSED=$(python3 "$SCRIPT_DIR/utils/date_parser.py" time "$END_TIME" 2>/dev/null) # Default times if not provided if [ -z "$START_PARSED" ]; then START_PARSED="09:00" fi if [ -z "$END_PARSED" ]; then END_PARSED="17:00" fi # Build ISO datetime strings START_ISO="${ISO_DATE}T${START_PARSED}:00" END_ISO="${ISO_DATE}T${END_PARSED}:00" # Variable to track if this is a new creation (for changelog) IS_NEW_EVENT=false BEFORE_STATE="" # Function to get current event state for changelog (before update) get_event_state() { local event_id="$1" local cal_id="$2" # Try to get current state from tracking first (faster) local tracked=$("$SCRIPT_DIR/lookup_event.sh" --event-id "$event_id" 2>/dev/null) if [ "$(echo "$tracked" | jq 'length' 2>/dev/null)" -gt 0 ]; then echo "$tracked" | jq -c '.[0] | {summary: .summary, start: .start, end: null}' fi } # Function to create a new event using calendar_ops.py create_new_event() { IS_NEW_EVENT=true # Build args array (avoids eval quoting issues) local -a CREATE_ARGS=(create --summary "$TITLE" --from "$START_ISO" --to "$END_ISO" --calendar-id "$CALENDAR_ID") if [ -n "$DESCRIPTION" ]; then CREATE_ARGS+=(--description "$DESCRIPTION") fi if [ -n "$ATTENDEE_EMAIL" ]; then CREATE_ARGS+=(--attendees "$ATTENDEE_EMAIL") fi if [ -n "$PROVIDER" ]; then CREATE_ARGS+=(--provider "$PROVIDER") fi RESULT=$(python3 "$UTILS_DIR/calendar_ops.py" "${CREATE_ARGS[@]}" 2>&1) # Extract event ID from JSON response (nested in data.id) EVENT_ID=$(echo "$RESULT" | jq -r '.data.id // empty' 2>/dev/null) } # Check if this is an update or create if [ -n "$EXISTING_EVENT_ID" ]; then # Get before state for changelog BEFORE_STATE=$(get_event_state "$EXISTING_EVENT_ID" "$CALENDAR_ID") # Update existing event using calendar_ops.py echo "Updating existing event: $EXISTING_EVENT_ID" >&2 # Build args array (avoids eval quoting issues) UPDATE_ARGS=(update --event-id "$EXISTING_EVENT_ID" --summary "$TITLE" --from "$START_ISO" --to "$END_ISO" --calendar-id "$CALENDAR_ID") if [ -n "$DESCRIPTION" ]; then UPDATE_ARGS+=(--description "$DESCRIPTION") fi if [ -n "$ATTENDEE_EMAIL" ]; then UPDATE_ARGS+=(--add-attendees "$ATTENDEE_EMAIL") fi if [ -n "$PROVIDER" ]; then UPDATE_ARGS+=(--provider "$PROVIDER") fi RESULT=$(python3 "$UTILS_DIR/calendar_ops.py" "${UPDATE_ARGS[@]}" 2>&1) # Self-healing: Check if event was deleted externally (404/410 error) if echo "$RESULT" | jq -e '.error_type == "not_found"' > /dev/null 2>&1 || echo "$RESULT" | grep -qiE "404|not found|410|gone|does not exist|deleted"; then echo "Event $EXISTING_EVENT_ID no longer exists, removing from tracking and creating new" >&2 "$SCRIPT_DIR/delete_tracked_event.sh" --event-id "$EXISTING_EVENT_ID" BEFORE_STATE="" # Clear before state since we're creating new # Fall back to creating a new event create_new_event else EVENT_ID="$EXISTING_EVENT_ID" fi else # Create new event create_new_event fi # Output the result echo "$RESULT" # Track the event and log to changelog if we have an ID if [ -n "$EVENT_ID" ]; then # Build args array (avoids eval quoting issues) TRACK_ARGS=(--event-id "$EVENT_ID" --calendar-id "$CALENDAR_ID" --summary "$TITLE" --start "$START_ISO") if [ -n "$EMAIL_ID" ]; then TRACK_ARGS+=(--email-id "$EMAIL_ID") fi "$SCRIPT_DIR/track_event.sh" "${TRACK_ARGS[@]}" >&2 echo "Event ID: $EVENT_ID" >&2 # Log to changelog for undo support if [ "$IS_NEW_EVENT" = true ]; then # Log create CHANGE_ID=$("$SCRIPT_DIR/changelog.sh" log-create \ --event-id "$EVENT_ID" \ --calendar-id "$CALENDAR_ID" \ --summary "$TITLE" \ --start "$START_ISO" \ --end "$END_ISO" \ --email-id "$EMAIL_ID" 2>/dev/null) || true if [ -n "$CHANGE_ID" ]; then echo "Change logged: $CHANGE_ID (can undo within 24 hours)" >&2 fi elif [ -n "$BEFORE_STATE" ]; then # Log update with before/after state AFTER_STATE=$(cat << EOF {"summary": "$TITLE", "start": "$START_ISO", "end": "$END_ISO"} EOF ) CHANGE_ID=$("$SCRIPT_DIR/changelog.sh" log-update \ --event-id "$EVENT_ID" \ --calendar-id "$CALENDAR_ID" \ --before-json "$BEFORE_STATE" \ --after-json "$AFTER_STATE" \ --email-id "$EMAIL_ID" 2>/dev/null) || true if [ -n "$CHANGE_ID" ]; then echo "Change logged: $CHANGE_ID (can undo within 24 hours)" >&2 fi fi # Also update pending_invites.json to mark this event as created if [ -n "$EMAIL_ID" ]; then "$SCRIPT_DIR/update_invite_status.sh" \ --email-id "$EMAIL_ID" \ --event-title "$TITLE" \ --status created \ --event-id "$EVENT_ID" 2>/dev/null || true # Auto-disposition email based on config (mark read and/or archive) "$SCRIPT_DIR/disposition_email.sh" --email-id "$EMAIL_ID" 2>/dev/null || true fi fi ``` ### scripts/delete_tracked_event.sh ```bash #!/bin/bash # Delete a tracked event from the tracking file (after deleting from calendar) # Usage: delete_tracked_event.sh --event-id <id> # # This removes the event from events.json tracking SCRIPT_DIR="$(dirname "$0")" UTILS_DIR="$SCRIPT_DIR/utils" EVENTS_FILE="$HOME/.openclaw/workspace/memory/email-to-calendar/events.json" # Parse arguments EVENT_ID="" while [[ $# -gt 0 ]]; do case $1 in --event-id) EVENT_ID="$2" shift 2 ;; *) echo "Unknown option: $1" >&2 exit 1 ;; esac done if [ -z "$EVENT_ID" ]; then echo "Error: --event-id is required" >&2 exit 1 fi if [ ! -f "$EVENTS_FILE" ]; then echo "Warning: No events file found" >&2 exit 0 fi # Delegate to Python implementation python3 "$UTILS_DIR/event_tracking.py" delete --event-id "$EVENT_ID" ``` ### scripts/disposition_email.sh ```bash #!/bin/bash # Disposition an email (mark read and/or archive) based on config # Usage: disposition_email.sh --email-id <id> [options] # # Options: # --email-id <id> Email message ID (required) # --mark-read Force mark as read (override config) # --no-mark-read Skip marking as read (override config) # --archive Force archive (override config) # --no-archive Skip archiving (override config) # --provider <name> Provider to use (default: from config) # # Returns JSON with success status and actions taken SCRIPT_DIR="$(dirname "$0")" UTILS_DIR="$SCRIPT_DIR/utils" EMAIL_ID="" MARK_READ="" NO_MARK_READ="" ARCHIVE="" NO_ARCHIVE="" PROVIDER="" while [[ $# -gt 0 ]]; do case "$1" in --email-id) EMAIL_ID="$2" shift 2 ;; --mark-read) MARK_READ="true" shift ;; --no-mark-read) NO_MARK_READ="true" shift ;; --archive) ARCHIVE="true" shift ;; --no-archive) NO_ARCHIVE="true" shift ;; --provider) PROVIDER="$2" shift 2 ;; *) shift ;; esac done if [ -z "$EMAIL_ID" ]; then echo "Usage: disposition_email.sh --email-id <id> [--mark-read] [--no-mark-read] [--archive] [--no-archive] [--provider <provider>]" >&2 exit 1 fi # Build args array (avoids eval quoting issues) ARGS=(disposition --email-id "$EMAIL_ID") if [ "$MARK_READ" = "true" ]; then ARGS+=(--mark-read true) fi if [ "$NO_MARK_READ" = "true" ]; then ARGS+=(--no-mark-read) fi if [ "$ARCHIVE" = "true" ]; then ARGS+=(--archive true) fi if [ "$NO_ARCHIVE" = "true" ]; then ARGS+=(--no-archive) fi if [ -n "$PROVIDER" ]; then ARGS+=(--provider "$PROVIDER") fi python3 "$UTILS_DIR/disposition_ops.py" "${ARGS[@]}" ``` ### scripts/email_modify.sh ```bash #!/bin/bash # Modify an email (add/remove labels) using provider abstraction # Usage: email_modify.sh --email-id <id> [--remove-labels <labels>] [--add-labels <labels>] [--provider <provider>] # # Labels should be comma-separated (e.g., "UNREAD,INBOX") # Returns JSON with success status SCRIPT_DIR="$(dirname "$0")" UTILS_DIR="$SCRIPT_DIR/utils" EMAIL_ID="" REMOVE_LABELS="" ADD_LABELS="" PROVIDER="" while [[ $# -gt 0 ]]; do case "$1" in --email-id) EMAIL_ID="$2" shift 2 ;; --remove-labels) REMOVE_LABELS="$2" shift 2 ;; --add-labels) ADD_LABELS="$2" shift 2 ;; --provider) PROVIDER="$2" shift 2 ;; *) shift ;; esac done if [ -z "$EMAIL_ID" ]; then echo "Usage: email_modify.sh --email-id <id> [--remove-labels <labels>] [--add-labels <labels>] [--provider <provider>]" >&2 exit 1 fi # Build args array (avoids eval quoting issues) ARGS=(modify --email-id "$EMAIL_ID") if [ -n "$REMOVE_LABELS" ]; then ARGS+=(--remove-labels "$REMOVE_LABELS") fi if [ -n "$ADD_LABELS" ]; then ARGS+=(--add-labels "$ADD_LABELS") fi if [ -n "$PROVIDER" ]; then ARGS+=(--provider "$PROVIDER") fi python3 "$UTILS_DIR/email_ops.py" "${ARGS[@]}" ``` ### scripts/email_read.sh ```bash #!/bin/bash # Read an email by ID using provider abstraction # Usage: email_read.sh --email-id <id> [--provider <provider>] # # Returns JSON with email content SCRIPT_DIR="$(dirname "$0")" UTILS_DIR="$SCRIPT_DIR/utils" EMAIL_ID="" PROVIDER="" while [[ $# -gt 0 ]]; do case "$1" in --email-id) EMAIL_ID="$2" shift 2 ;; --provider) PROVIDER="$2" shift 2 ;; *) shift ;; esac done if [ -z "$EMAIL_ID" ]; then echo "Usage: email_read.sh --email-id <id> [--provider <provider>]" >&2 exit 1 fi ARGS=(read --email-id "$EMAIL_ID") if [ -n "$PROVIDER" ]; then ARGS+=(--provider "$PROVIDER") fi python3 "$UTILS_DIR/email_ops.py" "${ARGS[@]}" ``` ### scripts/email_search.sh ```bash #!/bin/bash # Search emails using provider abstraction # Usage: email_search.sh --query <query> [--max <n>] [--include-body] [--provider <provider>] # # Returns JSON with list of matching emails SCRIPT_DIR="$(dirname "$0")" UTILS_DIR="$SCRIPT_DIR/utils" QUERY="" MAX="20" INCLUDE_BODY="" PROVIDER="" while [[ $# -gt 0 ]]; do case "$1" in --query) QUERY="$2" shift 2 ;; --max) MAX="$2" shift 2 ;; --include-body) INCLUDE_BODY="true" shift ;; --provider) PROVIDER="$2" shift 2 ;; *) shift ;; esac done if [ -z "$QUERY" ]; then echo "Usage: email_search.sh --query <query> [--max <n>] [--include-body] [--provider <provider>]" >&2 exit 1 fi # Build args array (avoids eval quoting issues) ARGS=(search --query "$QUERY" --max "$MAX") if [ "$INCLUDE_BODY" = "true" ]; then ARGS+=(--include-body true) fi if [ -n "$PROVIDER" ]; then ARGS+=(--provider "$PROVIDER") fi python3 "$UTILS_DIR/email_ops.py" "${ARGS[@]}" ``` ### scripts/email_send.sh ```bash #!/bin/bash # Send an email using provider abstraction # Usage: email_send.sh --to <email> --subject <subject> --body <body> [--provider <provider>] # # Returns JSON with success status SCRIPT_DIR="$(dirname "$0")" UTILS_DIR="$SCRIPT_DIR/utils" TO="" SUBJECT="" BODY="" PROVIDER="" while [[ $# -gt 0 ]]; do case "$1" in --to) TO="$2" shift 2 ;; --subject) SUBJECT="$2" shift 2 ;; --body) BODY="$2" shift 2 ;; --provider) PROVIDER="$2" shift 2 ;; *) shift ;; esac done if [ -z "$TO" ] || [ -z "$SUBJECT" ]; then echo "Usage: email_send.sh --to <email> --subject <subject> --body <body> [--provider <provider>]" >&2 exit 1 fi ARGS=(send --to "$TO" --subject "$SUBJECT" --body "$BODY") if [ -n "$PROVIDER" ]; then ARGS+=(--provider "$PROVIDER") fi python3 "$UTILS_DIR/email_ops.py" "${ARGS[@]}" ``` ### scripts/list_pending.sh ```bash #!/bin/bash # List all pending invites that haven't been actioned # Returns JSON array of pending events with their details # # Usage: list_pending.sh [options] # --summary Output a human-readable summary instead of JSON # --update-reminded Update last_reminded timestamp and increment reminder_count # --auto-dismiss Auto-dismiss events that have been reminded 3+ times without response # # Features: # - Shows day-of-week for verification # - Tracks reminder_count and last_reminded # - Auto-dismisses after 3 ignored reminders # - Batched presentation format # # Logs to ~/.openclaw/workspace/memory/email-to-calendar/pending_invites.json SCRIPT_DIR="$(dirname "$0")" UTILS_DIR="$SCRIPT_DIR/utils" PENDING_FILE="$HOME/.openclaw/workspace/memory/email-to-calendar/pending_invites.json" # Check if file exists if [ ! -f "$PENDING_FILE" ]; then # Check if --summary flag is present for arg in "$@"; do if [ "$arg" = "--summary" ]; then echo "No pending invites found." exit 0 fi done echo "[]" exit 0 fi # Delegate to Python implementation python3 "$UTILS_DIR/pending_ops.py" "$@" ``` ### scripts/lookup_event.sh ```bash #!/bin/bash # Look up a tracked event by email_id, event_id, or summary # Usage: lookup_event.sh --email-id <id> | --event-id <id> | --summary <text> | --list [--validate] # # Options: # --validate Check if the calendar event still exists, remove orphaned entries # # Returns JSON with the event details if found, or empty array [] if not SCRIPT_DIR="$(dirname "$0")" UTILS_DIR="$SCRIPT_DIR/utils" EVENTS_FILE="$HOME/.openclaw/workspace/memory/email-to-calendar/events.json" # Parse arguments SEARCH_TYPE="" SEARCH_VALUE="" VALIDATE="false" while [[ $# -gt 0 ]]; do case $1 in --email-id) SEARCH_TYPE="email_id" SEARCH_VALUE="$2" shift 2 ;; --event-id) SEARCH_TYPE="event_id" SEARCH_VALUE="$2" shift 2 ;; --summary) SEARCH_TYPE="summary" SEARCH_VALUE="$2" shift 2 ;; --list) SEARCH_TYPE="list" shift ;; --validate) VALIDATE="true" shift ;; *) echo "Unknown option: $1" >&2 echo "Usage: lookup_event.sh --email-id <id> | --event-id <id> | --summary <text> | --list [--validate]" >&2 exit 1 ;; esac done if [ -z "$SEARCH_TYPE" ]; then echo "Error: Must specify --email-id, --event-id, --summary, or --list" >&2 exit 1 fi if [ ! -f "$EVENTS_FILE" ]; then echo "[]" exit 0 fi # Delegate to Python implementation python3 "$UTILS_DIR/event_tracking.py" lookup \ --type "$SEARCH_TYPE" \ --value "$SEARCH_VALUE" \ --validate "$VALIDATE" \ --script-dir "$SCRIPT_DIR" ``` ### scripts/process_calendar_replies.sh ```bash #!/bin/bash # Process and disposition calendar reply emails (accepts, declines, tentatives) # Usage: process_calendar_replies.sh [--dry-run] [--provider <provider>] # # Finds unread calendar reply emails from [email protected] # and dispositions them based on config settings. # # Options: # --dry-run Show what would be processed without making changes # --provider <name> Provider to use (default: from config) # # Calendar reply patterns matched: # - "Accepted: ..." # - "Declined: ..." # - "Tentative: ..." # - "Updated invitation: ..." # - "Cancelled: ..." / "Canceled: ..." SCRIPT_DIR="$(dirname "$0")" UTILS_DIR="$SCRIPT_DIR/utils" DRY_RUN="" PROVIDER="" while [[ $# -gt 0 ]]; do case "$1" in --dry-run) DRY_RUN="true" shift ;; --provider) PROVIDER="$2" shift 2 ;; *) shift ;; esac done # Check if auto_dispose_calendar_replies is enabled SETTINGS=$(python3 "$UTILS_DIR/disposition_ops.py" settings 2>/dev/null) AUTO_DISPOSE=$(echo "$SETTINGS" | jq -r '.auto_dispose_calendar_replies // true') if [ "$AUTO_DISPOSE" != "true" ]; then echo '{"success": true, "message": "Calendar reply auto-disposition is disabled in config", "processed": 0}' exit 0 fi # Search for unread calendar notification emails SEARCH_QUERY="from:[email protected] is:unread" SEARCH_ARGS=(--query "$SEARCH_QUERY" --max 50) if [ -n "$PROVIDER" ]; then SEARCH_ARGS+=(--provider "$PROVIDER") fi SEARCH_RESULT=$("$SCRIPT_DIR/email_search.sh" "${SEARCH_ARGS[@]}" 2>/dev/null) if [ $? -ne 0 ] || [ -z "$SEARCH_RESULT" ]; then echo '{"success": false, "error": "Failed to search for calendar reply emails"}' exit 1 fi # Check if search was successful SUCCESS=$(echo "$SEARCH_RESULT" | jq -r '.success // false') if [ "$SUCCESS" != "true" ]; then echo "$SEARCH_RESULT" exit 1 fi # Extract emails from search result EMAILS=$(echo "$SEARCH_RESULT" | jq -r '.data // []') EMAIL_COUNT=$(echo "$EMAILS" | jq 'length') if [ "$EMAIL_COUNT" = "0" ] || [ "$EMAIL_COUNT" = "null" ]; then echo '{"success": true, "message": "No unread calendar reply emails found", "processed": 0}' exit 0 fi # Process each email PROCESSED=0 SKIPPED=0 ERRORS=0 PROCESSED_IDS=() echo "$EMAILS" | jq -c '.[]' | while read -r email; do EMAIL_ID=$(echo "$email" | jq -r '.id // empty') SUBJECT=$(echo "$email" | jq -r '.subject // ""') if [ -z "$EMAIL_ID" ]; then continue fi # Check if subject matches calendar reply patterns SUBJECT_LOWER=$(echo "$SUBJECT" | tr '[:upper:]' '[:lower:]') IS_CALENDAR_REPLY="" case "$SUBJECT_LOWER" in accepted:*|declined:*|tentative:*|"updated invitation:"*|cancelled:*|canceled:*) IS_CALENDAR_REPLY="true" ;; esac if [ "$IS_CALENDAR_REPLY" != "true" ]; then echo "Skipping non-calendar-reply: $SUBJECT" >&2 continue fi if [ "$DRY_RUN" = "true" ]; then echo "Would disposition: $EMAIL_ID - $SUBJECT" >&2 else # Disposition the email DISPOSITION_ARGS=(--email-id "$EMAIL_ID") if [ -n "$PROVIDER" ]; then DISPOSITION_ARGS+=(--provider "$PROVIDER") fi RESULT=$("$SCRIPT_DIR/disposition_email.sh" "${DISPOSITION_ARGS[@]}" 2>/dev/null) if echo "$RESULT" | jq -e '.success' > /dev/null 2>&1; then echo "Dispositioned: $EMAIL_ID - $SUBJECT" >&2 else echo "Failed to disposition: $EMAIL_ID - $SUBJECT" >&2 fi fi done # Count results (need to re-process since while loop runs in subshell) PROCESSED_COUNT=0 if [ "$DRY_RUN" != "true" ]; then echo "$EMAILS" | jq -c '.[]' | while read -r email; do SUBJECT=$(echo "$email" | jq -r '.subject // ""') SUBJECT_LOWER=$(echo "$SUBJECT" | tr '[:upper:]' '[:lower:]') case "$SUBJECT_LOWER" in accepted:*|declined:*|tentative:*|"updated invitation:"*|cancelled:*|canceled:*) PROCESSED_COUNT=$((PROCESSED_COUNT + 1)) ;; esac done fi # Output summary if [ "$DRY_RUN" = "true" ]; then echo "{\"success\": true, \"dry_run\": true, \"message\": \"Dry run complete\", \"emails_found\": $EMAIL_COUNT}" else echo "{\"success\": true, \"message\": \"Calendar replies processed\", \"emails_found\": $EMAIL_COUNT}" fi ``` ### scripts/run_tests.sh ```bash #!/bin/bash # Run all tests using Python's built-in unittest # Usage: ./run_tests.sh [test_module] [test_class] [test_method] # # Examples: # ./run_tests.sh # Run all tests # ./run_tests.sh test_common # Run tests in test_common.py # ./run_tests.sh test_common TestGetDayOfWeek # Run specific test class cd "$(dirname "$0")" if [ $# -eq 0 ]; then python3 -m unittest discover -s tests -v elif [ $# -eq 1 ]; then python3 -m unittest "tests.$1" -v elif [ $# -eq 2 ]; then python3 -m unittest "tests.$1.$2" -v else python3 -m unittest "tests.$1.$2.$3" -v fi ``` ### scripts/tests/__init__.py ```python # email-to-calendar tests package ``` ### scripts/tests/test_activity_ops.py ```python #!/usr/bin/env python3 """Tests for utils/activity_ops.py""" import unittest import sys import os import json import tempfile import shutil import io from unittest.mock import patch # Add parent directory to path for imports sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from utils import activity_ops class TestStartSession(unittest.TestCase): """Tests for start_session function.""" def setUp(self): """Create temp directory and patch file paths.""" self.temp_dir = tempfile.mkdtemp() self.session_file = os.path.join(self.temp_dir, ".current_session.json") self.activity_file = os.path.join(self.temp_dir, "activity.json") self.session_patcher = patch.object(activity_ops, 'SESSION_FILE', self.session_file) self.activity_patcher = patch.object(activity_ops, 'ACTIVITY_FILE', self.activity_file) self.session_patcher.start() self.activity_patcher.start() def tearDown(self): """Clean up temp directory and stop patchers.""" self.session_patcher.stop() self.activity_patcher.stop() shutil.rmtree(self.temp_dir, ignore_errors=True) def test_creates_session_file(self): """Test that start_session creates a session file.""" captured = io.StringIO() with patch('sys.stdout', captured): activity_ops.start_session() self.assertTrue(os.path.exists(self.session_file)) with open(self.session_file, 'r') as f: session = json.load(f) self.assertIn("timestamp", session) self.assertEqual(session["emails_scanned"], 0) self.assertEqual(session["emails_with_events"], 0) self.assertEqual(session["skipped"], []) self.assertEqual(session["events_extracted"], []) def test_start_session_output(self): """Test that start_session prints confirmation.""" captured = io.StringIO() with patch('sys.stdout', captured): activity_ops.start_session() output = captured.getvalue() self.assertIn("Session started", output) class TestLogSkip(unittest.TestCase): """Tests for log_skip function.""" def setUp(self): """Create temp directory and patch file paths.""" self.temp_dir = tempfile.mkdtemp() self.session_file = os.path.join(self.temp_dir, ".current_session.json") self.session_patcher = patch.object(activity_ops, 'SESSION_FILE', self.session_file) self.session_patcher.start() def tearDown(self): """Clean up temp directory and stop patcher.""" self.session_patcher.stop() shutil.rmtree(self.temp_dir, ignore_errors=True) def test_log_skip_adds_to_list(self): """Test that log_skip adds entry to skipped list.""" # Start session first captured = io.StringIO() with patch('sys.stdout', captured): activity_ops.start_session() activity_ops.log_skip( email_id="email123", subject="Test Email", reason="No events found" ) with open(self.session_file, 'r') as f: session = json.load(f) self.assertEqual(len(session["skipped"]), 1) self.assertEqual(session["skipped"][0]["email_id"], "email123") self.assertEqual(session["skipped"][0]["subject"], "Test Email") self.assertEqual(session["skipped"][0]["reason"], "No events found") self.assertEqual(session["emails_scanned"], 1) def test_log_skip_without_session_raises_error(self): """Test that log_skip without active session raises an error. Note: The actual code raises KeyError when session file doesn't exist because load_json returns {} by default. This test verifies the error behavior when no session is started. """ # Ensure no session file exists if os.path.exists(self.session_file): os.remove(self.session_file) with self.assertRaises((SystemExit, KeyError)): activity_ops.log_skip( email_id="email123", subject="Test", reason="Test" ) class TestLogEvent(unittest.TestCase): """Tests for log_event function.""" def setUp(self): """Create temp directory and patch file paths.""" self.temp_dir = tempfile.mkdtemp() self.session_file = os.path.join(self.temp_dir, ".current_session.json") self.session_patcher = patch.object(activity_ops, 'SESSION_FILE', self.session_file) self.session_patcher.start() def tearDown(self): """Clean up temp directory and stop patcher.""" self.session_patcher.stop() shutil.rmtree(self.temp_dir, ignore_errors=True) def test_log_event_adds_to_list(self): """Test that log_event adds entry to events_extracted list.""" captured = io.StringIO() with patch('sys.stdout', captured): activity_ops.start_session() activity_ops.log_event( email_id="email123", title="Team Meeting", action="pending", reason="Extracted from invite" ) with open(self.session_file, 'r') as f: session = json.load(f) self.assertEqual(len(session["events_extracted"]), 1) self.assertEqual(session["events_extracted"][0]["email_id"], "email123") self.assertEqual(session["events_extracted"][0]["title"], "Team Meeting") self.assertEqual(session["events_extracted"][0]["action"], "pending") self.assertEqual(session["events_extracted"][0]["reason"], "Extracted from invite") def test_log_event_increments_emails_with_events(self): """Test that log_event increments emails_with_events counter.""" captured = io.StringIO() with patch('sys.stdout', captured): activity_ops.start_session() activity_ops.log_event(email_id="email1", title="Event 1") with open(self.session_file, 'r') as f: session = json.load(f) self.assertEqual(session["emails_with_events"], 1) def test_log_event_same_email_counted_once(self): """Test that multiple events from same email only count once.""" captured = io.StringIO() with patch('sys.stdout', captured): activity_ops.start_session() activity_ops.log_event(email_id="email1", title="Event 1") activity_ops.log_event(email_id="email1", title="Event 2") with open(self.session_file, 'r') as f: session = json.load(f) self.assertEqual(len(session["events_extracted"]), 2) self.assertEqual(session["emails_with_events"], 1) # Only counted once def test_log_event_without_session_raises_error(self): """Test that log_event without active session raises an error. Note: The actual code raises KeyError when session file doesn't exist because load_json returns {} by default. This test verifies the error behavior when no session is started. """ # Ensure no session file exists if os.path.exists(self.session_file): os.remove(self.session_file) with self.assertRaises((SystemExit, KeyError)): activity_ops.log_event( email_id="email123", title="Test Event" ) class TestEndSession(unittest.TestCase): """Tests for end_session function.""" def setUp(self): """Create temp directory and patch file paths.""" self.temp_dir = tempfile.mkdtemp() self.session_file = os.path.join(self.temp_dir, ".current_session.json") self.activity_file = os.path.join(self.temp_dir, "activity.json") self.session_patcher = patch.object(activity_ops, 'SESSION_FILE', self.session_file) self.activity_patcher = patch.object(activity_ops, 'ACTIVITY_FILE', self.activity_file) self.session_patcher.start() self.activity_patcher.start() def tearDown(self): """Clean up temp directory and stop patchers.""" self.session_patcher.stop() self.activity_patcher.stop() shutil.rmtree(self.temp_dir, ignore_errors=True) def test_end_session_appends_to_activity_log(self): """Test that end_session appends session to activity log.""" captured = io.StringIO() with patch('sys.stdout', captured): activity_ops.start_session() activity_ops.log_skip(email_id="e1", subject="S1", reason="R1") activity_ops.log_event(email_id="e2", title="Event 1") with patch('sys.stdout', captured): activity_ops.end_session() with open(self.activity_file, 'r') as f: activity = json.load(f) self.assertEqual(len(activity["sessions"]), 1) session = activity["sessions"][0] self.assertEqual(session["emails_scanned"], 1) self.assertEqual(len(session["skipped"]), 1) self.assertEqual(len(session["events_extracted"]), 1) def test_end_session_removes_session_file(self): """Test that end_session removes the current session file.""" captured = io.StringIO() with patch('sys.stdout', captured): activity_ops.start_session() activity_ops.end_session() self.assertFalse(os.path.exists(self.session_file)) def test_end_session_without_session_handles_gracefully(self): """Test that end_session without active session doesn't crash. Note: When session file doesn't exist, load_json returns {} which is not None, so the function proceeds and creates an empty session entry. This tests that it at least doesn't crash. """ # Ensure no session file exists if os.path.exists(self.session_file): os.remove(self.session_file) captured = io.StringIO() with patch('sys.stdout', captured): activity_ops.end_session() output = captured.getvalue() # The function doesn't crash - it processes the empty dict # Output will show "Session ended: 0 scanned, 0 with events, 0 skipped" self.assertIn("Session ended", output) def test_end_session_limits_history(self): """Test that end_session keeps only MAX_SESSIONS sessions.""" # Create activity with many sessions sessions = [{"timestamp": f"2026-02-{i:02d}T10:00:00"} for i in range(1, 52)] with open(self.activity_file, 'w') as f: json.dump({"sessions": sessions}, f) captured = io.StringIO() with patch('sys.stdout', captured): activity_ops.start_session() activity_ops.end_session() with open(self.activity_file, 'r') as f: activity = json.load(f) self.assertEqual(len(activity["sessions"]), 50) # MAX_SESSIONS class TestShowActivity(unittest.TestCase): """Tests for show_activity function.""" def setUp(self): """Create temp directory and patch file paths.""" self.temp_dir = tempfile.mkdtemp() self.activity_file = os.path.join(self.temp_dir, "activity.json") self.activity_patcher = patch.object(activity_ops, 'ACTIVITY_FILE', self.activity_file) self.activity_patcher.start() def tearDown(self): """Clean up temp directory and stop patcher.""" self.activity_patcher.stop() shutil.rmtree(self.temp_dir, ignore_errors=True) def test_show_activity_empty(self): """Test show_activity when no activity recorded.""" with open(self.activity_file, 'w') as f: json.dump({"sessions": []}, f) captured = io.StringIO() with patch('sys.stdout', captured): activity_ops.show_activity() output = captured.getvalue() self.assertIn("No activity recorded", output) def test_show_activity_displays_session(self): """Test show_activity displays session details.""" test_data = { "sessions": [{ "timestamp": "2026-02-11T10:00:00", "emails_scanned": 5, "emails_with_events": 2, "skipped": [{"subject": "Newsletter", "reason": "No events"}], "events_extracted": [{"title": "Meeting", "action": "created"}] }] } with open(self.activity_file, 'w') as f: json.dump(test_data, f) captured = io.StringIO() with patch('sys.stdout', captured): activity_ops.show_activity() output = captured.getvalue() self.assertIn("Emails scanned: 5", output) self.assertIn("Emails with events: 2", output) if __name__ == '__main__': unittest.main() ``` ### scripts/tests/test_changelog_ops.py ```python #!/usr/bin/env python3 """Tests for utils/changelog_ops.py""" import unittest import sys import os import json import tempfile import shutil import io from datetime import datetime, timedelta from unittest.mock import patch # Add parent directory to path for imports sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from utils import changelog_ops class TestLogCreate(unittest.TestCase): """Tests for log_create function.""" def setUp(self): """Create temp directory and patch file path.""" self.temp_dir = tempfile.mkdtemp() self.changelog_file = os.path.join(self.temp_dir, "changelog.json") self.patcher = patch.object(changelog_ops, 'CHANGELOG_FILE', self.changelog_file) self.patcher.start() def tearDown(self): """Clean up temp directory and stop patcher.""" self.patcher.stop() shutil.rmtree(self.temp_dir, ignore_errors=True) def test_log_create_adds_entry(self): """Test that log_create adds a create entry.""" change_id = changelog_ops.log_create( event_id="evt123", calendar_id="primary", summary="Team Meeting", start_time="2026-02-11T14:00:00", end_time="2026-02-11T15:00:00", email_id="email456" ) with open(self.changelog_file, 'r') as f: data = json.load(f) self.assertEqual(len(data["changes"]), 1) change = data["changes"][0] self.assertEqual(change["action"], "create") self.assertEqual(change["event_id"], "evt123") self.assertEqual(change["calendar_id"], "primary") self.assertIsNone(change["before"]) self.assertEqual(change["after"]["summary"], "Team Meeting") self.assertEqual(change["after"]["start"], "2026-02-11T14:00:00") self.assertTrue(change["can_undo"]) self.assertTrue(change_id.startswith("chg_")) def test_log_create_returns_change_id(self): """Test that log_create returns a valid change ID.""" change_id = changelog_ops.log_create( event_id="evt123", calendar_id="primary", summary="Test" ) self.assertIsNotNone(change_id) self.assertTrue(change_id.startswith("chg_")) class TestLogUpdate(unittest.TestCase): """Tests for log_update function.""" def setUp(self): """Create temp directory and patch file path.""" self.temp_dir = tempfile.mkdtemp() self.changelog_file = os.path.join(self.temp_dir, "changelog.json") self.patcher = patch.object(changelog_ops, 'CHANGELOG_FILE', self.changelog_file) self.patcher.start() def tearDown(self): """Clean up temp directory and stop patcher.""" self.patcher.stop() shutil.rmtree(self.temp_dir, ignore_errors=True) def test_log_update_stores_before_after(self): """Test that log_update stores before and after states.""" before = json.dumps({"summary": "Old Title", "start": "2026-02-11T10:00:00"}) after = json.dumps({"summary": "New Title", "start": "2026-02-11T14:00:00"}) change_id = changelog_ops.log_update( event_id="evt123", calendar_id="primary", before_json=before, after_json=after ) with open(self.changelog_file, 'r') as f: data = json.load(f) change = data["changes"][0] self.assertEqual(change["action"], "update") self.assertEqual(change["before"]["summary"], "Old Title") self.assertEqual(change["after"]["summary"], "New Title") self.assertTrue(change["can_undo"]) def test_log_update_handles_invalid_json(self): """Test that log_update handles invalid JSON gracefully.""" change_id = changelog_ops.log_update( event_id="evt123", calendar_id="primary", before_json="not json", after_json="also not json" ) with open(self.changelog_file, 'r') as f: data = json.load(f) change = data["changes"][0] self.assertIsNone(change["before"]) self.assertIsNone(change["after"]) class TestLogDelete(unittest.TestCase): """Tests for log_delete function.""" def setUp(self): """Create temp directory and patch file path.""" self.temp_dir = tempfile.mkdtemp() self.changelog_file = os.path.join(self.temp_dir, "changelog.json") self.patcher = patch.object(changelog_ops, 'CHANGELOG_FILE', self.changelog_file) self.patcher.start() def tearDown(self): """Clean up temp directory and stop patcher.""" self.patcher.stop() shutil.rmtree(self.temp_dir, ignore_errors=True) def test_log_delete_stores_before_state(self): """Test that log_delete stores the before state.""" before = json.dumps({"summary": "Deleted Event", "start": "2026-02-11T10:00:00"}) change_id = changelog_ops.log_delete( event_id="evt123", calendar_id="primary", before_json=before ) with open(self.changelog_file, 'r') as f: data = json.load(f) change = data["changes"][0] self.assertEqual(change["action"], "delete") self.assertEqual(change["before"]["summary"], "Deleted Event") self.assertIsNone(change["after"]) class TestListChanges(unittest.TestCase): """Tests for list_changes function.""" def setUp(self): """Create temp directory and patch file path.""" self.temp_dir = tempfile.mkdtemp() self.changelog_file = os.path.join(self.temp_dir, "changelog.json") self.patcher = patch.object(changelog_ops, 'CHANGELOG_FILE', self.changelog_file) self.patcher.start() def tearDown(self): """Clean up temp directory and stop patcher.""" self.patcher.stop() shutil.rmtree(self.temp_dir, ignore_errors=True) def test_list_changes_empty(self): """Test list_changes when no changes recorded.""" with open(self.changelog_file, 'w') as f: json.dump({"changes": []}, f) captured = io.StringIO() with patch('sys.stdout', captured): changelog_ops.list_changes() output = captured.getvalue() self.assertIn("No changes recorded", output) def test_list_changes_shows_entries(self): """Test list_changes shows recent entries.""" test_data = { "changes": [{ "id": "chg_001", "timestamp": datetime.now().isoformat(), "action": "create", "event_id": "evt123", "after": {"summary": "Test Meeting"}, "can_undo": True }] } with open(self.changelog_file, 'w') as f: json.dump(test_data, f) captured = io.StringIO() with patch('sys.stdout', captured): changelog_ops.list_changes() output = captured.getvalue() self.assertIn("CREATE", output) self.assertIn("Test Meeting", output) self.assertIn("can undo", output) class TestCanUndo(unittest.TestCase): """Tests for can_undo function.""" def setUp(self): """Create temp directory and patch file path.""" self.temp_dir = tempfile.mkdtemp() self.changelog_file = os.path.join(self.temp_dir, "changelog.json") self.patcher = patch.object(changelog_ops, 'CHANGELOG_FILE', self.changelog_file) self.patcher.start() def tearDown(self): """Clean up temp directory and stop patcher.""" self.patcher.stop() shutil.rmtree(self.temp_dir, ignore_errors=True) def test_can_undo_recent_change(self): """Test can_undo returns true for recent change.""" test_data = { "changes": [{ "id": "chg_001", "timestamp": datetime.now().isoformat(), "action": "create", "can_undo": True }] } with open(self.changelog_file, 'w') as f: json.dump(test_data, f) captured = io.StringIO() with patch('sys.stdout', captured): changelog_ops.can_undo("chg_001") output = captured.getvalue().strip() self.assertEqual(output, "true") def test_can_undo_old_change(self): """Test can_undo returns false for old change (outside 24h window).""" old_time = (datetime.now() - timedelta(hours=25)).isoformat() test_data = { "changes": [{ "id": "chg_001", "timestamp": old_time, "action": "create", "can_undo": True }] } with open(self.changelog_file, 'w') as f: json.dump(test_data, f) captured = io.StringIO() with patch('sys.stdout', captured): changelog_ops.can_undo("chg_001") output = captured.getvalue().strip() self.assertEqual(output, "false") def test_can_undo_already_undone(self): """Test can_undo returns false for already undone change.""" test_data = { "changes": [{ "id": "chg_001", "timestamp": datetime.now().isoformat(), "action": "create", "can_undo": False # Already undone }] } with open(self.changelog_file, 'w') as f: json.dump(test_data, f) captured = io.StringIO() with patch('sys.stdout', captured): changelog_ops.can_undo("chg_001") output = captured.getvalue().strip() self.assertEqual(output, "false") def test_can_undo_not_found(self): """Test can_undo exits with error for nonexistent change.""" with open(self.changelog_file, 'w') as f: json.dump({"changes": []}, f) captured = io.StringIO() with self.assertRaises(SystemExit) as cm: with patch('sys.stdout', captured): changelog_ops.can_undo("nonexistent") self.assertEqual(cm.exception.code, 1) class TestGetChange(unittest.TestCase): """Tests for get_change function.""" def setUp(self): """Create temp directory and patch file path.""" self.temp_dir = tempfile.mkdtemp() self.changelog_file = os.path.join(self.temp_dir, "changelog.json") self.patcher = patch.object(changelog_ops, 'CHANGELOG_FILE', self.changelog_file) self.patcher.start() def tearDown(self): """Clean up temp directory and stop patcher.""" self.patcher.stop() shutil.rmtree(self.temp_dir, ignore_errors=True) def test_get_change_returns_json(self): """Test get_change returns change as JSON.""" test_data = { "changes": [{ "id": "chg_001", "action": "create", "event_id": "evt123" }] } with open(self.changelog_file, 'w') as f: json.dump(test_data, f) captured = io.StringIO() with patch('sys.stdout', captured): changelog_ops.get_change("chg_001") output = captured.getvalue() result = json.loads(output) self.assertEqual(result["id"], "chg_001") self.assertEqual(result["action"], "create") def test_get_change_not_found(self): """Test get_change exits with error for nonexistent change.""" with open(self.changelog_file, 'w') as f: json.dump({"changes": []}, f) with self.assertRaises(SystemExit) as cm: changelog_ops.get_change("nonexistent") self.assertEqual(cm.exception.code, 1) class TestMaxChangesLimit(unittest.TestCase): """Tests for MAX_CHANGES limit enforcement.""" def setUp(self): """Create temp directory and patch file path.""" self.temp_dir = tempfile.mkdtemp() self.changelog_file = os.path.join(self.temp_dir, "changelog.json") self.patcher = patch.object(changelog_ops, 'CHANGELOG_FILE', self.changelog_file) self.patcher.start() def tearDown(self): """Clean up temp directory and stop patcher.""" self.patcher.stop() shutil.rmtree(self.temp_dir, ignore_errors=True) def test_limits_to_max_changes(self): """Test that changelog is limited to MAX_CHANGES entries.""" # Create changelog with MAX_CHANGES entries changes = [{"id": f"chg_{i:03d}"} for i in range(100)] with open(self.changelog_file, 'w') as f: json.dump({"changes": changes}, f) # Add one more changelog_ops.log_create( event_id="evt_new", calendar_id="primary", summary="New Event" ) with open(self.changelog_file, 'r') as f: data = json.load(f) self.assertEqual(len(data["changes"]), 100) # Oldest should be removed self.assertNotEqual(data["changes"][0]["id"], "chg_000") if __name__ == '__main__': unittest.main() ``` ### scripts/tests/test_common.py ```python #!/usr/bin/env python3 """Tests for utils/common.py""" import unittest import sys import os from datetime import datetime, timedelta from unittest.mock import patch # Add parent directory to path for imports sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from utils.common import ( get_day_of_week, format_timestamp, generate_id, generate_indexed_id, time_ago ) class TestGetDayOfWeek(unittest.TestCase): """Tests for get_day_of_week function.""" def test_valid_date_wednesday(self): """Test that 2026-02-11 returns Wednesday.""" self.assertEqual(get_day_of_week("2026-02-11"), "Wednesday") def test_valid_date_monday(self): """Test that 2026-02-09 returns Monday.""" self.assertEqual(get_day_of_week("2026-02-09"), "Monday") def test_valid_date_sunday(self): """Test that 2026-02-15 returns Sunday.""" self.assertEqual(get_day_of_week("2026-02-15"), "Sunday") def test_invalid_format(self): """Test that invalid date format returns empty string.""" self.assertEqual(get_day_of_week("invalid"), "") def test_wrong_format(self): """Test that wrong date format (MM-DD-YYYY) returns empty string.""" self.assertEqual(get_day_of_week("02-11-2026"), "") def test_empty_string(self): """Test that empty string returns empty string.""" self.assertEqual(get_day_of_week(""), "") def test_none_input(self): """Test that None input returns empty string.""" self.assertEqual(get_day_of_week(None), "") class TestFormatTimestamp(unittest.TestCase): """Tests for format_timestamp function.""" def test_valid_iso_timestamp(self): """Test formatting a valid ISO timestamp.""" result = format_timestamp("2026-02-11T14:30:00") self.assertEqual(result, "2026-02-11 14:30") def test_custom_format(self): """Test formatting with a custom format.""" result = format_timestamp("2026-02-11T14:30:00", "%Y/%m/%d") self.assertEqual(result, "2026/02/11") def test_iso_with_microseconds(self): """Test formatting ISO timestamp with microseconds.""" result = format_timestamp("2026-02-11T14:30:00.123456") self.assertEqual(result, "2026-02-11 14:30") def test_invalid_timestamp(self): """Test that invalid timestamp returns original string.""" self.assertEqual(format_timestamp("invalid"), "invalid") def test_empty_string(self): """Test that empty string returns empty string.""" self.assertEqual(format_timestamp(""), "") def test_none_input(self): """Test that None input returns None (original).""" self.assertEqual(format_timestamp(None), None) class TestGenerateId(unittest.TestCase): """Tests for generate_id function.""" def test_prefix_format(self): """Test that generated ID starts with correct prefix.""" result = generate_id("chg") self.assertTrue(result.startswith("chg_")) def test_contains_date_parts(self): """Test that generated ID contains date parts.""" result = generate_id("inv") # Should have format: inv_YYYYMMDD_HHMMSS parts = result.split("_") self.assertEqual(len(parts), 3) self.assertEqual(parts[0], "inv") self.assertEqual(len(parts[1]), 8) # YYYYMMDD self.assertEqual(len(parts[2]), 6) # HHMMSS def test_different_prefixes(self): """Test various prefixes work correctly.""" for prefix in ["chg", "inv", "act", "evt"]: result = generate_id(prefix) self.assertTrue(result.startswith(f"{prefix}_")) class TestGenerateIndexedId(unittest.TestCase): """Tests for generate_indexed_id function.""" def test_index_format(self): """Test that generated ID includes zero-padded index.""" result = generate_indexed_id("chg", 1) self.assertTrue(result.endswith("_001")) def test_larger_index(self): """Test larger index values.""" result = generate_indexed_id("chg", 42) self.assertTrue(result.endswith("_042")) def test_three_digit_index(self): """Test three-digit index.""" result = generate_indexed_id("chg", 123) self.assertTrue(result.endswith("_123")) def test_full_format(self): """Test full ID format.""" result = generate_indexed_id("inv", 5) # Should have format: inv_YYYYMMDD_HHMMSS_005 parts = result.split("_") self.assertEqual(len(parts), 4) self.assertEqual(parts[0], "inv") self.assertEqual(parts[3], "005") class TestTimeAgo(unittest.TestCase): """Tests for time_ago function.""" def test_minutes_ago(self): """Test time_ago for recent timestamps (minutes).""" now = datetime.now() five_min_ago = (now - timedelta(minutes=5)).isoformat() result = time_ago(five_min_ago) self.assertIn("minutes ago", result) self.assertIn("5", result) def test_hours_ago(self): """Test time_ago for timestamps hours ago.""" now = datetime.now() two_hours_ago = (now - timedelta(hours=2)).isoformat() result = time_ago(two_hours_ago) self.assertIn("hours ago", result) self.assertIn("2", result) def test_fractional_hours(self): """Test time_ago shows fractional hours.""" now = datetime.now() ninety_min_ago = (now - timedelta(minutes=90)).isoformat() result = time_ago(ninety_min_ago) self.assertIn("hours ago", result) self.assertIn("1.5", result) def test_invalid_timestamp(self): """Test that invalid timestamp returns empty string.""" self.assertEqual(time_ago("invalid"), "") def test_none_input(self): """Test that None input returns empty string.""" self.assertEqual(time_ago(None), "") if __name__ == '__main__': unittest.main() ``` ### scripts/tests/test_date_parser.py ```python #!/usr/bin/env python3 """Tests for utils/date_parser.py""" import unittest import sys import os # Add parent directory to path for imports sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from utils.date_parser import parse_date, parse_time class TestParseDate(unittest.TestCase): """Tests for parse_date function.""" def test_full_month_name(self): """Test parsing 'February 11, 2026' format.""" self.assertEqual(parse_date("February 11, 2026"), "2026-02-11") def test_short_month_name(self): """Test parsing 'Feb 11, 2026' format.""" self.assertEqual(parse_date("Feb 11, 2026"), "2026-02-11") def test_us_numeric_format(self): """Test parsing '02/11/2026' US format.""" self.assertEqual(parse_date("02/11/2026"), "2026-02-11") def test_iso_format(self): """Test parsing '2026-02-11' ISO format.""" self.assertEqual(parse_date("2026-02-11"), "2026-02-11") def test_day_first_format(self): """Test parsing '11 February 2026' format.""" self.assertEqual(parse_date("11 February 2026"), "2026-02-11") def test_short_day_first_format(self): """Test parsing '11 Feb 2026' format.""" self.assertEqual(parse_date("11 Feb 2026"), "2026-02-11") def test_month_without_comma(self): """Test parsing 'February 11 2026' without comma.""" self.assertEqual(parse_date("February 11 2026"), "2026-02-11") def test_single_digit_day(self): """Test parsing date with single digit day.""" self.assertEqual(parse_date("February 1, 2026"), "2026-02-01") def test_september_short(self): """Test parsing 'Sept' abbreviation.""" self.assertEqual(parse_date("Sept 15, 2026"), "2026-09-15") def test_september_very_short(self): """Test parsing 'Sep' abbreviation.""" self.assertEqual(parse_date("Sep 15, 2026"), "2026-09-15") def test_invalid_date_string(self): """Test that invalid string returns empty.""" self.assertEqual(parse_date("invalid"), "") def test_empty_string(self): """Test that empty string returns empty.""" self.assertEqual(parse_date(""), "") def test_whitespace_handling(self): """Test that leading/trailing whitespace is handled.""" self.assertEqual(parse_date(" February 11, 2026 "), "2026-02-11") class TestParseTime(unittest.TestCase): """Tests for parse_time function.""" def test_12h_pm(self): """Test parsing '2:30 PM' format.""" self.assertEqual(parse_time("2:30 PM"), "14:30") def test_12h_am(self): """Test parsing '9:00 AM' format.""" self.assertEqual(parse_time("9:00 AM"), "09:00") def test_24h_format(self): """Test parsing '14:30' 24-hour format.""" self.assertEqual(parse_time("14:30"), "14:30") def test_noon(self): """Test parsing '12:00 PM' as noon.""" self.assertEqual(parse_time("12:00 PM"), "12:00") def test_midnight_12am(self): """Test parsing '12:00 AM' as midnight.""" self.assertEqual(parse_time("12:00 AM"), "00:00") def test_hour_only_pm(self): """Test parsing '2 PM' without minutes.""" self.assertEqual(parse_time("2 PM"), "14:00") def test_hour_only_am(self): """Test parsing '9 AM' without minutes.""" self.assertEqual(parse_time("9 AM"), "09:00") def test_lowercase_pm(self): """Test parsing '2:30 pm' lowercase.""" self.assertEqual(parse_time("2:30 pm"), "14:30") def test_no_space_before_ampm(self): """Test parsing '2:30PM' without space.""" self.assertEqual(parse_time("2:30PM"), "14:30") def test_double_digit_hour_12h(self): """Test parsing '10:30 AM' double digit hour.""" self.assertEqual(parse_time("10:30 AM"), "10:30") def test_invalid_time_string(self): """Test that invalid string returns empty.""" self.assertEqual(parse_time("invalid"), "") def test_empty_string(self): """Test that empty string returns empty.""" self.assertEqual(parse_time(""), "") def test_none_input(self): """Test that None input returns empty.""" self.assertEqual(parse_time(None), "") def test_whitespace_handling(self): """Test that leading/trailing whitespace is handled.""" self.assertEqual(parse_time(" 2:30 PM "), "14:30") if __name__ == '__main__': unittest.main() ``` ### scripts/tests/test_disposition_ops.py ```python #!/usr/bin/env python3 """Tests for utils/disposition_ops.py""" import unittest import sys import os import tempfile import json from unittest.mock import patch, MagicMock # Add parent directory to path for imports sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from utils.disposition_ops import ( get_disposition_settings, disposition_email, is_calendar_reply ) class TestGetDispositionSettings(unittest.TestCase): """Tests for get_disposition_settings function.""" def test_defaults_when_no_config(self): """Test that defaults are returned when config doesn't exist.""" with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: f.write('{}') f.flush() settings = get_disposition_settings(f.name) os.unlink(f.name) self.assertTrue(settings["mark_read"]) self.assertTrue(settings["archive"]) self.assertTrue(settings["auto_dispose_calendar_replies"]) def test_reads_config_values(self): """Test that config values are read correctly.""" config = { "email_handling": { "mark_read": False, "archive": True, "auto_dispose_calendar_replies": False } } with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: json.dump(config, f) f.flush() settings = get_disposition_settings(f.name) os.unlink(f.name) self.assertFalse(settings["mark_read"]) self.assertTrue(settings["archive"]) self.assertFalse(settings["auto_dispose_calendar_replies"]) def test_partial_config(self): """Test that missing keys use defaults.""" config = { "email_handling": { "mark_read": False } } with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: json.dump(config, f) f.flush() settings = get_disposition_settings(f.name) os.unlink(f.name) self.assertFalse(settings["mark_read"]) self.assertTrue(settings["archive"]) # default self.assertTrue(settings["auto_dispose_calendar_replies"]) # default def test_missing_email_handling_section(self): """Test that missing email_handling section uses all defaults.""" config = {"gmail_account": "[email protected]"} with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: json.dump(config, f) f.flush() settings = get_disposition_settings(f.name) os.unlink(f.name) self.assertTrue(settings["mark_read"]) self.assertTrue(settings["archive"]) self.assertTrue(settings["auto_dispose_calendar_replies"]) class TestDispositionEmail(unittest.TestCase): """Tests for disposition_email function.""" @patch('utils.disposition_ops.modify_email') @patch('utils.disposition_ops.get_disposition_settings') def test_mark_read_and_archive(self, mock_settings, mock_modify): """Test disposition with both mark_read and archive enabled.""" mock_settings.return_value = {"mark_read": True, "archive": True} mock_modify.return_value = {"success": True} result = disposition_email("test123") self.assertTrue(result["success"]) self.assertIn("mark_read", result["actions"]) self.assertIn("archive", result["actions"]) mock_modify.assert_called_once() call_args = mock_modify.call_args self.assertEqual(call_args[0][0], "test123") self.assertIn("UNREAD", call_args[1]["remove_labels"]) self.assertIn("INBOX", call_args[1]["remove_labels"]) @patch('utils.disposition_ops.modify_email') @patch('utils.disposition_ops.get_disposition_settings') def test_mark_read_only(self, mock_settings, mock_modify): """Test disposition with only mark_read enabled.""" mock_settings.return_value = {"mark_read": True, "archive": False} mock_modify.return_value = {"success": True} result = disposition_email("test123") self.assertTrue(result["success"]) self.assertIn("mark_read", result["actions"]) self.assertNotIn("archive", result["actions"]) call_args = mock_modify.call_args self.assertIn("UNREAD", call_args[1]["remove_labels"]) self.assertNotIn("INBOX", call_args[1]["remove_labels"]) @patch('utils.disposition_ops.modify_email') @patch('utils.disposition_ops.get_disposition_settings') def test_archive_only(self, mock_settings, mock_modify): """Test disposition with only archive enabled.""" mock_settings.return_value = {"mark_read": False, "archive": True} mock_modify.return_value = {"success": True} result = disposition_email("test123") self.assertTrue(result["success"]) self.assertIn("archive", result["actions"]) self.assertNotIn("mark_read", result["actions"]) call_args = mock_modify.call_args self.assertIn("INBOX", call_args[1]["remove_labels"]) self.assertNotIn("UNREAD", call_args[1]["remove_labels"]) @patch('utils.disposition_ops.get_disposition_settings') def test_no_actions_configured(self, mock_settings): """Test disposition when both options are disabled.""" mock_settings.return_value = {"mark_read": False, "archive": False} result = disposition_email("test123") self.assertTrue(result["success"]) self.assertEqual(result["actions"], []) self.assertIn("No disposition actions configured", result.get("message", "")) def test_empty_email_id(self): """Test that empty email_id returns error.""" result = disposition_email("") self.assertFalse(result["success"]) self.assertIn("email_id is required", result["error"]) @patch('utils.disposition_ops.modify_email') @patch('utils.disposition_ops.get_disposition_settings') def test_override_mark_read(self, mock_settings, mock_modify): """Test explicit override of mark_read setting.""" mock_settings.return_value = {"mark_read": False, "archive": False} mock_modify.return_value = {"success": True} result = disposition_email("test123", mark_read=True) self.assertTrue(result["success"]) self.assertIn("mark_read", result["actions"]) call_args = mock_modify.call_args self.assertIn("UNREAD", call_args[1]["remove_labels"]) @patch('utils.disposition_ops.modify_email') @patch('utils.disposition_ops.get_disposition_settings') def test_override_archive(self, mock_settings, mock_modify): """Test explicit override of archive setting.""" mock_settings.return_value = {"mark_read": False, "archive": False} mock_modify.return_value = {"success": True} result = disposition_email("test123", archive=True) self.assertTrue(result["success"]) self.assertIn("archive", result["actions"]) call_args = mock_modify.call_args self.assertIn("INBOX", call_args[1]["remove_labels"]) @patch('utils.disposition_ops.modify_email') @patch('utils.disposition_ops.get_disposition_settings') def test_modify_failure(self, mock_settings, mock_modify): """Test handling of modify_email failure.""" mock_settings.return_value = {"mark_read": True, "archive": True} mock_modify.return_value = {"success": False, "error": "API error"} result = disposition_email("test123") self.assertFalse(result["success"]) self.assertEqual(result["error"], "API error") self.assertEqual(result["actions"], []) class TestIsCalendarReply(unittest.TestCase): """Tests for is_calendar_reply function.""" def test_accepted_reply(self): """Test accepted invitation detection.""" self.assertTrue(is_calendar_reply("Accepted: Team Meeting")) self.assertTrue(is_calendar_reply("accepted: lowercase test")) def test_declined_reply(self): """Test declined invitation detection.""" self.assertTrue(is_calendar_reply("Declined: Team Meeting")) def test_tentative_reply(self): """Test tentative invitation detection.""" self.assertTrue(is_calendar_reply("Tentative: Team Meeting")) def test_updated_invitation(self): """Test updated invitation detection.""" self.assertTrue(is_calendar_reply("Updated invitation: Team Meeting")) def test_cancelled_reply(self): """Test cancelled invitation detection (both spellings).""" self.assertTrue(is_calendar_reply("Cancelled: Team Meeting")) self.assertTrue(is_calendar_reply("Canceled: Team Meeting")) def test_non_calendar_email(self): """Test that regular emails are not detected as calendar replies.""" self.assertFalse(is_calendar_reply("Meeting next week")) self.assertFalse(is_calendar_reply("Re: Team Meeting")) self.assertFalse(is_calendar_reply("Invitation to party")) def test_empty_subject(self): """Test that empty subject returns False.""" self.assertFalse(is_calendar_reply("")) self.assertFalse(is_calendar_reply(None)) if __name__ == '__main__': unittest.main() ``` ### scripts/tests/test_event_tracking.py ```python #!/usr/bin/env python3 """Tests for utils/event_tracking.py""" import unittest import sys import os import json import tempfile import shutil from unittest.mock import patch # Add parent directory to path for imports sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from utils import event_tracking class TestTrackEvent(unittest.TestCase): """Tests for track_event function.""" def setUp(self): """Create temp directory and patch file path.""" self.temp_dir = tempfile.mkdtemp() self.events_file = os.path.join(self.temp_dir, "events.json") self.patcher = patch.object(event_tracking, 'EVENTS_FILE', self.events_file) self.patcher.start() def tearDown(self): """Clean up temp directory and stop patcher.""" self.patcher.stop() shutil.rmtree(self.temp_dir, ignore_errors=True) def test_track_new_event(self): """Test tracking a new event.""" event_tracking.track_event( event_id="evt123", calendar_id="primary", email_id="email456", summary="Test Meeting", start="2026-02-11T14:00:00" ) with open(self.events_file, 'r') as f: data = json.load(f) self.assertEqual(len(data["events"]), 1) event = data["events"][0] self.assertEqual(event["event_id"], "evt123") self.assertEqual(event["calendar_id"], "primary") self.assertEqual(event["email_id"], "email456") self.assertEqual(event["summary"], "Test Meeting") self.assertEqual(event["start"], "2026-02-11T14:00:00") self.assertIsNotNone(event["created_at"]) self.assertIsNone(event["updated_at"]) def test_track_updates_existing(self): """Test that tracking same event_id updates existing entry.""" # First track event_tracking.track_event( event_id="evt123", summary="Original Title", start="2026-02-11T14:00:00" ) # Track again with same event_id event_tracking.track_event( event_id="evt123", summary="Updated Title", start="2026-02-12T15:00:00" ) with open(self.events_file, 'r') as f: data = json.load(f) self.assertEqual(len(data["events"]), 1) event = data["events"][0] self.assertEqual(event["summary"], "Updated Title") self.assertEqual(event["start"], "2026-02-12T15:00:00") self.assertIsNotNone(event["updated_at"]) def test_track_multiple_events(self): """Test tracking multiple different events.""" event_tracking.track_event(event_id="evt1", summary="Event 1") event_tracking.track_event(event_id="evt2", summary="Event 2") event_tracking.track_event(event_id="evt3", summary="Event 3") with open(self.events_file, 'r') as f: data = json.load(f) self.assertEqual(len(data["events"]), 3) event_ids = [e["event_id"] for e in data["events"]] self.assertEqual(event_ids, ["evt1", "evt2", "evt3"]) class TestUpdateTrackedEvent(unittest.TestCase): """Tests for update_tracked_event function.""" def setUp(self): """Create temp directory and patch file path.""" self.temp_dir = tempfile.mkdtemp() self.events_file = os.path.join(self.temp_dir, "events.json") self.patcher = patch.object(event_tracking, 'EVENTS_FILE', self.events_file) self.patcher.start() def tearDown(self): """Clean up temp directory and stop patcher.""" self.patcher.stop() shutil.rmtree(self.temp_dir, ignore_errors=True) def test_update_existing_event(self): """Test updating an existing tracked event.""" # Create initial event event_tracking.track_event( event_id="evt123", summary="Original", start="2026-02-11T14:00:00" ) # Update it event_tracking.update_tracked_event( event_id="evt123", summary="Updated", start="2026-02-12T15:00:00" ) with open(self.events_file, 'r') as f: data = json.load(f) event = data["events"][0] self.assertEqual(event["summary"], "Updated") self.assertEqual(event["start"], "2026-02-12T15:00:00") def test_update_nonexistent_event_exits(self): """Test that updating nonexistent event raises SystemExit.""" # Create empty events file with open(self.events_file, 'w') as f: json.dump({"events": []}, f) with self.assertRaises(SystemExit) as cm: event_tracking.update_tracked_event( event_id="nonexistent", summary="Test" ) self.assertEqual(cm.exception.code, 1) class TestDeleteTrackedEvent(unittest.TestCase): """Tests for delete_tracked_event function.""" def setUp(self): """Create temp directory and patch file path.""" self.temp_dir = tempfile.mkdtemp() self.events_file = os.path.join(self.temp_dir, "events.json") self.patcher = patch.object(event_tracking, 'EVENTS_FILE', self.events_file) self.patcher.start() def tearDown(self): """Clean up temp directory and stop patcher.""" self.patcher.stop() shutil.rmtree(self.temp_dir, ignore_errors=True) def test_delete_existing_event(self): """Test deleting an existing event.""" # Create events event_tracking.track_event(event_id="evt1", summary="Event 1") event_tracking.track_event(event_id="evt2", summary="Event 2") # Delete one event_tracking.delete_tracked_event("evt1") with open(self.events_file, 'r') as f: data = json.load(f) self.assertEqual(len(data["events"]), 1) self.assertEqual(data["events"][0]["event_id"], "evt2") def test_delete_nonexistent_event_warns(self): """Test that deleting nonexistent event prints warning but doesn't crash.""" with open(self.events_file, 'w') as f: json.dump({"events": []}, f) # Should not raise, just warn event_tracking.delete_tracked_event("nonexistent") class TestLookupEvents(unittest.TestCase): """Tests for lookup_events function.""" def setUp(self): """Create temp directory and patch file path.""" self.temp_dir = tempfile.mkdtemp() self.events_file = os.path.join(self.temp_dir, "events.json") self.patcher = patch.object(event_tracking, 'EVENTS_FILE', self.events_file) self.patcher.start() # Create test events test_data = { "events": [ {"event_id": "evt1", "email_id": "email1", "summary": "Team Meeting", "start": "2026-02-11T10:00:00"}, {"event_id": "evt2", "email_id": "email2", "summary": "Project Review", "start": "2026-02-12T14:00:00"}, {"event_id": "evt3", "email_id": "email1", "summary": "Follow-up Meeting", "start": "2026-02-13T09:00:00"} ] } with open(self.events_file, 'w') as f: json.dump(test_data, f) def tearDown(self): """Clean up temp directory and stop patcher.""" self.patcher.stop() shutil.rmtree(self.temp_dir, ignore_errors=True) @patch('sys.stdout') def test_lookup_by_email_id(self, mock_stdout): """Test looking up events by email_id.""" import io captured = io.StringIO() with patch('sys.stdout', captured): event_tracking.lookup_events(search_type="email_id", search_value="email1") output = captured.getvalue() results = json.loads(output) self.assertEqual(len(results), 2) self.assertTrue(all(e["email_id"] == "email1" for e in results)) @patch('sys.stdout') def test_lookup_by_event_id(self, mock_stdout): """Test looking up events by event_id.""" import io captured = io.StringIO() with patch('sys.stdout', captured): event_tracking.lookup_events(search_type="event_id", search_value="evt2") output = captured.getvalue() results = json.loads(output) self.assertEqual(len(results), 1) self.assertEqual(results[0]["event_id"], "evt2") @patch('sys.stdout') def test_lookup_by_summary_partial(self, mock_stdout): """Test looking up events by partial summary match.""" import io captured = io.StringIO() with patch('sys.stdout', captured): event_tracking.lookup_events(search_type="summary", search_value="meeting") output = captured.getvalue() results = json.loads(output) self.assertEqual(len(results), 2) # "Team Meeting" and "Follow-up Meeting" @patch('sys.stdout') def test_lookup_list_all(self, mock_stdout): """Test listing all events.""" import io captured = io.StringIO() with patch('sys.stdout', captured): event_tracking.lookup_events(search_type="list") output = captured.getvalue() results = json.loads(output) self.assertEqual(len(results), 3) @patch('sys.stdout') def test_lookup_no_results(self, mock_stdout): """Test lookup with no matching results.""" import io captured = io.StringIO() with patch('sys.stdout', captured): event_tracking.lookup_events(search_type="email_id", search_value="nonexistent") output = captured.getvalue() results = json.loads(output) self.assertEqual(len(results), 0) if __name__ == '__main__': unittest.main() ``` ### scripts/tests/test_invite_ops.py ```python #!/usr/bin/env python3 """Tests for utils/invite_ops.py""" import unittest import sys import os import json import tempfile import shutil import io from unittest.mock import patch # Add parent directory to path for imports sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from utils import invite_ops class TestUpdateInviteStatus(unittest.TestCase): """Tests for update_invite_status function.""" def setUp(self): """Create temp directory and patch file path.""" self.temp_dir = tempfile.mkdtemp() self.pending_file = os.path.join(self.temp_dir, "pending_invites.json") self.patcher = patch.object(invite_ops, 'PENDING_FILE', self.pending_file) self.patcher.start() def tearDown(self): """Clean up temp directory and stop patcher.""" self.patcher.stop() shutil.rmtree(self.temp_dir, ignore_errors=True) def test_update_status_by_email_id(self): """Test updating invite status by email_id.""" test_data = { "invites": [{ "id": "inv1", "email_id": "email123", "events": [ {"title": "Team Meeting", "status": "pending"} ] }] } with open(self.pending_file, 'w') as f: json.dump(test_data, f) captured = io.StringIO() with patch('sys.stdout', captured): invite_ops.update_invite_status( email_id="email123", event_title="Team Meeting", new_status="created", calendar_event_id="cal_evt_123" ) with open(self.pending_file, 'r') as f: data = json.load(f) event = data["invites"][0]["events"][0] self.assertEqual(event["status"], "created") self.assertEqual(event["event_id"], "cal_evt_123") self.assertIn("updated_at", event) def test_update_status_by_invite_id(self): """Test updating invite status by invite_id.""" test_data = { "invites": [{ "id": "inv1", "email_id": "email123", "events": [ {"title": "Project Review", "status": "pending"} ] }] } with open(self.pending_file, 'w') as f: json.dump(test_data, f) captured = io.StringIO() with patch('sys.stdout', captured): invite_ops.update_invite_status( invite_id="inv1", event_title="Project Review", new_status="dismissed" ) with open(self.pending_file, 'r') as f: data = json.load(f) event = data["invites"][0]["events"][0] self.assertEqual(event["status"], "dismissed") def test_update_status_partial_title_match(self): """Test updating invite status with partial title match.""" test_data = { "invites": [{ "id": "inv1", "email_id": "email123", "events": [ {"title": "Weekly Team Meeting", "status": "pending"} ] }] } with open(self.pending_file, 'w') as f: json.dump(test_data, f) captured = io.StringIO() with patch('sys.stdout', captured): invite_ops.update_invite_status( email_id="email123", event_title="team meeting", # Partial, lowercase new_status="created" ) with open(self.pending_file, 'r') as f: data = json.load(f) event = data["invites"][0]["events"][0] self.assertEqual(event["status"], "created") def test_update_status_not_found_exits(self): """Test that update_invite_status raises SystemExit when event not found.""" test_data = { "invites": [{ "id": "inv1", "email_id": "email123", "events": [ {"title": "Team Meeting", "status": "pending"} ] }] } with open(self.pending_file, 'w') as f: json.dump(test_data, f) with self.assertRaises(SystemExit) as cm: invite_ops.update_invite_status( email_id="email123", event_title="Nonexistent Event", new_status="created" ) self.assertEqual(cm.exception.code, 1) def test_update_status_wrong_email_id_exits(self): """Test that wrong email_id causes SystemExit.""" test_data = { "invites": [{ "id": "inv1", "email_id": "email123", "events": [ {"title": "Team Meeting", "status": "pending"} ] }] } with open(self.pending_file, 'w') as f: json.dump(test_data, f) with self.assertRaises(SystemExit) as cm: invite_ops.update_invite_status( email_id="wrong_email", event_title="Team Meeting", new_status="created" ) self.assertEqual(cm.exception.code, 1) def test_update_multiple_events_in_invite(self): """Test updating specific event when invite has multiple events.""" test_data = { "invites": [{ "id": "inv1", "email_id": "email123", "events": [ {"title": "Event A", "status": "pending"}, {"title": "Event B", "status": "pending"}, {"title": "Event C", "status": "pending"} ] }] } with open(self.pending_file, 'w') as f: json.dump(test_data, f) captured = io.StringIO() with patch('sys.stdout', captured): invite_ops.update_invite_status( email_id="email123", event_title="Event B", new_status="created" ) with open(self.pending_file, 'r') as f: data = json.load(f) events = data["invites"][0]["events"] self.assertEqual(events[0]["status"], "pending") self.assertEqual(events[1]["status"], "created") self.assertEqual(events[2]["status"], "pending") def test_update_prints_confirmation(self): """Test that update prints confirmation message.""" test_data = { "invites": [{ "id": "inv1", "email_id": "email123", "events": [ {"title": "Team Meeting", "status": "pending"} ] }] } with open(self.pending_file, 'w') as f: json.dump(test_data, f) captured = io.StringIO() with patch('sys.stdout', captured): invite_ops.update_invite_status( email_id="email123", event_title="Team Meeting", new_status="created" ) output = captured.getvalue() self.assertIn("Updated", output) self.assertIn("Team Meeting", output) self.assertIn("created", output) if __name__ == '__main__': unittest.main() ``` ### scripts/tests/test_json_store.py ```python #!/usr/bin/env python3 """Tests for utils/json_store.py""" import unittest import sys import os import json import tempfile import shutil # Add parent directory to path for imports sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from utils.json_store import load_json, save_json, ensure_dir class TestEnsureDir(unittest.TestCase): """Tests for ensure_dir function.""" def setUp(self): """Create temp directory for test files.""" self.temp_dir = tempfile.mkdtemp() def tearDown(self): """Clean up temp directory.""" shutil.rmtree(self.temp_dir, ignore_errors=True) def test_creates_parent_dirs(self): """Test that parent directories are created.""" filepath = os.path.join(self.temp_dir, "a", "b", "c", "file.json") ensure_dir(filepath) parent_dir = os.path.dirname(filepath) self.assertTrue(os.path.isdir(parent_dir)) def test_existing_dir_ok(self): """Test that existing directory doesn't cause error.""" filepath = os.path.join(self.temp_dir, "file.json") ensure_dir(filepath) # Should not raise class TestLoadJson(unittest.TestCase): """Tests for load_json function.""" def setUp(self): """Create temp directory for test files.""" self.temp_dir = tempfile.mkdtemp() def tearDown(self): """Clean up temp directory.""" shutil.rmtree(self.temp_dir, ignore_errors=True) def test_load_valid_json(self): """Test loading a valid JSON file.""" filepath = os.path.join(self.temp_dir, "data.json") test_data = {"key": "value", "number": 42} with open(filepath, 'w') as f: json.dump(test_data, f) result = load_json(filepath) self.assertEqual(result, test_data) def test_load_missing_file_returns_default(self): """Test that missing file returns default value.""" filepath = os.path.join(self.temp_dir, "nonexistent.json") result = load_json(filepath, default={"default": True}) self.assertEqual(result, {"default": True}) def test_load_missing_file_default_empty_dict(self): """Test that missing file with no default returns empty dict.""" filepath = os.path.join(self.temp_dir, "nonexistent.json") result = load_json(filepath) self.assertEqual(result, {}) def test_load_malformed_json_returns_default(self): """Test that malformed JSON returns default value.""" filepath = os.path.join(self.temp_dir, "bad.json") with open(filepath, 'w') as f: f.write("{invalid json") result = load_json(filepath, default={"fallback": True}) self.assertEqual(result, {"fallback": True}) def test_load_empty_file_returns_default(self): """Test that empty file returns default value.""" filepath = os.path.join(self.temp_dir, "empty.json") with open(filepath, 'w') as f: pass # Create empty file result = load_json(filepath, default={"empty": True}) self.assertEqual(result, {"empty": True}) def test_load_with_list_default(self): """Test loading with a list as default.""" filepath = os.path.join(self.temp_dir, "nonexistent.json") result = load_json(filepath, default=[]) self.assertEqual(result, []) def test_load_complex_nested_json(self): """Test loading complex nested JSON structure.""" filepath = os.path.join(self.temp_dir, "complex.json") test_data = { "events": [ {"id": 1, "name": "Event 1"}, {"id": 2, "name": "Event 2"} ], "metadata": { "version": "1.0", "nested": {"deep": True} } } with open(filepath, 'w') as f: json.dump(test_data, f) result = load_json(filepath) self.assertEqual(result, test_data) class TestSaveJson(unittest.TestCase): """Tests for save_json function.""" def setUp(self): """Create temp directory for test files.""" self.temp_dir = tempfile.mkdtemp() def tearDown(self): """Clean up temp directory.""" shutil.rmtree(self.temp_dir, ignore_errors=True) def test_save_creates_file(self): """Test that save_json creates a new file.""" filepath = os.path.join(self.temp_dir, "new.json") test_data = {"created": True} save_json(filepath, test_data) self.assertTrue(os.path.exists(filepath)) with open(filepath, 'r') as f: loaded = json.load(f) self.assertEqual(loaded, test_data) def test_save_creates_parent_dirs(self): """Test that save_json creates parent directories.""" filepath = os.path.join(self.temp_dir, "deep", "nested", "file.json") test_data = {"nested": True} save_json(filepath, test_data) self.assertTrue(os.path.exists(filepath)) def test_save_overwrites_existing(self): """Test that save_json overwrites existing file.""" filepath = os.path.join(self.temp_dir, "existing.json") with open(filepath, 'w') as f: json.dump({"old": "data"}, f) new_data = {"new": "data"} save_json(filepath, new_data) with open(filepath, 'r') as f: loaded = json.load(f) self.assertEqual(loaded, new_data) def test_save_with_custom_indent(self): """Test that save_json uses custom indentation.""" filepath = os.path.join(self.temp_dir, "indented.json") test_data = {"key": "value"} save_json(filepath, test_data, indent=4) with open(filepath, 'r') as f: content = f.read() # Check for 4-space indentation self.assertIn(" ", content) def test_save_list(self): """Test saving a list.""" filepath = os.path.join(self.temp_dir, "list.json") test_data = [1, 2, 3, {"nested": True}] save_json(filepath, test_data) with open(filepath, 'r') as f: loaded = json.load(f) self.assertEqual(loaded, test_data) def test_roundtrip(self): """Test that save followed by load returns same data.""" filepath = os.path.join(self.temp_dir, "roundtrip.json") test_data = { "string": "value", "number": 42, "float": 3.14, "boolean": True, "null": None, "array": [1, 2, 3], "object": {"nested": "value"} } save_json(filepath, test_data) loaded = load_json(filepath) self.assertEqual(loaded, test_data) if __name__ == '__main__': unittest.main() ``` ### scripts/tests/test_pending_ops.py ```python #!/usr/bin/env python3 """Tests for utils/pending_ops.py""" import unittest import sys import os import json import tempfile import shutil import io from unittest.mock import patch from datetime import datetime # Add parent directory to path for imports sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from utils import pending_ops class TestListPendingJson(unittest.TestCase): """Tests for list_pending_json function.""" def setUp(self): """Create temp directory and patch file path.""" self.temp_dir = tempfile.mkdtemp() self.pending_file = os.path.join(self.temp_dir, "pending_invites.json") self.patcher = patch.object(pending_ops, 'PENDING_FILE', self.pending_file) self.patcher.start() def tearDown(self): """Clean up temp directory and stop patcher.""" self.patcher.stop() shutil.rmtree(self.temp_dir, ignore_errors=True) def test_empty_when_no_invites(self): """Test that empty list returned when no invites exist.""" with open(self.pending_file, 'w') as f: json.dump({"invites": []}, f) captured = io.StringIO() with patch('sys.stdout', captured): pending_ops.list_pending_json(today="2026-02-11") output = captured.getvalue() results = json.loads(output) self.assertEqual(results, []) def test_filters_past_dates(self): """Test that past events are filtered out.""" test_data = { "invites": [{ "id": "inv1", "email_id": "email1", "email_subject": "Test", "events": [ {"title": "Past Event", "date": "2026-02-01", "status": "pending"}, {"title": "Future Event", "date": "2026-02-15", "status": "pending"} ] }] } with open(self.pending_file, 'w') as f: json.dump(test_data, f) captured = io.StringIO() with patch('sys.stdout', captured): pending_ops.list_pending_json(today="2026-02-11") output = captured.getvalue() results = json.loads(output) self.assertEqual(len(results), 1) self.assertEqual(results[0]["title"], "Future Event") def test_filters_non_pending_status(self): """Test that non-pending events are filtered out.""" test_data = { "invites": [{ "id": "inv1", "email_id": "email1", "email_subject": "Test", "events": [ {"title": "Created Event", "date": "2026-02-15", "status": "created"}, {"title": "Pending Event", "date": "2026-02-16", "status": "pending"} ] }] } with open(self.pending_file, 'w') as f: json.dump(test_data, f) captured = io.StringIO() with patch('sys.stdout', captured): pending_ops.list_pending_json(today="2026-02-11") output = captured.getvalue() results = json.loads(output) self.assertEqual(len(results), 1) self.assertEqual(results[0]["title"], "Pending Event") def test_includes_day_of_week(self): """Test that day_of_week is included in results.""" test_data = { "invites": [{ "id": "inv1", "email_id": "email1", "email_subject": "Test", "events": [ {"title": "Wednesday Event", "date": "2026-02-11", "status": "pending"} ] }] } with open(self.pending_file, 'w') as f: json.dump(test_data, f) captured = io.StringIO() with patch('sys.stdout', captured): pending_ops.list_pending_json(today="2026-02-11") output = captured.getvalue() results = json.loads(output) self.assertEqual(results[0]["day_of_week"], "Wednesday") class TestListPendingSummary(unittest.TestCase): """Tests for list_pending_summary function.""" def setUp(self): """Create temp directory and patch file path.""" self.temp_dir = tempfile.mkdtemp() self.pending_file = os.path.join(self.temp_dir, "pending_invites.json") self.patcher = patch.object(pending_ops, 'PENDING_FILE', self.pending_file) self.patcher.start() def tearDown(self): """Clean up temp directory and stop patcher.""" self.patcher.stop() shutil.rmtree(self.temp_dir, ignore_errors=True) def test_shows_no_pending_message(self): """Test that 'No pending invites' shown when empty.""" with open(self.pending_file, 'w') as f: json.dump({"invites": []}, f) captured = io.StringIO() with patch('sys.stdout', captured): pending_ops.list_pending_summary(today="2026-02-11") output = captured.getvalue() self.assertIn("No pending invites", output) def test_shows_event_count(self): """Test that event count is shown in summary.""" test_data = { "invites": [{ "id": "inv1", "email_id": "email1", "email_subject": "Meeting Invite", "events": [ {"title": "Event 1", "date": "2026-02-15", "status": "pending"}, {"title": "Event 2", "date": "2026-02-16", "status": "pending"} ] }] } with open(self.pending_file, 'w') as f: json.dump(test_data, f) captured = io.StringIO() with patch('sys.stdout', captured): pending_ops.list_pending_summary(today="2026-02-11") output = captured.getvalue() self.assertIn("2 pending calendar invite", output) class TestAutoDismiss(unittest.TestCase): """Tests for auto-dismiss functionality.""" def setUp(self): """Create temp directory and patch file path.""" self.temp_dir = tempfile.mkdtemp() self.pending_file = os.path.join(self.temp_dir, "pending_invites.json") self.patcher = patch.object(pending_ops, 'PENDING_FILE', self.pending_file) self.patcher.start() def tearDown(self): """Clean up temp directory and stop patcher.""" self.patcher.stop() shutil.rmtree(self.temp_dir, ignore_errors=True) def test_auto_dismiss_after_max_reminders(self): """Test that events are auto-dismissed after MAX_REMINDERS.""" test_data = { "invites": [{ "id": "inv1", "email_id": "email1", "email_subject": "Test", "reminder_count": 3, # MAX_REMINDERS "events": [ {"title": "Ignored Event", "date": "2026-02-15", "status": "pending"} ] }] } with open(self.pending_file, 'w') as f: json.dump(test_data, f) captured = io.StringIO() with patch('sys.stdout', captured): pending_ops.list_pending_json(today="2026-02-11", auto_dismiss=True) # Check that the file was updated with auto_dismissed status with open(self.pending_file, 'r') as f: data = json.load(f) event = data["invites"][0]["events"][0] self.assertEqual(event["status"], "auto_dismissed") self.assertIn("auto_dismissed_at", event) def test_no_auto_dismiss_below_max_reminders(self): """Test that events are not auto-dismissed below MAX_REMINDERS.""" test_data = { "invites": [{ "id": "inv1", "email_id": "email1", "email_subject": "Test", "reminder_count": 2, # Below MAX_REMINDERS "events": [ {"title": "Still Pending", "date": "2026-02-15", "status": "pending"} ] }] } with open(self.pending_file, 'w') as f: json.dump(test_data, f) captured = io.StringIO() with patch('sys.stdout', captured): pending_ops.list_pending_json(today="2026-02-11", auto_dismiss=True) output = captured.getvalue() results = json.loads(output) self.assertEqual(len(results), 1) self.assertEqual(results[0]["title"], "Still Pending") class TestUpdateReminded(unittest.TestCase): """Tests for update-reminded functionality.""" def setUp(self): """Create temp directory and patch file path.""" self.temp_dir = tempfile.mkdtemp() self.pending_file = os.path.join(self.temp_dir, "pending_invites.json") self.patcher = patch.object(pending_ops, 'PENDING_FILE', self.pending_file) self.patcher.start() def tearDown(self): """Clean up temp directory and stop patcher.""" self.patcher.stop() shutil.rmtree(self.temp_dir, ignore_errors=True) def test_increments_reminder_count(self): """Test that update_reminded increments reminder_count.""" test_data = { "invites": [{ "id": "inv1", "email_id": "email1", "email_subject": "Test", "reminder_count": 1, "events": [ {"title": "Event", "date": "2026-02-15", "status": "pending"} ] }] } with open(self.pending_file, 'w') as f: json.dump(test_data, f) captured = io.StringIO() with patch('sys.stdout', captured): pending_ops.list_pending_json(today="2026-02-11", update_reminded=True) with open(self.pending_file, 'r') as f: data = json.load(f) self.assertEqual(data["invites"][0]["reminder_count"], 2) self.assertIn("last_reminded", data["invites"][0]) class TestAddPendingInvite(unittest.TestCase): """Tests for add_pending_invite function.""" def setUp(self): """Create temp directory and patch file path.""" self.temp_dir = tempfile.mkdtemp() self.pending_file = os.path.join(self.temp_dir, "pending_invites.json") self.patcher = patch.object(pending_ops, 'PENDING_FILE', self.pending_file) self.patcher.start() def tearDown(self): """Clean up temp directory and stop patcher.""" self.patcher.stop() shutil.rmtree(self.temp_dir, ignore_errors=True) def test_add_new_invite(self): """Test adding a new pending invite.""" # Start with empty invites with open(self.pending_file, 'w') as f: json.dump({"invites": []}, f) events = [ {"title": "Birthday Party", "date": "2026-02-15", "time": "14:00", "status": "pending"} ] invite_id = pending_ops.add_pending_invite( email_id="email123", email_subject="Party Invite", events=events ) # Verify the invite was created self.assertTrue(invite_id.startswith("inv_")) with open(self.pending_file, 'r') as f: data = json.load(f) self.assertEqual(len(data["invites"]), 1) invite = data["invites"][0] self.assertEqual(invite["id"], invite_id) self.assertEqual(invite["email_id"], "email123") self.assertEqual(invite["email_subject"], "Party Invite") self.assertEqual(invite["events"], events) self.assertEqual(invite["reminder_count"], 0) self.assertIsNone(invite["last_reminded"]) self.assertIn("created_at", invite) def test_add_updates_existing_invite(self): """Test that adding with same email_id updates existing invite.""" # Create an existing invite initial_data = { "invites": [{ "id": "inv_existing", "email_id": "email123", "email_subject": "Old Subject", "events": [{"title": "Old Event", "date": "2026-02-10", "status": "pending"}], "created_at": "2026-02-01T10:00:00", "reminder_count": 2, "last_reminded": "2026-02-03T10:00:00" }] } with open(self.pending_file, 'w') as f: json.dump(initial_data, f) # Add with same email_id but new events new_events = [ {"title": "New Event", "date": "2026-02-20", "time": "15:00", "status": "pending"} ] invite_id = pending_ops.add_pending_invite( email_id="email123", email_subject="New Subject", events=new_events ) # Verify the existing invite was updated self.assertEqual(invite_id, "inv_existing") with open(self.pending_file, 'r') as f: data = json.load(f) self.assertEqual(len(data["invites"]), 1) invite = data["invites"][0] self.assertEqual(invite["events"], new_events) self.assertIn("updated_at", invite) # Original fields should be preserved self.assertEqual(invite["reminder_count"], 2) self.assertEqual(invite["created_at"], "2026-02-01T10:00:00") def test_add_multiple_events(self): """Test adding an invite with multiple events.""" with open(self.pending_file, 'w') as f: json.dump({"invites": []}, f) events = [ {"title": "Workshop Day 1", "date": "2026-03-01", "time": "09:00", "status": "pending"}, {"title": "Workshop Day 2", "date": "2026-03-02", "time": "09:00", "status": "pending"}, {"title": "Workshop Day 3", "date": "2026-03-03", "time": "09:00", "status": "pending"} ] invite_id = pending_ops.add_pending_invite( email_id="workshop_email", email_subject="3-Day Workshop Registration", events=events ) with open(self.pending_file, 'r') as f: data = json.load(f) self.assertEqual(len(data["invites"]), 1) invite = data["invites"][0] self.assertEqual(len(invite["events"]), 3) self.assertEqual(invite["events"][0]["title"], "Workshop Day 1") self.assertEqual(invite["events"][2]["title"], "Workshop Day 3") def test_add_creates_file_if_missing(self): """Test that add creates the file if it doesn't exist.""" # Remove the file if os.path.exists(self.pending_file): os.remove(self.pending_file) events = [{"title": "Test Event", "date": "2026-02-15", "status": "pending"}] invite_id = pending_ops.add_pending_invite( email_id="test_email", email_subject="Test", events=events ) self.assertTrue(os.path.exists(self.pending_file)) with open(self.pending_file, 'r') as f: data = json.load(f) self.assertEqual(len(data["invites"]), 1) if __name__ == '__main__': unittest.main() ``` ### scripts/tests/test_undo_ops.py ```python #!/usr/bin/env python3 """Tests for utils/undo_ops.py""" import unittest import sys import os import json import tempfile import shutil import io from datetime import datetime, timedelta from unittest.mock import patch # Add parent directory to path for imports sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from utils import undo_ops class TestFindLastUndoable(unittest.TestCase): """Tests for find_last_undoable function.""" def setUp(self): """Create temp directory and patch file path.""" self.temp_dir = tempfile.mkdtemp() self.changelog_file = os.path.join(self.temp_dir, "changelog.json") self.patcher = patch.object(undo_ops, 'CHANGELOG_FILE', self.changelog_file) self.patcher.start() def tearDown(self): """Clean up temp directory and stop patcher.""" self.patcher.stop() shutil.rmtree(self.temp_dir, ignore_errors=True) def test_finds_most_recent_undoable(self): """Test finding the most recent undoable change.""" now = datetime.now() test_data = { "changes": [ {"id": "chg_001", "timestamp": (now - timedelta(hours=2)).isoformat(), "can_undo": True}, {"id": "chg_002", "timestamp": (now - timedelta(hours=1)).isoformat(), "can_undo": True}, {"id": "chg_003", "timestamp": now.isoformat(), "can_undo": True} ] } with open(self.changelog_file, 'w') as f: json.dump(test_data, f) captured = io.StringIO() with patch('sys.stdout', captured): undo_ops.find_last_undoable() output = captured.getvalue().strip() self.assertEqual(output, "chg_003") def test_skips_non_undoable(self): """Test that non-undoable changes are skipped.""" now = datetime.now() test_data = { "changes": [ {"id": "chg_001", "timestamp": now.isoformat(), "can_undo": True}, {"id": "chg_002", "timestamp": now.isoformat(), "can_undo": False} # Already undone ] } with open(self.changelog_file, 'w') as f: json.dump(test_data, f) captured = io.StringIO() with patch('sys.stdout', captured): undo_ops.find_last_undoable() output = captured.getvalue().strip() self.assertEqual(output, "chg_001") def test_skips_old_changes(self): """Test that changes older than 24 hours are skipped.""" now = datetime.now() test_data = { "changes": [ {"id": "chg_001", "timestamp": (now - timedelta(hours=25)).isoformat(), "can_undo": True}, {"id": "chg_002", "timestamp": (now - timedelta(hours=1)).isoformat(), "can_undo": True} ] } with open(self.changelog_file, 'w') as f: json.dump(test_data, f) captured = io.StringIO() with patch('sys.stdout', captured): undo_ops.find_last_undoable() output = captured.getvalue().strip() self.assertEqual(output, "chg_002") def test_exits_when_no_undoable(self): """Test that SystemExit raised when no undoable changes exist.""" with open(self.changelog_file, 'w') as f: json.dump({"changes": []}, f) with self.assertRaises(SystemExit) as cm: undo_ops.find_last_undoable() self.assertEqual(cm.exception.code, 1) def test_exits_when_all_too_old(self): """Test that SystemExit raised when all changes are too old.""" old_time = (datetime.now() - timedelta(hours=25)).isoformat() test_data = { "changes": [ {"id": "chg_001", "timestamp": old_time, "can_undo": True} ] } with open(self.changelog_file, 'w') as f: json.dump(test_data, f) with self.assertRaises(SystemExit) as cm: undo_ops.find_last_undoable() self.assertEqual(cm.exception.code, 1) class TestListUndoable(unittest.TestCase): """Tests for list_undoable function.""" def setUp(self): """Create temp directory and patch file path.""" self.temp_dir = tempfile.mkdtemp() self.changelog_file = os.path.join(self.temp_dir, "changelog.json") self.patcher = patch.object(undo_ops, 'CHANGELOG_FILE', self.changelog_file) self.patcher.start() def tearDown(self): """Clean up temp directory and stop patcher.""" self.patcher.stop() shutil.rmtree(self.temp_dir, ignore_errors=True) def test_lists_all_undoable(self): """Test listing all undoable changes.""" now = datetime.now() test_data = { "changes": [ {"id": "chg_001", "timestamp": now.isoformat(), "action": "create", "after": {"summary": "Event 1"}, "can_undo": True}, {"id": "chg_002", "timestamp": now.isoformat(), "action": "update", "before": {"summary": "Old"}, "after": {"summary": "New"}, "can_undo": True} ] } with open(self.changelog_file, 'w') as f: json.dump(test_data, f) captured = io.StringIO() with patch('sys.stdout', captured): undo_ops.list_undoable() output = captured.getvalue() self.assertIn("chg_001", output) self.assertIn("chg_002", output) self.assertIn("Created", output) self.assertIn("Updated", output) def test_empty_when_no_undoable(self): """Test message when no undoable changes.""" with open(self.changelog_file, 'w') as f: json.dump({"changes": []}, f) captured = io.StringIO() with patch('sys.stdout', captured): undo_ops.list_undoable() output = captured.getvalue() self.assertIn("No undoable changes", output) def test_shows_delete_action(self): """Test that delete actions are shown correctly.""" now = datetime.now() test_data = { "changes": [ {"id": "chg_001", "timestamp": now.isoformat(), "action": "delete", "before": {"summary": "Deleted Event"}, "can_undo": True} ] } with open(self.changelog_file, 'w') as f: json.dump(test_data, f) captured = io.StringIO() with patch('sys.stdout', captured): undo_ops.list_undoable() output = captured.getvalue() self.assertIn("Deleted", output) self.assertIn("Deleted Event", output) class TestMarkUndone(unittest.TestCase): """Tests for mark_undone function.""" def setUp(self): """Create temp directory and patch file path.""" self.temp_dir = tempfile.mkdtemp() self.changelog_file = os.path.join(self.temp_dir, "changelog.json") self.patcher = patch.object(undo_ops, 'CHANGELOG_FILE', self.changelog_file) self.patcher.start() def tearDown(self): """Clean up temp directory and stop patcher.""" self.patcher.stop() shutil.rmtree(self.temp_dir, ignore_errors=True) def test_sets_can_undo_false(self): """Test that mark_undone sets can_undo to False.""" test_data = { "changes": [ {"id": "chg_001", "can_undo": True} ] } with open(self.changelog_file, 'w') as f: json.dump(test_data, f) undo_ops.mark_undone("chg_001") with open(self.changelog_file, 'r') as f: data = json.load(f) self.assertFalse(data["changes"][0]["can_undo"]) def test_sets_undone_at_timestamp(self): """Test that mark_undone sets undone_at timestamp.""" test_data = { "changes": [ {"id": "chg_001", "can_undo": True} ] } with open(self.changelog_file, 'w') as f: json.dump(test_data, f) undo_ops.mark_undone("chg_001") with open(self.changelog_file, 'r') as f: data = json.load(f) self.assertIn("undone_at", data["changes"][0]) def test_nonexistent_change_no_error(self): """Test that marking nonexistent change doesn't crash.""" test_data = {"changes": []} with open(self.changelog_file, 'w') as f: json.dump(test_data, f) # Should not raise undo_ops.mark_undone("nonexistent") def test_marks_only_specified_change(self): """Test that only the specified change is marked.""" test_data = { "changes": [ {"id": "chg_001", "can_undo": True}, {"id": "chg_002", "can_undo": True}, {"id": "chg_003", "can_undo": True} ] } with open(self.changelog_file, 'w') as f: json.dump(test_data, f) undo_ops.mark_undone("chg_002") with open(self.changelog_file, 'r') as f: data = json.load(f) self.assertTrue(data["changes"][0]["can_undo"]) self.assertFalse(data["changes"][1]["can_undo"]) self.assertTrue(data["changes"][2]["can_undo"]) if __name__ == '__main__': unittest.main() ``` ### scripts/track_event.sh ```bash #!/bin/bash # Track a created calendar event for future updates/deletions # Usage: track_event.sh --event-id <id> --calendar-id <cal_id> --email-id <email_id> --summary <title> --start <datetime> # # This stores event metadata in events.json so we can: # - Find existing events by email_id (for duplicate detection) # - Update or delete events without searching the calendar # - Track event history SCRIPT_DIR="$(dirname "$0")" UTILS_DIR="$SCRIPT_DIR/utils" # Parse arguments EVENT_ID="" CALENDAR_ID="primary" EMAIL_ID="" SUMMARY="" START="" while [[ $# -gt 0 ]]; do case $1 in --event-id) EVENT_ID="$2" shift 2 ;; --calendar-id) CALENDAR_ID="$2" shift 2 ;; --email-id) EMAIL_ID="$2" shift 2 ;; --summary) SUMMARY="$2" shift 2 ;; --start) START="$2" shift 2 ;; *) echo "Unknown option: $1" >&2 exit 1 ;; esac done if [ -z "$EVENT_ID" ]; then echo "Error: --event-id is required" >&2 exit 1 fi # Delegate to Python implementation python3 "$UTILS_DIR/event_tracking.py" track \ --event-id "$EVENT_ID" \ --calendar-id "$CALENDAR_ID" \ --email-id "$EMAIL_ID" \ --summary "$SUMMARY" \ --start "$START" ```