成品下载地址:https://www.pan38.com/yun/share.php?code=JCnzE 提取密码:1162
之前用python对接比特指纹浏览器实现开发的一个小红书批量上传工具,就是可以登录很多小红书账号,然后本地读取素材自动发布出去的工具,功能的话这个小红书自动化工具包含5个核心模块:主控制模块负责整体流程调度;API封装模块处理与小红书服务器的通信;内容管理模块处理媒体文件;定时调度模块控制发布时间;日志模块记录运行状态。使用时需要安装requests、Pillow、opencv-python等依赖库。
源码部分:【仅供学习测试】
import os
import time
from xiaohongshu_api import XiaohongshuAPI
from content_manager import ContentManager
from scheduler import Scheduler
from logger import setup_logger
class XiaohongshuBot:
def __init__(self):
self.logger = setup_logger('xiaohongshu')
self.api = XiaohongshuAPI()
self.content_mgr = ContentManager()
self.scheduler = Scheduler()
def login(self, username, password):
self.api.login(username, password)
def upload_post(self, content_type, **kwargs):
if content_type == 'image':
return self._upload_image_post(**kwargs)
elif content_type == 'video':
return self._upload_video_post(**kwargs)
def _upload_image_post(self, images, title, desc, tags):
media_ids = []
for img_path in images:
media_id = self.api.upload_image(img_path)
media_ids.append(media_id)
return self.api.create_note(
title=title,
desc=desc,
media_ids=media_ids,
tags=tags
)
def batch_upload(self, task_file):
tasks = self.content_mgr.load_tasks(task_file)
for task in tasks:
try:
self.scheduler.wait_until(task['publish_time'])
result = self.upload_post(**task)
self.logger.info(f"发布成功: {result}")
except Exception as e:
self.logger.error(f"发布失败: {str(e)}")
if __name__ == '__main__':
bot = XiaohongshuBot()
bot.login('your_username', 'your_password')
bot.batch_upload('tasks.json')
import requests
import json
from hashlib import md5
import time
class XiaohongshuAPI:
BASE_URL = 'https://edith.xiaohongshu.com/api'
def __init__(self):
self.session = requests.Session()
self.headers = {
'User-Agent': 'Mozilla/5.0...',
'Referer': 'https://www.xiaohongshu.com/'
}
def login(self, username, password):
password_md5 = md5(password.encode()).hexdigest()
payload = {
'username': username,
'password': password_md5,
'captcha': '',
'remember': True
}
resp = self._request('POST', '/login', payload)
self.token = resp['data']['token']
self.headers['Authorization'] = f'Bearer {self.token}'
def upload_image(self, file_path):
with open(file_path, 'rb') as f:
files = {'file': (os.path.basename(file_path), f)}
resp = self._request(
'POST',
'/media/upload/image',
files=files
)
return resp['data']['media_id']
def create_note(self, title, desc, media_ids, tags):
payload = {
'title': title,
'desc': desc,
'media_ids': media_ids,
'tags': tags,
'type': 'normal'
}
return self._request('POST', '/note/create', payload)
def _request(self, method, endpoint, data=None, files=None):
url = self.BASE_URL + endpoint
try:
if method == 'GET':
resp = self.session.get(url, params=data, headers=self.headers)
else:
if files:
resp = self.session.post(url, files=files, headers=self.headers)
else:
resp = self.session.post(url, json=data, headers=self.headers)
resp.raise_for_status()
return resp.json()
except Exception as e:
raise Exception(f"API请求失败: {str(e)}")
json
import os
from PIL import Image
import cv2
class ContentManager:
def __init__(self):
self.image_formats = ['.jpg', '.png', '.jpeg']
self.video_formats = ['.mp4', '.mov']
def load_tasks(self, file_path):
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
def scan_directory(self, dir_path):
media_files = []
for root, _, files in os.walk(dir_path):
for file in files:
ext = os.path.splitext(file)[1].lower()
if ext in self.image_formats + self.video_formats:
media_files.append(os.path.join(root, file))
return media_files
def process_image(self, img_path, max_size=1080):
img = Image.open(img_path)
if max(img.size) > max_size:
ratio = max_size / max(img.size)
new_size = tuple(int(x*ratio) for x in img.size)
img = img.resize(new_size, Image.ANTIALIAS)
img.save(img_path)
def get_video_info(self, video_path):
cap = cv2.VideoCapture(video_path)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
duration = cap.get(cv2.CAP_PROP_FRAME_COUNT) / cap.get(cv2.CAP_PROP_FPS)
cap.release()
return {
'width': width,
'height': height,
'duration': duration
}
time
from datetime import datetime
import random
class Scheduler:
def __init__(self):
self.min_interval = 3600 # 1小时最小间隔
def wait_until(self, target_time):
now = datetime.now()
target = datetime.strptime(target_time, '%Y-%m-%d %H:%M')
if target < now:
raise ValueError("目标时间已过期")
sleep_seconds = (target - now).total_seconds()
while sleep_seconds > 0:
print(f"等待发布,剩余时间: {sleep_seconds//3600}小时")
time.sleep(min(300, sleep_seconds)) # 每5分钟检查一次
sleep_seconds = (target - datetime.now()).total_seconds()
def random_delay(self):
delay = random.randint(300, 1800) # 5-30分钟随机延迟
time.sleep(delay)
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。