下载地址:https://www.pan38.com/dow/share.php?code=JCnzE 提取密码:1133
这个完整实现包含API封装、任务管理、多线程处理和配置文件系统。使用时需要先配置cookie信息,然后按照示例格式创建任务文件。代码实现了视频分片上传、错误处理和任务归档功能。
import requests
import json
import time
from typing import Dict, List, Optional
class BilibiliUploader:
def __init__(self, sessdata: str, bili_jct: str, dedeuserid: str):
self.session = requests.Session()
self.cookies = {
'SESSDATA': sessdata,
'bili_jct': bili_jct,
'DedeUserID': dedeuserid
}
self.headers = {
'User-Agent': 'Mozilla/5.0',
'Referer': 'https://www.bilibili.com'
}
def _get_upload_id(self) -> str:
url = "https://api.bilibili.com/x/upload/web/upload/add"
params = {
"profile": "ugcfr/pc3"
}
response = self.session.get(url, params=params, cookies=self.cookies, headers=self.headers)
return response.json()['data']['upload_id']
def _upload_chunk(self, upload_id: str, chunk: bytes, chunk_index: int, chunks: int) -> bool:
url = "https://upos-sz-upcdnbda2.bilivideo.com/upload"
files = {
'version': (None, '2.0.0.1054'),
'filesize': (None, str(len(chunk))),
'chunk': (None, str(chunk_index)),
'chunks': (None, str(chunks)),
'uploadId': (None, upload_id),
'partNumber': (None, str(chunk_index + 1)),
'file': ('blob', chunk, 'application/octet-stream')
}
response = self.session.post(url, files=files, cookies=self.cookies)
return response.status_code == 200
def upload_video(self, file_path: str, title: str, desc: str, tid: int, tags: List[str]) -> Dict:
# 分片上传实现
CHUNK_SIZE = 1024 * 1024 * 5 # 5MB
upload_id = self._get_upload_id()
with open(file_path, 'rb') as f:
file_size = os.path.getsize(file_path)
chunks = (file_size + CHUNK_SIZE - 1) // CHUNK_SIZE
for i in range(chunks):
chunk = f.read(CHUNK_SIZE)
self._upload_chunk(upload_id, chunk, i, chunks)
# 提交视频信息
submit_url = "https://member.bilibili.com/x/vu/web/add"
data = {
"copyright": 2,
"videos": [{"filename": os.path.basename(file_path)}],
"source": "",
"tid": tid,
"cover": "",
"title": title,
"desc": desc,
"dynamic": "",
"subtitle": {"open":0,"lan":""},
"tag": ",".join(tags),
"dtime": 0,
"interactive": 0,
"no_reprint": 1,
"upload_id": upload_id
}
response = self.session.post(submit_url, json=data, cookies=self.cookies)
return response.json()
import os
import yaml
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
from bilibili_uploader import BilibiliUploader
class TaskManager:
def __init__(self, config_path: str = 'config.yaml'):
with open(config_path) as f:
self.config = yaml.safe_load(f)
self.uploader = BilibiliUploader(
self.config['cookies']['sessdata'],
self.config['cookies']['bili_jct'],
self.config['cookies']['dedeuserid']
)
self.task_queue = []
self.load_tasks()
def load_tasks(self):
tasks_dir = self.config.get('tasks_dir', 'tasks')
for filename in os.listdir(tasks_dir):
if filename.endswith('.yaml'):
with open(os.path.join(tasks_dir, filename)) as f:
task = yaml.safe_load(f)
self.task_queue.append(task)
def process_task(self, task: dict):
try:
print(f"开始处理任务: {task['title']}")
result = self.uploader.upload_video(
file_path=task['video_path'],
title=task['title'],
desc=task.get('desc', ''),
tid=task.get('tid', 174), # 默认科技区
tags=task.get('tags', [])
)
if result['code'] == 0:
print(f"上传成功: {task['title']} (AID: {result['data']['aid']})")
self._move_to_archive(task)
else:
print(f"上传失败: {task['title']} - {result['message']}")
except Exception as e:
print(f"任务处理异常: {task['title']} - {str(e)}")
def _move_to_archive(self, task: dict):
archive_dir = self.config.get('archive_dir', 'archive')
os.makedirs(archive_dir, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
new_path = os.path.join(archive_dir, f"{timestamp}_{os.path.basename(task['video_path'])}")
os.rename(task['video_path'], new_path)
def run(self, max_workers: int = 3):
with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(self.process_task, self.task_queue)
argparse
from task_manager import TaskManager
def parse_args():
parser = argparse.ArgumentParser(description='B站批量上传工具')
parser.add_argument('--config', default='config.yaml', help='配置文件路径')
parser.add_argument('--workers', type=int, default=3, help='并发上传数量')
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
print("=== B站批量上传工具启动 ===")
manager = TaskManager(args.config)
manager.run(args.workers)
print("=== 所有任务处理完成 ===")
ookies:
sessdata: "你的SESSDATA"
bili_jct: "你的bili_jct"
dedeuserid: "你的DedeUserID"
tasks_dir: "tasks" # 任务文件目录
archive_dir: "archive" # 完成归档目录
# 可选全局默认值
defaults:
tid: 174 # 科技区
tags: ["科技", "编程", "Python"]
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。