desktop_app/app/core/http_client.py
2025-09-30 01:07:43 +03:00

93 lines
3.1 KiB
Python

import httpx
from contextvars import ContextVar
from typing import Dict, Any, Optional
# Connection pool & timeouts
limits = httpx.Limits(max_connections=200, max_keepalive_connections=50)
timeout = httpx.Timeout(connect=5.0, read=10.0, write=5.0, pool=5.0)
# Keep a client per event loop/context to avoid cross-loop issues
_client_ctx: ContextVar[httpx.AsyncClient | None] = ContextVar("http_client_ctx", default=None)
def get_client() -> httpx.AsyncClient:
client = _client_ctx.get()
if client is None or client.is_closed:
transport = httpx.AsyncHTTPTransport(http2=True)
client = httpx.AsyncClient(transport=transport, limits=limits, timeout=timeout)
_client_ctx.set(client)
return client
async def aclose_client():
client = _client_ctx.get()
if client is not None and not client.is_closed:
await client.aclose()
_client_ctx.set(None)
# --- Authorized helpers with auto-refresh on 401 ---
async def authorized_get(
url: str,
*,
login: Optional[str] = None,
access_token: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
params: Optional[Dict[str, Any]] = None,
) -> httpx.Response:
hdrs = dict(headers or {})
if access_token:
hdrs["Authorization"] = f"Bearer {access_token}"
resp = await get_client().get(url, headers=hdrs, params=params)
if resp.status_code != 401 or not login or not access_token:
return resp
# Try refresh flow lazily to avoid import cycle at import time
from app.core.database import get_session, logout
from app.core.services.auth_service import refresh_token as do_refresh
session = get_session(login)
if not session or not session.get("refresh_token"):
return resp
ok, data = await do_refresh(access_token, session["refresh_token"])
if ok:
new_access = data["access_token"]
hdrs["Authorization"] = f"Bearer {new_access}"
return await get_client().get(url, headers=hdrs, params=params)
logout(access_token)
return resp
async def authorized_post(
url: str,
*,
login: Optional[str] = None,
access_token: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
params: Optional[Dict[str, Any]] = None,
json: Any = None,
data: Any = None,
) -> httpx.Response:
hdrs = dict(headers or {})
if access_token:
hdrs["Authorization"] = f"Bearer {access_token}"
resp = await get_client().post(url, headers=hdrs, params=params, json=json, data=data)
if resp.status_code != 401 or not login or not access_token:
return resp
from app.core.database import get_session, logout
from app.core.services.auth_service import refresh_token as do_refresh
session = get_session(login)
if not session or not session.get("refresh_token"):
return resp
ok, data = await do_refresh(access_token, session["refresh_token"])
if ok:
new_access = data["access_token"]
hdrs["Authorization"] = f"Bearer {new_access}"
return await get_client().post(url, headers=hdrs, params=params, json=json, data=data)
logout(access_token)
return resp