首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >抖音批量发布工具,抖音上传图文视频批量发布软件,python版开发

抖音批量发布工具,抖音上传图文视频批量发布软件,python版开发

原创
作者头像
用户11744395
发布2025-07-16 10:09:02
发布2025-07-16 10:09:02
4190
举报

下载地址:https://www.pan38.com/dow/share.php?code=JCnzE 提取密码:1133

这个抖音批量发布工具包含以下功能:

  1. 多账号管理
  2. 视频和图文批量上传
  3. 定时发布功能
  4. 内容验证(格式、大小等)
  5. 上传日志记录
  6. 定时任务调度

使用说明:

  1. 安装依赖:pip install -r requirements.txt
  2. 配置config.json文件
  3. 运行主程序:python dy_uploader.py
  4. 对于定时任务,调用run_scheduled_uploads()方法

注意:使用前需要安装Chrome浏览器和对应版本的ChromeDriver。

代码语言:txt
复制

import os
import time
import random
import json
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from PIL import Image
import cv2
import numpy as np
import pandas as pd
from moviepy.editor import VideoFileClip
import schedule
import threading

class DouyinUploader:
    def __init__(self, config_file='config.json'):
        self.config = self.load_config(config_file)
        self.driver = None
        self.current_account = None
        self.log_file = 'upload_log.csv'
        self.init_log_file()
        
    def load_config(self, config_file):
        with open(config_file, 'r', encoding='utf-8') as f:
            return json.load(f)
            
    def init_log_file(self):
        if not os.path.exists(self.log_file):
            columns = ['timestamp', 'account', 'file', 'type', 'status', 'message']
            pd.DataFrame(columns=columns).to_csv(self.log_file, index=False)
            
    def log_upload(self, account, file, file_type, status, message):
        log_data = {
            'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            'account': account,
            'file': file,
            'type': file_type,
            'status': status,
            'message': message
        }
        pd.DataFrame([log_data]).to_csv(self.log_file, mode='a', header=False, index=False)
        
    def init_driver(self):
        chrome_options = Options()
        if self.config['headless']:
            chrome_options.add_argument('--headless')
        chrome_options.add_argument('--disable-gpu')
        chrome_options.add_argument('--no-sandbox')
        chrome_options.add_argument('--disable-dev-shm-usage')
        chrome_options.add_argument('--window-size=1920,1080')
        self.driver = webdriver.Chrome(options=chrome_options)
        self.driver.implicitly_wait(10)
        
    def login(self, account):
        try:
            self.driver.get('https://www.douyin.com')
            time.sleep(random.uniform(2, 4))
            
            login_btn = WebDriverWait(self.driver, 10).until(
                EC.element_to_be_clickable((By.XPATH, '//*[contains(text(),"登录")]'))
            )
            login_btn.click()
            time.sleep(random.uniform(1, 3))
            
            # 切换登录方式
            switch_btn = WebDriverWait(self.driver, 10).until(
                EC.element_to_be_clickable((By.XPATH, '//*[contains(text(),"密码登录")]'))
            )
            switch_btn.click()
            time.sleep(1)
            
            # 输入账号密码
            username = self.driver.find_element(By.NAME, 'username')
            username.clear()
            username.send_keys(account['username'])
            time.sleep(random.uniform(0.5, 1.5))
            
            password = self.driver.find_element(By.NAME, 'password')
            password.clear()
            password.send_keys(account['password'])
            time.sleep(random.uniform(0.5, 1.5))
            
            # 点击登录
            submit_btn = self.driver.find_element(By.XPATH, '//button[@type="submit"]')
            submit_btn.click()
            time.sleep(random.uniform(5, 8))
            
            # 检查是否登录成功
            if "验证码" in self.driver.page_source:
                self.handle_captcha()
                
            self.current_account = account
            return True
        except Exception as e:
            print(f"登录失败: {str(e)}")
            return False
            
    def handle_captcha(self):
        # 这里可以添加验证码处理逻辑
        print("检测到验证码,请手动处理...")
        time.sleep(30)  # 等待用户手动处理
        
    def upload_video(self, video_path, caption='', schedule_time=None):
        try:
            if not os.path.exists(video_path):
                raise FileNotFoundError(f"视频文件不存在: {video_path}")
                
            # 检查视频格式和大小
            self.validate_video(video_path)
            
            # 打开发布页面
            self.driver.get('https://creator.douyin.com/creator-micro/content/upload')
            time.sleep(random.uniform(3, 5))
            
            # 上传视频
            upload_input = WebDriverWait(self.driver, 20).until(
                EC.presence_of_element_located((By.XPATH, '//input[@type="file"]'))
            )
            upload_input.send_keys(os.path.abspath(video_path))
            time.sleep(random.uniform(5, 10))
            
            # 等待上传完成
            WebDriverWait(self.driver, 120).until(
                EC.invisibility_of_element_located((By.XPATH, '//*[contains(text(),"上传中")]'))
            )
            time.sleep(3)
            
            # 输入描述
            caption_area = self.driver.find_element(By.XPATH, '//textarea[@placeholder="添加描述..."]')
            caption_area.clear()
            caption_area.send_keys(caption)
            time.sleep(random.uniform(1, 2))
            
            # 设置定时发布
            if schedule_time:
                self.set_schedule_time(schedule_time)
                
            # 点击发布
            publish_btn = WebDriverWait(self.driver, 10).until(
                EC.element_to_be_clickable((By.XPATH, '//button[contains(text(),"发布")]'))
            )
            publish_btn.click()
            time.sleep(random.uniform(5, 8))
            
            # 检查发布结果
            if "发布成功" in self.driver.page_source:
                self.log_upload(self.current_account['username'], video_path, 'video', 'success', '发布成功')
                return True
            else:
                raise Exception("发布失败,未知错误")
                
        except Exception as e:
            self.log_upload(self.current_account['username'], video_path, 'video', 'failed', str(e))
            print(f"视频上传失败: {str(e)}")
            return False
            
    def upload_image(self, image_paths, caption='', schedule_time=None):
        try:
            if not all(os.path.exists(p) for p in image_paths):
                raise FileNotFoundError("部分图片文件不存在")
                
            # 检查图片格式和大小
            for img_path in image_paths:
                self.validate_image(img_path)
                
            # 打开发布页面
            self.driver.get('https://creator.douyin.com/creator-micro/content/publish')
            time.sleep(random.uniform(3, 5))
            
            # 切换到图文模式
            image_tab = WebDriverWait(self.driver, 10).until(
                EC.element_to_be_clickable((By.XPATH, '//*[contains(text(),"图文")]'))
            )
            image_tab.click()
            time.sleep(2)
            
            # 上传图片
            upload_input = WebDriverWait(self.driver, 20).until(
                EC.presence_of_element_located((By.XPATH, '//input[@type="file"]'))
            )
            for img_path in image_paths:
                upload_input.send_keys(os.path.abspath(img_path))
                time.sleep(random.uniform(1, 2))
            time.sleep(random.uniform(5, 10))
            
            # 输入描述
            caption_area = self.driver.find_element(By.XPATH, '//textarea[@placeholder="添加描述..."]')
            caption_area.clear()
            caption_area.send_keys(caption)
            time.sleep(random.uniform(1, 2))
            
            # 设置定时发布
            if schedule_time:
                self.set_schedule_time(schedule_time)
                
            # 点击发布
            publish_btn = WebDriverWait(self.driver, 10).until(
                EC.element_to_be_clickable((By.XPATH, '//button[contains(text(),"发布")]'))
            )
            publish_btn.click()
            time.sleep(random.uniform(5, 8))
            
            # 检查发布结果
            if "发布成功" in self.driver.page_source:
                self.log_upload(self.current_account['username'], ','.join(image_paths), 'image', 'success', '发布成功')
                return True
            else:
                raise Exception("发布失败,未知错误")
                
        except Exception as e:
            self.log_upload(self.current_account['username'], ','.join(image_paths), 'image', 'failed', str(e))
            print(f"图文上传失败: {str(e)}")
            return False
            
    def set_schedule_time(self, schedule_time):
        try:
            # 点击定时发布按钮
            schedule_btn = WebDriverWait(self.driver, 10).until(
                EC.element_to_be_clickable((By.XPATH, '//*[contains(text(),"定时发布")]'))
            )
            schedule_btn.click()
            time.sleep(1)
            
            # 设置时间
            time_input = self.driver.find_element(By.XPATH, '//input[@placeholder="选择日期时间"]')
            time_input.clear()
            time_input.send_keys(schedule_time.strftime('%Y-%m-%d %H:%M'))
            time.sleep(1)
            
            # 确认
            confirm_btn = self.driver.find_element(By.XPATH, '//button[contains(text(),"确定")]')
            confirm_btn.click()
            time.sleep(1)
            
        except Exception as e:
            print(f"设置定时发布失败: {str(e)}")
            raise
            
    def validate_video(self, video_path):
        try:
            # 检查视频时长
            clip = VideoFileClip(video_path)
            duration = clip.duration
            if duration > 300:  # 5分钟限制
                raise ValueError("视频时长超过5分钟限制")
                
            # 检查视频大小
            file_size = os.path.getsize(video_path) / (1024 * 1024)  # MB
            if file_size > 500:  # 500MB限制
                raise ValueError("视频文件大小超过500MB限制")
                
        except Exception as e:
            raise ValueError(f"视频验证失败: {str(e)}")
            
    def validate_image(self, image_path):
        try:
            # 检查图片格式
            img = Image.open(image_path)
            if img.format.lower() not in ['jpeg', 'png', 'webp']:
                raise ValueError("不支持的图片格式")
                
            # 检查图片大小
            file_size = os.path.getsize(image_path) / (1024 * 1024)  # MB
            if file_size > 20:  # 20MB限制
                raise ValueError("图片文件大小超过20MB限制")
                
        except Exception as e:
            raise ValueError(f"图片验证失败: {str(e)}")
            
    def batch_upload(self, content_list):
        try:
            self.init_driver()
            
            for account in self.config['accounts']:
                if not self.login(account):
                    continue
                    
                for content in content_list:
                    if content['type'] == 'video':
                        self.upload_video(
                            content['path'],
                            content.get('caption', ''),
                            content.get('schedule_time')
                        )
                    elif content['type'] == 'image':
                        self.upload_image(
                            content['paths'],
                            content.get('caption', ''),
                            content.get('schedule_time')
                        )
                    time.sleep(random.uniform(10, 20))
                    
        finally:
            if self.driver:
                self.driver.quit()
                
    def run_scheduled_uploads(self):
        def job():
            print(f"执行定时上传任务: {datetime.now()}")
            self.batch_upload(self.config['scheduled_content'])
            
        # 设置定时任务
        for schedule_time in self.config['schedule_times']:
            schedule.every().day.at(schedule_time).do(job)
            
        # 启动定时任务线程
        def schedule_thread():
            while True:
                schedule.run_pending()
                time.sleep(60)
                
        threading.Thread(target=schedule_thread, daemon=True).start()
        
        print("定时上传服务已启动,按Ctrl+C退出...")
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            print("定时上传服务已停止")

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
作者已关闭评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档