From de876990a7745464836be6fa39c2e0f1427807b2 Mon Sep 17 00:00:00 2001 From: cheykrym Date: Wed, 1 Apr 2026 04:28:16 +0300 Subject: [PATCH] add ir --- build_info.py | 16 +++ services/__init__.py | 3 +- services/ir_remote_service.py | 209 ++++++++++++++++++++++++++++++++++ 3 files changed, 227 insertions(+), 1 deletion(-) create mode 100644 services/ir_remote_service.py diff --git a/build_info.py b/build_info.py index b4e870e..0809b3a 100644 --- a/build_info.py +++ b/build_info.py @@ -15,6 +15,22 @@ DEFAULT_DUCKING_VOLUME = 35 DEV_MODE_ENABLE = (Path(__file__).resolve().parent / "dev_mode_enable").exists() BLUETOOTH_MAX_PAIRED_DEVICES = 4 +# Конфигурация ИК-пульта +# Скан-коды кнопок (получить через ir-keytable -t) +IR_REMOTE_SCANCODES = { + "volume_up": "0x15", # Громкость + + "volume_down": "0x07", # Громкость - + "play_pause": "0x43", # Play/Pause + "next_track": "0x40", # Следующий трек + "prev_track": "0x44", # Предыдущий трек +} + +# Количество повторений для считания зажатием +IR_REPEAT_COUNT_FOR_HOLD = 4 + +# Задержка между повторениями (мс) +IR_REPEAT_DELAY_MS = 100 + def get_device_model() -> str: for path in ("/proc/device-tree/model", "/sys/firmware/devicetree/base/model"): diff --git a/services/__init__.py b/services/__init__.py index d04f61a..d911c4d 100644 --- a/services/__init__.py +++ b/services/__init__.py @@ -1,3 +1,4 @@ from .bluetooth_service import BluetoothService, BluetoothDevice +from .ir_remote_service import IrRemoteService -__all__ = ["BluetoothService", "BluetoothDevice"] +__all__ = ["BluetoothService", "BluetoothDevice", "IrRemoteService"] diff --git a/services/ir_remote_service.py b/services/ir_remote_service.py new file mode 100644 index 0000000..0869ba6 --- /dev/null +++ b/services/ir_remote_service.py @@ -0,0 +1,209 @@ +from __future__ import annotations + +import os +import select +import struct +import threading +from pathlib import Path +from typing import Callable + +from PySide6.QtCore import QObject, Signal + +import build_info + + +# Типы событий из Linux input.h +EV_MSC = 0x04 +EV_KEY = 0x01 +EV_SYN = 0x00 +MSC_SCAN = 0x04 +KEY_UP = 0 +KEY_DOWN = 1 + + +class IrRemoteEvent: + """Событие ИК-пульта.""" + + def __init__(self, scancode: str, is_repeat: bool, is_hold: bool): + self.scancode = scancode + self.is_repeat = repeat + self.is_hold = hold + + +class IrRemoteService(QObject): + """ + Сервис для обработки событий ИК-пульта. + + Поддерживает: + - Одиночные нажатия (клики) + - Зажатие кнопок (после N повторений) + """ + + # Сигналы для кнопок + volume_up_clicked = Signal() + volume_down_clicked = Signal() + play_pause_clicked = Signal() + next_track_clicked = Signal() + prev_track_clicked = Signal() + + # Сигналы для зажатий + volume_up_hold = Signal() + volume_down_hold = Signal() + next_track_hold = Signal() # Перемотка вперед + prev_track_hold = Signal() # Перемотка назад + + def __init__(self, parent: QObject = None): + super().__init__(parent) + self._device_path = self._find_ir_device() + self._running = False + self._thread: threading.Thread | None = None + self._repeat_counts: dict[str, int] = {} + self._hold_triggered: dict[str, bool] = {} + + # Маппинг скан-кодов на действия + self._scancode_to_action: dict[str, str] = { + v: k for k, v in build_info.IR_REMOTE_SCANCODES.items() + } + + def _find_ir_device(self) -> str | None: + """Найти устройство ИК-пульта в /dev/input.""" + for path in Path("/dev/input").glob("event*"): + try: + # Проверяем, поддерживает ли устройство MSC_SCAN + with open(path, "rb") as f: + # Читаем имя устройства + EVIOCGNAME = 0x80000000 | (16 << 8) | ord('E') << 24 | 6 + import fcntl + name = fcntl.ioctl(f.fileno(), EVIOCGNAME, b'\x00' * 256) + if b"ir" in name.lower() or b"rc" in name.lower(): + return str(path) + except (OSError, IOError): + continue + # Если не нашли по имени, пробуем первое event + for path in Path("/dev/input").glob("event*"): + return str(path) + return None + + def start(self) -> bool: + """Запустить обработку событий ИК-пульта.""" + if not self._device_path: + return False + + if self._running: + return True + + self._running = True + self._thread = threading.Thread(target=self._event_loop, daemon=True) + self._thread.start() + return True + + def stop(self) -> None: + """Остановить обработку событий.""" + self._running = False + if self._thread: + self._thread.join(timeout=1.0) + self._thread = None + + def _event_loop(self) -> None: + """Цикл обработки событий.""" + while self._running: + try: + with open(self._device_path, "rb") as f: + while self._running: + # Используем select для неблокирующего чтения + ready, _, _ = select.select([f], [], [], 0.1) + if not ready: + continue + + # Читаем событие (16 байт: tv_sec, tv_usec, type, code, value) + data = f.read(24) + if len(data) < 24: + continue + + tv_sec, tv_usec, ev_type, ev_code, ev_value = struct.unpack("llHHI", data) + + # Обрабатываем только MSC_SCAN события + if ev_type == EV_MSC and ev_code == MSC_SCAN: + scancode = f"0x{ev_value:02x}" + self._handle_scancode(scancode) + + except (OSError, IOError): + # Устройство недоступно, ждём и пробуем снова + import time + time.sleep(1.0) + + def _handle_scancode(self, scancode: str) -> None: + """Обработать скан-код кнопки.""" + action = self._scancode_to_action.get(scancode) + if not action: + return + + # Увеличиваем счётчик повторений + self._repeat_counts[scancode] = self._repeat_counts.get(scancode, 0) + 1 + repeat_count = self._repeat_counts[scancode] + + # Проверяем, было ли зажатие уже обработано + hold_triggered = self._hold_triggered.get(scancode, False) + + # Определяем тип события + is_repeat = repeat_count > 1 + is_hold = repeat_count >= build_info.IR_REPEAT_COUNT_FOR_HOLD + + # Отправляем сигналы + if action == "play_pause": + # Play/Pause - только клик (первое нажатие) + if not is_repeat: + self.play_pause_clicked.emit() + + elif action in ("volume_up", "volume_down"): + # Громкость - клик + зажатие + if not is_repeat: + if action == "volume_up": + self.volume_up_clicked.emit() + else: + self.volume_down_clicked.emit() + + if is_hold and not hold_triggered: + self._hold_triggered[scancode] = True + if action == "volume_up": + self.volume_up_hold.emit() + else: + self.volume_down_hold.emit() + + elif action in ("next_track", "prev_track"): + # Треки - клик + зажатие (перемотка) + if not is_repeat: + if action == "next_track": + self.next_track_clicked.emit() + else: + self.prev_track_clicked.emit() + + if is_hold and not hold_triggered: + self._hold_triggered[scancode] = True + if action == "next_track": + self.next_track_hold.emit() + else: + self.prev_track_hold.emit() + + def reset_button_state(self, scancode: str | None = None) -> None: + """ + Сбросить состояние кнопки (вызывать когда кнопка отпущена). + + Если scancode не указан, сбрасывает все кнопки. + """ + if scancode: + self._repeat_counts.pop(scancode, None) + self._hold_triggered.pop(scancode, None) + else: + self._repeat_counts.clear() + self._hold_triggered.clear() + + @property + def device_path(self) -> str | None: + """Путь к устройству ИК-пульта.""" + return self._device_path + + @property + def is_running(self) -> bool: + """Сервис запущен.""" + return self._running