下载地址:https://www.pan38.com/dow/share.php?code=JCnzE 提取密码:1133
这个抖音批量发布工具包含以下功能:
使用说明:
注意:使用前需要安装Chrome浏览器和对应版本的ChromeDriver。
import os
import time
import random
import json
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from PIL import Image
import cv2
import numpy as np
import pandas as pd
from moviepy.editor import VideoFileClip
import schedule
import threading
class DouyinUploader:
def __init__(self, config_file='config.json'):
self.config = self.load_config(config_file)
self.driver = None
self.current_account = None
self.log_file = 'upload_log.csv'
self.init_log_file()
def load_config(self, config_file):
with open(config_file, 'r', encoding='utf-8') as f:
return json.load(f)
def init_log_file(self):
if not os.path.exists(self.log_file):
columns = ['timestamp', 'account', 'file', 'type', 'status', 'message']
pd.DataFrame(columns=columns).to_csv(self.log_file, index=False)
def log_upload(self, account, file, file_type, status, message):
log_data = {
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'account': account,
'file': file,
'type': file_type,
'status': status,
'message': message
}
pd.DataFrame([log_data]).to_csv(self.log_file, mode='a', header=False, index=False)
def init_driver(self):
chrome_options = Options()
if self.config['headless']:
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--window-size=1920,1080')
self.driver = webdriver.Chrome(options=chrome_options)
self.driver.implicitly_wait(10)
def login(self, account):
try:
self.driver.get('https://www.douyin.com')
time.sleep(random.uniform(2, 4))
login_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '//*[contains(text(),"登录")]'))
)
login_btn.click()
time.sleep(random.uniform(1, 3))
# 切换登录方式
switch_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '//*[contains(text(),"密码登录")]'))
)
switch_btn.click()
time.sleep(1)
# 输入账号密码
username = self.driver.find_element(By.NAME, 'username')
username.clear()
username.send_keys(account['username'])
time.sleep(random.uniform(0.5, 1.5))
password = self.driver.find_element(By.NAME, 'password')
password.clear()
password.send_keys(account['password'])
time.sleep(random.uniform(0.5, 1.5))
# 点击登录
submit_btn = self.driver.find_element(By.XPATH, '//button[@type="submit"]')
submit_btn.click()
time.sleep(random.uniform(5, 8))
# 检查是否登录成功
if "验证码" in self.driver.page_source:
self.handle_captcha()
self.current_account = account
return True
except Exception as e:
print(f"登录失败: {str(e)}")
return False
def handle_captcha(self):
# 这里可以添加验证码处理逻辑
print("检测到验证码,请手动处理...")
time.sleep(30) # 等待用户手动处理
def upload_video(self, video_path, caption='', schedule_time=None):
try:
if not os.path.exists(video_path):
raise FileNotFoundError(f"视频文件不存在: {video_path}")
# 检查视频格式和大小
self.validate_video(video_path)
# 打开发布页面
self.driver.get('https://creator.douyin.com/creator-micro/content/upload')
time.sleep(random.uniform(3, 5))
# 上传视频
upload_input = WebDriverWait(self.driver, 20).until(
EC.presence_of_element_located((By.XPATH, '//input[@type="file"]'))
)
upload_input.send_keys(os.path.abspath(video_path))
time.sleep(random.uniform(5, 10))
# 等待上传完成
WebDriverWait(self.driver, 120).until(
EC.invisibility_of_element_located((By.XPATH, '//*[contains(text(),"上传中")]'))
)
time.sleep(3)
# 输入描述
caption_area = self.driver.find_element(By.XPATH, '//textarea[@placeholder="添加描述..."]')
caption_area.clear()
caption_area.send_keys(caption)
time.sleep(random.uniform(1, 2))
# 设置定时发布
if schedule_time:
self.set_schedule_time(schedule_time)
# 点击发布
publish_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '//button[contains(text(),"发布")]'))
)
publish_btn.click()
time.sleep(random.uniform(5, 8))
# 检查发布结果
if "发布成功" in self.driver.page_source:
self.log_upload(self.current_account['username'], video_path, 'video', 'success', '发布成功')
return True
else:
raise Exception("发布失败,未知错误")
except Exception as e:
self.log_upload(self.current_account['username'], video_path, 'video', 'failed', str(e))
print(f"视频上传失败: {str(e)}")
return False
def upload_image(self, image_paths, caption='', schedule_time=None):
try:
if not all(os.path.exists(p) for p in image_paths):
raise FileNotFoundError("部分图片文件不存在")
# 检查图片格式和大小
for img_path in image_paths:
self.validate_image(img_path)
# 打开发布页面
self.driver.get('https://creator.douyin.com/creator-micro/content/publish')
time.sleep(random.uniform(3, 5))
# 切换到图文模式
image_tab = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '//*[contains(text(),"图文")]'))
)
image_tab.click()
time.sleep(2)
# 上传图片
upload_input = WebDriverWait(self.driver, 20).until(
EC.presence_of_element_located((By.XPATH, '//input[@type="file"]'))
)
for img_path in image_paths:
upload_input.send_keys(os.path.abspath(img_path))
time.sleep(random.uniform(1, 2))
time.sleep(random.uniform(5, 10))
# 输入描述
caption_area = self.driver.find_element(By.XPATH, '//textarea[@placeholder="添加描述..."]')
caption_area.clear()
caption_area.send_keys(caption)
time.sleep(random.uniform(1, 2))
# 设置定时发布
if schedule_time:
self.set_schedule_time(schedule_time)
# 点击发布
publish_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '//button[contains(text(),"发布")]'))
)
publish_btn.click()
time.sleep(random.uniform(5, 8))
# 检查发布结果
if "发布成功" in self.driver.page_source:
self.log_upload(self.current_account['username'], ','.join(image_paths), 'image', 'success', '发布成功')
return True
else:
raise Exception("发布失败,未知错误")
except Exception as e:
self.log_upload(self.current_account['username'], ','.join(image_paths), 'image', 'failed', str(e))
print(f"图文上传失败: {str(e)}")
return False
def set_schedule_time(self, schedule_time):
try:
# 点击定时发布按钮
schedule_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '//*[contains(text(),"定时发布")]'))
)
schedule_btn.click()
time.sleep(1)
# 设置时间
time_input = self.driver.find_element(By.XPATH, '//input[@placeholder="选择日期时间"]')
time_input.clear()
time_input.send_keys(schedule_time.strftime('%Y-%m-%d %H:%M'))
time.sleep(1)
# 确认
confirm_btn = self.driver.find_element(By.XPATH, '//button[contains(text(),"确定")]')
confirm_btn.click()
time.sleep(1)
except Exception as e:
print(f"设置定时发布失败: {str(e)}")
raise
def validate_video(self, video_path):
try:
# 检查视频时长
clip = VideoFileClip(video_path)
duration = clip.duration
if duration > 300: # 5分钟限制
raise ValueError("视频时长超过5分钟限制")
# 检查视频大小
file_size = os.path.getsize(video_path) / (1024 * 1024) # MB
if file_size > 500: # 500MB限制
raise ValueError("视频文件大小超过500MB限制")
except Exception as e:
raise ValueError(f"视频验证失败: {str(e)}")
def validate_image(self, image_path):
try:
# 检查图片格式
img = Image.open(image_path)
if img.format.lower() not in ['jpeg', 'png', 'webp']:
raise ValueError("不支持的图片格式")
# 检查图片大小
file_size = os.path.getsize(image_path) / (1024 * 1024) # MB
if file_size > 20: # 20MB限制
raise ValueError("图片文件大小超过20MB限制")
except Exception as e:
raise ValueError(f"图片验证失败: {str(e)}")
def batch_upload(self, content_list):
try:
self.init_driver()
for account in self.config['accounts']:
if not self.login(account):
continue
for content in content_list:
if content['type'] == 'video':
self.upload_video(
content['path'],
content.get('caption', ''),
content.get('schedule_time')
)
elif content['type'] == 'image':
self.upload_image(
content['paths'],
content.get('caption', ''),
content.get('schedule_time')
)
time.sleep(random.uniform(10, 20))
finally:
if self.driver:
self.driver.quit()
def run_scheduled_uploads(self):
def job():
print(f"执行定时上传任务: {datetime.now()}")
self.batch_upload(self.config['scheduled_content'])
# 设置定时任务
for schedule_time in self.config['schedule_times']:
schedule.every().day.at(schedule_time).do(job)
# 启动定时任务线程
def schedule_thread():
while True:
schedule.run_pending()
time.sleep(60)
threading.Thread(target=schedule_thread, daemon=True).start()
print("定时上传服务已启动,按Ctrl+C退出...")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("定时上传服务已停止")
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。