add ir
This commit is contained in:
parent
34cb65bb8f
commit
de876990a7
@ -15,6 +15,22 @@ DEFAULT_DUCKING_VOLUME = 35
|
||||
DEV_MODE_ENABLE = (Path(__file__).resolve().parent / "dev_mode_enable").exists()
|
||||
BLUETOOTH_MAX_PAIRED_DEVICES = 4
|
||||
|
||||
# Конфигурация ИК-пульта
|
||||
# Скан-коды кнопок (получить через ir-keytable -t)
|
||||
IR_REMOTE_SCANCODES = {
|
||||
"volume_up": "0x15", # Громкость +
|
||||
"volume_down": "0x07", # Громкость -
|
||||
"play_pause": "0x43", # Play/Pause
|
||||
"next_track": "0x40", # Следующий трек
|
||||
"prev_track": "0x44", # Предыдущий трек
|
||||
}
|
||||
|
||||
# Количество повторений для считания зажатием
|
||||
IR_REPEAT_COUNT_FOR_HOLD = 4
|
||||
|
||||
# Задержка между повторениями (мс)
|
||||
IR_REPEAT_DELAY_MS = 100
|
||||
|
||||
|
||||
def get_device_model() -> str:
|
||||
for path in ("/proc/device-tree/model", "/sys/firmware/devicetree/base/model"):
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
from .bluetooth_service import BluetoothService, BluetoothDevice
|
||||
from .ir_remote_service import IrRemoteService
|
||||
|
||||
__all__ = ["BluetoothService", "BluetoothDevice"]
|
||||
__all__ = ["BluetoothService", "BluetoothDevice", "IrRemoteService"]
|
||||
|
||||
209
services/ir_remote_service.py
Normal file
209
services/ir_remote_service.py
Normal file
@ -0,0 +1,209 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import select
|
||||
import struct
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from typing import Callable
|
||||
|
||||
from PySide6.QtCore import QObject, Signal
|
||||
|
||||
import build_info
|
||||
|
||||
|
||||
# Типы событий из Linux input.h
|
||||
EV_MSC = 0x04
|
||||
EV_KEY = 0x01
|
||||
EV_SYN = 0x00
|
||||
MSC_SCAN = 0x04
|
||||
KEY_UP = 0
|
||||
KEY_DOWN = 1
|
||||
|
||||
|
||||
class IrRemoteEvent:
|
||||
"""Событие ИК-пульта."""
|
||||
|
||||
def __init__(self, scancode: str, is_repeat: bool, is_hold: bool):
|
||||
self.scancode = scancode
|
||||
self.is_repeat = repeat
|
||||
self.is_hold = hold
|
||||
|
||||
|
||||
class IrRemoteService(QObject):
|
||||
"""
|
||||
Сервис для обработки событий ИК-пульта.
|
||||
|
||||
Поддерживает:
|
||||
- Одиночные нажатия (клики)
|
||||
- Зажатие кнопок (после N повторений)
|
||||
"""
|
||||
|
||||
# Сигналы для кнопок
|
||||
volume_up_clicked = Signal()
|
||||
volume_down_clicked = Signal()
|
||||
play_pause_clicked = Signal()
|
||||
next_track_clicked = Signal()
|
||||
prev_track_clicked = Signal()
|
||||
|
||||
# Сигналы для зажатий
|
||||
volume_up_hold = Signal()
|
||||
volume_down_hold = Signal()
|
||||
next_track_hold = Signal() # Перемотка вперед
|
||||
prev_track_hold = Signal() # Перемотка назад
|
||||
|
||||
def __init__(self, parent: QObject = None):
|
||||
super().__init__(parent)
|
||||
self._device_path = self._find_ir_device()
|
||||
self._running = False
|
||||
self._thread: threading.Thread | None = None
|
||||
self._repeat_counts: dict[str, int] = {}
|
||||
self._hold_triggered: dict[str, bool] = {}
|
||||
|
||||
# Маппинг скан-кодов на действия
|
||||
self._scancode_to_action: dict[str, str] = {
|
||||
v: k for k, v in build_info.IR_REMOTE_SCANCODES.items()
|
||||
}
|
||||
|
||||
def _find_ir_device(self) -> str | None:
|
||||
"""Найти устройство ИК-пульта в /dev/input."""
|
||||
for path in Path("/dev/input").glob("event*"):
|
||||
try:
|
||||
# Проверяем, поддерживает ли устройство MSC_SCAN
|
||||
with open(path, "rb") as f:
|
||||
# Читаем имя устройства
|
||||
EVIOCGNAME = 0x80000000 | (16 << 8) | ord('E') << 24 | 6
|
||||
import fcntl
|
||||
name = fcntl.ioctl(f.fileno(), EVIOCGNAME, b'\x00' * 256)
|
||||
if b"ir" in name.lower() or b"rc" in name.lower():
|
||||
return str(path)
|
||||
except (OSError, IOError):
|
||||
continue
|
||||
# Если не нашли по имени, пробуем первое event
|
||||
for path in Path("/dev/input").glob("event*"):
|
||||
return str(path)
|
||||
return None
|
||||
|
||||
def start(self) -> bool:
|
||||
"""Запустить обработку событий ИК-пульта."""
|
||||
if not self._device_path:
|
||||
return False
|
||||
|
||||
if self._running:
|
||||
return True
|
||||
|
||||
self._running = True
|
||||
self._thread = threading.Thread(target=self._event_loop, daemon=True)
|
||||
self._thread.start()
|
||||
return True
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Остановить обработку событий."""
|
||||
self._running = False
|
||||
if self._thread:
|
||||
self._thread.join(timeout=1.0)
|
||||
self._thread = None
|
||||
|
||||
def _event_loop(self) -> None:
|
||||
"""Цикл обработки событий."""
|
||||
while self._running:
|
||||
try:
|
||||
with open(self._device_path, "rb") as f:
|
||||
while self._running:
|
||||
# Используем select для неблокирующего чтения
|
||||
ready, _, _ = select.select([f], [], [], 0.1)
|
||||
if not ready:
|
||||
continue
|
||||
|
||||
# Читаем событие (16 байт: tv_sec, tv_usec, type, code, value)
|
||||
data = f.read(24)
|
||||
if len(data) < 24:
|
||||
continue
|
||||
|
||||
tv_sec, tv_usec, ev_type, ev_code, ev_value = struct.unpack("llHHI", data)
|
||||
|
||||
# Обрабатываем только MSC_SCAN события
|
||||
if ev_type == EV_MSC and ev_code == MSC_SCAN:
|
||||
scancode = f"0x{ev_value:02x}"
|
||||
self._handle_scancode(scancode)
|
||||
|
||||
except (OSError, IOError):
|
||||
# Устройство недоступно, ждём и пробуем снова
|
||||
import time
|
||||
time.sleep(1.0)
|
||||
|
||||
def _handle_scancode(self, scancode: str) -> None:
|
||||
"""Обработать скан-код кнопки."""
|
||||
action = self._scancode_to_action.get(scancode)
|
||||
if not action:
|
||||
return
|
||||
|
||||
# Увеличиваем счётчик повторений
|
||||
self._repeat_counts[scancode] = self._repeat_counts.get(scancode, 0) + 1
|
||||
repeat_count = self._repeat_counts[scancode]
|
||||
|
||||
# Проверяем, было ли зажатие уже обработано
|
||||
hold_triggered = self._hold_triggered.get(scancode, False)
|
||||
|
||||
# Определяем тип события
|
||||
is_repeat = repeat_count > 1
|
||||
is_hold = repeat_count >= build_info.IR_REPEAT_COUNT_FOR_HOLD
|
||||
|
||||
# Отправляем сигналы
|
||||
if action == "play_pause":
|
||||
# Play/Pause - только клик (первое нажатие)
|
||||
if not is_repeat:
|
||||
self.play_pause_clicked.emit()
|
||||
|
||||
elif action in ("volume_up", "volume_down"):
|
||||
# Громкость - клик + зажатие
|
||||
if not is_repeat:
|
||||
if action == "volume_up":
|
||||
self.volume_up_clicked.emit()
|
||||
else:
|
||||
self.volume_down_clicked.emit()
|
||||
|
||||
if is_hold and not hold_triggered:
|
||||
self._hold_triggered[scancode] = True
|
||||
if action == "volume_up":
|
||||
self.volume_up_hold.emit()
|
||||
else:
|
||||
self.volume_down_hold.emit()
|
||||
|
||||
elif action in ("next_track", "prev_track"):
|
||||
# Треки - клик + зажатие (перемотка)
|
||||
if not is_repeat:
|
||||
if action == "next_track":
|
||||
self.next_track_clicked.emit()
|
||||
else:
|
||||
self.prev_track_clicked.emit()
|
||||
|
||||
if is_hold and not hold_triggered:
|
||||
self._hold_triggered[scancode] = True
|
||||
if action == "next_track":
|
||||
self.next_track_hold.emit()
|
||||
else:
|
||||
self.prev_track_hold.emit()
|
||||
|
||||
def reset_button_state(self, scancode: str | None = None) -> None:
|
||||
"""
|
||||
Сбросить состояние кнопки (вызывать когда кнопка отпущена).
|
||||
|
||||
Если scancode не указан, сбрасывает все кнопки.
|
||||
"""
|
||||
if scancode:
|
||||
self._repeat_counts.pop(scancode, None)
|
||||
self._hold_triggered.pop(scancode, None)
|
||||
else:
|
||||
self._repeat_counts.clear()
|
||||
self._hold_triggered.clear()
|
||||
|
||||
@property
|
||||
def device_path(self) -> str | None:
|
||||
"""Путь к устройству ИК-пульта."""
|
||||
return self._device_path
|
||||
|
||||
@property
|
||||
def is_running(self) -> bool:
|
||||
"""Сервис запущен."""
|
||||
return self._running
|
||||
Loading…
x
Reference in New Issue
Block a user