2025-10-23 00:54:59 +03:00

208 lines
6.9 KiB
Python

import httpx
from fastapi import Depends, Request, HTTPException, status, Header
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import List
from dataclasses import dataclass
from config import settings
# from .ssl_transport import ssl_transport
from .validators import validate_username as core_validate_username, validate_password as core_validate_password
from common_lib.utils.http_client import client
auth_scheme = HTTPBearer()
@dataclass
class CurrentUser:
token: str
is_bot: str
user_id: str
session_id: str
permissions: List[str]
async def _fetch_current_user(
request: Request,
credentials: HTTPAuthorizationCredentials,
require_permissions: bool,
is_bot=False
) -> CurrentUser:
token = credentials.credentials
ip = request.client.host or "(unknown)"
user_agent = request.headers.get("User-Agent", "(unknown)")
try:
# async with httpx.AsyncClient(transport=ssl_transport, timeout=5.0) as client:
response = await client.post(
f"{settings.TOKEN_SERVICE}/decode",
json={
"token": token,
"ip": ip,
"user_agent": user_agent,
"require_permissions": require_permissions,
"is_bot": is_bot
},
)
except httpx.RequestError as e:
print("_fetch_current_user error", e)
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Token service unavailable")
if response.status_code != 200:
raise HTTPException(
status_code=response.status_code,
detail=f"token_service: {response.text}"
)
wrapped = response.json()
data = wrapped["data"]
return CurrentUser(
token=token,
is_bot=data["is_bot"],
user_id=data["user_id"],
session_id=data["session_id"],
permissions=data["permissions"]
)
class AuthInvalidToken(Exception): pass
class AuthSessionNotFound(Exception): pass
class AuthPermissionDenied(Exception): pass
class AuthConflictError(Exception): pass
class AuthServiceError(Exception): pass
async def fetch_user_for_sio(
token: str,
ip: str,
user_agent: str,
require_permissions: bool = False,
is_bot: bool = False
) -> CurrentUser:
"""
Fetches user data for Socket.IO, decoupled from FastAPI's Request object.
"""
try:
response = await client.post(
f"{settings.TOKEN_SERVICE}/decode",
json={
"token": token,
"ip": ip,
"user_agent": user_agent,
"require_permissions": require_permissions,
"is_bot": is_bot,
},
)
except httpx.RequestError as e:
# Сервис недоступен (network error / timeout)
raise AuthServiceError("Token service unavailable") from e
# Если не 200, разбираем ошибку
if response.status_code != 200:
try:
error_json = response.json()
message = error_json.get("errors", [{}])[0].get("message", "Unknown error")
except ValueError:
message = response.text or "Unknown error"
print("response.status_code", response.status_code)
# Разные статусы → разные исключения
if response.status_code in (400, 401):
raise AuthInvalidToken(message)
elif response.status_code == 403:
raise AuthPermissionDenied(message)
elif response.status_code == 404:
raise AuthSessionNotFound(message)
elif response.status_code == 409:
raise AuthConflictError(message)
elif response.status_code >= 500:
raise AuthServiceError("Authentication service error: " + message)
else:
# fallback (нестандартный статус)
raise AuthServiceError(message)
# Успех
wrapped = response.json()
data = wrapped["data"]
return CurrentUser(
token=token,
is_bot=data["is_bot"],
user_id=data["user_id"],
session_id=data["session_id"],
permissions=data["permissions"]
)
async def get_current_user(
request: Request,
credentials: HTTPAuthorizationCredentials = Depends(auth_scheme)
) -> CurrentUser:
return await _fetch_current_user(request, credentials, require_permissions=False)
async def get_current_user_with_permissions(
request: Request,
credentials: HTTPAuthorizationCredentials = Depends(auth_scheme)
) -> CurrentUser:
return await _fetch_current_user(request, credentials, require_permissions=True)
async def get_current_user_or_bot(
request: Request,
is_bot: str | None = Header(default=None),
credentials: HTTPAuthorizationCredentials = Depends(auth_scheme)
) -> CurrentUser:
is_bot_flag = str(is_bot).lower() in ("true", "1", "yes") if is_bot else False
return await _fetch_current_user(request, credentials, require_permissions=False, is_bot=is_bot_flag)
def validate_username(value: str,
field_name: str = "login",
with_httpexception=False,
need_back=False,
min_length=3,
max_length=32,
error_status_code: int = status.HTTP_401_UNAUTHORIZED,
error_detail: str = "Invalid login or password") -> str:
"""
Username validator:
- checks for length
- no spaces
- no leading underscore
- no consecutive underscores
- only [A-Za-z0-9_]
"""
valid, result = core_validate_username(value, field_name, need_back=True, min_length=min_length, max_length=max_length)
if not valid:
if with_httpexception:
raise HTTPException(status_code=error_status_code, detail=error_detail)
if need_back:
return False, result
raise ValueError(result)
return result if need_back else result
def validate_password(value: str,
field_name: str = "password",
with_httpexception=False,
need_back=False,
min_length=8,
max_length=128,
error_status_code: int = status.HTTP_401_UNAUTHORIZED,
error_detail: str = "Invalid login or password") -> str:
"""
Validates password length and (optionally) other rules.
Supports HTTPException raising if `with_httpexception=True`.
Returns (True, value) or raises ValueError.
"""
valid, result = core_validate_password(value, field_name, need_back=True, min_length=min_length, max_length=max_length)
if not valid:
if with_httpexception:
raise HTTPException(status_code=error_status_code, detail=error_detail)
if need_back:
return False, result
raise ValueError(result)
return result if need_back else result