bluetooth service

This commit is contained in:
cheykrym 2026-03-31 23:08:45 +03:00
parent 299314369f
commit 36fbd1034f
4 changed files with 328 additions and 226 deletions

View File

@ -1,5 +1,3 @@
import subprocess
from PySide6.QtWidgets import (
QWidget,
QLabel,
@ -12,12 +10,16 @@ from PySide6.QtWidgets import (
from PySide6.QtCore import Qt, QSize, QTimer, Signal
from PySide6.QtGui import QFont
from services.bluetooth_service import BluetoothService
class MediaScreen(QWidget):
source_changed = Signal(str)
def __init__(self):
super().__init__()
self._bt_service = BluetoothService(self)
root = QVBoxLayout(self)
root.setContentsMargins(18, 16, 18, 16)
root.setSpacing(14)
@ -154,97 +156,45 @@ class MediaScreen(QWidget):
self._refresh_metadata()
def _toggle_play(self):
status = self._player_status()
if status == "playing":
self._run_btctl(["menu player", "pause"])
else:
self._run_btctl(["menu player", "play"])
"""Переключить воспроизведение/пауза."""
self._bt_service.player_toggle()
QTimer.singleShot(300, self._refresh_metadata)
def _prev(self):
self._run_btctl(["menu player", "previous"])
"""Предыдущий трек."""
self._bt_service.player_previous()
QTimer.singleShot(300, self._refresh_metadata)
def _next(self):
self._run_btctl(["menu player", "next"])
"""Следующий трек."""
self._bt_service.player_next()
QTimer.singleShot(300, self._refresh_metadata)
def _player_status(self) -> str | None:
out = self._run_btctl(["menu player", "show"])
for line in out.splitlines():
if "Status:" in line:
return line.split("Status:", 1)[1].strip()
return None
def _refresh_metadata(self):
out = self._run_btctl(["menu player", "show"])
title = None
artist = None
album = None
source = None
position = None
duration = None
status = None
for line in out.splitlines():
if "Name:" in line:
source = line.split("Name:", 1)[1].strip()
if "Track.Title:" in line:
title = line.split("Track.Title:", 1)[1].strip()
if "Track.Artist:" in line:
artist = line.split("Track.Artist:", 1)[1].strip()
if "Track.Album:" in line:
album = line.split("Track.Album:", 1)[1].strip()
if "Position:" in line:
position = self._parse_hex_value(line)
if "Track.Duration:" in line:
duration = self._parse_hex_value(line)
if "Status:" in line:
status = line.split("Status:", 1)[1].strip()
if title:
self.title.setText(title)
if artist:
self.artist.setText(artist)
if album:
self.album.setText(album)
if source:
text = f"Источник: {source}"
"""Обновить метаданные трека."""
metadata = self._bt_service.get_player_metadata()
if metadata["title"]:
self.title.setText(metadata["title"])
if metadata["artist"]:
self.artist.setText(metadata["artist"])
if metadata["album"]:
self.album.setText(metadata["album"])
if metadata["source"]:
text = f"Источник: {metadata["source"]}"
self.source.setText(text)
self.source_changed.emit(text)
if duration is not None and duration > 0:
self.progress.setRange(0, duration)
self.time_total.setText(self._format_time(duration))
if position is not None:
self.progress.setValue(position)
self.time_pos.setText(self._format_time(position))
if status:
self.btn_play.setText("" if status == "playing" else "")
def _run_btctl(self, commands: list[str]) -> str:
script = "\n".join(commands + ["back", "quit"]) + "\n"
try:
result = subprocess.run(
["bluetoothctl"],
input=script,
capture_output=True,
text=True,
timeout=3,
check=False,
)
except (subprocess.SubprocessError, OSError):
return ""
return result.stdout.strip()
def _parse_hex_value(self, line: str) -> int | None:
start = line.find("0x")
if start == -1:
return None
hex_part = line[start:].split()[0]
try:
return int(hex_part, 16)
except ValueError:
return None
if metadata["duration"] is not None and metadata["duration"] > 0:
self.progress.setRange(0, metadata["duration"])
self.time_total.setText(self._format_time(metadata["duration"]))
if metadata["position"] is not None:
self.progress.setValue(metadata["position"])
self.time_pos.setText(self._format_time(metadata["position"]))
if metadata["status"]:
self.btn_play.setText("" if metadata["status"] == "playing" else "")
def _format_time(self, ms: int) -> str:
"""Форматировать время из миллисекунд."""
total_seconds = max(ms, 0) // 1000
minutes = total_seconds // 60
seconds = total_seconds % 60

View File

@ -1,11 +1,5 @@
from __future__ import annotations
import os
import subprocess
from dataclasses import dataclass
from datetime import datetime
import re
from PySide6.QtCore import Qt, QTimer, QSettings
from PySide6.QtGui import QFont
from PySide6.QtWidgets import (
@ -19,11 +13,7 @@ from PySide6.QtWidgets import (
QScroller,
)
@dataclass
class BluetoothDevice:
mac: str
name: str
from services.bluetooth_service import BluetoothService, BluetoothDevice
class BluetoothScreen(QWidget):
@ -31,29 +21,12 @@ class BluetoothScreen(QWidget):
super().__init__()
self._on_back = on_back
self._settings = QSettings("car_ui", "ui")
self._log_path = os.path.expanduser("~/.cache/car_ui/bluetooth.log")
self._last_error = ""
self._bt_service = BluetoothService(self)
root = QVBoxLayout(self)
root.setContentsMargins(0, 0, 0, 0)
root.setSpacing(12)
hdr = QHBoxLayout()
hdr.setContentsMargins(0, 0, 0, 0)
hdr.setSpacing(12)
# back_btn = QPushButton("Назад")
# back_btn.setObjectName("SettingsBackBtn")
# back_btn.setMinimumSize(120, 44)
# back_btn.clicked.connect(self._on_back)
# title = QLabel("Bluetooth")
# title.setFont(QFont("", 22, 700))
# hdr.addWidget(back_btn)
# hdr.addWidget(title)
# hdr.addStretch(1)
self.status = QLabel("Статус: —")
self.status.setObjectName("BluetoothStatus")
self.status.setFont(QFont("", 12))
@ -100,7 +73,6 @@ class BluetoothScreen(QWidget):
actions.addWidget(self.btn_connect, 1)
actions.addWidget(self.btn_disconnect, 1)
root.addLayout(hdr)
root.addWidget(self.status)
root.addWidget(self.list, 1)
root.addLayout(actions)
@ -109,7 +81,8 @@ class BluetoothScreen(QWidget):
QTimer.singleShot(200, self._auto_connect_last)
def refresh_list(self):
devices = self._paired_devices()
"""Обновить список сопряженных устройств."""
devices = self._bt_service.get_paired_devices()
self.list.clear()
for dev in devices:
@ -121,163 +94,67 @@ class BluetoothScreen(QWidget):
self._update_status()
def _on_select(self):
"""Обработчик выбора устройства."""
has_selection = self._selected_mac() is not None
self.btn_connect.setEnabled(has_selection)
self.btn_disconnect.setEnabled(has_selection)
self._update_status()
def _selected_mac(self) -> str | None:
"""Получить MAC-адрес выбранного устройства."""
items = self.list.selectedItems()
if not items:
return None
return items[0].data(Qt.UserRole)
def _auto_connect_last(self):
"""Автоподключение к последнему устройству."""
last_mac = self._settings.value("bluetooth/last_mac", "")
if not last_mac:
return
self._connect_device(last_mac, silent=True)
def _make_visible(self):
self._last_error = ""
script_out = self._run_btctl_script(
[
"power on",
"discoverable-timeout 0",
"discoverable on",
"pairable on",
"agent NoInputNoOutput",
"default-agent",
]
)
if "Failed to set power on" in script_out or "org.bluez.Error" in script_out:
"""Сделать устройство видимым для сопряжения."""
success = self._bt_service.make_discoverable()
if self._bt_service.last_error == "power_off":
self.status.setText("Статус: питание BT выключено (проверьте rfkill)")
return
if self._last_error:
self.status.setText(f"Статус: ошибка ({self._last_error})")
elif not success:
self.status.setText(f"Статус: ошибка ({self._bt_service.last_error})")
else:
self.status.setText("Статус: видим для сопряжения")
def _connect_selected(self):
"""Подключить выбранное устройство."""
mac = self._selected_mac()
if not mac:
return
self._connect_device(mac, silent=False)
def _connect_device(self, mac: str, silent: bool):
self._last_error = ""
self._run_cmd(["bluetoothctl", "trust", mac])
self._run_cmd(["bluetoothctl", "connect", mac])
"""Подключиться к устройству по MAC."""
success = self._bt_service.connect(mac)
if not silent:
if self._last_error:
self.status.setText(f"Статус: ошибка ({self._last_error})")
if not success:
self.status.setText(f"Статус: ошибка ({self._bt_service.last_error})")
else:
self.status.setText(f"Статус: подключаемся к {mac}")
self._settings.setValue("bluetooth/last_mac", mac)
QTimer.singleShot(300, self._update_status)
def _disconnect_selected(self):
"""Отключить выбранное устройство."""
mac = self._selected_mac()
if not mac:
return
self._last_error = ""
self._run_cmd(["bluetoothctl", "disconnect", mac])
if self._last_error:
self.status.setText(f"Статус: ошибка ({self._last_error})")
success = self._bt_service.disconnect(mac)
if not success:
self.status.setText(f"Статус: ошибка ({self._bt_service.last_error})")
else:
self.status.setText(f"Статус: отключено от {mac}")
QTimer.singleShot(300, self._update_status)
def _update_status(self):
"""Обновить текстовый статус."""
mac = self._selected_mac()
if not mac:
self.status.setText("Статус: выберите устройство")
return
info = self._device_info(mac)
connected = info.get("Connected", "no") == "yes"
name = info.get("Name", mac)
state = "подключено" if connected else "не подключено"
self.status.setText(f"Статус: {name}{state}")
def _device_info(self, mac: str) -> dict[str, str]:
out = self._run_cmd(["bluetoothctl", "info", mac])
info: dict[str, str] = {}
for line in out.splitlines():
if ":" not in line:
continue
key, value = line.split(":", 1)
info[key.strip()] = value.strip()
return info
def _paired_devices(self) -> list[BluetoothDevice]:
out = self._run_cmd(["bluetoothctl", "devices", "Paired"])
if "Invalid command" in out or not out:
out = self._run_cmd(["bluetoothctl", "paired-devices"])
if "Invalid command" in out or not out:
out = self._run_cmd(["bluetoothctl", "devices"])
devices: list[BluetoothDevice] = []
for line in self._strip_ansi(out).splitlines():
parts = line.split()
if len(parts) >= 2 and parts[0] == "Device":
mac = parts[1]
name = " ".join(parts[2:]) if len(parts) > 2 else ""
devices.append(BluetoothDevice(mac=mac, name=name))
return devices
def _run_cmd(self, args: list[str]) -> str:
try:
result = subprocess.run(
args,
capture_output=True,
text=True,
timeout=2,
check=False,
)
except (subprocess.SubprocessError, OSError):
self._last_error = "run-failed"
self._log(args, "", "run-failed", 1)
return ""
stdout = result.stdout.strip()
stderr = result.stderr.strip()
if result.returncode != 0 or stderr:
self._last_error = stderr or f"exit={result.returncode}"
self._log(args, stdout, stderr, result.returncode)
return stdout
def _run_btctl_script(self, commands: list[str]) -> str:
script = "\n".join(commands + ["quit"]) + "\n"
try:
result = subprocess.run(
["bluetoothctl"],
input=script,
capture_output=True,
text=True,
timeout=4,
check=False,
)
except (subprocess.SubprocessError, OSError):
self._last_error = "run-failed"
self._log(["bluetoothctl", "(script)"], "", "run-failed", 1)
return ""
stdout = result.stdout.strip()
stderr = result.stderr.strip()
if result.returncode != 0 or stderr:
self._last_error = stderr or f"exit={result.returncode}"
self._log(["bluetoothctl", "(script)"], stdout, stderr, result.returncode)
return stdout
def _log(self, args: list[str], stdout: str, stderr: str, code: int):
try:
os.makedirs(os.path.dirname(self._log_path), exist_ok=True)
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
line = (
f"[{ts}] cmd={' '.join(args)} code={code} "
f"stdout={stdout!r} stderr={stderr!r}\n"
)
with open(self._log_path, "a", encoding="utf-8") as f:
f.write(line)
except OSError:
pass
def _strip_ansi(self, text: str) -> str:
return re.sub(r"\x1b\[[0-9;]*m", "", text)
self.status.setText(self._bt_service.get_status_text(mac))

3
services/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from .bluetooth_service import BluetoothService, BluetoothDevice
__all__ = ["BluetoothService", "BluetoothDevice"]

View File

@ -0,0 +1,272 @@
from __future__ import annotations
import os
import re
import subprocess
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from PySide6.QtCore import QObject, Signal
@dataclass
class BluetoothDevice:
mac: str
name: str
class BluetoothService(QObject):
"""Сервис для управления Bluetooth на Raspberry Pi."""
status_changed = Signal(str)
devices_changed = Signal(list)
connected_changed = Signal(str, bool) # mac, connected
def __init__(self, parent=None):
super().__init__(parent)
self._log_path = Path("~/.cache/car_ui/bluetooth.log").expanduser()
self._last_error = ""
# === Public API ===
def get_paired_devices(self) -> list[BluetoothDevice]:
"""Получить список сопряженных устройств."""
out = self._run_cmd(["bluetoothctl", "devices", "Paired"])
if "Invalid command" in out or not out:
out = self._run_cmd(["bluetoothctl", "paired-devices"])
if "Invalid command" in out or not out:
out = self._run_cmd(["bluetoothctl", "devices"])
devices: list[BluetoothDevice] = []
for line in self._strip_ansi(out).splitlines():
parts = line.split()
if len(parts) >= 2 and parts[0] == "Device":
mac = parts[1]
name = " ".join(parts[2:]) if len(parts) > 2 else ""
devices.append(BluetoothDevice(mac=mac, name=name))
return devices
def get_device_info(self, mac: str) -> dict[str, str]:
"""Получить информацию об устройстве."""
out = self._run_cmd(["bluetoothctl", "info", mac])
info: dict[str, str] = {}
for line in out.splitlines():
if ":" not in line:
continue
key, value = line.split(":", 1)
info[key.strip()] = value.strip()
return info
def is_connected(self, mac: str) -> bool:
"""Проверить, подключено ли устройство."""
info = self.get_device_info(mac)
return info.get("Connected", "no") == "yes"
def connect(self, mac: str) -> bool:
"""Подключиться к устройству."""
self._last_error = ""
self._run_cmd(["bluetoothctl", "trust", mac])
result = self._run_cmd(["bluetoothctl", "connect", mac])
success = not self._last_error
self.connected_changed.emit(mac, success)
return success
def disconnect(self, mac: str) -> bool:
"""Отключить устройство."""
self._last_error = ""
result = self._run_cmd(["bluetoothctl", "disconnect", mac])
success = not self._last_error
self.connected_changed.emit(mac, False)
return success
def make_discoverable(self) -> bool:
"""Сделать устройство видимым для сопряжения."""
self._last_error = ""
script_out = self._run_btctl_script(
[
"power on",
"discoverable-timeout 0",
"discoverable on",
"pairable on",
"agent NoInputNoOutput",
"default-agent",
]
)
if "Failed to set power on" in script_out or "org.bluez.Error" in script_out:
self._last_error = "power_off"
return False
return not bool(self._last_error)
def get_status_text(self, mac: str | None = None) -> str:
"""Получить текстовый статус Bluetooth."""
if not mac:
return "Статус: выберите устройство"
info = self.get_device_info(mac)
connected = info.get("Connected", "no") == "yes"
name = info.get("Name", mac)
state = "подключено" if connected else "не подключено"
return f"Статус: {name}{state}"
def get_player_status(self) -> str | None:
"""Получить статус медиаплеера (playing/paused/stopped)."""
out = self._run_btctl(["menu player", "show"])
for line in out.splitlines():
if "Status:" in line:
return line.split("Status:", 1)[1].strip()
return None
def get_player_metadata(self) -> dict:
"""Получить метаданные текущего трека."""
out = self._run_btctl(["menu player", "show"])
metadata = {
"source": None,
"title": None,
"artist": None,
"album": None,
"position": None,
"duration": None,
"status": None,
}
for line in out.splitlines():
if "Name:" in line:
metadata["source"] = line.split("Name:", 1)[1].strip()
elif "Track.Title:" in line:
metadata["title"] = line.split("Track.Title:", 1)[1].strip()
elif "Track.Artist:" in line:
metadata["artist"] = line.split("Track.Artist:", 1)[1].strip()
elif "Track.Album:" in line:
metadata["album"] = line.split("Track.Album:", 1)[1].strip()
elif "Position:" in line:
metadata["position"] = self._parse_hex_value(line)
elif "Track.Duration:" in line:
metadata["duration"] = self._parse_hex_value(line)
elif "Status:" in line:
metadata["status"] = line.split("Status:", 1)[1].strip()
return metadata
def player_play(self) -> None:
"""Воспроизведение."""
self._run_btctl(["menu player", "play"])
def player_pause(self) -> None:
"""Пауза."""
self._run_btctl(["menu player", "pause"])
def player_toggle(self) -> None:
"""Переключить воспроизведение/пауза."""
status = self.get_player_status()
if status == "playing":
self.player_pause()
else:
self.player_play()
def player_next(self) -> None:
"""Следующий трек."""
self._run_btctl(["menu player", "next"])
def player_previous(self) -> None:
"""Предыдущий трек."""
self._run_btctl(["menu player", "previous"])
# === Private methods ===
def _run_cmd(self, args: list[str]) -> str:
"""Выполнить команду bluetoothctl."""
try:
result = subprocess.run(
args,
capture_output=True,
text=True,
timeout=2,
check=False,
)
except (subprocess.SubprocessError, OSError):
self._last_error = "run-failed"
self._log(args, "", "run-failed", 1)
return ""
stdout = result.stdout.strip()
stderr = result.stderr.strip()
if result.returncode != 0 or stderr:
self._last_error = stderr or f"exit={result.returncode}"
self._log(args, stdout, stderr, result.returncode)
return stdout
def _run_btctl(self, commands: list[str]) -> str:
"""Выполнить скрипт в bluetoothctl интерактивном режиме."""
script = "\n".join(commands + ["back", "quit"]) + "\n"
try:
result = subprocess.run(
["bluetoothctl"],
input=script,
capture_output=True,
text=True,
timeout=3,
check=False,
)
except (subprocess.SubprocessError, OSError):
self._last_error = "run-failed"
self._log(["bluetoothctl"], "", "run-failed", 1)
return ""
stdout = result.stdout.strip()
stderr = result.stderr.strip()
if result.returncode != 0 or stderr:
self._last_error = stderr or f"exit={result.returncode}"
self._log(["bluetoothctl"], stdout, stderr, result.returncode)
return stdout
def _run_btctl_script(self, commands: list[str]) -> str:
"""Выполнить скрипт в bluetoothctl (без back в конце)."""
script = "\n".join(commands + ["quit"]) + "\n"
try:
result = subprocess.run(
["bluetoothctl"],
input=script,
capture_output=True,
text=True,
timeout=4,
check=False,
)
except (subprocess.SubprocessError, OSError):
self._last_error = "run-failed"
self._log(["bluetoothctl", "(script)"], "", "run-failed", 1)
return ""
stdout = result.stdout.strip()
stderr = result.stderr.strip()
if result.returncode != 0 or stderr:
self._last_error = stderr or f"exit={result.returncode}"
self._log(["bluetoothctl", "(script)"], stdout, stderr, result.returncode)
return stdout
def _parse_hex_value(self, line: str) -> int | None:
"""Распарсить hex значение из строки bluetoothctl."""
start = line.find("0x")
if start == -1:
return None
hex_part = line[start:].split()[0]
try:
return int(hex_part, 16)
except ValueError:
return None
def _strip_ansi(self, text: str) -> str:
"""Удалить ANSI escape-последовательности."""
return re.sub(r"\x1b\[[0-9;]*m", "", text)
def _log(self, args: list[str], stdout: str, stderr: str, code: int):
"""Записать лог операции."""
try:
self._log_path.parent.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
line = (
f"[{ts}] cmd={' '.join(args)} code={code} "
f"stdout={stdout!r} stderr={stderr!r}\n"
)
with open(self._log_path, "a", encoding="utf-8") as f:
f.write(line)
except OSError:
pass
@property
def last_error(self) -> str:
"""Получить последнюю ошибку."""
return self._last_error