from __future__ import annotations import os import select import struct import threading import logging from pathlib import Path from typing import Callable from PySide6.QtCore import QObject, Signal import build_info logger = logging.getLogger(__name__) # Типы событий из Linux input.h EV_MSC = 0x04 EV_KEY = 0x01 EV_SYN = 0x00 MSC_SCAN = 0x04 KEY_UP = 0 KEY_DOWN = 1 class IrRemoteEvent: """Событие ИК-пульта.""" def __init__(self, scancode: str, is_repeat: bool, is_hold: bool): self.scancode = scancode self.is_repeat = is_repeat self.is_hold = is_hold class IrRemoteService(QObject): """ Сервис для обработки событий ИК-пульта. Поддерживает: - Одиночные нажатия (клики) - Зажатие кнопок (после N повторений) """ # Сигналы для кнопок volume_up_clicked = Signal() volume_down_clicked = Signal() play_pause_clicked = Signal() next_track_clicked = Signal() prev_track_clicked = Signal() # Сигналы для зажатий volume_up_hold = Signal() volume_down_hold = Signal() next_track_hold = Signal() # Перемотка вперед prev_track_hold = Signal() # Перемотка назад def __init__(self, parent: QObject = None): super().__init__(parent) self._device_path = self._find_ir_device() self._running = False self._thread: threading.Thread | None = None self._repeat_counts: dict[str, int] = {} self._hold_triggered: dict[str, bool] = {} # Маппинг скан-кодов на действия self._scancode_to_action: dict[str, str] = { v: k for k, v in build_info.IR_REMOTE_SCANCODES.items() } def _find_ir_device(self) -> str | None: """Найти устройство ИК-пульта в /dev/input.""" for path in Path("/dev/input").glob("event*"): try: # Проверяем, поддерживает ли устройство MSC_SCAN with open(path, "rb") as f: # Читаем имя устройства EVIOCGNAME = 0x80000000 | (16 << 8) | ord('E') << 24 | 6 import fcntl name = fcntl.ioctl(f.fileno(), EVIOCGNAME, b'\x00' * 256) if b"ir" in name.lower() or b"rc" in name.lower(): return str(path) except (OSError, IOError): continue # Если не нашли по имени, пробуем первое event for path in Path("/dev/input").glob("event*"): return str(path) return None def start(self) -> bool: """Запустить обработку событий ИК-пульта.""" if not self._device_path: logger.warning("IR remote device not found") return False if self._running: return True logger.info(f"Starting IR remote service, device: {self._device_path}") self._running = True self._thread = threading.Thread(target=self._event_loop, daemon=True) self._thread.start() return True def stop(self) -> None: """Остановить обработку событий.""" self._running = False if self._thread: self._thread.join(timeout=1.0) self._thread = None def _event_loop(self) -> None: """Цикл обработки событий.""" while self._running: try: with open(self._device_path, "rb") as f: while self._running: # Используем select для неблокирующего чтения ready, _, _ = select.select([f], [], [], 0.1) if not ready: continue # Читаем событие (24 байта: tv_sec, tv_usec, type, code, value) data = f.read(24) if len(data) < 24: continue tv_sec, tv_usec, ev_type, ev_code, ev_value = struct.unpack("llHHI", data) # Обрабатываем MSC_SCAN события (скан-код) if ev_type == EV_MSC and ev_code == MSC_SCAN: scancode = f"0x{ev_value:02x}" self._handle_scancode(scancode) # Обрабатываем EV_KEY события (отпускание кнопки) elif ev_type == EV_KEY and ev_value == KEY_UP: # Кнопка отпущена - сбрасываем состояние self.reset_button_state() except (OSError, IOError): # Устройство недоступно, ждём и пробуем снова import time time.sleep(1.0) def _handle_scancode(self, scancode: str) -> None: """Обработать скан-код кнопки.""" action = self._scancode_to_action.get(scancode) if not action: logger.debug(f"Unknown scancode: {scancode}") return logger.debug(f"Scancode: {scancode}, action: {action}") # Увеличиваем счётчик повторений self._repeat_counts[scancode] = self._repeat_counts.get(scancode, 0) + 1 repeat_count = self._repeat_counts[scancode] # Проверяем, было ли зажатие уже обработано hold_triggered = self._hold_triggered.get(scancode, False) # Определяем тип события is_repeat = repeat_count > 1 is_hold = repeat_count >= build_info.IR_REPEAT_COUNT_FOR_HOLD logger.debug(f" repeat_count={repeat_count}, is_repeat={is_repeat}, is_hold={is_hold}") # Отправляем сигналы if action == "play_pause": # Play/Pause - только клик (первое нажатие) if not is_repeat: logger.info("Emitting play_pause_clicked") self.play_pause_clicked.emit() elif action in ("volume_up", "volume_down"): # Громкость - клик + зажатие if not is_repeat: if action == "volume_up": logger.info("Emitting volume_up_clicked") self.volume_up_clicked.emit() else: logger.info("Emitting volume_down_clicked") self.volume_down_clicked.emit() if is_hold and not hold_triggered: self._hold_triggered[scancode] = True if action == "volume_up": logger.info("Emitting volume_up_hold") self.volume_up_hold.emit() else: logger.info("Emitting volume_down_hold") self.volume_down_hold.emit() elif action in ("next_track", "prev_track"): # Треки - клик + зажатие (перемотка) if not is_repeat: if action == "next_track": logger.info("Emitting next_track_clicked") self.next_track_clicked.emit() else: logger.info("Emitting prev_track_clicked") self.prev_track_clicked.emit() if is_hold and not hold_triggered: self._hold_triggered[scancode] = True if action == "next_track": logger.info("Emitting next_track_hold") self.next_track_hold.emit() else: logger.info("Emitting prev_track_hold") self.prev_track_hold.emit() def reset_button_state(self, scancode: str | None = None) -> None: """ Сбросить состояние кнопки (вызывать когда кнопка отпущена). Если scancode не указан, сбрасывает все кнопки. """ if scancode: self._repeat_counts.pop(scancode, None) self._hold_triggered.pop(scancode, None) else: self._repeat_counts.clear() self._hold_triggered.clear() @property def device_path(self) -> str | None: """Путь к устройству ИК-пульта.""" return self._device_path @property def is_running(self) -> bool: """Сервис запущен.""" return self._running