2025-09-27 01:31:44 +03:00

134 lines
4.3 KiB
Python

import httpx
from fastapi import Depends, Request, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import List
from dataclasses import dataclass
from config import settings
from .ssl_transport import ssl_transport
from .validators import validate_username as core_validate_username, validate_password as core_validate_password
limits = httpx.Limits(max_keepalive_connections=200, max_connections=1000)
timeout = httpx.Timeout(connect=5.0, read=10.0, write=5.0, pool=5.0)
client = httpx.AsyncClient(
transport=ssl_transport,
limits=limits,
timeout=timeout
)
auth_scheme = HTTPBearer()
@dataclass
class CurrentUser:
token: str
user_id: str
session_id: str
permissions: List[str]
async def _fetch_current_user(
request: Request,
credentials: HTTPAuthorizationCredentials,
require_permissions: bool
) -> CurrentUser:
token = credentials.credentials
ip = request.client.host or "(unknown)"
user_agent = request.headers.get("User-Agent", "(unknown)")
try:
# async with httpx.AsyncClient(transport=ssl_transport, timeout=5.0) as client:
response = await client.post(
f"{settings.TOKEN_SERVICE}/decode",
json={
"token": token,
"ip": ip,
"user_agent": user_agent,
"require_permissions": require_permissions
},
)
except httpx.RequestError as e:
print("_fetch_current_user error", e)
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Token service unavailable")
if response.status_code != 200:
raise HTTPException(
status_code=response.status_code,
detail=f"token_service: {response.text}"
)
wrapped = response.json()
data = wrapped["data"]
return CurrentUser(
token=token,
user_id=data["user_id"],
session_id=data["session_id"],
permissions=data["permissions"]
)
async def get_current_user(
request: Request,
credentials: HTTPAuthorizationCredentials = Depends(auth_scheme)
) -> CurrentUser:
return await _fetch_current_user(request, credentials, require_permissions=False)
async def get_current_user_with_permissions(
request: Request,
credentials: HTTPAuthorizationCredentials = Depends(auth_scheme)
) -> CurrentUser:
return await _fetch_current_user(request, credentials, require_permissions=True)
def validate_username(value: str,
field_name: str = "login",
with_httpexception=False,
need_back=False,
min_length=3,
max_length=32,
error_status_code: int = status.HTTP_401_UNAUTHORIZED,
error_detail: str = "Invalid login or password") -> str:
"""
Username validator:
- checks for length
- no spaces
- no leading underscore
- no consecutive underscores
- only [A-Za-z0-9_]
"""
valid, result = core_validate_username(value, field_name, need_back=True, min_length=min_length, max_length=max_length)
if not valid:
if with_httpexception:
raise HTTPException(status_code=error_status_code, detail=error_detail)
if need_back:
return False, result
raise ValueError(result)
return result if need_back else result
def validate_password(value: str,
field_name: str = "password",
with_httpexception=False,
need_back=False,
min_length=8,
max_length=128,
error_status_code: int = status.HTTP_401_UNAUTHORIZED,
error_detail: str = "Invalid login or password") -> str:
"""
Validates password length and (optionally) other rules.
Supports HTTPException raising if `with_httpexception=True`.
Returns (True, value) or raises ValueError.
"""
valid, result = core_validate_password(value, field_name, need_back=True, min_length=min_length, max_length=max_length)
if not valid:
if with_httpexception:
raise HTTPException(status_code=error_status_code, detail=error_detail)
if need_back:
return False, result
raise ValueError(result)
return result if need_back else result