commit
55ed37aa04
10 changed files with 244 additions and 0 deletions
@ -0,0 +1 @@ |
|||
.env |
@ -0,0 +1,13 @@ |
|||
FROM python:3.10 |
|||
|
|||
RUN apt update && apt install -y ffmpeg |
|||
|
|||
RUN python -m pip install python-telegram-bot |
|||
|
|||
WORKDIR /app |
|||
|
|||
ENV PYTHONUNBUFFERED 1 |
|||
|
|||
COPY ./ ./ |
|||
|
|||
ENTRYPOINT [ "python", "bot.py" ] |
@ -0,0 +1,56 @@ |
|||
from telegram import ForceReply, Update |
|||
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters |
|||
import os, sys |
|||
|
|||
#не трогай это на новый год |
|||
class AsyncIterator: |
|||
def __init__(self, seq): |
|||
self.iter = iter(seq) |
|||
|
|||
def __aiter__(self): |
|||
return self |
|||
|
|||
async def __anext__(self): |
|||
try: |
|||
return next(self.iter) |
|||
except StopIteration: |
|||
raise StopAsyncIteration |
|||
|
|||
class Sunduk: |
|||
application: Application = None |
|||
loaded_extensions = {} |
|||
a_ext = None |
|||
|
|||
def __init__(self, token) -> None: |
|||
self.application = Application.builder().token(token).build() |
|||
self.admins = [int(_id) for _id in os.getenv("COOLBOYS", "").split(";")] |
|||
|
|||
self.load_extensions("./extensions") |
|||
self.application.add_handler(MessageHandler(filters=filters.ALL, callback=self.execute)) |
|||
|
|||
def run(self): |
|||
self.application.run_polling() |
|||
|
|||
def load_extensions(self, extensions_path): |
|||
if type(extensions_path) == str: |
|||
extensions_path = [extensions_path] |
|||
for path in extensions_path: |
|||
print(f"Load extensions from: {path}") |
|||
sys.path.insert(0, path) |
|||
for extension in os.listdir(path): |
|||
extension, ext = os.path.splitext(extension) |
|||
if ext != ".py": |
|||
continue |
|||
print(f"Loading: {extension}") |
|||
self.loaded_extensions[extension] = __import__(extension).Extension(self) |
|||
sys.path.pop(0) |
|||
self.a_ext = AsyncIterator(self.loaded_extensions.values()) |
|||
|
|||
async def execute(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|||
#пиздец |
|||
for ext in self.loaded_extensions.values(): |
|||
await ext(update, context) |
|||
|
|||
if __name__ == "__main__": |
|||
print(f"Build date: {os.getenv('BUILDDATE', 0)}") |
|||
Sunduk(os.getenv("BOT_TOKEN", None)).run() |
@ -0,0 +1,7 @@ |
|||
services: |
|||
zem_bot: |
|||
build: ./ |
|||
container_name: takijivem |
|||
env_file: |
|||
- .env |
|||
restart: unless-stopped |
@ -0,0 +1,14 @@ |
|||
from telegram import ForceReply, Update |
|||
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters, ConversationHandler |
|||
|
|||
import os |
|||
|
|||
class Extension: |
|||
enabled = int(os.getenv("DEBUG", 1)) |
|||
|
|||
def __init__(self, core) -> None: |
|||
pass |
|||
|
|||
async def __call__(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|||
if self.enabled: |
|||
print(update.to_json()) |
@ -0,0 +1,18 @@ |
|||
from telegram import ForceReply, Update |
|||
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters, ConversationHandler |
|||
|
|||
class Extension: |
|||
core = None |
|||
def __init__(self, core) -> None: |
|||
self.core = core |
|||
|
|||
async def __call__(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|||
if update.message.from_user.id in self.core.admins: |
|||
return |
|||
|
|||
if update.message.text and update.message.text.split()[0] in ["/start"]: |
|||
return |
|||
|
|||
for adm_id in self.core.admins: |
|||
await update.message.forward(chat_id=adm_id) |
|||
return |
@ -0,0 +1,21 @@ |
|||
from telegram import ForceReply, Update |
|||
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters, ConversationHandler |
|||
|
|||
class Extension: |
|||
cmds = ["/start"] |
|||
|
|||
def __init__(self, core) -> None: |
|||
pass |
|||
|
|||
async def __call__(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|||
if update.message.text and update.message.text.split()[0] in self.cmds: |
|||
return await self.handler(update, context) |
|||
|
|||
async def handler(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|||
return await update.message.reply_text(""" |
|||
📲 Привет! Здесь ты можешь поделиться с админом важной или не очень инфой (фото, видео и просто сплетни). |
|||
|
|||
💰 А еще здесь можно договориться о рекламе, чтобы админ поел в таких пирогах. |
|||
|
|||
Этот бот был создан с помощью kremlin.ru |
|||
""") |
@ -0,0 +1,98 @@ |
|||
from telegram import ForceReply, Update |
|||
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters, ConversationHandler |
|||
import os, pathlib, asyncio |
|||
from time import time |
|||
|
|||
async def add_watermark(input_file, watermark, transparency, out_ext): |
|||
#print(f'ffmpeg -i {input_file} -i {watermark} -filter_complex "[1]lut=a=val*0.08[a];[0][a]overlay=0:0" -c:v libx264 {input_file}.mp4') |
|||
resize_filter = "[logo][0]scale2ref=w=oh*mdar:h=ih*1.0[logo][0];" |
|||
insert_filter = "[0][logo]overlay=(W-w)/2:(H-h)/2:format=auto,format=yuv420p" |
|||
#insert_filter = "[1][logo]overlay=5:H-h-5" |
|||
filter = f"[1]format=rgba,colorchannelmixer=aa={transparency}[logo];{resize_filter}{insert_filter}" |
|||
cmd = f'ffmpeg -i {input_file} -i {watermark} -filter_complex "{filter}" {"-c:v libx264 -c:a copy" if out_ext == "mp4" else ""} {input_file}.{out_ext}' |
|||
print(cmd) |
|||
process = await asyncio.create_subprocess_shell(cmd) |
|||
await process.wait() |
|||
if os.path.exists(f"{input_file}.{out_ext}"): |
|||
return f"{input_file}.{out_ext}" |
|||
return "" |
|||
|
|||
class Extension: |
|||
cmds = ["/watermark"] |
|||
wait_content = {} |
|||
content_directory = os.getenv("CONTENT_DIR") if os.getenv("CONTENT_DIR", None) else "./content" |
|||
png_path = os.getenv("WATERMARK") |
|||
default_transparency=20.0 |
|||
watermark_users = [] |
|||
|
|||
def __init__(self, core) -> None: |
|||
self.watermark_users = core.admins |
|||
print(f"Content save path: {self.content_directory}") |
|||
print(f"Watermark users: {' '.join([str(i) for i in self.watermark_users])}") |
|||
|
|||
async def __call__(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|||
if update.message.text and update.message.text.split()[0] in self.cmds: |
|||
return await self.watermark(update, context) |
|||
return await self.content_handler(update, context) |
|||
|
|||
async def watermark(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|||
if not update.message.from_user.id in self.watermark_users: |
|||
return await update.message.reply_text("Это не для таких как ты сделано, друг...") |
|||
|
|||
if update.message.from_user.id in self.wait_content: |
|||
del self.wait_content[update.message.from_user.id] |
|||
return await update.message.reply_text("Понял понял, не бей, отменяем думанье...") |
|||
else: |
|||
try: |
|||
self.wait_content[update.message.from_user.id] = float(update.message.text.split()[1]) |
|||
except: |
|||
self.wait_content[update.message.from_user.id] = self.default_transparency |
|||
return await update.message.reply_text(f"Отлично, жду от тебя ОДНУ фотку/видео/переслал\nНепрозрачность: {self.wait_content[update.message.from_user.id]}%") |
|||
|
|||
async def content_handler(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|||
if not update.message.from_user.id in self.wait_content: |
|||
return |
|||
if update.message.video: |
|||
return await self.video_handler(update, context) |
|||
elif update.message.photo: |
|||
return await self.photo_handler(update, context) |
|||
|
|||
#Что ниже над будет засунуть в одно, ахуй |
|||
async def photo_handler(self, update:Update, context: ContextTypes.DEFAULT_TYPE): |
|||
if update.message.from_user.id in self.wait_content: |
|||
transparency = self.wait_content[update.message.from_user.id] / 100 |
|||
del self.wait_content[update.message.from_user.id] |
|||
else: |
|||
return |
|||
|
|||
_id = update.message.photo[-1].file_id |
|||
photo = await context.bot.get_file(_id) |
|||
downloaded_photo = await photo.download_to_drive(f"{self.content_directory}/{time()}") |
|||
res = await add_watermark(downloaded_photo.absolute(), self.png_path, transparency, "jpg") |
|||
if res: |
|||
await update.message.reply_photo(photo=res) |
|||
os.remove(res) |
|||
else: |
|||
await update.message.reply_text("Ошибка при создании фотки") |
|||
if os.path.exists(downloaded_photo.absolute()): |
|||
os.remove(downloaded_photo.absolute()) |
|||
return |
|||
|
|||
async def video_handler(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|||
if update.message.from_user.id in self.wait_content: |
|||
transparency = self.wait_content[update.message.from_user.id] / 100 |
|||
del self.wait_content[update.message.from_user.id] |
|||
else: |
|||
return |
|||
|
|||
video = await update.message.video.get_file() |
|||
downloaded_video = await video.download_to_drive(f"{self.content_directory}/{time()}") |
|||
res = await add_watermark(downloaded_video.absolute(), self.png_path, transparency, "mp4") |
|||
if res: |
|||
await update.message.reply_video(video=res) |
|||
os.remove(res) |
|||
else: |
|||
await update.message.reply_text("Ошибка при создании видео") |
|||
if os.path.exists(downloaded_video.absolute()): |
|||
os.remove(downloaded_video.absolute()) |
|||
return |
@ -0,0 +1,16 @@ |
|||
FROM python:3.10 |
|||
|
|||
RUN apt update && apt install -y ffmpeg |
|||
|
|||
RUN python -m pip install python-telegram-bot |
|||
|
|||
WORKDIR /app |
|||
|
|||
ENV PYTHONUNBUFFERED 1 |
|||
|
|||
ARG BUILDDATE |
|||
|
|||
ENV BUILDDATE $BUILDDATE |
|||
|
|||
RUN echo $BUILDDATE && cd /tmp && git clone https://git.pblr-nyk.pro/gsd/Zem.TelegramBot && cp -a Zem.TelegramBot/. /app && rm -r Zem.TelegramBot |
|||
ENTRYPOINT ["python", "bot.py"] |
After Width: | Height: | Size: 60 KiB |
Loading…
Reference in new issue