2025-07-10 19:42:23 +03:00

176 lines
6.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import httpx
import re
from fastapi import Depends, Request, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import List
from dataclasses import dataclass
from config import settings
from .utils.ssl_transport import ssl_transport
auth_scheme = HTTPBearer()
@dataclass
class CurrentUser:
token: str
user_id: str
session_id: str
permissions: List[str]
async def _fetch_current_user(
request: Request,
credentials: HTTPAuthorizationCredentials,
require_permissions: bool
) -> CurrentUser:
token = credentials.credentials
ip = request.client.host or "(unknown)"
user_agent = request.headers.get("User-Agent", "(unknown)")
try:
async with httpx.AsyncClient(transport=ssl_transport, timeout=5.0) as client:
response = await client.post(
f"{settings.TOKEN_SERVICE}/decode",
json={
"token": token,
"ip": ip,
"user_agent": user_agent,
"require_permissions": require_permissions
},
)
except httpx.RequestError:
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Token service unavailable")
if response.status_code != 200:
raise HTTPException(
status_code=response.status_code,
detail=f"token_service: {response.text}"
)
data = response.json()
return CurrentUser(
token=token,
user_id=data["user_id"],
session_id=data["session_id"],
permissions=data["permissions"]
)
async def get_current_user(
request: Request,
credentials: HTTPAuthorizationCredentials = Depends(auth_scheme)
) -> CurrentUser:
return await _fetch_current_user(request, credentials, require_permissions=False)
async def get_current_user_with_permissions(
request: Request,
credentials: HTTPAuthorizationCredentials = Depends(auth_scheme)
) -> CurrentUser:
return await _fetch_current_user(request, credentials, require_permissions=True)
def validate_username(value: str,
field_name: str = "login",
with_httpexception=False,
need_back=False,
min_length=3,
max_length=32,
error_status_code: int = status.HTTP_401_UNAUTHORIZED,
error_detail: str = "Invalid login or password") -> str:
"""
Username validator:
- checks for length
- no spaces
- no leading underscore
- no consecutive underscores
- only [A-Za-z0-9_]
"""
# Проверка типа и длины
if not isinstance(value, str) or not (min_length <= len(value) <= max_length):
msg = f"{field_name.capitalize()} must be between {min_length} and {max_length} characters long"
if with_httpexception:
raise HTTPException(status_code=error_status_code, detail=error_detail)
if need_back:
return False, msg
raise ValueError(msg)
# Пробелы
if any(c.isspace() for c in value):
msg = f"{field_name.capitalize()} must not contain whitespace characters"
if with_httpexception:
raise HTTPException(status_code=error_status_code, detail=error_detail)
if need_back:
return False, msg
raise ValueError(msg)
# Начинается с подчеркивания
if value.startswith('_'):
msg = f"{field_name.capitalize()} must not start with an underscore"
if with_httpexception:
raise HTTPException(status_code=error_status_code, detail=error_detail)
if need_back:
return False, msg
raise ValueError(msg)
# Двойные подчеркивания
if '__' in value:
msg = f"{field_name.capitalize()} must not contain consecutive underscores"
if with_httpexception:
raise HTTPException(status_code=error_status_code, detail=error_detail)
if need_back:
return False, msg
raise ValueError(msg)
# Только допустимые символы
if not re.fullmatch(r'[A-Za-z0-9_]+', value):
msg = f"{field_name.capitalize()} must contain only English letters, digits, and underscores"
if with_httpexception:
raise HTTPException(status_code=error_status_code, detail=error_detail)
if need_back:
return False, msg
raise ValueError(msg)
if need_back:
return True, value.lower()
return value.lower()
def validate_password(value: str,
field_name: str = "password",
with_httpexception=False,
need_back=False,
min_length=8,
max_length=128,
error_status_code: int = status.HTTP_401_UNAUTHORIZED,
error_detail: str = "Invalid login or password") -> str:
"""
Validates password length and (optionally) other rules.
Supports HTTPException raising if `with_httpexception=True`.
Returns (True, value) or raises ValueError.
"""
# Проверка типа и длины
if not isinstance(value, str) or not (min_length <= len(value) <= max_length):
msg = f"{field_name.capitalize()} must be between {min_length} and {max_length} characters long"
if with_httpexception:
raise HTTPException(status_code=error_status_code, detail=error_detail)
if need_back:
return False, msg
raise ValueError(msg)
# if any(c.isspace() for c in value):
# raise ValueError(f"{field_name.capitalize()} must not contain whitespace characters")
# if not re.search(r'[A-Z]', value):
# raise ValueError(f"{field_name.capitalize()} must contain at least one uppercase letter")
# if not re.search(r'\d', value):
# raise ValueError(f"{field_name.capitalize()} must contain at least one digit")
# if not re.search(r'[!@#$%^&*()\-_=+\[\]{};:\'",.<>?/|\\]', value):
# raise ValueError(f"{field_name.capitalize()} must contain at least one special character")
if need_back:
return True, value
return value