首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >自媒体批量发布工具,抖音批量发布作品工具,上传视频用python模块~

自媒体批量发布工具,抖音批量发布作品工具,上传视频用python模块~

原创
作者头像
用户11719788
发布2025-07-13 19:22:30
发布2025-07-13 19:22:30
3450
举报

成品下载地址:https://www.pan38.com/yun/share.php?code=JCnzE 提取密码:8819

包含视频上传、内容管理、自动发布等功能模块。软件使用易语言开发的,但是易语言仅仅只是调用了python的浏览器模块实现,比如自动上传,cookie多账号管理,都是用的python,易语言只是实现的调用的功能非常简单哈,但是很实用。

源码部分:【模块~仅供学习参考】

代码语言:txt
复制

import os
import time
import random
import requests
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from datetime import datetime
import json
import cv2
import numpy as np
from PIL import Image
import moviepy.editor as mp
import logging
from logging.handlers import RotatingFileHandler
import schedule
import threading
import queue

class DouyinUploader:
    def __init__(self, config_file='config.json'):
        self.setup_logging()
        self.logger.info("Initializing Douyin Uploader...")
        self.config = self.load_config(config_file)
        self.video_queue = queue.Queue()
        self.driver = None
        self.is_running = False
        self.setup_driver()
        self.login()
        
    def setup_logging(self):
        self.logger = logging.getLogger('DouyinUploader')
        self.logger.setLevel(logging.INFO)
        handler = RotatingFileHandler(
            'douyin_uploader.log',
            maxBytes=1024*1024*5,
            backupCount=5
        )
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
        
    def load_config(self, config_file):
        try:
            with open(config_file, 'r', encoding='utf-8') as f:
                config = json.load(f)
                self.logger.info("Config loaded successfully")
                return config
        except Exception as e:
            self.logger.error(f"Error loading config: {str(e)}")
            raise
            
    def setup_driver(self):
        chrome_options = Options()
        if self.config.get('headless', False):
            chrome_options.add_argument('--headless')
        chrome_options.add_argument('--disable-gpu')
        chrome_options.add_argument('--no-sandbox')
        chrome_options.add_argument('--disable-dev-shm-usage')
        chrome_options.add_argument('--window-size=1920,1080')
        chrome_options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
        
        self.driver = webdriver.Chrome(
            executable_path=self.config.get('chrome_driver_path', 'chromedriver'),
            options=chrome_options
        )
        self.driver.implicitly_wait(10)
        self.logger.info("WebDriver initialized successfully")
        
    def login(self):
        self.logger.info("Attempting to login...")
        try:
            self.driver.get('https://www.douyin.com')
            time.sleep(5)
            
            # Check if already logged in
            if self.check_login_status():
                self.logger.info("Already logged in")
                return True
                
            # Perform login
            login_btn = WebDriverWait(self.driver, 10).until(
                EC.element_to_be_clickable((By.XPATH, '//*[@id="login-pannel"]/div[2]/div[1]/div[1]'))
            )
            login_btn.click()
            time.sleep(2)
            
            # Switch to QR code login
            qr_login = WebDriverWait(self.driver, 10).until(
                EC.element_to_be_clickable((By.XPATH, '//*[@id="login-pannel"]/div[2]/div[2]/div[2]/div[1]/div[1]/div[2]'))
            )
            qr_login.click()
            time.sleep(5)
            
            # Wait for QR code scan
            self.logger.info("Please scan the QR code to login")
            WebDriverWait(self.driver, 120).until(
                lambda d: self.check_login_status()
            )
            self.logger.info("Login successful")
            return True
        except Exception as e:
            self.logger.error(f"Login failed: {str(e)}")
            return False
            
    def check_login_status(self):
        try:
            self.driver.find_element(By.XPATH, '//*[@id="login-pannel"]')
            return False
        except:
            return True
            
    def upload_video(self, video_path, title='', tags=[], schedule_time=None):
        try:
            self.logger.info(f"Preparing to upload video: {video_path}")
            
            # Open upload page
            self.driver.get('https://creator.douyin.com/creator-micro/content/upload')
            time.sleep(5)
            
            # Upload video file
            upload_input = WebDriverWait(self.driver, 30).until(
                EC.presence_of_element_located((By.XPATH, '//input[@type="file"]'))
            )
            upload_input.send_keys(os.path.abspath(video_path))
            self.logger.info("Video file selected")
            
            # Wait for upload to complete
            WebDriverWait(self.driver, 300).until(
                EC.invisibility_of_element_located((By.XPATH, '//div[contains(text(),"上传中")]'))
            )
            self.logger.info("Video upload completed")
            
            # Set video title
            if title:
                title_input = WebDriverWait(self.driver, 10).until(
                    EC.element_to_be_clickable((By.XPATH, '//textarea[@placeholder="填写更丰富的标题,让更多人看到你"]'))
                )
                title_input.clear()
                title_input.send_keys(title)
                self.logger.info(f"Video title set: {title}")
                
            # Add tags
            if tags:
                for tag in tags:
                    tag_input = WebDriverWait(self.driver, 10).until(
                        EC.element_to_be_clickable((By.XPATH, '//input[@placeholder="#添加话题,让更多人看到"]'))
                    )
                    tag_input.send_keys(tag)
                    tag_input.send_keys(Keys.ENTER)
                    time.sleep(1)
                self.logger.info(f"Tags added: {', '.join(tags)}")
                
            # Set schedule time if provided
            if schedule_time:
                schedule_btn = WebDriverWait(self.driver, 10).until(
                    EC.element_to_be_clickable((By.XPATH, '//span[contains(text(),"定时发布")]'))
                )
                schedule_btn.click()
                time.sleep(1)
                
                # Set date and time
                # Note: This part may need adjustment based on actual UI elements
                date_input = self.driver.find_element(By.XPATH, '//input[@placeholder="选择日期"]')
                time_input = self.driver.find_element(By.XPATH, '//input[@placeholder="选择时间"]')
                
                date_input.clear()
                date_input.send_keys(schedule_time.strftime('%Y-%m-%d'))
                time_input.clear()
                time_input.send_keys(schedule_time.strftime('%H:%M'))
                self.logger.info(f"Scheduled for: {schedule_time}")
                
            # Publish video
            publish_btn = WebDriverWait(self.driver, 10).until(
                EC.element_to_be_clickable((By.XPATH, '//button[contains(text(),"发布")]'))
            )
            publish_btn.click()
            self.logger.info("Publish button clicked")
            
            # Wait for confirmation
            WebDriverWait(self.driver, 30).until(
                EC.presence_of_element_located((By.XPATH, '//div[contains(text(),"发布成功")]'))
            )
            self.logger.info("Video published successfully")
            return True
        except Exception as e:
            self.logger.error(f"Video upload failed: {str(e)}")
            return False
            
    def batch_upload(self, video_dir):
        self.logger.info(f"Starting batch upload from directory: {video_dir}")
        if not os.path.isdir(video_dir):
            self.logger.error(f"Directory not found: {video_dir}")
            return False
            
        video_files = [f for f in os.listdir(video_dir) if f.lower().endswith(('.mp4', '.mov', '.avi'))]
        if not video_files:
            self.logger.warning(f"No video files found in directory: {video_dir}")
            return False
            
        for video_file in video_files:
            video_path = os.path.join(video_dir, video_file)
            title = os.path.splitext(video_file)[0]
            tags = self.generate_tags(title)
            
            if not self.upload_video(video_path, title, tags):
                self.logger.warning(f"Failed to upload video: {video_file}")
                continue
                
            # Random delay between uploads
            delay = random.randint(30, 120)
            self.logger.info(f"Waiting {delay} seconds before next upload...")
            time.sleep(delay)
            
        self.logger.info("Batch upload completed")
        return True
        
    def generate_tags(self, title):
        common_tags = [
            "抖音", "短视频", "原创", "热门",
            "推荐", "爆款", "上热门", "流量"
        ]
        
        # Add some random tags
        random_tags = random.sample([
            "生活", "日常", "搞笑", "美食",
            "旅行", "摄影", "音乐", "舞蹈",
            "宠物", "科技", "教育", "时尚"
        ], 3)
        
        return common_tags + random_tags
        
    def start_scheduler(self):
        self.logger.info("Starting scheduler...")
        self.is_running = True
        
        # Schedule uploads based on config
        for schedule_item in self.config.get('schedule', []):
            time_str = schedule_item['time']
            video_dir = schedule_item['video_dir']
            
            schedule.every().day.at(time_str).do(
                self.batch_upload,
                video_dir=video_dir
            )
            self.logger.info(f"Scheduled upload at {time_str} for directory: {video_dir}")
            
        # Run scheduler in a separate thread
        def run_scheduler():
            while self.is_running:
                schedule.run_pending()
                time.sleep(1)
                
        scheduler_thread = threading.Thread(target=run_scheduler)
        scheduler_thread.daemon = True
        scheduler_thread.start()
        
    def stop_scheduler(self):
        self.logger.info("Stopping scheduler...")
        self.is_running = False
        
    def __del__(self):
        if self.driver:
            self.driver.quit()
            self.logger.info("WebDriver closed")
