|
|
@ -1,15 +1,40 @@ |
|
|
|
from telegram import ForceReply, Update |
|
|
|
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters, ConversationHandler |
|
|
|
import os, pathlib, asyncio |
|
|
|
from time import time |
|
|
|
|
|
|
|
async def add_watermark(input_file, watermark, transparency, out_ext): |
|
|
|
#print(f'ffmpeg -i {input_file} -i {watermark} -filter_complex "[1]lut=a=val*0.08[a];[0][a]overlay=0:0" -c:v libx264 {input_file}.mp4') |
|
|
|
resize_filter = "[logo][0]scale2ref=w=oh*mdar:h=ih*1.0[logo][0];" |
|
|
|
insert_filter = "[0][logo]overlay=(W-w)/2:(H-h)/2:format=auto,format=yuv420p" |
|
|
|
#insert_filter = "[1][logo]overlay=5:H-h-5" |
|
|
|
filter = f"[1]format=rgba,colorchannelmixer=aa={transparency}[logo];{resize_filter}{insert_filter}" |
|
|
|
cmd = f'ffmpeg -i {input_file} -i {watermark} -filter_complex "{filter}" {"-c:v libx264 -c:a copy" if out_ext == "mp4" else ""} {input_file}.{out_ext}' |
|
|
|
try: |
|
|
|
from telegram import ForceReply, Update |
|
|
|
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters, ConversationHandler |
|
|
|
except ModuleNotFoundError: |
|
|
|
Update = None |
|
|
|
class ct: |
|
|
|
DEFAULT_TYPE = None |
|
|
|
ContextTypes = ct() |
|
|
|
print("Cannot import telegram futures") |
|
|
|
|
|
|
|
filter_tables = { |
|
|
|
"v1":{#default |
|
|
|
"resize_filter":"[logo][0]scale2ref=w=oh*mdar:h=ih*1.0[logo][0];", |
|
|
|
"insert_filter":"[0][logo]overlay=(W-w)/2:(H-h)/2:format=auto,format=yuv420p", |
|
|
|
"filter":"[1]format=rgba,colorchannelmixer=aa={transparency}[logo];{resize_filter}{insert_filter}" |
|
|
|
}, |
|
|
|
"v2":{#photo |
|
|
|
"filter":"[1]format=rgba,colorchannelmixer=aa={transparency}[logo];[logo][0]scale2ref=oh*mdar:ih*0.2[logo][video];[video][logo]overlay=(main_w-overlay_w):(main_h-overlay_h)" |
|
|
|
}, |
|
|
|
"v2.move":{#good |
|
|
|
"insert_filter":"[video][logo]overlay=x='if(lt(mod(t\,16)\,8)\,W-w-W*10/100\,W*10/100)':y='if(lt(mod(t+4\,16)\,8)\,H-h-H*5/100\,H*5/100)'", |
|
|
|
"filter":"[1]colorchannelmixer=aa={transparency}[logo];[logo][0]scale2ref=oh*mdar:ih*0.2[logo][video];{insert_filter}" |
|
|
|
}, |
|
|
|
"v2.dvd":{ |
|
|
|
"insert_filter":"[video][logo]overlay=x='if(eq(mod(floor(st(8,t*{speed_x})/(W-w)),2),0),mod(ld(8),W-w),W-w-mod(ld(8),W-w))':y='if(eq(mod(floor(st(9,t*{speed_y})/(H-h)),2),0),mod(ld(9),H-h),H-h-mod(ld(9),H-h))'", |
|
|
|
"filter":"[1]colorchannelmixer=aa={transparency}[logo];[logo][0]scale2ref=oh*mdar:ih*0.2[logo][video];{insert_filter}" |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
async def add_watermark(input_file, watermark, transparency, out_ext, filter_version = "v1", speed=(160, 70)): |
|
|
|
resize_filter = filter_tables[filter_version].get("resize_filter", "") |
|
|
|
insert_filter = filter_tables[filter_version].get("insert_filter", "").format(speed_x = speed[0], speed_y = speed[1]) |
|
|
|
filter = filter_tables[filter_version]["filter"].format(transparency = transparency, resize_filter = resize_filter, insert_filter = insert_filter) |
|
|
|
cmd = f'ffmpeg -y -i {input_file} -i {watermark} -filter_complex "{filter}" {"-c:v libx264 -c:a copy" if out_ext == "mp4" else ""} {input_file}.{out_ext}' |
|
|
|
print(cmd) |
|
|
|
process = await asyncio.create_subprocess_shell(cmd) |
|
|
|
await process.wait() |
|
|
@ -68,7 +93,7 @@ class Extension: |
|
|
|
_id = update.message.photo[-1].file_id |
|
|
|
photo = await context.bot.get_file(_id) |
|
|
|
downloaded_photo = await photo.download_to_drive(f"{self.content_directory}/{time()}") |
|
|
|
res = await add_watermark(downloaded_photo.absolute(), self.png_path, transparency, "jpg") |
|
|
|
res = await add_watermark(downloaded_photo.absolute(), self.png_path, transparency, "jpg", "v2") |
|
|
|
if res: |
|
|
|
await update.message.reply_photo(photo=res) |
|
|
|
os.remove(res) |
|
|
@ -87,7 +112,7 @@ class Extension: |
|
|
|
|
|
|
|
video = await update.message.video.get_file() |
|
|
|
downloaded_video = await video.download_to_drive(f"{self.content_directory}/{time()}") |
|
|
|
res = await add_watermark(downloaded_video.absolute(), self.png_path, transparency, "mp4") |
|
|
|
res = await add_watermark(downloaded_video.absolute(), self.png_path, transparency, "mp4", "v2.dvd") |
|
|
|
if res: |
|
|
|
await update.message.reply_video(video=res) |
|
|
|
os.remove(res) |
|
|
@ -95,4 +120,20 @@ class Extension: |
|
|
|
await update.message.reply_text("Ошибка при создании видео") |
|
|
|
if os.path.exists(downloaded_video.absolute()): |
|
|
|
os.remove(downloaded_video.absolute()) |
|
|
|
return |
|
|
|
return |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
import asyncio |
|
|
|
import argparse |
|
|
|
parser = argparse.ArgumentParser() |
|
|
|
parser.add_argument("input", type=str) |
|
|
|
parser.add_argument("watermark", type=str) |
|
|
|
parser.add_argument("--transparency", type=float, default=100.0) |
|
|
|
parser.add_argument("--insert-version", type=str, default="v1") |
|
|
|
parser.add_argument("--out-ext", type=str, default="mp4") |
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
async def run(): |
|
|
|
await add_watermark(args.input, args.watermark, args.transparency / 100, args.out_ext, args.insert_version) |
|
|
|
|
|
|
|
asyncio.get_event_loop().run_until_complete(run()) |