177 lines
5.7 KiB
Python
177 lines
5.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
seed_from_markdown.py
|
|
|
|
Reads a Markdown file, extracts all unchecked task items (- [ ] lines),
|
|
parses optional metadata from the same line or the preceding heading,
|
|
and inserts them as tasks into the SOMA database.
|
|
|
|
Usage:
|
|
python seed_from_markdown.py --file /path/to/life_plan.md [--dry-run]
|
|
|
|
Format recognized:
|
|
# Project Name
|
|
## Sub-section
|
|
- [ ] Task title @tag #priority due:2026-04-01
|
|
- [x] Already done task (skipped)
|
|
|
|
Metadata extraction:
|
|
@tag → maps to project_tag (e.g., @openclaw, @mak)
|
|
#priority → maps to priority (low|medium|high|critical)
|
|
due:YYYY-MM-DD → maps to due_date
|
|
Default tag: derived from nearest parent heading (slugified, max 50 chars)
|
|
Default priority: medium
|
|
"""
|
|
|
|
import argparse
|
|
import re
|
|
import sys
|
|
import os
|
|
import asyncio
|
|
from datetime import date
|
|
|
|
# Add parent directory to path so we can import app modules
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
|
|
|
|
def parse_markdown_tasks(filepath: str) -> list[dict]:
|
|
"""Parse all unchecked [ ] tasks from a markdown file."""
|
|
tasks = []
|
|
current_tag = "general"
|
|
current_section = ""
|
|
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
lines = f.readlines()
|
|
|
|
for line in lines:
|
|
line = line.rstrip()
|
|
|
|
# Track headings for default tag
|
|
heading_match = re.match(r'^#{1,3}\s+(.+)$', line)
|
|
if heading_match:
|
|
heading_text = heading_match.group(1).strip()
|
|
# Slugify heading: lowercase, replace spaces/special chars with -
|
|
slug = re.sub(r'[^a-z0-9]+', '-', heading_text.lower()).strip('-')[:50]
|
|
# Map common heading words to known tags
|
|
tag_map = {
|
|
'openclaw': 'openclaw', 'mak': 'mak', 'rarity': 'rarity-media',
|
|
'japan': 'japan', 'germany': 'germany', 'personal': 'personal',
|
|
'habit': 'habits', 'retreat': 'retreat'
|
|
}
|
|
mapped = False
|
|
for keyword, tag in tag_map.items():
|
|
if keyword in heading_text.lower():
|
|
current_tag = tag
|
|
mapped = True
|
|
break
|
|
if not mapped:
|
|
current_tag = slug if slug else "general"
|
|
current_section = heading_text
|
|
continue
|
|
|
|
# Match unchecked task: - [ ] or * [ ]
|
|
task_match = re.match(r'^\s*[-*]\s+\[ \]\s+(.+)$', line)
|
|
if not task_match:
|
|
continue
|
|
|
|
raw_title = task_match.group(1).strip()
|
|
|
|
# Extract metadata from title
|
|
tag = current_tag
|
|
priority = "medium"
|
|
due_date = None
|
|
|
|
# @tag
|
|
tag_match = re.search(r'@(\S+)', raw_title)
|
|
if tag_match:
|
|
tag = tag_match.group(1)
|
|
raw_title = raw_title.replace(tag_match.group(0), '').strip()
|
|
|
|
# #priority
|
|
priority_match = re.search(r'#(low|medium|high|critical)', raw_title, re.IGNORECASE)
|
|
if priority_match:
|
|
priority = priority_match.group(1).lower()
|
|
raw_title = raw_title.replace(priority_match.group(0), '').strip()
|
|
|
|
# due:YYYY-MM-DD
|
|
due_match = re.search(r'due:(\d{4}-\d{2}-\d{2})', raw_title)
|
|
if due_match:
|
|
try:
|
|
due_date = due_match.group(1)
|
|
date.fromisoformat(due_date) # validate
|
|
except ValueError:
|
|
due_date = None
|
|
raw_title = raw_title.replace(due_match.group(0), '').strip()
|
|
|
|
# Clean up extra whitespace
|
|
title = re.sub(r'\s+', ' ', raw_title).strip()
|
|
if not title:
|
|
continue
|
|
|
|
# XP based on priority
|
|
xp_map = {'low': 25, 'medium': 50, 'high': 100, 'critical': 200}
|
|
|
|
tasks.append({
|
|
'title': title,
|
|
'description': f"Imported from: {current_section}" if current_section else None,
|
|
'status': 'backlog',
|
|
'priority': priority,
|
|
'project_tag': tag,
|
|
'due_date': due_date,
|
|
'xp_reward': xp_map[priority],
|
|
'position': 0,
|
|
'checklist': [],
|
|
})
|
|
|
|
return tasks
|
|
|
|
|
|
async def insert_tasks(tasks: list[dict], dry_run: bool = False):
|
|
"""Insert parsed tasks into the database."""
|
|
from app.database import AsyncSessionLocal
|
|
from app.models.task import Task
|
|
import uuid
|
|
|
|
if dry_run:
|
|
print(f"DRY RUN: Would insert {len(tasks)} tasks:")
|
|
for i, t in enumerate(tasks, 1):
|
|
print(f" {i:3}. [{t['priority'].upper():8}] [{t['project_tag']:20}] {t['title']}")
|
|
return
|
|
|
|
async with AsyncSessionLocal() as db:
|
|
inserted = 0
|
|
skipped = 0
|
|
for i, task_data in enumerate(tasks):
|
|
# Set position based on insertion order within each status+tag
|
|
task_data['position'] = i
|
|
task = Task(id=uuid.uuid4(), **task_data)
|
|
db.add(task)
|
|
inserted += 1
|
|
|
|
await db.commit()
|
|
print(f"Inserted {inserted} tasks, skipped {skipped}.")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Seed SOMA tasks from a Markdown file')
|
|
parser.add_argument('--file', required=True, help='Path to the Markdown file')
|
|
parser.add_argument('--dry-run', action='store_true', help='Parse only, do not insert')
|
|
args = parser.parse_args()
|
|
|
|
if not os.path.exists(args.file):
|
|
print(f"Error: File not found: {args.file}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
print(f"Parsing: {args.file}")
|
|
tasks = parse_markdown_tasks(args.file)
|
|
print(f"Found {len(tasks)} unchecked tasks")
|
|
|
|
if not tasks:
|
|
print("No tasks found. Check that the file contains '- [ ] ' lines.")
|
|
sys.exit(0)
|
|
|
|
asyncio.run(insert_tasks(tasks, dry_run=args.dry_run))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main() |