car_ui/services/bluetooth_service.py
2026-04-01 02:06:09 +03:00

315 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import os
import re
import subprocess
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from PySide6.QtCore import QObject, Signal
import build_info
def _is_mock_devices_enabled() -> bool:
"""Проверить, включён ли режим мок-устройств."""
flag_path = Path(build_info.__file__).resolve().parent / "mock_devices_enable"
return flag_path.exists()
@dataclass
class BluetoothDevice:
mac: str
name: str
class BluetoothService(QObject):
"""Сервис для управления Bluetooth на Raspberry Pi."""
status_changed = Signal(str)
devices_changed = Signal(list)
connected_changed = Signal(str, bool) # mac, connected
def __init__(self, parent=None):
super().__init__(parent)
self._log_path = Path("~/.cache/car_ui/bluetooth.log").expanduser()
self._last_error = ""
# === Public API ===
def get_paired_devices(self) -> list[BluetoothDevice]:
"""Получить список сопряженных устройств."""
out = self._run_cmd(["bluetoothctl", "devices", "Paired"])
if "Invalid command" in out or not out:
out = self._run_cmd(["bluetoothctl", "paired-devices"])
if "Invalid command" in out or not out:
out = self._run_cmd(["bluetoothctl", "devices"])
devices: list[BluetoothDevice] = []
for line in self._strip_ansi(out).splitlines():
parts = line.split()
if len(parts) >= 2 and parts[0] == "Device":
mac = parts[1]
name = " ".join(parts[2:]) if len(parts) > 2 else ""
devices.append(BluetoothDevice(mac=mac, name=name))
# DEBUG: мок устройств для тестирования лимита
if _is_mock_devices_enabled():
real_count = len(devices)
missing = build_info.BLUETOOTH_MAX_PAIRED_DEVICES - real_count
for i in range(missing):
idx = real_count + i + 1
devices.append(BluetoothDevice(
mac=f"00:11:22:33:44:{idx:02X}",
name=f"Test Device {idx}"
))
return devices
def get_device_info(self, mac: str) -> dict[str, str]:
"""Получить информацию об устройстве."""
out = self._run_cmd(["bluetoothctl", "info", mac])
info: dict[str, str] = {}
for line in out.splitlines():
if ":" not in line:
continue
key, value = line.split(":", 1)
info[key.strip()] = value.strip()
return info
def is_connected(self, mac: str) -> bool:
"""Проверить, подключено ли устройство."""
info = self.get_device_info(mac)
return info.get("Connected", "no") == "yes"
def connect_device(self, mac: str) -> bool:
"""Подключиться к устройству."""
self._last_error = ""
self._run_cmd(["bluetoothctl", "trust", mac])
result = self._run_cmd(["bluetoothctl", "connect", mac])
success = not self._last_error
self.connected_changed.emit(mac, success)
return success
def disconnect_device(self, mac: str) -> bool:
"""Отключить устройство."""
self._last_error = ""
result = self._run_cmd(["bluetoothctl", "disconnect", mac])
success = not self._last_error
self.connected_changed.emit(mac, False)
return success
def remove_device(self, mac: str) -> bool:
"""Удалить устройство из списка сопряженных."""
self._last_error = ""
# Сначала отключаем, если подключено
if self.is_connected(mac):
self._run_cmd(["bluetoothctl", "disconnect", mac])
result = self._run_cmd(["bluetoothctl", "remove", mac])
success = not self._last_error
return success
def make_discoverable(self, timeout_sec: int = 10) -> bool:
"""Сделать устройство видимым для сопряжения.
Args:
timeout_sec: Время видимости в секундах (по умолчанию 10).
Возвращает False, если достигнуто максимальное количество сопряженных устройств.
"""
# Проверка лимита сопряженных устройств
paired_devices = self.get_paired_devices()
if len(paired_devices) >= build_info.BLUETOOTH_MAX_PAIRED_DEVICES:
self._last_error = "max_devices"
return False
self._last_error = ""
script_out = self._run_btctl_script(
[
"power on",
f"discoverable-timeout {timeout_sec}",
"discoverable on",
"pairable on",
"agent NoInputNoOutput",
"default-agent",
]
)
if "Failed to set power on" in script_out or "org.bluez.Error" in script_out:
self._last_error = "power_off"
return False
return not bool(self._last_error)
def get_status_text(self, mac: str | None = None) -> str:
"""Получить текстовый статус Bluetooth."""
if not mac:
return "Статус: выберите устройство"
info = self.get_device_info(mac)
connected = info.get("Connected", "no") == "yes"
name = info.get("Name", mac)
state = "подключено" if connected else "не подключено"
return f"Статус: {name}{state}"
def get_player_status(self) -> str | None:
"""Получить статус медиаплеера (playing/paused/stopped)."""
out = self._run_btctl(["menu player", "show"])
for line in out.splitlines():
if "Status:" in line:
return line.split("Status:", 1)[1].strip()
return None
def get_player_metadata(self) -> dict:
"""Получить метаданные текущего трека."""
out = self._run_btctl(["menu player", "show"])
metadata = {
"source": None,
"title": None,
"artist": None,
"album": None,
"position": None,
"duration": None,
"status": None,
}
for line in out.splitlines():
if "Name:" in line:
metadata["source"] = line.split("Name:", 1)[1].strip()
elif "Track.Title:" in line:
metadata["title"] = line.split("Track.Title:", 1)[1].strip()
elif "Track.Artist:" in line:
metadata["artist"] = line.split("Track.Artist:", 1)[1].strip()
elif "Track.Album:" in line:
metadata["album"] = line.split("Track.Album:", 1)[1].strip()
elif "Position:" in line:
metadata["position"] = self._parse_hex_value(line)
elif "Track.Duration:" in line:
metadata["duration"] = self._parse_hex_value(line)
elif "Status:" in line:
metadata["status"] = line.split("Status:", 1)[1].strip()
return metadata
def player_play(self) -> None:
"""Воспроизведение."""
self._run_btctl(["menu player", "play"])
def player_pause(self) -> None:
"""Пауза."""
self._run_btctl(["menu player", "pause"])
def player_toggle(self) -> None:
"""Переключить воспроизведение/пауза."""
status = self.get_player_status()
if status == "playing":
self.player_pause()
else:
self.player_play()
def player_next(self) -> None:
"""Следующий трек."""
self._run_btctl(["menu player", "next"])
def player_previous(self) -> None:
"""Предыдущий трек."""
self._run_btctl(["menu player", "previous"])
# === Private methods ===
def _run_cmd(self, args: list[str]) -> str:
"""Выполнить команду bluetoothctl."""
try:
result = subprocess.run(
args,
capture_output=True,
text=True,
timeout=2,
check=False,
)
except (subprocess.SubprocessError, OSError):
self._last_error = "run-failed"
self._log(args, "", "run-failed", 1)
return ""
stdout = result.stdout.strip()
stderr = result.stderr.strip()
if result.returncode != 0 or stderr:
self._last_error = stderr or f"exit={result.returncode}"
self._log(args, stdout, stderr, result.returncode)
return stdout
def _run_btctl(self, commands: list[str]) -> str:
"""Выполнить скрипт в bluetoothctl интерактивном режиме."""
script = "\n".join(commands + ["back", "quit"]) + "\n"
try:
result = subprocess.run(
["bluetoothctl"],
input=script,
capture_output=True,
text=True,
timeout=3,
check=False,
)
except (subprocess.SubprocessError, OSError):
self._last_error = "run-failed"
self._log(["bluetoothctl"], "", "run-failed", 1)
return ""
stdout = result.stdout.strip()
stderr = result.stderr.strip()
if result.returncode != 0 or stderr:
self._last_error = stderr or f"exit={result.returncode}"
self._log(["bluetoothctl"], stdout, stderr, result.returncode)
return stdout
def _run_btctl_script(self, commands: list[str]) -> str:
"""Выполнить скрипт в bluetoothctl (без back в конце)."""
script = "\n".join(commands + ["quit"]) + "\n"
try:
result = subprocess.run(
["bluetoothctl"],
input=script,
capture_output=True,
text=True,
timeout=4,
check=False,
)
except (subprocess.SubprocessError, OSError):
self._last_error = "run-failed"
self._log(["bluetoothctl", "(script)"], "", "run-failed", 1)
return ""
stdout = result.stdout.strip()
stderr = result.stderr.strip()
if result.returncode != 0 or stderr:
self._last_error = stderr or f"exit={result.returncode}"
self._log(["bluetoothctl", "(script)"], stdout, stderr, result.returncode)
return stdout
def _parse_hex_value(self, line: str) -> int | None:
"""Распарсить hex значение из строки bluetoothctl."""
start = line.find("0x")
if start == -1:
return None
hex_part = line[start:].split()[0]
try:
return int(hex_part, 16)
except ValueError:
return None
def _strip_ansi(self, text: str) -> str:
"""Удалить ANSI escape-последовательности."""
return re.sub(r"\x1b\[[0-9;]*m", "", text)
def _log(self, args: list[str], stdout: str, stderr: str, code: int):
"""Записать лог операции."""
try:
self._log_path.parent.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
line = (
f"[{ts}] cmd={' '.join(args)} code={code} "
f"stdout={stdout!r} stderr={stderr!r}\n"
)
with open(self._log_path, "a", encoding="utf-8") as f:
f.write(line)
except OSError:
pass
@property
def last_error(self) -> str:
"""Получить последнюю ошибку."""
return self._last_error