This commit is contained in:
cheykrym 2026-04-01 04:29:47 +03:00
parent de876990a7
commit 7a74991910

View File

@ -23,35 +23,35 @@ KEY_DOWN = 1
class IrRemoteEvent: class IrRemoteEvent:
"""Событие ИК-пульта.""" """Событие ИК-пульта."""
def __init__(self, scancode: str, is_repeat: bool, is_hold: bool): def __init__(self, scancode: str, is_repeat: bool, is_hold: bool):
self.scancode = scancode self.scancode = scancode
self.is_repeat = repeat self.is_repeat = is_repeat
self.is_hold = hold self.is_hold = is_hold
class IrRemoteService(QObject): class IrRemoteService(QObject):
""" """
Сервис для обработки событий ИК-пульта. Сервис для обработки событий ИК-пульта.
Поддерживает: Поддерживает:
- Одиночные нажатия (клики) - Одиночные нажатия (клики)
- Зажатие кнопок (после N повторений) - Зажатие кнопок (после N повторений)
""" """
# Сигналы для кнопок # Сигналы для кнопок
volume_up_clicked = Signal() volume_up_clicked = Signal()
volume_down_clicked = Signal() volume_down_clicked = Signal()
play_pause_clicked = Signal() play_pause_clicked = Signal()
next_track_clicked = Signal() next_track_clicked = Signal()
prev_track_clicked = Signal() prev_track_clicked = Signal()
# Сигналы для зажатий # Сигналы для зажатий
volume_up_hold = Signal() volume_up_hold = Signal()
volume_down_hold = Signal() volume_down_hold = Signal()
next_track_hold = Signal() # Перемотка вперед next_track_hold = Signal() # Перемотка вперед
prev_track_hold = Signal() # Перемотка назад prev_track_hold = Signal() # Перемотка назад
def __init__(self, parent: QObject = None): def __init__(self, parent: QObject = None):
super().__init__(parent) super().__init__(parent)
self._device_path = self._find_ir_device() self._device_path = self._find_ir_device()
@ -59,12 +59,12 @@ class IrRemoteService(QObject):
self._thread: threading.Thread | None = None self._thread: threading.Thread | None = None
self._repeat_counts: dict[str, int] = {} self._repeat_counts: dict[str, int] = {}
self._hold_triggered: dict[str, bool] = {} self._hold_triggered: dict[str, bool] = {}
# Маппинг скан-кодов на действия # Маппинг скан-кодов на действия
self._scancode_to_action: dict[str, str] = { self._scancode_to_action: dict[str, str] = {
v: k for k, v in build_info.IR_REMOTE_SCANCODES.items() v: k for k, v in build_info.IR_REMOTE_SCANCODES.items()
} }
def _find_ir_device(self) -> str | None: def _find_ir_device(self) -> str | None:
"""Найти устройство ИК-пульта в /dev/input.""" """Найти устройство ИК-пульта в /dev/input."""
for path in Path("/dev/input").glob("event*"): for path in Path("/dev/input").glob("event*"):
@ -83,27 +83,27 @@ class IrRemoteService(QObject):
for path in Path("/dev/input").glob("event*"): for path in Path("/dev/input").glob("event*"):
return str(path) return str(path)
return None return None
def start(self) -> bool: def start(self) -> bool:
"""Запустить обработку событий ИК-пульта.""" """Запустить обработку событий ИК-пульта."""
if not self._device_path: if not self._device_path:
return False return False
if self._running: if self._running:
return True return True
self._running = True self._running = True
self._thread = threading.Thread(target=self._event_loop, daemon=True) self._thread = threading.Thread(target=self._event_loop, daemon=True)
self._thread.start() self._thread.start()
return True return True
def stop(self) -> None: def stop(self) -> None:
"""Остановить обработку событий.""" """Остановить обработку событий."""
self._running = False self._running = False
if self._thread: if self._thread:
self._thread.join(timeout=1.0) self._thread.join(timeout=1.0)
self._thread = None self._thread = None
def _event_loop(self) -> None: def _event_loop(self) -> None:
"""Цикл обработки событий.""" """Цикл обработки событий."""
while self._running: while self._running:
@ -114,47 +114,47 @@ class IrRemoteService(QObject):
ready, _, _ = select.select([f], [], [], 0.1) ready, _, _ = select.select([f], [], [], 0.1)
if not ready: if not ready:
continue continue
# Читаем событие (16 байт: tv_sec, tv_usec, type, code, value) # Читаем событие (16 байт: tv_sec, tv_usec, type, code, value)
data = f.read(24) data = f.read(24)
if len(data) < 24: if len(data) < 24:
continue continue
tv_sec, tv_usec, ev_type, ev_code, ev_value = struct.unpack("llHHI", data) tv_sec, tv_usec, ev_type, ev_code, ev_value = struct.unpack("llHHI", data)
# Обрабатываем только MSC_SCAN события # Обрабатываем только MSC_SCAN события
if ev_type == EV_MSC and ev_code == MSC_SCAN: if ev_type == EV_MSC and ev_code == MSC_SCAN:
scancode = f"0x{ev_value:02x}" scancode = f"0x{ev_value:02x}"
self._handle_scancode(scancode) self._handle_scancode(scancode)
except (OSError, IOError): except (OSError, IOError):
# Устройство недоступно, ждём и пробуем снова # Устройство недоступно, ждём и пробуем снова
import time import time
time.sleep(1.0) time.sleep(1.0)
def _handle_scancode(self, scancode: str) -> None: def _handle_scancode(self, scancode: str) -> None:
"""Обработать скан-код кнопки.""" """Обработать скан-код кнопки."""
action = self._scancode_to_action.get(scancode) action = self._scancode_to_action.get(scancode)
if not action: if not action:
return return
# Увеличиваем счётчик повторений # Увеличиваем счётчик повторений
self._repeat_counts[scancode] = self._repeat_counts.get(scancode, 0) + 1 self._repeat_counts[scancode] = self._repeat_counts.get(scancode, 0) + 1
repeat_count = self._repeat_counts[scancode] repeat_count = self._repeat_counts[scancode]
# Проверяем, было ли зажатие уже обработано # Проверяем, было ли зажатие уже обработано
hold_triggered = self._hold_triggered.get(scancode, False) hold_triggered = self._hold_triggered.get(scancode, False)
# Определяем тип события # Определяем тип события
is_repeat = repeat_count > 1 is_repeat = repeat_count > 1
is_hold = repeat_count >= build_info.IR_REPEAT_COUNT_FOR_HOLD is_hold = repeat_count >= build_info.IR_REPEAT_COUNT_FOR_HOLD
# Отправляем сигналы # Отправляем сигналы
if action == "play_pause": if action == "play_pause":
# Play/Pause - только клик (первое нажатие) # Play/Pause - только клик (первое нажатие)
if not is_repeat: if not is_repeat:
self.play_pause_clicked.emit() self.play_pause_clicked.emit()
elif action in ("volume_up", "volume_down"): elif action in ("volume_up", "volume_down"):
# Громкость - клик + зажатие # Громкость - клик + зажатие
if not is_repeat: if not is_repeat:
@ -162,14 +162,14 @@ class IrRemoteService(QObject):
self.volume_up_clicked.emit() self.volume_up_clicked.emit()
else: else:
self.volume_down_clicked.emit() self.volume_down_clicked.emit()
if is_hold and not hold_triggered: if is_hold and not hold_triggered:
self._hold_triggered[scancode] = True self._hold_triggered[scancode] = True
if action == "volume_up": if action == "volume_up":
self.volume_up_hold.emit() self.volume_up_hold.emit()
else: else:
self.volume_down_hold.emit() self.volume_down_hold.emit()
elif action in ("next_track", "prev_track"): elif action in ("next_track", "prev_track"):
# Треки - клик + зажатие (перемотка) # Треки - клик + зажатие (перемотка)
if not is_repeat: if not is_repeat:
@ -177,18 +177,18 @@ class IrRemoteService(QObject):
self.next_track_clicked.emit() self.next_track_clicked.emit()
else: else:
self.prev_track_clicked.emit() self.prev_track_clicked.emit()
if is_hold and not hold_triggered: if is_hold and not hold_triggered:
self._hold_triggered[scancode] = True self._hold_triggered[scancode] = True
if action == "next_track": if action == "next_track":
self.next_track_hold.emit() self.next_track_hold.emit()
else: else:
self.prev_track_hold.emit() self.prev_track_hold.emit()
def reset_button_state(self, scancode: str | None = None) -> None: def reset_button_state(self, scancode: str | None = None) -> None:
""" """
Сбросить состояние кнопки (вызывать когда кнопка отпущена). Сбросить состояние кнопки (вызывать когда кнопка отпущена).
Если scancode не указан, сбрасывает все кнопки. Если scancode не указан, сбрасывает все кнопки.
""" """
if scancode: if scancode:
@ -197,12 +197,12 @@ class IrRemoteService(QObject):
else: else:
self._repeat_counts.clear() self._repeat_counts.clear()
self._hold_triggered.clear() self._hold_triggered.clear()
@property @property
def device_path(self) -> str | None: def device_path(self) -> str | None:
"""Путь к устройству ИК-пульта.""" """Путь к устройству ИК-пульта."""
return self._device_path return self._device_path
@property @property
def is_running(self) -> bool: def is_running(self) -> bool:
"""Сервис запущен.""" """Сервис запущен."""