car_ui/services/ir_remote_service.py
2026-04-01 05:28:03 +03:00

298 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import os
import subprocess
import re
import threading
import logging
import time
from pathlib import Path
from typing import Callable
from PySide6.QtCore import QObject, Signal
import build_info
logger = logging.getLogger(__name__)
# Типы событий из Linux input.h
EV_MSC = 0x04
EV_KEY = 0x01
EV_SYN = 0x00
MSC_SCAN = 0x04
KEY_UP = 0
KEY_DOWN = 1
class IrRemoteEvent:
"""Событие ИК-пульта."""
def __init__(self, scancode: str, is_repeat: bool, is_hold: bool):
self.scancode = scancode
self.is_repeat = is_repeat
self.is_hold = is_hold
class IrRemoteService(QObject):
"""
Сервис для обработки событий ИК-пульта.
Поддерживает:
- Одиночные нажатия (клики)
- Зажатие кнопок (после N повторений)
"""
# Сигналы для кнопок
volume_up_clicked = Signal()
volume_down_clicked = Signal()
play_pause_clicked = Signal()
next_track_clicked = Signal()
prev_track_clicked = Signal()
# Сигналы для зажатий
volume_up_hold = Signal()
volume_down_hold = Signal()
next_track_hold = Signal() # Перемотка вперед
prev_track_hold = Signal() # Перемотка назад
def __init__(self, parent: QObject = None):
super().__init__(parent)
self._device_path = self._find_ir_device()
self._running = False
self._thread: threading.Thread | None = None
self._repeat_counts: dict[str, int] = {}
self._hold_triggered: dict[str, bool] = {}
# Маппинг скан-кодов на действия
self._scancode_to_action: dict[str, str] = {
v: k for k, v in build_info.IR_REMOTE_SCANCODES.items()
}
def _find_ir_device(self) -> str | None:
"""Найти устройство ИК-пульта в /dev/input."""
# Сначала пробуем lirc устройства
for path in Path("/dev").glob("lirc*"):
logger.info(f"Found lirc device: {path}")
return str(path)
# Затем ищем event устройства с ir/rc в имени
for path in Path("/dev/input").glob("event*"):
try:
# Проверяем, поддерживает ли устройство MSC_SCAN
with open(path, "rb") as f:
# Читаем имя устройства
EVIOCGNAME = 0x80000000 | (16 << 8) | ord('E') << 24 | 6
import fcntl
name = fcntl.ioctl(f.fileno(), EVIOCGNAME, b'\x00' * 256)
name_str = name.decode('utf-8', errors='ignore').lower()
if "ir" in name_str or "rc" in name_str:
logger.info(f"Found IR device by name: {path} ({name_str})")
return str(path)
except (OSError, IOError):
continue
# Если не нашли по имени, пробуем все event устройства
for path in sorted(Path("/dev/input").glob("event*")):
logger.info(f"Trying event device: {path}")
return str(path)
logger.warning("No IR device found")
return None
def start(self) -> bool:
"""Запустить обработку событий ИК-пульта."""
if not self._device_path:
logger.warning("IR remote device not found")
return False
if self._running:
return True
logger.info(f"Starting IR remote service, device: {self._device_path}")
self._running = True
self._thread = threading.Thread(target=self._event_loop, daemon=True)
self._thread.start()
return True
def stop(self) -> None:
"""Остановить обработку событий."""
self._running = False
if self._thread:
self._thread.join(timeout=1.0)
self._thread = None
def _event_loop(self) -> None:
"""Цикл обработки событий через ir-keytable."""
logger.info(f"Starting ir-keytable event loop")
last_scancode = None
last_time = 0
min_delay = 0.3 # Минимальная задержка между нажатиями (сек)
key_held = False # Флаг: кнопка зажата
while self._running:
try:
# Запускаем ir-keytable -t для чтения событий
# Пробуем без sudo, если не работает - добавь правило в sudoers
proc = subprocess.Popen(
["ir-keytable", "-t"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1
)
logger.info("ir-keytable started")
while self._running and proc.poll() is None:
line = proc.stdout.readline()
if not line:
break
line = line.strip()
# Игнорируем EV_SYN события (это просто разделители)
if "EV_SYN" in line:
continue
logger.debug(f"ir-keytable output: {line}")
# Проверяем отпускание кнопки (EV_KEY с value=0)
if "EV_KEY" in line and "value=0" in line:
logger.debug("Key release detected")
key_held = False
last_scancode = None
self.reset_button_state()
continue
# Парсим строку вида:
# 23567.604034: lirc protocol(nec): scancode = 0x19
# 23567.604034: event type EV_MSC(0x04): scancode = 0x19
match = re.search(r'scancode\s*=\s*(0x[0-9a-fA-F]+)', line)
if match:
current_time = time.time()
scancode = match.group(1).lower()
# Проверяем есть ли "repeat" в строке
is_repeat = "repeat" in line.lower()
# Если пришла другая кнопка — сбрасываем состояние предыдущей
if last_scancode and scancode != last_scancode and not is_repeat:
logger.debug(f"New button pressed, resetting previous: {last_scancode} -> {scancode}")
key_held = False
self._repeat_counts.pop(last_scancode, None)
self._hold_triggered.pop(last_scancode, None)
# Если кнопка была отпущена и пришло новое нажатие
if not key_held and not is_repeat:
key_held = True
self._repeat_counts[scancode] = 1
logger.debug(f"New key press: {scancode}")
# Если кнопка зажата и пришло repeat
elif key_held and is_repeat:
self._repeat_counts[scancode] = self._repeat_counts.get(scancode, 1) + 1
logger.debug(f"Key repeat: {scancode}, count={self._repeat_counts[scancode]}")
# Игнорируем дубли
else:
logger.debug(f"Skipping: key_held={key_held}, is_repeat={is_repeat}")
continue
# Проверяем задержку между разными нажатиями
if scancode != last_scancode and (current_time - last_time) < min_delay:
logger.debug(f"Skipping too fast: {current_time - last_time:.3f}s < {min_delay}s")
continue
logger.debug(f"Processing scancode: {scancode}, is_repeat: {is_repeat}")
self._handle_scancode(scancode, is_repeat)
last_scancode = scancode
last_time = current_time
proc.terminate()
proc.wait(timeout=1)
except Exception as e:
logger.warning(f"ir-keytable error: {e}, retrying in 1s...")
time.sleep(1.0)
def _handle_scancode(self, scancode: str, is_repeat: bool = False) -> None:
"""Обработать скан-код кнопки."""
action = self._scancode_to_action.get(scancode)
if not action:
logger.debug(f"Unknown scancode: {scancode}")
return
repeat_count = self._repeat_counts.get(scancode, 1)
hold_triggered = self._hold_triggered.get(scancode, False)
is_hold = repeat_count >= build_info.IR_REPEAT_COUNT_FOR_HOLD
logger.debug(f"Scancode: {scancode}, action: {action}, repeat_count={repeat_count}, is_hold={is_hold}")
# Отправляем сигналы
if action == "play_pause":
# Play/Pause - только клик (первое нажатие)
if not is_repeat:
logger.info("Emitting play_pause_clicked")
self.play_pause_clicked.emit()
elif action in ("volume_up", "volume_down"):
# Громкость - клик + зажатие
if not is_repeat:
if action == "volume_up":
logger.info("Emitting volume_up_clicked")
self.volume_up_clicked.emit()
else:
logger.info("Emitting volume_down_clicked")
self.volume_down_clicked.emit()
if is_hold and not hold_triggered:
self._hold_triggered[scancode] = True
if action == "volume_up":
logger.info("Emitting volume_up_hold")
self.volume_up_hold.emit()
else:
logger.info("Emitting volume_down_hold")
self.volume_down_hold.emit()
elif action in ("next_track", "prev_track"):
# Треки - клик + зажатие (перемотка)
if not is_repeat:
if action == "next_track":
logger.info("Emitting next_track_clicked")
self.next_track_clicked.emit()
else:
logger.info("Emitting prev_track_clicked")
self.prev_track_clicked.emit()
if is_hold and not hold_triggered:
self._hold_triggered[scancode] = True
if action == "next_track":
logger.info("Emitting next_track_hold")
self.next_track_hold.emit()
else:
logger.info("Emitting prev_track_hold")
self.prev_track_hold.emit()
def reset_button_state(self, scancode: str | None = None) -> None:
"""
Сбросить состояние кнопки (вызывать когда кнопка отпущена).
Если scancode не указан, сбрасывает все кнопки.
"""
if scancode:
self._repeat_counts.pop(scancode, None)
self._hold_triggered.pop(scancode, None)
else:
self._repeat_counts.clear()
self._hold_triggered.clear()
@property
def device_path(self) -> str | None:
"""Путь к устройству ИК-пульта."""
return self._device_path
@property
def is_running(self) -> bool:
"""Сервис запущен."""
return self._running