From 36fbd1034f281e247cf803bbc1430e3256990f1c Mon Sep 17 00:00:00 2001 From: cheykrym Date: Tue, 31 Mar 2026 23:08:45 +0300 Subject: [PATCH] bluetooth service --- screens/media.py | 110 +++-------- screens/setting/bluetooth_screen.py | 169 +++-------------- services/__init__.py | 3 + services/bluetooth_service.py | 272 ++++++++++++++++++++++++++++ 4 files changed, 328 insertions(+), 226 deletions(-) create mode 100644 services/__init__.py create mode 100644 services/bluetooth_service.py diff --git a/screens/media.py b/screens/media.py index 028e641..e4594ef 100644 --- a/screens/media.py +++ b/screens/media.py @@ -1,5 +1,3 @@ -import subprocess - from PySide6.QtWidgets import ( QWidget, QLabel, @@ -12,12 +10,16 @@ from PySide6.QtWidgets import ( from PySide6.QtCore import Qt, QSize, QTimer, Signal from PySide6.QtGui import QFont +from services.bluetooth_service import BluetoothService + class MediaScreen(QWidget): source_changed = Signal(str) def __init__(self): super().__init__() + self._bt_service = BluetoothService(self) + root = QVBoxLayout(self) root.setContentsMargins(18, 16, 18, 16) root.setSpacing(14) @@ -154,97 +156,45 @@ class MediaScreen(QWidget): self._refresh_metadata() def _toggle_play(self): - status = self._player_status() - if status == "playing": - self._run_btctl(["menu player", "pause"]) - else: - self._run_btctl(["menu player", "play"]) + """Переключить воспроизведение/пауза.""" + self._bt_service.player_toggle() QTimer.singleShot(300, self._refresh_metadata) def _prev(self): - self._run_btctl(["menu player", "previous"]) + """Предыдущий трек.""" + self._bt_service.player_previous() QTimer.singleShot(300, self._refresh_metadata) def _next(self): - self._run_btctl(["menu player", "next"]) + """Следующий трек.""" + self._bt_service.player_next() QTimer.singleShot(300, self._refresh_metadata) - def _player_status(self) -> str | None: - out = self._run_btctl(["menu player", "show"]) - for line in out.splitlines(): - if "Status:" in line: - return line.split("Status:", 1)[1].strip() - return None - def _refresh_metadata(self): - out = self._run_btctl(["menu player", "show"]) - title = None - artist = None - album = None - source = None - position = None - duration = None - status = None - for line in out.splitlines(): - if "Name:" in line: - source = line.split("Name:", 1)[1].strip() - if "Track.Title:" in line: - title = line.split("Track.Title:", 1)[1].strip() - if "Track.Artist:" in line: - artist = line.split("Track.Artist:", 1)[1].strip() - if "Track.Album:" in line: - album = line.split("Track.Album:", 1)[1].strip() - if "Position:" in line: - position = self._parse_hex_value(line) - if "Track.Duration:" in line: - duration = self._parse_hex_value(line) - if "Status:" in line: - status = line.split("Status:", 1)[1].strip() - if title: - self.title.setText(title) - if artist: - self.artist.setText(artist) - if album: - self.album.setText(album) - if source: - text = f"Источник: {source}" + """Обновить метаданные трека.""" + metadata = self._bt_service.get_player_metadata() + + if metadata["title"]: + self.title.setText(metadata["title"]) + if metadata["artist"]: + self.artist.setText(metadata["artist"]) + if metadata["album"]: + self.album.setText(metadata["album"]) + if metadata["source"]: + text = f"Источник: {metadata["source"]}" self.source.setText(text) self.source_changed.emit(text) - if duration is not None and duration > 0: - self.progress.setRange(0, duration) - self.time_total.setText(self._format_time(duration)) - if position is not None: - self.progress.setValue(position) - self.time_pos.setText(self._format_time(position)) - if status: - self.btn_play.setText("⏸" if status == "playing" else "▶") - - def _run_btctl(self, commands: list[str]) -> str: - script = "\n".join(commands + ["back", "quit"]) + "\n" - try: - result = subprocess.run( - ["bluetoothctl"], - input=script, - capture_output=True, - text=True, - timeout=3, - check=False, - ) - except (subprocess.SubprocessError, OSError): - return "" - return result.stdout.strip() - - def _parse_hex_value(self, line: str) -> int | None: - start = line.find("0x") - if start == -1: - return None - hex_part = line[start:].split()[0] - try: - return int(hex_part, 16) - except ValueError: - return None + if metadata["duration"] is not None and metadata["duration"] > 0: + self.progress.setRange(0, metadata["duration"]) + self.time_total.setText(self._format_time(metadata["duration"])) + if metadata["position"] is not None: + self.progress.setValue(metadata["position"]) + self.time_pos.setText(self._format_time(metadata["position"])) + if metadata["status"]: + self.btn_play.setText("⏸" if metadata["status"] == "playing" else "▶") def _format_time(self, ms: int) -> str: + """Форматировать время из миллисекунд.""" total_seconds = max(ms, 0) // 1000 minutes = total_seconds // 60 seconds = total_seconds % 60 diff --git a/screens/setting/bluetooth_screen.py b/screens/setting/bluetooth_screen.py index 94ea993..58081f2 100644 --- a/screens/setting/bluetooth_screen.py +++ b/screens/setting/bluetooth_screen.py @@ -1,11 +1,5 @@ from __future__ import annotations -import os -import subprocess -from dataclasses import dataclass -from datetime import datetime -import re - from PySide6.QtCore import Qt, QTimer, QSettings from PySide6.QtGui import QFont from PySide6.QtWidgets import ( @@ -19,11 +13,7 @@ from PySide6.QtWidgets import ( QScroller, ) - -@dataclass -class BluetoothDevice: - mac: str - name: str +from services.bluetooth_service import BluetoothService, BluetoothDevice class BluetoothScreen(QWidget): @@ -31,29 +21,12 @@ class BluetoothScreen(QWidget): super().__init__() self._on_back = on_back self._settings = QSettings("car_ui", "ui") - self._log_path = os.path.expanduser("~/.cache/car_ui/bluetooth.log") - self._last_error = "" + self._bt_service = BluetoothService(self) root = QVBoxLayout(self) root.setContentsMargins(0, 0, 0, 0) root.setSpacing(12) - hdr = QHBoxLayout() - hdr.setContentsMargins(0, 0, 0, 0) - hdr.setSpacing(12) - - # back_btn = QPushButton("Назад") - # back_btn.setObjectName("SettingsBackBtn") - # back_btn.setMinimumSize(120, 44) - # back_btn.clicked.connect(self._on_back) - - # title = QLabel("Bluetooth") - # title.setFont(QFont("", 22, 700)) - - # hdr.addWidget(back_btn) - # hdr.addWidget(title) - # hdr.addStretch(1) - self.status = QLabel("Статус: —") self.status.setObjectName("BluetoothStatus") self.status.setFont(QFont("", 12)) @@ -100,7 +73,6 @@ class BluetoothScreen(QWidget): actions.addWidget(self.btn_connect, 1) actions.addWidget(self.btn_disconnect, 1) - root.addLayout(hdr) root.addWidget(self.status) root.addWidget(self.list, 1) root.addLayout(actions) @@ -109,7 +81,8 @@ class BluetoothScreen(QWidget): QTimer.singleShot(200, self._auto_connect_last) def refresh_list(self): - devices = self._paired_devices() + """Обновить список сопряженных устройств.""" + devices = self._bt_service.get_paired_devices() self.list.clear() for dev in devices: @@ -121,163 +94,67 @@ class BluetoothScreen(QWidget): self._update_status() def _on_select(self): + """Обработчик выбора устройства.""" has_selection = self._selected_mac() is not None self.btn_connect.setEnabled(has_selection) self.btn_disconnect.setEnabled(has_selection) self._update_status() def _selected_mac(self) -> str | None: + """Получить MAC-адрес выбранного устройства.""" items = self.list.selectedItems() if not items: return None return items[0].data(Qt.UserRole) def _auto_connect_last(self): + """Автоподключение к последнему устройству.""" last_mac = self._settings.value("bluetooth/last_mac", "") if not last_mac: return self._connect_device(last_mac, silent=True) def _make_visible(self): - self._last_error = "" - script_out = self._run_btctl_script( - [ - "power on", - "discoverable-timeout 0", - "discoverable on", - "pairable on", - "agent NoInputNoOutput", - "default-agent", - ] - ) - if "Failed to set power on" in script_out or "org.bluez.Error" in script_out: + """Сделать устройство видимым для сопряжения.""" + success = self._bt_service.make_discoverable() + if self._bt_service.last_error == "power_off": self.status.setText("Статус: питание BT выключено (проверьте rfkill)") - return - if self._last_error: - self.status.setText(f"Статус: ошибка ({self._last_error})") + elif not success: + self.status.setText(f"Статус: ошибка ({self._bt_service.last_error})") else: self.status.setText("Статус: видим для сопряжения") def _connect_selected(self): + """Подключить выбранное устройство.""" mac = self._selected_mac() if not mac: return self._connect_device(mac, silent=False) def _connect_device(self, mac: str, silent: bool): - self._last_error = "" - self._run_cmd(["bluetoothctl", "trust", mac]) - self._run_cmd(["bluetoothctl", "connect", mac]) + """Подключиться к устройству по MAC.""" + success = self._bt_service.connect(mac) if not silent: - if self._last_error: - self.status.setText(f"Статус: ошибка ({self._last_error})") + if not success: + self.status.setText(f"Статус: ошибка ({self._bt_service.last_error})") else: self.status.setText(f"Статус: подключаемся к {mac}") self._settings.setValue("bluetooth/last_mac", mac) QTimer.singleShot(300, self._update_status) def _disconnect_selected(self): + """Отключить выбранное устройство.""" mac = self._selected_mac() if not mac: return - self._last_error = "" - self._run_cmd(["bluetoothctl", "disconnect", mac]) - if self._last_error: - self.status.setText(f"Статус: ошибка ({self._last_error})") + success = self._bt_service.disconnect(mac) + if not success: + self.status.setText(f"Статус: ошибка ({self._bt_service.last_error})") else: self.status.setText(f"Статус: отключено от {mac}") QTimer.singleShot(300, self._update_status) def _update_status(self): + """Обновить текстовый статус.""" mac = self._selected_mac() - if not mac: - self.status.setText("Статус: выберите устройство") - return - info = self._device_info(mac) - connected = info.get("Connected", "no") == "yes" - name = info.get("Name", mac) - state = "подключено" if connected else "не подключено" - self.status.setText(f"Статус: {name} — {state}") - - def _device_info(self, mac: str) -> dict[str, str]: - out = self._run_cmd(["bluetoothctl", "info", mac]) - info: dict[str, str] = {} - for line in out.splitlines(): - if ":" not in line: - continue - key, value = line.split(":", 1) - info[key.strip()] = value.strip() - return info - - def _paired_devices(self) -> list[BluetoothDevice]: - out = self._run_cmd(["bluetoothctl", "devices", "Paired"]) - if "Invalid command" in out or not out: - out = self._run_cmd(["bluetoothctl", "paired-devices"]) - if "Invalid command" in out or not out: - out = self._run_cmd(["bluetoothctl", "devices"]) - devices: list[BluetoothDevice] = [] - for line in self._strip_ansi(out).splitlines(): - parts = line.split() - if len(parts) >= 2 and parts[0] == "Device": - mac = parts[1] - name = " ".join(parts[2:]) if len(parts) > 2 else "" - devices.append(BluetoothDevice(mac=mac, name=name)) - return devices - - def _run_cmd(self, args: list[str]) -> str: - try: - result = subprocess.run( - args, - capture_output=True, - text=True, - timeout=2, - check=False, - ) - except (subprocess.SubprocessError, OSError): - self._last_error = "run-failed" - self._log(args, "", "run-failed", 1) - return "" - stdout = result.stdout.strip() - stderr = result.stderr.strip() - if result.returncode != 0 or stderr: - self._last_error = stderr or f"exit={result.returncode}" - self._log(args, stdout, stderr, result.returncode) - return stdout - - def _run_btctl_script(self, commands: list[str]) -> str: - script = "\n".join(commands + ["quit"]) + "\n" - try: - result = subprocess.run( - ["bluetoothctl"], - input=script, - capture_output=True, - text=True, - timeout=4, - check=False, - ) - except (subprocess.SubprocessError, OSError): - self._last_error = "run-failed" - self._log(["bluetoothctl", "(script)"], "", "run-failed", 1) - return "" - stdout = result.stdout.strip() - stderr = result.stderr.strip() - if result.returncode != 0 or stderr: - self._last_error = stderr or f"exit={result.returncode}" - self._log(["bluetoothctl", "(script)"], stdout, stderr, result.returncode) - return stdout - - def _log(self, args: list[str], stdout: str, stderr: str, code: int): - try: - os.makedirs(os.path.dirname(self._log_path), exist_ok=True) - ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - line = ( - f"[{ts}] cmd={' '.join(args)} code={code} " - f"stdout={stdout!r} stderr={stderr!r}\n" - ) - with open(self._log_path, "a", encoding="utf-8") as f: - f.write(line) - except OSError: - pass - - def _strip_ansi(self, text: str) -> str: - return re.sub(r"\x1b\[[0-9;]*m", "", text) + self.status.setText(self._bt_service.get_status_text(mac)) diff --git a/services/__init__.py b/services/__init__.py new file mode 100644 index 0000000..d04f61a --- /dev/null +++ b/services/__init__.py @@ -0,0 +1,3 @@ +from .bluetooth_service import BluetoothService, BluetoothDevice + +__all__ = ["BluetoothService", "BluetoothDevice"] diff --git a/services/bluetooth_service.py b/services/bluetooth_service.py new file mode 100644 index 0000000..2e90e6c --- /dev/null +++ b/services/bluetooth_service.py @@ -0,0 +1,272 @@ +from __future__ import annotations + +import os +import re +import subprocess +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path + +from PySide6.QtCore import QObject, Signal + + +@dataclass +class BluetoothDevice: + mac: str + name: str + + +class BluetoothService(QObject): + """Сервис для управления Bluetooth на Raspberry Pi.""" + + status_changed = Signal(str) + devices_changed = Signal(list) + connected_changed = Signal(str, bool) # mac, connected + + def __init__(self, parent=None): + super().__init__(parent) + self._log_path = Path("~/.cache/car_ui/bluetooth.log").expanduser() + self._last_error = "" + + # === Public API === + + def get_paired_devices(self) -> list[BluetoothDevice]: + """Получить список сопряженных устройств.""" + out = self._run_cmd(["bluetoothctl", "devices", "Paired"]) + if "Invalid command" in out or not out: + out = self._run_cmd(["bluetoothctl", "paired-devices"]) + if "Invalid command" in out or not out: + out = self._run_cmd(["bluetoothctl", "devices"]) + devices: list[BluetoothDevice] = [] + for line in self._strip_ansi(out).splitlines(): + parts = line.split() + if len(parts) >= 2 and parts[0] == "Device": + mac = parts[1] + name = " ".join(parts[2:]) if len(parts) > 2 else "" + devices.append(BluetoothDevice(mac=mac, name=name)) + return devices + + def get_device_info(self, mac: str) -> dict[str, str]: + """Получить информацию об устройстве.""" + out = self._run_cmd(["bluetoothctl", "info", mac]) + info: dict[str, str] = {} + for line in out.splitlines(): + if ":" not in line: + continue + key, value = line.split(":", 1) + info[key.strip()] = value.strip() + return info + + def is_connected(self, mac: str) -> bool: + """Проверить, подключено ли устройство.""" + info = self.get_device_info(mac) + return info.get("Connected", "no") == "yes" + + def connect(self, mac: str) -> bool: + """Подключиться к устройству.""" + self._last_error = "" + self._run_cmd(["bluetoothctl", "trust", mac]) + result = self._run_cmd(["bluetoothctl", "connect", mac]) + success = not self._last_error + self.connected_changed.emit(mac, success) + return success + + def disconnect(self, mac: str) -> bool: + """Отключить устройство.""" + self._last_error = "" + result = self._run_cmd(["bluetoothctl", "disconnect", mac]) + success = not self._last_error + self.connected_changed.emit(mac, False) + return success + + def make_discoverable(self) -> bool: + """Сделать устройство видимым для сопряжения.""" + self._last_error = "" + script_out = self._run_btctl_script( + [ + "power on", + "discoverable-timeout 0", + "discoverable on", + "pairable on", + "agent NoInputNoOutput", + "default-agent", + ] + ) + if "Failed to set power on" in script_out or "org.bluez.Error" in script_out: + self._last_error = "power_off" + return False + return not bool(self._last_error) + + def get_status_text(self, mac: str | None = None) -> str: + """Получить текстовый статус Bluetooth.""" + if not mac: + return "Статус: выберите устройство" + info = self.get_device_info(mac) + connected = info.get("Connected", "no") == "yes" + name = info.get("Name", mac) + state = "подключено" if connected else "не подключено" + return f"Статус: {name} — {state}" + + def get_player_status(self) -> str | None: + """Получить статус медиаплеера (playing/paused/stopped).""" + out = self._run_btctl(["menu player", "show"]) + for line in out.splitlines(): + if "Status:" in line: + return line.split("Status:", 1)[1].strip() + return None + + def get_player_metadata(self) -> dict: + """Получить метаданные текущего трека.""" + out = self._run_btctl(["menu player", "show"]) + metadata = { + "source": None, + "title": None, + "artist": None, + "album": None, + "position": None, + "duration": None, + "status": None, + } + for line in out.splitlines(): + if "Name:" in line: + metadata["source"] = line.split("Name:", 1)[1].strip() + elif "Track.Title:" in line: + metadata["title"] = line.split("Track.Title:", 1)[1].strip() + elif "Track.Artist:" in line: + metadata["artist"] = line.split("Track.Artist:", 1)[1].strip() + elif "Track.Album:" in line: + metadata["album"] = line.split("Track.Album:", 1)[1].strip() + elif "Position:" in line: + metadata["position"] = self._parse_hex_value(line) + elif "Track.Duration:" in line: + metadata["duration"] = self._parse_hex_value(line) + elif "Status:" in line: + metadata["status"] = line.split("Status:", 1)[1].strip() + return metadata + + def player_play(self) -> None: + """Воспроизведение.""" + self._run_btctl(["menu player", "play"]) + + def player_pause(self) -> None: + """Пауза.""" + self._run_btctl(["menu player", "pause"]) + + def player_toggle(self) -> None: + """Переключить воспроизведение/пауза.""" + status = self.get_player_status() + if status == "playing": + self.player_pause() + else: + self.player_play() + + def player_next(self) -> None: + """Следующий трек.""" + self._run_btctl(["menu player", "next"]) + + def player_previous(self) -> None: + """Предыдущий трек.""" + self._run_btctl(["menu player", "previous"]) + + # === Private methods === + + def _run_cmd(self, args: list[str]) -> str: + """Выполнить команду bluetoothctl.""" + try: + result = subprocess.run( + args, + capture_output=True, + text=True, + timeout=2, + check=False, + ) + except (subprocess.SubprocessError, OSError): + self._last_error = "run-failed" + self._log(args, "", "run-failed", 1) + return "" + stdout = result.stdout.strip() + stderr = result.stderr.strip() + if result.returncode != 0 or stderr: + self._last_error = stderr or f"exit={result.returncode}" + self._log(args, stdout, stderr, result.returncode) + return stdout + + def _run_btctl(self, commands: list[str]) -> str: + """Выполнить скрипт в bluetoothctl интерактивном режиме.""" + script = "\n".join(commands + ["back", "quit"]) + "\n" + try: + result = subprocess.run( + ["bluetoothctl"], + input=script, + capture_output=True, + text=True, + timeout=3, + check=False, + ) + except (subprocess.SubprocessError, OSError): + self._last_error = "run-failed" + self._log(["bluetoothctl"], "", "run-failed", 1) + return "" + stdout = result.stdout.strip() + stderr = result.stderr.strip() + if result.returncode != 0 or stderr: + self._last_error = stderr or f"exit={result.returncode}" + self._log(["bluetoothctl"], stdout, stderr, result.returncode) + return stdout + + def _run_btctl_script(self, commands: list[str]) -> str: + """Выполнить скрипт в bluetoothctl (без back в конце).""" + script = "\n".join(commands + ["quit"]) + "\n" + try: + result = subprocess.run( + ["bluetoothctl"], + input=script, + capture_output=True, + text=True, + timeout=4, + check=False, + ) + except (subprocess.SubprocessError, OSError): + self._last_error = "run-failed" + self._log(["bluetoothctl", "(script)"], "", "run-failed", 1) + return "" + stdout = result.stdout.strip() + stderr = result.stderr.strip() + if result.returncode != 0 or stderr: + self._last_error = stderr or f"exit={result.returncode}" + self._log(["bluetoothctl", "(script)"], stdout, stderr, result.returncode) + return stdout + + def _parse_hex_value(self, line: str) -> int | None: + """Распарсить hex значение из строки bluetoothctl.""" + start = line.find("0x") + if start == -1: + return None + hex_part = line[start:].split()[0] + try: + return int(hex_part, 16) + except ValueError: + return None + + def _strip_ansi(self, text: str) -> str: + """Удалить ANSI escape-последовательности.""" + return re.sub(r"\x1b\[[0-9;]*m", "", text) + + def _log(self, args: list[str], stdout: str, stderr: str, code: int): + """Записать лог операции.""" + try: + self._log_path.parent.mkdir(parents=True, exist_ok=True) + ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + line = ( + f"[{ts}] cmd={' '.join(args)} code={code} " + f"stdout={stdout!r} stderr={stderr!r}\n" + ) + with open(self._log_path, "a", encoding="utf-8") as f: + f.write(line) + except OSError: + pass + + @property + def last_error(self) -> str: + """Получить последнюю ошибку.""" + return self._last_error