[SOMA][BOT-3][INFRA] skeleton: project folder structure and all package manifests
This commit is contained in:
114
soma/scripts/admin_recovery.py
Normal file
114
soma/scripts/admin_recovery.py
Normal file
@@ -0,0 +1,114 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
admin_recovery.py
|
||||
|
||||
Emergency TOTP reset tool. Run ONLY via SSH on the EC2 instance.
|
||||
This script is NOT exposed via any HTTP endpoint.
|
||||
It resets the TOTP secret for the admin account, prints a new QR code
|
||||
and backup codes, which must be scanned in Google Authenticator.
|
||||
|
||||
Usage (on EC2):
|
||||
docker compose exec backend python /app/scripts/admin_recovery.py
|
||||
|
||||
Or directly:
|
||||
ssh ubuntu@your-ec2-ip
|
||||
cd ~/soma
|
||||
docker compose exec backend python /app/scripts/admin_recovery.py
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
import os
|
||||
import secrets
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
||||
|
||||
|
||||
async def reset_totp():
|
||||
from app.database import AsyncSessionLocal
|
||||
from app.models.user import User
|
||||
from app.config import settings
|
||||
from sqlalchemy import select
|
||||
import pyotp
|
||||
import qrcode
|
||||
import io
|
||||
|
||||
async with AsyncSessionLocal() as db:
|
||||
# Find the admin user (single-tenant: only one user exists)
|
||||
result = await db.execute(select(User).limit(1))
|
||||
user = result.scalar_one_or_none()
|
||||
|
||||
if not user:
|
||||
print("ERROR: No user found in database. Run the setup flow first.")
|
||||
sys.exit(1)
|
||||
|
||||
print(f"\nResetting TOTP for: {user.email}")
|
||||
print("=" * 60)
|
||||
|
||||
# Confirm action
|
||||
confirm = input("Type 'RESET' to confirm TOTP reset: ").strip()
|
||||
if confirm != "RESET":
|
||||
print("Aborted.")
|
||||
sys.exit(0)
|
||||
|
||||
# Generate new TOTP secret
|
||||
new_secret = pyotp.random_base32()
|
||||
|
||||
# Generate new backup codes (8 codes, 8 chars each)
|
||||
backup_codes = [secrets.token_hex(4).upper() for _ in range(8)]
|
||||
|
||||
# Generate QR code URL
|
||||
totp = pyotp.TOTP(new_secret)
|
||||
provisioning_uri = totp.provisioning_uri(
|
||||
name=user.email,
|
||||
issuer_name="SOMA"
|
||||
)
|
||||
|
||||
# Print QR code as ASCII art (requires 'qrcode' library)
|
||||
try:
|
||||
qr = qrcode.QRCode(version=1, box_size=1, border=1)
|
||||
qr.add_data(provisioning_uri)
|
||||
qr.make(fit=True)
|
||||
print("\nScan this QR code with Google Authenticator / Authy:")
|
||||
print("=" * 60)
|
||||
qr.print_ascii(invert=True)
|
||||
print("=" * 60)
|
||||
except Exception:
|
||||
print(f"\nProvisioning URI (paste into authenticator app):")
|
||||
print(provisioning_uri)
|
||||
|
||||
print(f"\nManual entry secret: {new_secret}")
|
||||
print("\nBACKUP CODES (save these offline — one-time use):")
|
||||
print("-" * 40)
|
||||
for i, code in enumerate(backup_codes, 1):
|
||||
print(f" {i:2}. {code}")
|
||||
print("-" * 40)
|
||||
|
||||
# Verify the new TOTP code before saving
|
||||
print("\nEnter a TOTP code from your authenticator app to verify setup:")
|
||||
test_code = input("6-digit code: ").strip()
|
||||
if not totp.verify(test_code, valid_window=1):
|
||||
print("ERROR: TOTP code verification failed. TOTP NOT reset.")
|
||||
sys.exit(1)
|
||||
|
||||
# Save to database
|
||||
user.totp_secret = new_secret
|
||||
user.totp_backup_codes = backup_codes # stored as plain text; hash in production if desired
|
||||
|
||||
# Invalidate all existing sessions
|
||||
from app.models.user import AuthSession
|
||||
from sqlalchemy import update
|
||||
await db.execute(
|
||||
update(AuthSession)
|
||||
.where(AuthSession.user_id == user.id)
|
||||
.values(is_active=False)
|
||||
)
|
||||
|
||||
await db.commit()
|
||||
|
||||
print("\nTOTP reset successful. All existing sessions have been invalidated.")
|
||||
print("Log in at your SOMA URL to start a new session.")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
asyncio.run(reset_totp())
|
||||
Reference in New Issue
Block a user