代码语言:txt
复制
 cv2
import numpy as np
import os
import moviepy.editor as mp
from PIL import Image
import random
import time
from datetime import datetime
import logging

class VideoProcessor:
    def __init__(self, config):
        self.config = config
        self.logger = logging.getLogger('VideoProcessor')
        
    def process_video(self, input_path, output_dir):
        try:
            self.logger.info(f"Processing video: {input_path}")
            
            # Create output directory if not exists
            os.makedirs(output_dir, exist_ok=True)
            
            # Generate output filename
            base_name = os.path.splitext(os.path.basename(input_path))[0]
            output_path = os.path.join(output_dir, f"{base_name}_processed.mp4")
            
            # Load video
            video = mp.VideoFileClip(input_path)
            
            # Apply processing steps
            processed = video
            
            # Resize if needed
            if 'resize' in self.config:
                width = self.config['resize']['width']
                height = self.config['resize']['height']
                processed = processed.resize((width, height))
                self.logger.info(f"Resized video to {width}x{height}")
                
            # Add watermark if configured
            if 'watermark' in self.config:
                processed = self.add_watermark(processed)
                self.logger.info("Watermark added")
                
            # Add intro/outro if configured
            if 'intro' in self.config:
                processed = self.add_intro_outro(processed)
                self.logger.info("Intro/outro added")
                
            # Adjust speed if configured
            if 'speed' in self.config:
                speed_factor = self.config['speed']
                processed = processed.fx(mp.vfx.speedx, speed_factor)
                self.logger.info(f"Speed adjusted by factor {speed_factor}")
                
            # Write processed video
            processed.write_videofile(
                output_path,
                codec='libx264',
                audio_codec='aac',
                threads=4,
                fps=video.fps
            )
            
            self.logger.info(f"Video processing completed: {output_path}")
            return output_path
            
        except Exception as e:
            self.logger.error(f"Video processing failed: {str(e)}")
            return None
            
    def add_watermark(self, video_clip):
        watermark_config = self.config['watermark']
        
        # Create watermark image
        if watermark_config['type'] == 'text':
            from moviepy.editor import TextClip
            
            watermark = TextClip(
                watermark_config['text'],
                fontsize=watermark_config['size'],
                color=watermark_config['color'],
                font=watermark_config['font']
            )
        else:
            # Image watermark
            watermark = mp.ImageClip(watermark_config['image_path'])
            
        watermark = watermark.set_opacity(watermark_config['opacity'])
        watermark = watermark.set_position(watermark_config['position'])
        watermark = watermark.set_duration(video_clip.duration)
        
        return mp.CompositeVideoClip([video_clip, watermark])
        
    def add_intro_outro(self, video_clip):
        intro_config = self.config['intro']
        outro_config = self.config.get('outro', {})
        
        clips = []
        
        # Add intro
        if intro_config:
            if intro_config['type'] == 'video':
                intro = mp.VideoFileClip(intro_config['path'])
            else:
                # Image intro
                intro = mp.ImageClip(intro_config['path'])
                intro = intro.set_duration(intro_config['duration'])
                
            clips.append(intro)
            
        # Add main video
        clips.append(video_clip)
        
        # Add outro
        if outro_config:
            if outro_config['type'] == 'video':
                outro = mp.VideoFileClip(outro_config['path'])
            else:
                # Image outro
                outro = mp.ImageClip(outro_config['path'])
                outro = outro.set_duration(outro_config['duration'])
                
            clips.append(outro)
            
        return mp.concatenate_videoclips(clips)
        
    def batch_process(self, input_dir, output_dir):
        self.logger.info(f"Starting batch processing from {input_dir} to {output_dir}")
        
        if not os.path.isdir(input_dir):
            self.logger.error(f"Input directory not found: {input_dir}")
            return False
            
        os.makedirs(output_dir, exist_ok=True)
        
        video_files = [f for f in os.listdir(input_dir) if f.lower().endswith(('.mp4', '.mov', '.avi'))]
        if not video_files:
            self.logger.warning(f"No video files found in directory: {input_dir}")
            return False
            
        processed_count = 0
        for video_file in video_files:
            input_path = os.path.join(input_dir, video_file)
            output_path = self.process_video(input_path, output_dir)
            
            if output_path:
                processed_count += 1
                
        self.logger.info(f"Batch processing completed. Processed {processed_count}/{len(video_files)} videos")
        return processed_count > 0

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
作者已关闭评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档