add refresh

This commit is contained in:
unknown 2025-09-29 01:00:17 +03:00
parent ddb95ae852
commit e8427ceb5d
3 changed files with 96 additions and 44 deletions

View File

@ -50,18 +50,37 @@ def init_db():
conn.commit()
conn.close()
def add_session(login, access_token, refresh_token):
def add_session(login, access_token, refresh_token, update_existing=False):
"""Добавляет новую сессию или обновляет существующую."""
conn = get_connection()
cursor = conn.cursor()
# REPLACE INTO - удобный способ для вставки или обновления
if update_existing:
# Обновляем существующую сессию по access_token
cursor.execute('''
UPDATE sessions
SET access_token = ?, refresh_token = ?, created_at = ?
WHERE access_token = ?
''', (access_token, refresh_token, datetime.now(), access_token))
else:
# Вставляем новую или заменяем существующую по логину
cursor.execute('''
INSERT OR REPLACE INTO sessions (login, access_token, refresh_token, created_at)
VALUES (?, ?, ?, ?)
''', (login, access_token, refresh_token, datetime.now()))
conn.commit()
conn.close()
def logout(access_token: str):
"""Удаляет сессию по токену доступа."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute('DELETE FROM sessions WHERE access_token = ?', (access_token,))
conn.commit()
conn.close()
def get_session(login: str):
"""Получает сессию по логину."""
conn = get_connection()
@ -80,13 +99,6 @@ def get_all_sessions():
conn.close()
return sessions
def delete_session(login: str):
"""Удаляет сессию по логину."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute('DELETE FROM sessions WHERE login = ?', (login,))
conn.commit()
conn.close()
def get_last_login():
"""Получает логин последнего вошедшего пользователя."""

View File

@ -1,7 +1,7 @@
import httpx
import asyncio
from app.core import config
from app.core.database import add_session
from app.core.database import add_session, logout, get_session
from app.core.localizer import localizer
async def login(login, password):
@ -91,50 +91,90 @@ async def register(login, password, invite=None):
return False, f"{localizer.translate('Произошла ошибка')}: {e}"
async def get_user_role(access_token: str):
async def refresh_token(access_token: str, refresh_token: str):
"""
Получает роль и права пользователя по токену доступа.
Обновляет токен доступа, используя токен обновления.
:param access_token: Токен доступа пользователя
:return: Кортеж (успех: bool, данные: UserRoleData | str)
:param access_token: Истекший токен доступа
:param refresh_token: Токен обновления
:return: Кортеж (успех: bool, данные: dict | str)
"""
url = f"{config.BASE_URL}/v1/user/role"
headers = {"Authorization": f"Bearer {access_token}"}
url = f"{config.BASE_URL}/v1/auth/token/refresh"
payload = {"access_token": access_token, "refresh_token": refresh_token}
timeout = httpx.Timeout(connect=5.0, read=10.0, write=5.0, pool=5.0)
try:
async with httpx.AsyncClient(http2=True, timeout=timeout) as client:
response = await client.get(url, headers=headers)
print("headers", headers,"response", response, "status", response.json().get("status"))
async with httpx.AsyncClient(http2=True) as client:
response = await client.post(url, json=payload)
if response.status_code == 200:
data = response.json()
if data.get("status") == "fine":
# Здесь можно добавить валидацию через Pydantic, если необходимо
# from app.core.models.user_models import UserRoleData
# user_role_data = UserRoleData(**data['data'])
return True, data['data']
token_data = data["data"]
# Обновляем сессию с новыми токенами
add_session(
login=None, # Логин не требуется для обновления
access_token=token_data["access_token"],
refresh_token=token_data["refresh_token"],
update_existing=True
)
return True, token_data
else:
return False, data.get("detail", localizer.translate("Неизвестная ошибка ответа"))
return False, data.get("detail", "Unknown error")
elif response.status_code == 401:
return False, localizer.translate("Токен недействителен или истек")
elif response.status_code == 404:
return False, localizer.translate("Пользователь не найден")
return False, "Refresh token is invalid or expired"
else:
return False, f"{localizer.translate('Ошибка сервера')}: {response.status_code}"
return False, f"Server error: {response.status_code}"
except httpx.RequestError as e:
return False, f"{localizer.translate('Ошибка сети')}: {e}"
return False, f"Network error: {e}"
except Exception as e:
return False, f"{localizer.translate('Произошла ошибка')}: {e}"
return False, f"An error occurred: {e}"
# Пример использования (для тестирования)
async def main():
# Замените на реальные данные для теста
success, message = await login("testuser", "testpassword")
print(f"Результат входа: {success}, Сообщение: {message}")
async def get_user_role(access_token: str, login: str):
"""
Получает роль и права пользователя по токену доступа.
В случае истечения срока действия токена, пытается его обновить.
"""
url = f"{config.BASE_URL}/v1/user/role"
headers = {"Authorization": f"Bearer {access_token}"}
if __name__ == "__main__":
asyncio.run(main())
try:
async with httpx.AsyncClient(http2=True) as client:
response = await client.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
return (True, data['data']) if data.get("status") == "fine" else (False, data.get("detail", "Unknown error"))
elif response.status_code == 401:
# Токен истек, пытаемся обновить
session = get_session(login)
if not session or not session['refresh_token']:
return False, "No refresh token found"
refresh_success, refresh_data = await refresh_token(access_token, session['refresh_token'])
if refresh_success:
# Повторяем запрос с новым токеном
new_access_token = refresh_data['access_token']
headers["Authorization"] = f"Bearer {new_access_token}"
response = await client.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
return (True, data['data']) if data.get("status") == "fine" else (False, "Failed to get role after refresh")
# Если обновление не удалось, выходим из системы
logout(access_token)
return False, "Session expired, please log in again"
else:
return False, f"Server error: {response.status_code}"
except httpx.RequestError as e:
return False, f"Network error: {e}"
except Exception as e:
return False, f"An error occurred: {e}"

View File

@ -305,7 +305,7 @@ class YobbleHomeView(QWidget):
print("[Permissions] Preload failed: No access token.")
return
success, data = await get_user_role(access_token)
success, data = await get_user_role(access_token, self.username)
if success:
user_permissions = set(data.get("user_permissions", []))
@ -358,7 +358,7 @@ class YobbleHomeView(QWidget):
self.show_error_message(localizer.translate("Сессия не найдена. Пожалуйста, войдите снова."))
return
success, data = await get_user_role(access_token)
success, data = await get_user_role(access_token, self.username)
if success and permission_code in data.get("user_permissions", []):
# self.permission_cache.add(permission_code)
user_permissions = set(data.get("user_permissions", []))