crk_telegram_bot/server.py

325 lines
13 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import os
from datetime import datetime
import time
from aiogram import Bot, Dispatcher, F, Router, types
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from aiogram.fsm.storage.memory import MemoryStorage
from aiogram.filters import Command, StateFilter
from aiogram.types import Message, FSInputFile
from aiogram.utils.keyboard import InlineKeyboardBuilder
import app.keyboards as kb
from app.messages import MESSAGES
from app.settings.config import DEBUG, TOKEN, CHANNEL_WHITELIST
from app.settings.info import bot_link
from app.database.loader_db import check_bd, check_config
#запуск
#----------------------------------------
check_bd()
check_config()
dp = Dispatcher(storage=MemoryStorage())
bot = Bot(token=TOKEN)
router = Router()
from app.database.db_connector import ALLusers
checkuser = ALLusers('app/database/data/database.db')
available_da = ["Да"]
async def main():
if DEBUG: print('DEBUG mode')
print(f'Bot start @{bot_link}')
await bot.delete_webhook(drop_pending_updates=True)
await dp.start_polling(bot)
#----------------------------------------
# access
#----------------------------------------
async def check_admin(message_chat_id):
try:
channel_id = CHANNEL_WHITELIST
user_channel_status = await bot.get_chat_member(chat_id=channel_id, user_id=message_chat_id)
if DEBUG:
print('---------------------------------------')
print('user_channel_status', user_channel_status.status)
print('---------------------------------------')
if ((user_channel_status.status == 'creator') or (user_channel_status.status == 'administrator')):
if DEBUG:
print('Admin access')
return True
return False
except:
return False
async def check_subscribe(message_from_user_id, message_chat_id):
try:
channel_id = CHANNEL_WHITELIST
user_channel_status = await bot.get_chat_member(chat_id=channel_id, user_id=message_chat_id)
if DEBUG:
print('---------------------------------------')
print('user_channel_status:',user_channel_status)
print('user_channel_status', user_channel_status.status)
print('---------------------------------------')
if ((user_channel_status.status != 'left') and (user_channel_status.status != 'kicked')):
return True
else:
await bot.send_message(message_from_user_id, MESSAGES['no_sub'], parse_mode="HTML")
return False
except:
await bot.send_message(message_from_user_id, MESSAGES['error'])
return False
async def check_access(message):
try:
if (not checkuser.search_user(message.from_user.id)):
# если юзера нет в базе, добавляем
try:
checkuser.add_user(message.from_user.id)
except:
if DEBUG:
print("Critical error check_access 1")
return False
# checkSubscribe = await check_subscribe(message.from_user.id, message.chat.id)
# if (checkSubscribe != True):
# return False
print("message_from_user", message.from_user.id)
return True
except:
if DEBUG:
print("Critical error check_access 2")
async def check_subscribe_spam(message_chat_id):
try:
channel_id = CHANNEL_WHITELIST
user_channel_status = await bot.get_chat_member(chat_id=channel_id, user_id=message_chat_id)
if DEBUG:
print('---------------------------------------')
print('user_channel_status:',user_channel_status)
print('user_channel_status', user_channel_status.status)
print('---------------------------------------')
if ((user_channel_status.status != 'left') and (user_channel_status.status != 'kicked')):
return True
else:
return False
except:
return False
#----------------------------------------
# form
#----------------------------------------
class Form(StatesGroup):
# search_0 = State()
# waiting_for_file = State()
# waiting_for_file_pause = State()
# register = State()
# register_pause = State()
# prespam = State()
spam = State()
spam_pause = State()
@dp.message(Command("cancel"))
@dp.message(F.text.casefold() == "Отмена" or F.text.casefold() == "старт" or F.text.casefold() == "/старт" or F.text.casefold() == "🏠 Главная")
@dp.message(lambda message: ((message.text == "Отмена") or (message.text == "старт") or (message.text == "/старт")))
async def cancel_handler(message: Message, state: FSMContext) -> None:
current_state = await state.get_state()
checkAccess = await check_access(message)
if current_state is None:
checkAccess = await check_access(message)
if checkAccess:
await message.answer(MESSAGES['start'], reply_markup=kb.main_menu)
return
await state.clear()
if checkAccess:
await message.answer(MESSAGES['byebye'], reply_markup=kb.main_menu)
return
#----------------------------------------
@dp.message(Command("start"))
async def process_start_command(message: types.Message):
# debug = True
# if debug:
# from debug import get_debug_info
# debug_info = get_debug_info()
# answer = "\n".join([f"{key}: {value}" for key, value in debug_info.items()])
# await message.answer(answer)
checkAccess = await check_access(message)
if checkAccess:
await message.answer(MESSAGES['start'], reply_markup=kb.main_menu)
return
@dp.message(StateFilter(None), F.text)
async def do_main_menu(message: Message, state: FSMContext):
txt = message.text
try:
checkAccess = await check_access(message)
if not checkAccess:
return await message.answer(MESSAGES['usestart'])
checkAdmin = await check_admin(message.chat.id)
if checkAdmin:
if txt == '/admin':
how_users = checkuser.log_info()
answer = (f"Пользователей зарегистрировано: {how_users[0]}\n\n"\
f"Рассылка - /spam")
return await message.reply(answer)
elif txt == '/spam':
answer = (f"Какой текст для рассылки?")
await message.reply(answer, reply_markup=kb.cancel)
return await state.set_state(Form.spam)
# главное меню
if txt == '🏠 Главная':
return await message.answer(MESSAGES['start'], reply_markup=kb.main_menu)
elif txt == '📚 Студентам':
return await message.answer(MESSAGES['menu_1'], reply_markup=kb.menu_1)
elif txt == '🤝 Работодателям':
return await message.answer(MESSAGES['menu_2'], reply_markup=kb.menu_2)
elif txt == ' О центре':
return await message.answer(MESSAGES['menu_3'], reply_markup=kb.menu_3_inline)
# elif txt == '📰 Подписка на новости':
# return await message.answer(MESSAGES['menu_4'], reply_markup=kb.menu_4_inline)
# студентам (1)
elif txt == '🔙 Назад':
return await message.answer(MESSAGES['menu_1'], reply_markup=kb.menu_1)
elif txt == '📖 Стажировки, практики и вакансии':
return await message.answer(MESSAGES['menu_1_1'], reply_markup=kb.menu_1_1)
elif txt == '✍️ Советы по резюме':
return await message.answer(MESSAGES['menu_1_2'], reply_markup=kb.menu_1_2_inline)
elif txt == '📄 Оформление договора':
return await message.answer(MESSAGES['menu_1_3'], reply_markup=kb.menu_1_3_inline)
# студентам (1_1)
elif txt == '📚 Стажировки':
return await message.answer(MESSAGES['menu_1_1_1'], reply_markup=kb.menu_1_1_1_inline)
elif txt == '🛠 Практики':
return await message.answer(MESSAGES['menu_1_1_2'], reply_markup=kb.menu_1_1_2_inline)
elif txt == '💼 Вакансии':
return await message.answer(MESSAGES['menu_1_1_3'], reply_markup=kb.menu_1_1_3_inline)
# Работодателям (2)
elif txt == '🏢 Стать партнёром':
photo = FSInputFile('app/photos/2_1/1.jpg') # Используем FSInputFile для локального файла
return await bot.send_photo(message.from_user.id, photo, caption=MESSAGES['menu_2_1'])
# photo = InputFile('app/photos/2_1/1.jpg')
# print("photo", photo)
# return await bot.send_photo(message.from_user.id, photo, MESSAGES['menu_2_1'])
#return await message.answer(MESSAGES['menu_2_1'])
elif txt == '📄 Виды сотрудничества':
photo = FSInputFile('app/photos/2_2/1.jpg') # Используем FSInputFile для локального файла
return await bot.send_photo(message.from_user.id, photo, caption=MESSAGES['menu_2_2'], reply_markup=kb.menu_2_2_inline)
# return await message.answer(MESSAGES['menu_2_2'], reply_markup=kb.menu_2_2_inline)
else:
return await message.answer(MESSAGES['usestart'])
# Для выведения ошибок
except Exception as e:
await message.answer(MESSAGES['error'])
print('----------------------------------------')
print(f'Critical error in: {e}')
print('----------------------------------------')
#----------------------------------------
# spam
#----------------------------------------
# available_in_spam = ["Всем", "С доступом"]
# @dp.message(lambda message: message.text not in available_in_spam, Form.prespam)
# async def process_invalid_prespam(message: types.Message, state: FSMContext):
# return await message.answer("Неверный аргумент. /cancel?", reply_markup=kb.spam)
# @dp.message(Form.prespam, F.text)
# @router.message(Form.prespam)
# async def process_form_prespam(message: types.Message, state: FSMContext):
# if message.text == "Всем":
# await state.update_data(prespam=1)
# elif message.text == "С доступом":
# await state.update_data(prespam=2)
# else:
# await state.clear()
# return
# await state.set_state(Form.spam)
# await message.answer("Какой текст для рассылки?", reply_markup=kb.cancel)
@dp.message(Form.spam, F.text)
@router.message(Form.spam)
async def process_form_spam(message: types.Message, state: FSMContext):
await state.update_data(spam=message.text)
data = await state.get_data()
# if data['prespam'] == 2:
# answer = f'Отправить сообщение (только с доступом):\n\n'
# else:
answer = f'Отправить сообщение (всем):\n\n'
answer += data['spam']
await bot.send_message(message.from_user.id, answer, reply_markup=kb.da)
await state.set_state(Form.spam_pause)
await message.answer("Отправить?", reply_markup=kb.da)
@dp.message(lambda message: message.text not in available_da, Form.spam_pause)
async def process_invalid_spam(message: types.Message, state: FSMContext):
return await message.answer("Отправить?\n\n Или /cancel ?", reply_markup=kb.da)
@dp.message(Form.spam_pause, F.text)
@router.message(Form.spam_pause, F.text.in_(available_da))
async def process_spam_pause(message: types.Message, state: FSMContext):
data = await state.get_data()
chetchik_spam = 0
checkAdmin = await check_admin(message.chat.id)
if checkAdmin:
# if data['prespam'] == 2:
# if DEBUG:
# print('Отправляем только с доступом:', data['spam'])
# user_for_spam = checkuser.load_all_users()
# for s in user_for_spam:
# access = await check_subscribe_spam(s[1])
# if access: # Проверяем наличие подписки
# try:
# text_spam = data['spam']
# # Рассылка только пользователям с подпиской
# await bot.send_message(s[1], text_spam, parse_mode="HTML")
# chetchik_spam += 1
# except Exception as e:
# if DEBUG:
# print(f"Ошибка отправки пользователю {s[1]}: {e}")
# pass
# else:
if DEBUG:
print('Отправляем всем:', data['spam'])
user_for_spam = (checkuser.load_all_users())
for s in user_for_spam:
try:
text_spam = data['spam']
await bot.send_message(s[1], text_spam, parse_mode="HTML")
chetchik_spam = chetchik_spam + 1
except:
if DEBUG:
print(f"Ошибка отправки пользователю {s[1]}: {e}")
pass
else:
pass
await state.clear()
user_info = checkuser.log_info()
await message.reply(f"Отправлено {chetchik_spam} из {user_info[0]}", reply_markup=kb.main_menu)
return
#----------------------------------------
if __name__ == '__main__':
asyncio.run(main())