成品下载地址:https://www.pan38.com/yun/share.php?code=JCnzE 提取密码:8819
包含视频上传、内容管理、自动发布等功能模块。软件使用易语言开发的,但是易语言仅仅只是调用了python的浏览器模块实现,比如自动上传,cookie多账号管理,都是用的python,易语言只是实现的调用的功能非常简单哈,但是很实用。
源码部分:【模块~仅供学习参考】
import os
import time
import random
import requests
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from datetime import datetime
import json
import cv2
import numpy as np
from PIL import Image
import moviepy.editor as mp
import logging
from logging.handlers import RotatingFileHandler
import schedule
import threading
import queue
class DouyinUploader:
def __init__(self, config_file='config.json'):
self.setup_logging()
self.logger.info("Initializing Douyin Uploader...")
self.config = self.load_config(config_file)
self.video_queue = queue.Queue()
self.driver = None
self.is_running = False
self.setup_driver()
self.login()
def setup_logging(self):
self.logger = logging.getLogger('DouyinUploader')
self.logger.setLevel(logging.INFO)
handler = RotatingFileHandler(
'douyin_uploader.log',
maxBytes=1024*1024*5,
backupCount=5
)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def load_config(self, config_file):
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
self.logger.info("Config loaded successfully")
return config
except Exception as e:
self.logger.error(f"Error loading config: {str(e)}")
raise
def setup_driver(self):
chrome_options = Options()
if self.config.get('headless', False):
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--window-size=1920,1080')
chrome_options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
self.driver = webdriver.Chrome(
executable_path=self.config.get('chrome_driver_path', 'chromedriver'),
options=chrome_options
)
self.driver.implicitly_wait(10)
self.logger.info("WebDriver initialized successfully")
def login(self):
self.logger.info("Attempting to login...")
try:
self.driver.get('https://www.douyin.com')
time.sleep(5)
# Check if already logged in
if self.check_login_status():
self.logger.info("Already logged in")
return True
# Perform login
login_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '//*[@id="login-pannel"]/div[2]/div[1]/div[1]'))
)
login_btn.click()
time.sleep(2)
# Switch to QR code login
qr_login = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '//*[@id="login-pannel"]/div[2]/div[2]/div[2]/div[1]/div[1]/div[2]'))
)
qr_login.click()
time.sleep(5)
# Wait for QR code scan
self.logger.info("Please scan the QR code to login")
WebDriverWait(self.driver, 120).until(
lambda d: self.check_login_status()
)
self.logger.info("Login successful")
return True
except Exception as e:
self.logger.error(f"Login failed: {str(e)}")
return False
def check_login_status(self):
try:
self.driver.find_element(By.XPATH, '//*[@id="login-pannel"]')
return False
except:
return True
def upload_video(self, video_path, title='', tags=[], schedule_time=None):
try:
self.logger.info(f"Preparing to upload video: {video_path}")
# Open upload page
self.driver.get('https://creator.douyin.com/creator-micro/content/upload')
time.sleep(5)
# Upload video file
upload_input = WebDriverWait(self.driver, 30).until(
EC.presence_of_element_located((By.XPATH, '//input[@type="file"]'))
)
upload_input.send_keys(os.path.abspath(video_path))
self.logger.info("Video file selected")
# Wait for upload to complete
WebDriverWait(self.driver, 300).until(
EC.invisibility_of_element_located((By.XPATH, '//div[contains(text(),"上传中")]'))
)
self.logger.info("Video upload completed")
# Set video title
if title:
title_input = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '//textarea[@placeholder="填写更丰富的标题,让更多人看到你"]'))
)
title_input.clear()
title_input.send_keys(title)
self.logger.info(f"Video title set: {title}")
# Add tags
if tags:
for tag in tags:
tag_input = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '//input[@placeholder="#添加话题,让更多人看到"]'))
)
tag_input.send_keys(tag)
tag_input.send_keys(Keys.ENTER)
time.sleep(1)
self.logger.info(f"Tags added: {', '.join(tags)}")
# Set schedule time if provided
if schedule_time:
schedule_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '//span[contains(text(),"定时发布")]'))
)
schedule_btn.click()
time.sleep(1)
# Set date and time
# Note: This part may need adjustment based on actual UI elements
date_input = self.driver.find_element(By.XPATH, '//input[@placeholder="选择日期"]')
time_input = self.driver.find_element(By.XPATH, '//input[@placeholder="选择时间"]')
date_input.clear()
date_input.send_keys(schedule_time.strftime('%Y-%m-%d'))
time_input.clear()
time_input.send_keys(schedule_time.strftime('%H:%M'))
self.logger.info(f"Scheduled for: {schedule_time}")
# Publish video
publish_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '//button[contains(text(),"发布")]'))
)
publish_btn.click()
self.logger.info("Publish button clicked")
# Wait for confirmation
WebDriverWait(self.driver, 30).until(
EC.presence_of_element_located((By.XPATH, '//div[contains(text(),"发布成功")]'))
)
self.logger.info("Video published successfully")
return True
except Exception as e:
self.logger.error(f"Video upload failed: {str(e)}")
return False
def batch_upload(self, video_dir):
self.logger.info(f"Starting batch upload from directory: {video_dir}")
if not os.path.isdir(video_dir):
self.logger.error(f"Directory not found: {video_dir}")
return False
video_files = [f for f in os.listdir(video_dir) if f.lower().endswith(('.mp4', '.mov', '.avi'))]
if not video_files:
self.logger.warning(f"No video files found in directory: {video_dir}")
return False
for video_file in video_files:
video_path = os.path.join(video_dir, video_file)
title = os.path.splitext(video_file)[0]
tags = self.generate_tags(title)
if not self.upload_video(video_path, title, tags):
self.logger.warning(f"Failed to upload video: {video_file}")
continue
# Random delay between uploads
delay = random.randint(30, 120)
self.logger.info(f"Waiting {delay} seconds before next upload...")
time.sleep(delay)
self.logger.info("Batch upload completed")
return True
def generate_tags(self, title):
common_tags = [
"抖音", "短视频", "原创", "热门",
"推荐", "爆款", "上热门", "流量"
]
# Add some random tags
random_tags = random.sample([
"生活", "日常", "搞笑", "美食",
"旅行", "摄影", "音乐", "舞蹈",
"宠物", "科技", "教育", "时尚"
], 3)
return common_tags + random_tags
def start_scheduler(self):
self.logger.info("Starting scheduler...")
self.is_running = True
# Schedule uploads based on config
for schedule_item in self.config.get('schedule', []):
time_str = schedule_item['time']
video_dir = schedule_item['video_dir']
schedule.every().day.at(time_str).do(
self.batch_upload,
video_dir=video_dir
)
self.logger.info(f"Scheduled upload at {time_str} for directory: {video_dir}")
# Run scheduler in a separate thread
def run_scheduler():
while self.is_running:
schedule.run_pending()
time.sleep(1)
scheduler_thread = threading.Thread(target=run_scheduler)
scheduler_thread.daemon = True
scheduler_thread.start()
def stop_scheduler(self):
self.logger.info("Stopping scheduler...")
self.is_running = False
def __del__(self):
if self.driver:
self.driver.quit()
self.logger.info("WebDriver closed")
cv2
import numpy as np
import os
import moviepy.editor as mp
from PIL import Image
import random
import time
from datetime import datetime
import logging
class VideoProcessor:
def __init__(self, config):
self.config = config
self.logger = logging.getLogger('VideoProcessor')
def process_video(self, input_path, output_dir):
try:
self.logger.info(f"Processing video: {input_path}")
# Create output directory if not exists
os.makedirs(output_dir, exist_ok=True)
# Generate output filename
base_name = os.path.splitext(os.path.basename(input_path))[0]
output_path = os.path.join(output_dir, f"{base_name}_processed.mp4")
# Load video
video = mp.VideoFileClip(input_path)
# Apply processing steps
processed = video
# Resize if needed
if 'resize' in self.config:
width = self.config['resize']['width']
height = self.config['resize']['height']
processed = processed.resize((width, height))
self.logger.info(f"Resized video to {width}x{height}")
# Add watermark if configured
if 'watermark' in self.config:
processed = self.add_watermark(processed)
self.logger.info("Watermark added")
# Add intro/outro if configured
if 'intro' in self.config:
processed = self.add_intro_outro(processed)
self.logger.info("Intro/outro added")
# Adjust speed if configured
if 'speed' in self.config:
speed_factor = self.config['speed']
processed = processed.fx(mp.vfx.speedx, speed_factor)
self.logger.info(f"Speed adjusted by factor {speed_factor}")
# Write processed video
processed.write_videofile(
output_path,
codec='libx264',
audio_codec='aac',
threads=4,
fps=video.fps
)
self.logger.info(f"Video processing completed: {output_path}")
return output_path
except Exception as e:
self.logger.error(f"Video processing failed: {str(e)}")
return None
def add_watermark(self, video_clip):
watermark_config = self.config['watermark']
# Create watermark image
if watermark_config['type'] == 'text':
from moviepy.editor import TextClip
watermark = TextClip(
watermark_config['text'],
fontsize=watermark_config['size'],
color=watermark_config['color'],
font=watermark_config['font']
)
else:
# Image watermark
watermark = mp.ImageClip(watermark_config['image_path'])
watermark = watermark.set_opacity(watermark_config['opacity'])
watermark = watermark.set_position(watermark_config['position'])
watermark = watermark.set_duration(video_clip.duration)
return mp.CompositeVideoClip([video_clip, watermark])
def add_intro_outro(self, video_clip):
intro_config = self.config['intro']
outro_config = self.config.get('outro', {})
clips = []
# Add intro
if intro_config:
if intro_config['type'] == 'video':
intro = mp.VideoFileClip(intro_config['path'])
else:
# Image intro
intro = mp.ImageClip(intro_config['path'])
intro = intro.set_duration(intro_config['duration'])
clips.append(intro)
# Add main video
clips.append(video_clip)
# Add outro
if outro_config:
if outro_config['type'] == 'video':
outro = mp.VideoFileClip(outro_config['path'])
else:
# Image outro
outro = mp.ImageClip(outro_config['path'])
outro = outro.set_duration(outro_config['duration'])
clips.append(outro)
return mp.concatenate_videoclips(clips)
def batch_process(self, input_dir, output_dir):
self.logger.info(f"Starting batch processing from {input_dir} to {output_dir}")
if not os.path.isdir(input_dir):
self.logger.error(f"Input directory not found: {input_dir}")
return False
os.makedirs(output_dir, exist_ok=True)
video_files = [f for f in os.listdir(input_dir) if f.lower().endswith(('.mp4', '.mov', '.avi'))]
if not video_files:
self.logger.warning(f"No video files found in directory: {input_dir}")
return False
processed_count = 0
for video_file in video_files:
input_path = os.path.join(input_dir, video_file)
output_path = self.process_video(input_path, output_dir)
if output_path:
processed_count += 1
self.logger.info(f"Batch processing completed. Processed {processed_count}/{len(video_files)} videos")
return processed_count > 0
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。