#!/usr/bin/env python3 """ admin_recovery.py Emergency TOTP reset tool. Run ONLY via SSH on the EC2 instance. This script is NOT exposed via any HTTP endpoint. It resets the TOTP secret for the admin account, prints a new QR code and backup codes, which must be scanned in Google Authenticator. Usage (on EC2): docker compose exec backend python /app/scripts/admin_recovery.py Or directly: ssh ubuntu@your-ec2-ip cd ~/soma docker compose exec backend python /app/scripts/admin_recovery.py """ import asyncio import sys import os import secrets sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) async def reset_totp(): from app.database import AsyncSessionLocal from app.models.user import User from app.config import settings from sqlalchemy import select import pyotp import qrcode import io async with AsyncSessionLocal() as db: # Find the admin user (single-tenant: only one user exists) result = await db.execute(select(User).limit(1)) user = result.scalar_one_or_none() if not user: print("ERROR: No user found in database. Run the setup flow first.") sys.exit(1) print(f"\nResetting TOTP for: {user.email}") print("=" * 60) # Confirm action confirm = input("Type 'RESET' to confirm TOTP reset: ").strip() if confirm != "RESET": print("Aborted.") sys.exit(0) # Generate new TOTP secret new_secret = pyotp.random_base32() # Generate new backup codes (8 codes, 8 chars each) backup_codes = [secrets.token_hex(4).upper() for _ in range(8)] # Generate QR code URL totp = pyotp.TOTP(new_secret) provisioning_uri = totp.provisioning_uri( name=user.email, issuer_name="SOMA" ) # Print QR code as ASCII art (requires 'qrcode' library) try: qr = qrcode.QRCode(version=1, box_size=1, border=1) qr.add_data(provisioning_uri) qr.make(fit=True) print("\nScan this QR code with Google Authenticator / Authy:") print("=" * 60) qr.print_ascii(invert=True) print("=" * 60) except Exception: print(f"\nProvisioning URI (paste into authenticator app):") print(provisioning_uri) print(f"\nManual entry secret: {new_secret}") print("\nBACKUP CODES (save these offline — one-time use):") print("-" * 40) for i, code in enumerate(backup_codes, 1): print(f" {i:2}. {code}") print("-" * 40) # Verify the new TOTP code before saving print("\nEnter a TOTP code from your authenticator app to verify setup:") test_code = input("6-digit code: ").strip() if not totp.verify(test_code, valid_window=1): print("ERROR: TOTP code verification failed. TOTP NOT reset.") sys.exit(1) # Save to database user.totp_secret = new_secret user.totp_backup_codes = backup_codes # stored as plain text; hash in production if desired # Invalidate all existing sessions from app.models.user import AuthSession from sqlalchemy import update await db.execute( update(AuthSession) .where(AuthSession.user_id == user.id) .values(is_active=False) ) await db.commit() print("\nTOTP reset successful. All existing sessions have been invalidated.") print("Log in at your SOMA URL to start a new session.") if __name__ == '__main__': asyncio.run(reset_totp())