from __future__ import annotations import os import subprocess import re import threading import logging import time from pathlib import Path from typing import Callable from PySide6.QtCore import QObject, Signal import build_info logger = logging.getLogger(__name__) # Типы событий из Linux input.h EV_MSC = 0x04 EV_KEY = 0x01 EV_SYN = 0x00 MSC_SCAN = 0x04 KEY_UP = 0 KEY_DOWN = 1 class IrRemoteEvent: """Событие ИК-пульта.""" def __init__(self, scancode: str, is_repeat: bool, is_hold: bool): self.scancode = scancode self.is_repeat = is_repeat self.is_hold = is_hold class IrRemoteService(QObject): """ Сервис для обработки событий ИК-пульта. Поддерживает: - Одиночные нажатия (клики) - Зажатие кнопок (после N повторений) """ # Сигналы для кнопок volume_up_clicked = Signal() volume_down_clicked = Signal() play_pause_clicked = Signal() next_track_clicked = Signal() prev_track_clicked = Signal() # Сигналы для зажатий volume_up_hold = Signal() volume_down_hold = Signal() next_track_hold = Signal() # Перемотка вперед prev_track_hold = Signal() # Перемотка назад def __init__(self, parent: QObject = None): super().__init__(parent) self._device_path = self._find_ir_device() self._running = False self._thread: threading.Thread | None = None self._repeat_counts: dict[str, int] = {} self._hold_triggered: dict[str, bool] = {} # Маппинг скан-кодов на действия self._scancode_to_action: dict[str, str] = { v: k for k, v in build_info.IR_REMOTE_SCANCODES.items() } def _find_ir_device(self) -> str | None: """Найти устройство ИК-пульта в /dev/input.""" # Сначала пробуем lirc устройства for path in Path("/dev").glob("lirc*"): logger.info(f"Found lirc device: {path}") return str(path) # Затем ищем event устройства с ir/rc в имени for path in Path("/dev/input").glob("event*"): try: # Проверяем, поддерживает ли устройство MSC_SCAN with open(path, "rb") as f: # Читаем имя устройства EVIOCGNAME = 0x80000000 | (16 << 8) | ord('E') << 24 | 6 import fcntl name = fcntl.ioctl(f.fileno(), EVIOCGNAME, b'\x00' * 256) name_str = name.decode('utf-8', errors='ignore').lower() if "ir" in name_str or "rc" in name_str: logger.info(f"Found IR device by name: {path} ({name_str})") return str(path) except (OSError, IOError): continue # Если не нашли по имени, пробуем все event устройства for path in sorted(Path("/dev/input").glob("event*")): logger.info(f"Trying event device: {path}") return str(path) logger.warning("No IR device found") return None def start(self) -> bool: """Запустить обработку событий ИК-пульта.""" if not self._device_path: logger.warning("IR remote device not found") return False if self._running: return True logger.info(f"Starting IR remote service, device: {self._device_path}") self._running = True self._thread = threading.Thread(target=self._event_loop, daemon=True) self._thread.start() return True def stop(self) -> None: """Остановить обработку событий.""" self._running = False if self._thread: self._thread.join(timeout=1.0) self._thread = None def _event_loop(self) -> None: """Цикл обработки событий через ir-keytable.""" logger.info(f"Starting ir-keytable event loop") last_scancode = None last_time = 0 min_delay = 0.3 # Минимальная задержка между нажатиями (сек) key_held = False # Флаг: кнопка зажата while self._running: try: # Запускаем ir-keytable -t для чтения событий # Пробуем без sudo, если не работает - добавь правило в sudoers proc = subprocess.Popen( ["ir-keytable", "-t"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1 ) logger.info("ir-keytable started") while self._running and proc.poll() is None: line = proc.stdout.readline() if not line: break line = line.strip() # Игнорируем EV_SYN события (это просто разделители) if "EV_SYN" in line: continue logger.debug(f"ir-keytable output: {line}") # Проверяем отпускание кнопки (EV_KEY с value=0) if "EV_KEY" in line and "value=0" in line: logger.debug("Key release detected") key_held = False last_scancode = None self.reset_button_state() continue # Парсим строку вида: # 23567.604034: lirc protocol(nec): scancode = 0x19 # 23567.604034: event type EV_MSC(0x04): scancode = 0x19 match = re.search(r'scancode\s*=\s*(0x[0-9a-fA-F]+)', line) if match: current_time = time.time() scancode = match.group(1).lower() # Проверяем есть ли "repeat" в строке is_repeat = "repeat" in line.lower() # Если пришла другая кнопка — сбрасываем состояние предыдущей if last_scancode and scancode != last_scancode and not is_repeat: logger.debug(f"New button pressed, resetting previous: {last_scancode} -> {scancode}") key_held = False self._repeat_counts.pop(last_scancode, None) self._hold_triggered.pop(last_scancode, None) # Если кнопка была отпущена и пришло новое нажатие if not key_held and not is_repeat: key_held = True self._repeat_counts[scancode] = 1 logger.debug(f"New key press: {scancode}") # Если кнопка зажата и пришло repeat elif key_held and is_repeat: self._repeat_counts[scancode] = self._repeat_counts.get(scancode, 1) + 1 logger.debug(f"Key repeat: {scancode}, count={self._repeat_counts[scancode]}") # Игнорируем дубли else: logger.debug(f"Skipping: key_held={key_held}, is_repeat={is_repeat}") continue # Проверяем задержку между разными нажатиями if scancode != last_scancode and (current_time - last_time) < min_delay: logger.debug(f"Skipping too fast: {current_time - last_time:.3f}s < {min_delay}s") continue logger.debug(f"Processing scancode: {scancode}, is_repeat: {is_repeat}") self._handle_scancode(scancode, is_repeat) last_scancode = scancode last_time = current_time proc.terminate() proc.wait(timeout=1) except Exception as e: logger.warning(f"ir-keytable error: {e}, retrying in 1s...") time.sleep(1.0) def _handle_scancode(self, scancode: str, is_repeat: bool = False) -> None: """Обработать скан-код кнопки.""" action = self._scancode_to_action.get(scancode) if not action: logger.debug(f"Unknown scancode: {scancode}") return repeat_count = self._repeat_counts.get(scancode, 1) hold_triggered = self._hold_triggered.get(scancode, False) is_hold = repeat_count >= build_info.IR_REPEAT_COUNT_FOR_HOLD logger.debug(f"Scancode: {scancode}, action: {action}, repeat_count={repeat_count}, is_hold={is_hold}") # Отправляем сигналы if action == "play_pause": # Play/Pause - только клик (первое нажатие) if not is_repeat: logger.info("Emitting play_pause_clicked") self.play_pause_clicked.emit() elif action in ("volume_up", "volume_down"): # Громкость - клик + зажатие if not is_repeat: if action == "volume_up": logger.info("Emitting volume_up_clicked") self.volume_up_clicked.emit() else: logger.info("Emitting volume_down_clicked") self.volume_down_clicked.emit() if is_hold and not hold_triggered: self._hold_triggered[scancode] = True if action == "volume_up": logger.info("Emitting volume_up_hold") self.volume_up_hold.emit() else: logger.info("Emitting volume_down_hold") self.volume_down_hold.emit() elif action in ("next_track", "prev_track"): # Треки - клик + зажатие (перемотка) if not is_repeat: if action == "next_track": logger.info("Emitting next_track_clicked") self.next_track_clicked.emit() else: logger.info("Emitting prev_track_clicked") self.prev_track_clicked.emit() if is_hold and not hold_triggered: self._hold_triggered[scancode] = True if action == "next_track": logger.info("Emitting next_track_hold") self.next_track_hold.emit() else: logger.info("Emitting prev_track_hold") self.prev_track_hold.emit() def reset_button_state(self, scancode: str | None = None) -> None: """ Сбросить состояние кнопки (вызывать когда кнопка отпущена). Если scancode не указан, сбрасывает все кнопки. """ if scancode: self._repeat_counts.pop(scancode, None) self._hold_triggered.pop(scancode, None) else: self._repeat_counts.clear() self._hold_triggered.clear() @property def device_path(self) -> str | None: """Путь к устройству ИК-пульта.""" return self._device_path @property def is_running(self) -> bool: """Сервис запущен.""" return self._running