我有一个内联按钮与他们的回应,一些人垃圾邮件他们,我想使命令延迟。
如何实现这个目标?
如果可能,编写另一种方式来回答用户,命令可以在一段时间后写入
@dp.callback_query_handler(id, text='Правила ⚔️')
@dp.callback_query_handler(id, text='Мануалы ')
async def inline_kb_answer_callback_handler(query: types.CallbackQuery):
answer_data = query.data
# always answer callback queries, even if you have nothing to say
await query.answer(f'Вы выбрали пункт {answer_data!r}')
if answer_data == 'Помощь ':
text = "Есть какие-то вопросы? \n \nПо вопросам внутри чата:\n \n Сотрудничество - @reimannlive \n \nПо всем вопросам: @FollHash ☯️\nПо всем вопросам, заявкам в тиму: @t3sse ☯️\n \nВот полезные команды, для развлекухи:\n \n/cat - киски \n/dog - собачки \n/music - музычка \n/robot - бредовый видосик "
elif answer_data == 'Правила ⚔️':
text = """Пᴩᴀʙиᴧᴀ чᴀᴛᴀ
=======================
Зᴀᴨᴩᴇщᴇнᴏ:
- ᴧюбᴀя ᴋᴏʍʍᴇᴩция ʙ чᴀᴛᴇ (ᴨᴏᴋуᴨᴋᴀ/ᴨᴩᴏдᴀжᴀ)
- уᴋᴀɜыʙᴀᴛь иᴧи ᴨᴏʍᴇчᴀᴛь дᴩуᴦиᴇ ᴋᴀнᴀᴧы иᴧи бᴏᴛы
- ᴩᴇᴋᴧᴀʍᴀ иᴧи уᴨᴏʍинᴀниᴇ ᴨᴏхᴏжих ᴩᴇᴄуᴩᴄᴏʙ/ɯᴏᴨᴏʙ/нᴇйʍᴏʙ ʙ ᴧюбᴏʍ ᴋᴏнᴛᴇᴋᴄᴛᴇ
- ᴨᴏᴨᴩᴏɯᴀйничᴇᴄᴛʙᴏ
- ɜᴧᴏуᴨᴏᴛᴩᴇбᴧᴇниᴇ "CAPS LOCK"
- ʙᴇᴄᴛи ᴄᴇбя нᴇᴀдᴇᴋʙᴀᴛнᴏ ʙ чᴀᴛᴇ и ᴩᴀɜʙᴏдиᴛь "ᴄᴩᴀч"
- ᴏᴄᴋᴏᴩбᴧᴇниᴇ "мᴏдᴇᴩᴀции/ᴨᴩᴏᴇᴋᴛᴀ/ɯᴏᴨᴀ" - бᴀн ❗️
- ᴏᴛᴨᴩᴀʙᴧяᴛь ᴄᴋᴩиʍᴇᴩы, ᴩᴀᴄчᴧᴇнᴇнᴋу, ᴄʙᴀᴄᴛиᴋу, нᴀциɜʍ, ᴋᴏнᴛᴇнᴛ
- ᴏɸᴏᴩʍᴧяᴛь ᴩᴀɜᴧичныᴇ ᴋᴀᴩᴛы, ᴀбуɜиᴛь ᴩᴇɸᴇᴩᴀᴧьную ᴄиᴄᴛᴇʍу, ᴄᴋᴀʍ и ᴏбʍᴀн ᴨᴏᴧьɜᴏʙᴀᴛᴇᴧᴇй
- ᴨᴩᴏᴨᴀᴦᴀндᴀ ᴨᴏᴧиᴛиᴋи
- ɸᴧуд\ᴄᴨᴀʍ ᴏдинᴀᴋᴏʙыʍи ɜᴀ ᴋᴏнᴛᴇᴋᴄᴛᴏʍ ᴄᴧᴏʙᴀʍи иᴧи ᴨᴩᴇдᴧᴏжᴇнияʍи (1 ᴨᴩᴇдуᴨᴩᴇждᴇниᴇ, ᴨᴏᴄᴧᴇ - ɯᴛᴩᴀɸ) """
elif answer_data == 'Мануалы ':
text = "Краткий мануал о том как обрабатывать логи - https://telegra.ph/Kak-obrabatyvat-logi-05-30\nЗа привлечение новой аудитории, выдаю логи"
else:
text = f'Unexpected callback data {answer_data!r}!'
await bot.send_message(ID, text)
if __name__ == '__main__':
executor.start_polling(dp, skip_updates=True)希望您能帮助我,提前谢谢!
发布于 2021-07-30 04:41:17
aiogram有一个洪水避免(反洪水)示例,它基本上可以防止用户滥用该命令。记得在你的服务器上安装(redis aiohttp,aioredis和)。
import asyncio
from aiogram import Bot, Dispatcher, executor, types
from aiogram.contrib.fsm_storage.redis import RedisStorage2
from aiogram.dispatcher import DEFAULT_RATE_LIMIT
from aiogram.dispatcher.handler import CancelHandler, current_handler
from aiogram.dispatcher.middlewares import BaseMiddleware
from aiogram.utils.exceptions import Throttled
TOKEN = 'BOT_TOKEN_HERE'
# In this example Redis storage is used
storage = RedisStorage2(db=5)
bot = Bot(token=TOKEN)
dp = Dispatcher(bot, storage=storage)
def rate_limit(limit: int, key=None):
"""
Decorator for configuring rate limit and key in different functions.
:param limit:
:param key:
:return:
"""
def decorator(func):
setattr(func, 'throttling_rate_limit', limit)
if key:
setattr(func, 'throttling_key', key)
return func
return decorator
class ThrottlingMiddleware(BaseMiddleware):
"""
Simple middleware
"""
def __init__(self, limit=DEFAULT_RATE_LIMIT, key_prefix='antiflood_'):
self.rate_limit = limit
self.prefix = key_prefix
super(ThrottlingMiddleware, self).__init__()
async def on_process_message(self, message: types.Message, data: dict):
"""
This handler is called when dispatcher receives a message
:param message:
"""
# Get current handler
handler = current_handler.get()
# Get dispatcher from context
dispatcher = Dispatcher.get_current()
# If handler was configured, get rate limit and key from handler
if handler:
limit = getattr(handler, 'throttling_rate_limit', self.rate_limit)
key = getattr(handler, 'throttling_key', f"{self.prefix}_{handler.__name__}")
else:
limit = self.rate_limit
key = f"{self.prefix}_message"
# Use Dispatcher.throttle method.
try:
await dispatcher.throttle(key, rate=limit)
except Throttled as t:
# Execute action
await self.message_throttled(message, t)
# Cancel current handler
raise CancelHandler()
async def message_throttled(self, message: types.Message, throttled: Throttled):
"""
Notify user only on first exceed and notify about unlocking only on last exceed
:param message:
:param throttled:
"""
handler = current_handler.get()
dispatcher = Dispatcher.get_current()
if handler:
key = getattr(handler, 'throttling_key', f"{self.prefix}_{handler.__name__}")
else:
key = f"{self.prefix}_message"
# Calculate how many time is left till the block ends
delta = throttled.rate - throttled.delta
# Prevent flooding
if throttled.exceeded_count <= 2:
await message.reply('Too many requests! ')
# Sleep.
await asyncio.sleep(delta)
# Check lock status
thr = await dispatcher.check_key(key)
# If current message is not last with current key - do not send message
if thr.exceeded_count == throttled.exceeded_count:
await message.reply('Unlocked.')
@dp.message_handler(commands=['start'])
@rate_limit(5, 'start') # this is not required but you can configure throttling manager for current handler using it
async def cmd_test(message: types.Message):
# You can use this command every 5 seconds
await message.reply('Test passed! You can use this command every 5 seconds.')
if __name__ == '__main__':
# Setup middleware
dp.middleware.setup(ThrottlingMiddleware())
# Start long-polling
executor.start_polling(dp)https://stackoverflow.com/questions/68099134
复制相似问题