from __future__ import annotations import os import re import subprocess from dataclasses import dataclass from datetime import datetime from pathlib import Path from PySide6.QtCore import QObject, Signal, QSettings import build_info def _is_mock_devices_enabled() -> bool: """Проверить, включён ли режим мок-устройств.""" settings = QSettings("car_ui", "ui") return _read_bool_setting(settings, "debug/mock_devices", False) def _read_bool_setting(settings: QSettings, key: str, default: bool) -> bool: raw = settings.value(key, default) if isinstance(raw, bool): return raw if isinstance(raw, str): return raw.strip().lower() in ("1", "true", "yes", "on") try: return bool(int(raw)) except (TypeError, ValueError): return default @dataclass class BluetoothDevice: mac: str name: str class BluetoothService(QObject): """Сервис для управления Bluetooth на Raspberry Pi.""" status_changed = Signal(str) devices_changed = Signal(list) connected_changed = Signal(str, bool) # mac, connected def __init__(self, parent=None): super().__init__(parent) self._log_path = Path("~/.cache/car_ui/bluetooth.log").expanduser() self._last_error = "" # === Public API === def get_paired_devices(self) -> list[BluetoothDevice]: """Получить список сопряженных устройств.""" out = self._run_cmd(["bluetoothctl", "devices", "Paired"]) if "Invalid command" in out or not out: out = self._run_cmd(["bluetoothctl", "paired-devices"]) if "Invalid command" in out or not out: out = self._run_cmd(["bluetoothctl", "devices"]) devices: list[BluetoothDevice] = [] for line in self._strip_ansi(out).splitlines(): parts = line.split() if len(parts) >= 2 and parts[0] == "Device": mac = parts[1] name = " ".join(parts[2:]) if len(parts) > 2 else "" devices.append(BluetoothDevice(mac=mac, name=name)) # DEBUG: мок устройств для тестирования лимита if _is_mock_devices_enabled(): real_count = len(devices) missing = build_info.BLUETOOTH_MAX_PAIRED_DEVICES - real_count for i in range(missing): idx = real_count + i + 1 devices.append(BluetoothDevice( mac=f"00:11:22:33:44:{idx:02X}", name=f"Test Device {idx}" )) return devices def get_device_info(self, mac: str) -> dict[str, str]: """Получить информацию об устройстве.""" out = self._run_cmd(["bluetoothctl", "info", mac]) info: dict[str, str] = {} for line in out.splitlines(): if ":" not in line: continue key, value = line.split(":", 1) info[key.strip()] = value.strip() return info def is_connected(self, mac: str) -> bool: """Проверить, подключено ли устройство.""" info = self.get_device_info(mac) return info.get("Connected", "no") == "yes" def connect_device(self, mac: str) -> bool: """Подключиться к устройству.""" self._last_error = "" self._run_cmd(["bluetoothctl", "trust", mac]) result = self._run_cmd(["bluetoothctl", "connect", mac]) success = not self._last_error self.connected_changed.emit(mac, success) return success def disconnect_device(self, mac: str) -> bool: """Отключить устройство.""" self._last_error = "" result = self._run_cmd(["bluetoothctl", "disconnect", mac]) success = not self._last_error self.connected_changed.emit(mac, False) return success def remove_device(self, mac: str) -> bool: """Удалить устройство из списка сопряженных.""" self._last_error = "" # Сначала отключаем, если подключено if self.is_connected(mac): self._run_cmd(["bluetoothctl", "disconnect", mac]) result = self._run_cmd(["bluetoothctl", "remove", mac]) success = not self._last_error return success def make_discoverable(self, timeout_sec: int = 10) -> bool: """Сделать устройство видимым для сопряжения. Args: timeout_sec: Время видимости в секундах (по умолчанию 10). Возвращает False, если достигнуто максимальное количество сопряженных устройств. """ # Проверка лимита сопряженных устройств paired_devices = self.get_paired_devices() if len(paired_devices) >= build_info.BLUETOOTH_MAX_PAIRED_DEVICES: self._last_error = "max_devices" return False self._last_error = "" script_out = self._run_btctl_script( [ "power on", f"discoverable-timeout {timeout_sec}", "discoverable on", "pairable on", "agent NoInputNoOutput", "default-agent", ] ) if "Failed to set power on" in script_out or "org.bluez.Error" in script_out: self._last_error = "power_off" return False return not bool(self._last_error) def get_status_text(self, mac: str | None = None) -> str: """Получить текстовый статус Bluetooth.""" if not mac: return "Статус: выберите устройство" info = self.get_device_info(mac) connected = info.get("Connected", "no") == "yes" name = info.get("Name", mac) state = "подключено" if connected else "не подключено" return f"Статус: {name} — {state}" def get_player_status(self) -> str | None: """Получить статус медиаплеера (playing/paused/stopped).""" out = self._run_btctl(["menu player", "show"]) for line in out.splitlines(): if "Status:" in line: return line.split("Status:", 1)[1].strip() return None def get_player_metadata(self) -> dict: """Получить метаданные текущего трека.""" out = self._run_btctl(["menu player", "show"]) metadata = { "source": None, "title": None, "artist": None, "album": None, "position": None, "duration": None, "status": None, } for line in out.splitlines(): if "Name:" in line: metadata["source"] = line.split("Name:", 1)[1].strip() elif "Track.Title:" in line: metadata["title"] = line.split("Track.Title:", 1)[1].strip() elif "Track.Artist:" in line: metadata["artist"] = line.split("Track.Artist:", 1)[1].strip() elif "Track.Album:" in line: metadata["album"] = line.split("Track.Album:", 1)[1].strip() elif "Position:" in line: metadata["position"] = self._parse_hex_value(line) elif "Track.Duration:" in line: metadata["duration"] = self._parse_hex_value(line) elif "Status:" in line: metadata["status"] = line.split("Status:", 1)[1].strip() return metadata def player_play(self) -> None: """Воспроизведение.""" self._run_btctl(["menu player", "play"]) def player_pause(self) -> None: """Пауза.""" self._run_btctl(["menu player", "pause"]) def player_toggle(self) -> None: """Переключить воспроизведение/пауза.""" status = self.get_player_status() if status == "playing": self.player_pause() else: self.player_play() def player_next(self) -> None: """Следующий трек.""" self._run_btctl(["menu player", "next"]) def player_previous(self) -> None: """Предыдущий трек.""" self._run_btctl(["menu player", "previous"]) # === Private methods === def _run_cmd(self, args: list[str]) -> str: """Выполнить команду bluetoothctl.""" try: result = subprocess.run( args, capture_output=True, text=True, timeout=2, check=False, ) except (subprocess.SubprocessError, OSError): self._last_error = "run-failed" self._log(args, "", "run-failed", 1) return "" stdout = result.stdout.strip() stderr = result.stderr.strip() if result.returncode != 0 or stderr: self._last_error = stderr or f"exit={result.returncode}" self._log(args, stdout, stderr, result.returncode) return stdout def _run_btctl(self, commands: list[str]) -> str: """Выполнить скрипт в bluetoothctl интерактивном режиме.""" script = "\n".join(commands + ["back", "quit"]) + "\n" try: result = subprocess.run( ["bluetoothctl"], input=script, capture_output=True, text=True, timeout=3, check=False, ) except (subprocess.SubprocessError, OSError): self._last_error = "run-failed" self._log(["bluetoothctl"], "", "run-failed", 1) return "" stdout = result.stdout.strip() stderr = result.stderr.strip() if result.returncode != 0 or stderr: self._last_error = stderr or f"exit={result.returncode}" self._log(["bluetoothctl"], stdout, stderr, result.returncode) return stdout def _run_btctl_script(self, commands: list[str]) -> str: """Выполнить скрипт в bluetoothctl (без back в конце).""" script = "\n".join(commands + ["quit"]) + "\n" try: result = subprocess.run( ["bluetoothctl"], input=script, capture_output=True, text=True, timeout=4, check=False, ) except (subprocess.SubprocessError, OSError): self._last_error = "run-failed" self._log(["bluetoothctl", "(script)"], "", "run-failed", 1) return "" stdout = result.stdout.strip() stderr = result.stderr.strip() if result.returncode != 0 or stderr: self._last_error = stderr or f"exit={result.returncode}" self._log(["bluetoothctl", "(script)"], stdout, stderr, result.returncode) return stdout def _parse_hex_value(self, line: str) -> int | None: """Распарсить hex значение из строки bluetoothctl.""" start = line.find("0x") if start == -1: return None hex_part = line[start:].split()[0] try: return int(hex_part, 16) except ValueError: return None def _strip_ansi(self, text: str) -> str: """Удалить ANSI escape-последовательности.""" return re.sub(r"\x1b\[[0-9;]*m", "", text) def _log(self, args: list[str], stdout: str, stderr: str, code: int): """Записать лог операции.""" try: self._log_path.parent.mkdir(parents=True, exist_ok=True) ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") line = ( f"[{ts}] cmd={' '.join(args)} code={code} " f"stdout={stdout!r} stderr={stderr!r}\n" ) with open(self._log_path, "a", encoding="utf-8") as f: f.write(line) except OSError: pass @property def last_error(self) -> str: """Получить последнюю ошибку.""" return self._last_error