108 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			108 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import httpx
 | 
						|
from contextvars import ContextVar
 | 
						|
from typing import Dict, Any, Optional
 | 
						|
 | 
						|
# Connection pool & timeouts
 | 
						|
limits = httpx.Limits(max_connections=200, max_keepalive_connections=50)
 | 
						|
timeout = httpx.Timeout(connect=5.0, read=10.0, write=5.0, pool=5.0)
 | 
						|
 | 
						|
# Keep a client per event loop/context to avoid cross-loop issues
 | 
						|
_client_ctx: ContextVar[httpx.AsyncClient | None] = ContextVar("http_client_ctx", default=None)
 | 
						|
 | 
						|
def get_client() -> httpx.AsyncClient:
 | 
						|
    client = _client_ctx.get()
 | 
						|
    if client is None or client.is_closed:
 | 
						|
        transport = httpx.AsyncHTTPTransport(http2=True)
 | 
						|
        client = httpx.AsyncClient(transport=transport, limits=limits, timeout=timeout)
 | 
						|
        _client_ctx.set(client)
 | 
						|
    return client
 | 
						|
 | 
						|
async def aclose_client():
 | 
						|
    client = _client_ctx.get()
 | 
						|
    if client is not None and not client.is_closed:
 | 
						|
        await client.aclose()
 | 
						|
        _client_ctx.set(None)
 | 
						|
 | 
						|
# --- Authorized helpers with auto-refresh on 401 ---
 | 
						|
async def authorized_get(
 | 
						|
    url: str,
 | 
						|
    *,
 | 
						|
    login: Optional[str] = None,
 | 
						|
    access_token: Optional[str] = None,
 | 
						|
    headers: Optional[Dict[str, str]] = None,
 | 
						|
    params: Optional[Dict[str, Any]] = None,
 | 
						|
) -> httpx.Response:
 | 
						|
    hdrs = dict(headers or {})
 | 
						|
    if access_token:
 | 
						|
        hdrs["Authorization"] = f"Bearer {access_token}"
 | 
						|
 | 
						|
    resp = await get_client().get(url, headers=hdrs, params=params)
 | 
						|
    if resp.status_code != 401 or not login or not access_token:
 | 
						|
        return resp
 | 
						|
 | 
						|
    print("authorized_get 401")
 | 
						|
    # Try refresh flow lazily to avoid import cycle at import time
 | 
						|
    from app.core.database import get_session, logout
 | 
						|
    from app.core.services.auth_service import refresh_token as do_refresh
 | 
						|
 | 
						|
    session = get_session(login)
 | 
						|
    if not session:
 | 
						|
        return resp
 | 
						|
    try:
 | 
						|
        refresh = session["refresh_token"]
 | 
						|
    except Exception:
 | 
						|
        refresh = session.get("refresh_token") if isinstance(session, dict) else None
 | 
						|
    if not refresh:
 | 
						|
        return resp
 | 
						|
 | 
						|
    ok, data = await do_refresh(access_token, refresh)
 | 
						|
    if ok:
 | 
						|
        new_access = data["access_token"]
 | 
						|
        hdrs["Authorization"] = f"Bearer {new_access}"
 | 
						|
        return await get_client().get(url, headers=hdrs, params=params)
 | 
						|
    return resp
 | 
						|
 | 
						|
async def authorized_post(
 | 
						|
    url: str,
 | 
						|
    *,
 | 
						|
    login: Optional[str] = None,
 | 
						|
    access_token: Optional[str] = None,
 | 
						|
    headers: Optional[Dict[str, str]] = None,
 | 
						|
    params: Optional[Dict[str, Any]] = None,
 | 
						|
    json: Any = None,
 | 
						|
    data: Any = None,
 | 
						|
) -> httpx.Response:
 | 
						|
    hdrs = dict(headers or {})
 | 
						|
    if access_token:
 | 
						|
        hdrs["Authorization"] = f"Bearer {access_token}"
 | 
						|
 | 
						|
    resp = await get_client().post(url, headers=hdrs, params=params, json=json, data=data)
 | 
						|
    if resp.status_code != 401 or not login or not access_token:
 | 
						|
        return resp
 | 
						|
 | 
						|
    print("authorized_post 401")
 | 
						|
    from app.core.database import get_session, logout
 | 
						|
    from app.core.services.auth_service import refresh_token as do_refresh
 | 
						|
 | 
						|
    session = get_session(login)
 | 
						|
    if not session:
 | 
						|
        return resp
 | 
						|
    try:
 | 
						|
        refresh = session["refresh_token"]
 | 
						|
    except Exception:
 | 
						|
        refresh = session.get("refresh_token") if isinstance(session, dict) else None
 | 
						|
    if not refresh:
 | 
						|
        return resp
 | 
						|
 | 
						|
    ok, data = await do_refresh(access_token, refresh)
 | 
						|
    if ok:
 | 
						|
        new_access = data["access_token"]
 | 
						|
        hdrs["Authorization"] = f"Bearer {new_access}"
 | 
						|
        return await get_client().post(url, headers=hdrs, params=params, json=json, data=data)
 | 
						|
    return resp
 | 
						|
 | 
						|
 | 
						|
 | 
						|
 | 
						|
 |