成品下载地址:https://www.pan38.com/yun/share.php?code=JCnzE 提取密码:9998
这个框架包含三个主要模块:主上传程序、视频预处理工具和配置文件。使用时需要安装selenium、moviepy等依赖库。代码实现了自动登录、批量上传、视频压缩和水印添加等功能,可根据实际需求进行扩展。
import os
import time
import random
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from moviepy.editor import VideoFileClip
import configparser
class KuaishouUploader:
def __init__(self):
self.config = configparser.ConfigParser()
self.config.read('config.ini')
self.driver = self._init_driver()
def _init_driver(self):
chrome_options = Options()
chrome_options.add_argument("--disable-notifications")
chrome_options.add_argument("--lang=zh-CN")
chrome_options.add_experimental_option("prefs", {
"profile.default_content_setting_values.media_stream_mic": 1,
"profile.default_content_setting_values.media_stream_camera": 1,
})
driver = webdriver.Chrome(options=chrome_options)
driver.maximize_window()
return driver
def login(self):
self.driver.get("https://www.kuaishou.com")
time.sleep(3)
# 点击登录按钮
login_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '//*[@id="login"]'))
)
login_btn.click()
time.sleep(2)
# 切换至账号密码登录
switch_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '//div[contains(text(),"密码登录")]'))
)
switch_btn.click()
time.sleep(1)
# 输入账号密码
username = self.driver.find_element(By.NAME, 'phone')
username.send_keys(self.config['ACCOUNT']['username'])
password = self.driver.find_element(By.NAME, 'password')
password.send_keys(self.config['ACCOUNT']['password'])
# 点击登录
submit_btn = self.driver.find_element(By.XPATH, '//button[@type="submit"]')
submit_btn.click()
time.sleep(5)
def upload_video(self, video_path, title, tags):
# 打开上传页面
self.driver.get("https://www.kuaishou.com/upload")
time.sleep(3)
# 上传视频文件
upload_input = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH, '//input[@type="file"]'))
)
upload_input.send_keys(os.path.abspath(video_path))
time.sleep(5)
# 输入标题
title_input = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH, '//textarea[@placeholder="添加标题..."]'))
)
title_input.send_keys(title)
# 添加标签
if tags:
tag_input = self.driver.find_element(By.XPATH, '//input[@placeholder="添加标签..."]')
for tag in tags:
tag_input.send_keys(tag)
tag_input.send_keys(Keys.ENTER)
time.sleep(0.5)
# 点击发布
publish_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '//button[contains(text(),"发布")]'))
)
publish_btn.click()
time.sleep(10)
def batch_upload(self, video_dir):
videos = [f for f in os.listdir(video_dir) if f.endswith(('.mp4', '.mov', '.avi'))]
for video in videos:
video_path = os.path.join(video_dir, video)
title = os.path.splitext(video)[0]
tags = ["自动上传", "Python脚本"]
try:
self.upload_video(video_path, title, tags)
print(f"成功上传视频: {video}")
time.sleep(random.randint(30, 60)) # 随机等待30-60秒
except Exception as e:
print(f"上传视频 {video} 失败: {str(e)}")
def close(self):
self.driver.quit()
if __name__ == "__main__":
uploader = KuaishouUploader()
try:
uploader.login()
uploader.batch_upload("videos")
finally:
uploader.close()
os
from moviepy.editor import VideoFileClip
from PIL import Image
import numpy as np
class VideoProcessor:
@staticmethod
def compress_video(input_path, output_path, target_size_mb=50, fps=30):
"""压缩视频到指定大小"""
clip = VideoFileClip(input_path)
# 计算目标比特率 (kbps)
duration = clip.duration
target_bitrate = int((target_size_mb * 8000) / duration)
# 调整分辨率
if clip.size[0] > 1920 or clip.size[1] > 1080:
clip = clip.resize(height=1080)
# 写入压缩后的视频
clip.write_videofile(
output_path,
fps=fps,
bitrate=f"{target_bitrate}k",
threads=4,
preset="fast",
codec="libx264"
)
clip.close()
@staticmethod
def add_watermark(input_path, output_path, watermark_text="自动上传"):
"""添加文字水印"""
clip = VideoFileClip(input_path)
# 创建水印文本
txt_clip = (TextClip(watermark_text, fontsize=30, color='white')
.set_position(('right', 'bottom'))
.set_duration(clip.duration))
# 合成视频
final_clip = CompositeVideoClip([clip, txt_clip])
final_clip.write_videofile(output_path)
clip.close()
final_clip.close()
@staticmethod
def generate_thumbnails(video_path, output_dir, count=3):
"""生成缩略图"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
clip = VideoFileClip(video_path)
duration = clip.duration
for i in range(count):
time_point = random.uniform(0, duration)
frame = clip.get_frame(time_point)
img = Image.fromarray(frame)
thumbnail_path = os.path.join(
output_dir,
f"{os.path.splitext(os.path.basename(video_path))[0]}_thumb_{i}.jpg"
)
img.save(thumbnail_path)
clip.close()
[ACCOUNT]
username = your_username
password = your_password
[SETTINGS]
headless = False
upload_interval = 60
max_retry = 3
[PATHS]
video_dir = ./videos
processed_dir = ./processed
thumbnails_dir = ./thumbnails
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。