crk_telegram_bot/server.py

225 lines
8.7 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import os
from datetime import datetime
import time
from aiogram import Bot, Dispatcher, F, Router, types
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from aiogram.fsm.storage.memory import MemoryStorage
from aiogram.filters import Command, StateFilter
from aiogram.types import Message
from aiogram.utils.keyboard import InlineKeyboardBuilder
import app.keyboards as kb
from app.messages import MESSAGES
from app.settings.config import DEBUG, TOKEN, CHANNEL_WHITELIST
from app.settings.info import bot_link
from app.database.loader_db import check_bd, check_config
#запуск
#----------------------------------------
check_bd()
check_config()
dp = Dispatcher(storage=MemoryStorage())
bot = Bot(token=TOKEN)
router = Router()
from app.database.db_connector import ALLusers
checkuser = ALLusers('app/database/data/database.db')
available_da = ["Да"]
async def main():
if DEBUG: print('DEBUG mode')
print(f'Bot start @{bot_link}')
await bot.delete_webhook(drop_pending_updates=True)
await dp.start_polling(bot)
#----------------------------------------
# access
#----------------------------------------
async def check_admin(message_chat_id):
try:
channel_id = CHANNEL_WHITELIST
user_channel_status = await bot.get_chat_member(chat_id=channel_id, user_id=message_chat_id)
if DEBUG:
print('---------------------------------------')
print('user_channel_status', user_channel_status.status)
print('---------------------------------------')
if ((user_channel_status.status == 'creator') or (user_channel_status.status == 'administrator')):
if DEBUG:
print('Admin access')
return True
return False
except:
return False
async def check_subscribe(message_from_user_id, message_chat_id):
try:
channel_id = CHANNEL_WHITELIST
user_channel_status = await bot.get_chat_member(chat_id=channel_id, user_id=message_chat_id)
if DEBUG:
print('---------------------------------------')
print('user_channel_status:',user_channel_status)
print('user_channel_status', user_channel_status.status)
print('---------------------------------------')
if ((user_channel_status.status != 'left') and (user_channel_status.status != 'kicked')):
return True
else:
await bot.send_message(message_from_user_id, MESSAGES['no_sub'], parse_mode="HTML")
return False
except:
await bot.send_message(message_from_user_id, MESSAGES['error'])
return False
async def check_access(message):
try:
if (not checkuser.search_user(message.from_user.id)):
# если юзера нет в базе, добавляем
try:
checkuser.add_user(message.from_user.id)
except:
if DEBUG:
print("Critical error check_access 1")
return False
# checkSubscribe = await check_subscribe(message.from_user.id, message.chat.id)
# if (checkSubscribe != True):
# return False
print("message_from_user", message.from_user.id)
return True
except:
if DEBUG:
print("Critical error check_access 2")
async def check_subscribe_spam(message_chat_id):
try:
channel_id = CHANNEL_WHITELIST
user_channel_status = await bot.get_chat_member(chat_id=channel_id, user_id=message_chat_id)
if DEBUG:
print('---------------------------------------')
print('user_channel_status:',user_channel_status)
print('user_channel_status', user_channel_status.status)
print('---------------------------------------')
if ((user_channel_status.status != 'left') and (user_channel_status.status != 'kicked')):
return True
else:
return False
except:
return False
#----------------------------------------
# form
#----------------------------------------
class Form(StatesGroup):
search_0 = State()
waiting_for_file = State()
waiting_for_file_pause = State()
register = State()
register_pause = State()
prespam = State()
spam = State()
spam_pause = State()
@dp.message(Command("cancel"))
@dp.message(F.text.casefold() == "Отмена" or F.text.casefold() == "старт" or F.text.casefold() == "/старт" or F.text.casefold() == "🏠 Главная")
@dp.message(lambda message: ((message.text == "Отмена") or (message.text == "старт") or (message.text == "/старт")))
async def cancel_handler(message: Message, state: FSMContext) -> None:
current_state = await state.get_state()
checkAccess = await check_access(message)
if current_state is None:
checkAccess = await check_access(message)
if checkAccess:
await message.answer(MESSAGES['start'], reply_markup=kb.main_menu)
return
await state.clear()
if checkAccess:
await message.answer(MESSAGES['byebye'], reply_markup=kb.main_menu)
return
#----------------------------------------
@dp.message(Command("start"))
async def process_start_command(message: types.Message):
checkAccess = await check_access(message)
if checkAccess:
await message.answer(MESSAGES['start'], reply_markup=kb.main_menu)
return
@dp.message(StateFilter(None), F.text)
async def do_main_menu(message: Message, state: FSMContext):
txt = message.text
try:
checkAccess = await check_access(message)
if not checkAccess:
return await message.answer(MESSAGES['usestart'])
if txt == '/admin':
checkAdmin = await check_admin(message.chat.id)
if checkAdmin:
how_users = checkuser.log_info()
answer = (f"Пользователей зарегистрировано: {how_users[0]}\n\n"\
f"Рассылка - /spam")
return await message.reply(answer)
else:
return await message.answer(MESSAGES['usestart'])
# elif txt == '/spam':
# checkAdmin = await check_admin(message.chat.id)
# if checkAdmin:
# answer = (f"Для кого рассылка?")
# await message.reply(answer, reply_markup=kb.spam)
# return await state.set_state(Form.prespam)
# else:
# return await message.answer(MESSAGES['usestart'])
# главное меню
if txt == '🏠 Главная':
return await message.answer(MESSAGES['start'], reply_markup=kb.main_menu)
elif txt == '📚 Студентам':
return await message.answer(MESSAGES['menu_1'], reply_markup=kb.menu_1)
elif txt == '🤝 Работодателям':
return await message.answer(MESSAGES['menu_2'], reply_markup=kb.menu_2_inline)
elif txt == ' О центре':
return await message.answer(MESSAGES['menu_3'], reply_markup=kb.menu_3_inline)
# elif txt == '📰 Подписка на новости':
# return await message.answer(MESSAGES['menu_4'], reply_markup=kb.menu_4_inline)
# студентам (1)
elif txt == '🔙 Назад':
return await message.answer(MESSAGES['menu_1'], reply_markup=kb.menu_1)
elif txt == '📖 Стажировки, практики и вакансии':
return await message.answer(MESSAGES['menu_1_1'], reply_markup=kb.menu_1_1)
elif txt == '✍️ Советы по резюме':
return await message.answer(MESSAGES['menu_1_2'], reply_markup=kb.menu_1_2_inline)
elif txt == '📄 Оформление договора':
return await message.answer(MESSAGES['menu_1_3'], reply_markup=kb.menu_1_3_inline)
# студентам (1_1)
elif txt == '📚 Стажировки':
return await message.answer(MESSAGES['menu_1_1_1'], reply_markup=kb.menu_1_1_1_inline)
elif txt == '🛠 Практики':
return await message.answer(MESSAGES['menu_1_1_2'], reply_markup=kb.menu_1_1_2_inline)
elif txt == '💼 Вакансии':
return await message.answer(MESSAGES['menu_1_1_3'], reply_markup=kb.menu_1_1_3_inline)
else:
return await message.answer(MESSAGES['usestart'])
# Для выведения ошибок
except Exception as e:
await message.answer(MESSAGES['error'])
print('----------------------------------------')
print(f'Critical error in: {e}')
print('----------------------------------------')
#----------------------------------------
if __name__ == '__main__':
asyncio.run(main())