Files
faction_war_dispatch_bot/routers/auth.py
2026-01-27 09:48:58 -05:00

165 lines
4.8 KiB
Python

"""Authentication endpoints for login/logout."""
import jwt
from datetime import datetime, timedelta
from fastapi import APIRouter, HTTPException, Request, Response
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import Dict
import config as config_module
router = APIRouter(prefix="/auth", tags=["auth"])
# In-memory storage for failed login attempts
# Format: {ip_address: {"count": int, "locked_until": datetime}}
failed_attempts: Dict[str, Dict] = {}
class LoginRequest(BaseModel):
name: str
password: str
def get_client_ip(request: Request) -> str:
"""Get client IP address from request"""
# Check X-Forwarded-For header first (for proxy/load balancer)
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
return forwarded.split(",")[0].strip()
return request.client.host if request.client else "unknown"
def is_locked_out(ip: str) -> bool:
"""Check if IP is currently locked out"""
if ip not in failed_attempts:
return False
attempt_data = failed_attempts[ip]
locked_until = attempt_data.get("locked_until")
if not locked_until:
return False
# Check if lockout has expired
if datetime.now() >= locked_until:
# Clear the lockout
del failed_attempts[ip]
return False
return True
def record_failed_attempt(ip: str):
"""Record a failed login attempt"""
now = datetime.now()
if ip not in failed_attempts:
failed_attempts[ip] = {"count": 1, "locked_until": None}
else:
failed_attempts[ip]["count"] += 1
# Lock out after 5 failed attempts
if failed_attempts[ip]["count"] >= 5:
failed_attempts[ip]["locked_until"] = now + timedelta(minutes=5)
print(f"IP {ip} locked out until {failed_attempts[ip]['locked_until']}")
def clear_failed_attempts(ip: str):
"""Clear failed attempts for an IP after successful login"""
if ip in failed_attempts:
del failed_attempts[ip]
def create_jwt_token(username: str) -> str:
"""Create a JWT token for the user"""
expiration = datetime.utcnow() + timedelta(days=7) # Token valid for 7 days
payload = {
"username": username,
"exp": expiration
}
token = jwt.encode(payload, config_module.JWT_SECRET, algorithm="HS256")
return token
def verify_jwt_token(token: str) -> dict:
"""Verify and decode a JWT token"""
try:
payload = jwt.decode(token, config_module.JWT_SECRET, algorithms=["HS256"])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
@router.post("/login")
async def login(request: Request, response: Response, req: LoginRequest):
"""Login endpoint with rate limiting"""
client_ip = get_client_ip(request)
# Check if IP is locked out
if is_locked_out(client_ip):
attempt_data = failed_attempts[client_ip]
locked_until = attempt_data["locked_until"]
remaining = (locked_until - datetime.now()).total_seconds()
raise HTTPException(
status_code=429,
detail=f"Too many failed attempts. Try again in {int(remaining)} seconds."
)
# Verify password (universal password for all users)
if req.password != config_module.AUTH_PASSWORD:
record_failed_attempt(client_ip)
attempts_left = 5 - failed_attempts[client_ip]["count"]
if attempts_left <= 0:
raise HTTPException(
status_code=429,
detail="Too many failed attempts. Locked out for 5 minutes."
)
raise HTTPException(
status_code=401,
detail=f"Invalid password. {attempts_left} attempts remaining."
)
# Successful login
clear_failed_attempts(client_ip)
# Create JWT token
token = create_jwt_token(req.name)
# Set token as HTTP-only cookie
response.set_cookie(
key="auth_token",
value=token,
httponly=True,
max_age=7 * 24 * 60 * 60, # 7 days in seconds
samesite="lax"
)
print(f"User '{req.name}' logged in from IP {client_ip}")
return {"status": "success", "username": req.name}
@router.post("/logout")
async def logout(response: Response):
"""Logout endpoint"""
response.delete_cookie("auth_token")
return {"status": "success"}
@router.get("/status")
async def auth_status(request: Request):
"""Check authentication status"""
token = request.cookies.get("auth_token")
if not token:
return {"authenticated": False}
try:
payload = verify_jwt_token(token)
return {"authenticated": True, "username": payload.get("username")}
except HTTPException:
return {"authenticated": False}