This repository has been archived on 2025-11-05. You can view files and clone it, but cannot push or open issues or pull requests.
desktop_app/app/core/http_client.py
2025-09-30 02:09:37 +03:00

108 lines
3.4 KiB
Python

import httpx
from contextvars import ContextVar
from typing import Dict, Any, Optional
# Connection pool & timeouts
limits = httpx.Limits(max_connections=200, max_keepalive_connections=50)
timeout = httpx.Timeout(connect=5.0, read=10.0, write=5.0, pool=5.0)
# Keep a client per event loop/context to avoid cross-loop issues
_client_ctx: ContextVar[httpx.AsyncClient | None] = ContextVar("http_client_ctx", default=None)
def get_client() -> httpx.AsyncClient:
client = _client_ctx.get()
if client is None or client.is_closed:
transport = httpx.AsyncHTTPTransport(http2=True)
client = httpx.AsyncClient(transport=transport, limits=limits, timeout=timeout)
_client_ctx.set(client)
return client
async def aclose_client():
client = _client_ctx.get()
if client is not None and not client.is_closed:
await client.aclose()
_client_ctx.set(None)
# --- Authorized helpers with auto-refresh on 401 ---
async def authorized_get(
url: str,
*,
login: Optional[str] = None,
access_token: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
params: Optional[Dict[str, Any]] = None,
) -> httpx.Response:
hdrs = dict(headers or {})
if access_token:
hdrs["Authorization"] = f"Bearer {access_token}"
resp = await get_client().get(url, headers=hdrs, params=params)
if resp.status_code != 401 or not login or not access_token:
return resp
print("authorized_get 401")
# Try refresh flow lazily to avoid import cycle at import time
from app.core.database import get_session, logout
from app.core.services.auth_service import refresh_token as do_refresh
session = get_session(login)
if not session:
return resp
try:
refresh = session["refresh_token"]
except Exception:
refresh = session.get("refresh_token") if isinstance(session, dict) else None
if not refresh:
return resp
ok, data = await do_refresh(access_token, refresh)
if ok:
new_access = data["access_token"]
hdrs["Authorization"] = f"Bearer {new_access}"
return await get_client().get(url, headers=hdrs, params=params)
return resp
async def authorized_post(
url: str,
*,
login: Optional[str] = None,
access_token: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
params: Optional[Dict[str, Any]] = None,
json: Any = None,
data: Any = None,
) -> httpx.Response:
hdrs = dict(headers or {})
if access_token:
hdrs["Authorization"] = f"Bearer {access_token}"
resp = await get_client().post(url, headers=hdrs, params=params, json=json, data=data)
if resp.status_code != 401 or not login or not access_token:
return resp
print("authorized_post 401")
from app.core.database import get_session, logout
from app.core.services.auth_service import refresh_token as do_refresh
session = get_session(login)
if not session:
return resp
try:
refresh = session["refresh_token"]
except Exception:
refresh = session.get("refresh_token") if isinstance(session, dict) else None
if not refresh:
return resp
ok, data = await do_refresh(access_token, refresh)
if ok:
new_access = data["access_token"]
hdrs["Authorization"] = f"Bearer {new_access}"
return await get_client().post(url, headers=hdrs, params=params, json=json, data=data)
return resp