114 lines
3.6 KiB
Python
114 lines
3.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
admin_recovery.py
|
|
|
|
Emergency TOTP reset tool. Run ONLY via SSH on the EC2 instance.
|
|
This script is NOT exposed via any HTTP endpoint.
|
|
It resets the TOTP secret for the admin account, prints a new QR code
|
|
and backup codes, which must be scanned in Google Authenticator.
|
|
|
|
Usage (on EC2):
|
|
docker compose exec backend python /app/scripts/admin_recovery.py
|
|
|
|
Or directly:
|
|
ssh ubuntu@your-ec2-ip
|
|
cd ~/soma
|
|
docker compose exec backend python /app/scripts/admin_recovery.py
|
|
"""
|
|
|
|
import asyncio
|
|
import sys
|
|
import os
|
|
import secrets
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
|
|
|
|
async def reset_totp():
|
|
from app.database import AsyncSessionLocal
|
|
from app.models.user import User
|
|
from app.config import settings
|
|
from sqlalchemy import select
|
|
import pyotp
|
|
import qrcode
|
|
import io
|
|
|
|
async with AsyncSessionLocal() as db:
|
|
# Find the admin user (single-tenant: only one user exists)
|
|
result = await db.execute(select(User).limit(1))
|
|
user = result.scalar_one_or_none()
|
|
|
|
if not user:
|
|
print("ERROR: No user found in database. Run the setup flow first.")
|
|
sys.exit(1)
|
|
|
|
print(f"\nResetting TOTP for: {user.email}")
|
|
print("=" * 60)
|
|
|
|
# Confirm action
|
|
confirm = input("Type 'RESET' to confirm TOTP reset: ").strip()
|
|
if confirm != "RESET":
|
|
print("Aborted.")
|
|
sys.exit(0)
|
|
|
|
# Generate new TOTP secret
|
|
new_secret = pyotp.random_base32()
|
|
|
|
# Generate new backup codes (8 codes, 8 chars each)
|
|
backup_codes = [secrets.token_hex(4).upper() for _ in range(8)]
|
|
|
|
# Generate QR code URL
|
|
totp = pyotp.TOTP(new_secret)
|
|
provisioning_uri = totp.provisioning_uri(
|
|
name=user.email,
|
|
issuer_name="SOMA"
|
|
)
|
|
|
|
# Print QR code as ASCII art (requires 'qrcode' library)
|
|
try:
|
|
qr = qrcode.QRCode(version=1, box_size=1, border=1)
|
|
qr.add_data(provisioning_uri)
|
|
qr.make(fit=True)
|
|
print("\nScan this QR code with Google Authenticator / Authy:")
|
|
print("=" * 60)
|
|
qr.print_ascii(invert=True)
|
|
print("=" * 60)
|
|
except Exception:
|
|
print(f"\nProvisioning URI (paste into authenticator app):")
|
|
print(provisioning_uri)
|
|
|
|
print(f"\nManual entry secret: {new_secret}")
|
|
print("\nBACKUP CODES (save these offline — one-time use):")
|
|
print("-" * 40)
|
|
for i, code in enumerate(backup_codes, 1):
|
|
print(f" {i:2}. {code}")
|
|
print("-" * 40)
|
|
|
|
# Verify the new TOTP code before saving
|
|
print("\nEnter a TOTP code from your authenticator app to verify setup:")
|
|
test_code = input("6-digit code: ").strip()
|
|
if not totp.verify(test_code, valid_window=1):
|
|
print("ERROR: TOTP code verification failed. TOTP NOT reset.")
|
|
sys.exit(1)
|
|
|
|
# Save to database
|
|
user.totp_secret = new_secret
|
|
user.totp_backup_codes = backup_codes # stored as plain text; hash in production if desired
|
|
|
|
# Invalidate all existing sessions
|
|
from app.models.user import AuthSession
|
|
from sqlalchemy import update
|
|
await db.execute(
|
|
update(AuthSession)
|
|
.where(AuthSession.user_id == user.id)
|
|
.values(is_active=False)
|
|
)
|
|
|
|
await db.commit()
|
|
|
|
print("\nTOTP reset successful. All existing sessions have been invalidated.")
|
|
print("Log in at your SOMA URL to start a new session.")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
asyncio.run(reset_totp()) |