首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >小红书自动发布笔记工具,上传图文视频批量软件,python框架分享

小红书自动发布笔记工具,上传图文视频批量软件,python框架分享

原创
作者头像
用户11719788
发布2025-07-13 18:59:45
发布2025-07-13 18:59:45
1.1K0
举报

成品下载地址:https://www.pan38.com/yun/share.php?code=JCnzE 提取密码:1162

之前用python对接比特指纹浏览器实现开发的一个小红书批量上传工具,就是可以登录很多小红书账号,然后本地读取素材自动发布出去的工具,功能的话这个小红书自动化工具包含5个核心模块:主控制模块负责整体流程调度;API封装模块处理与小红书服务器的通信;内容管理模块处理媒体文件;定时调度模块控制发布时间;日志模块记录运行状态。使用时需要安装requests、Pillow、opencv-python等依赖库。

源码部分:【仅供学习测试】

代码语言:txt
复制

import os
import time
from xiaohongshu_api import XiaohongshuAPI
from content_manager import ContentManager
from scheduler import Scheduler
from logger import setup_logger

class XiaohongshuBot:
    def __init__(self):
        self.logger = setup_logger('xiaohongshu')
        self.api = XiaohongshuAPI()
        self.content_mgr = ContentManager()
        self.scheduler = Scheduler()
        
    def login(self, username, password):
        self.api.login(username, password)
        
    def upload_post(self, content_type, **kwargs):
        if content_type == 'image':
            return self._upload_image_post(**kwargs)
        elif content_type == 'video':
            return self._upload_video_post(**kwargs)
            
    def _upload_image_post(self, images, title, desc, tags):
        media_ids = []
        for img_path in images:
            media_id = self.api.upload_image(img_path)
            media_ids.append(media_id)
            
        return self.api.create_note(
            title=title,
            desc=desc,
            media_ids=media_ids,
            tags=tags
        )
        
    def batch_upload(self, task_file):
        tasks = self.content_mgr.load_tasks(task_file)
        for task in tasks:
            try:
                self.scheduler.wait_until(task['publish_time'])
                result = self.upload_post(**task)
                self.logger.info(f"发布成功: {result}")
            except Exception as e:
                self.logger.error(f"发布失败: {str(e)}")

if __name__ == '__main__':
    bot = XiaohongshuBot()
    bot.login('your_username', 'your_password')
    bot.batch_upload('tasks.json')
代码语言:txt
复制
import requests
import json
from hashlib import md5
import time

class XiaohongshuAPI:
    BASE_URL = 'https://edith.xiaohongshu.com/api'
    
    def __init__(self):
        self.session = requests.Session()
        self.headers = {
            'User-Agent': 'Mozilla/5.0...',
            'Referer': 'https://www.xiaohongshu.com/'
        }
        
    def login(self, username, password):
        password_md5 = md5(password.encode()).hexdigest()
        payload = {
            'username': username,
            'password': password_md5,
            'captcha': '',
            'remember': True
        }
        resp = self._request('POST', '/login', payload)
        self.token = resp['data']['token']
        self.headers['Authorization'] = f'Bearer {self.token}'
        
    def upload_image(self, file_path):
        with open(file_path, 'rb') as f:
            files = {'file': (os.path.basename(file_path), f)}
            resp = self._request(
                'POST', 
                '/media/upload/image', 
                files=files
            )
        return resp['data']['media_id']
        
    def create_note(self, title, desc, media_ids, tags):
        payload = {
            'title': title,
            'desc': desc,
            'media_ids': media_ids,
            'tags': tags,
            'type': 'normal'
        }
        return self._request('POST', '/note/create', payload)
        
    def _request(self, method, endpoint, data=None, files=None):
        url = self.BASE_URL + endpoint
        try:
            if method == 'GET':
                resp = self.session.get(url, params=data, headers=self.headers)
            else:
                if files:
                    resp = self.session.post(url, files=files, headers=self.headers)
                else:
                    resp = self.session.post(url, json=data, headers=self.headers)
                    
            resp.raise_for_status()
            return resp.json()
        except Exception as e:
            raise Exception(f"API请求失败: {str(e)}")
代码语言:txt
复制
 json
import os
from PIL import Image
import cv2

class ContentManager:
    def __init__(self):
        self.image_formats = ['.jpg', '.png', '.jpeg']
        self.video_formats = ['.mp4', '.mov']
        
    def load_tasks(self, file_path):
        with open(file_path, 'r', encoding='utf-8') as f:
            return json.load(f)
            
    def scan_directory(self, dir_path):
        media_files = []
        for root, _, files in os.walk(dir_path):
            for file in files:
                ext = os.path.splitext(file)[1].lower()
                if ext in self.image_formats + self.video_formats:
                    media_files.append(os.path.join(root, file))
        return media_files
        
    def process_image(self, img_path, max_size=1080):
        img = Image.open(img_path)
        if max(img.size) > max_size:
            ratio = max_size / max(img.size)
            new_size = tuple(int(x*ratio) for x in img.size)
            img = img.resize(new_size, Image.ANTIALIAS)
            img.save(img_path)
            
    def get_video_info(self, video_path):
        cap = cv2.VideoCapture(video_path)
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        duration = cap.get(cv2.CAP_PROP_FRAME_COUNT) / cap.get(cv2.CAP_PROP_FPS)
        cap.release()
        return {
            'width': width,
            'height': height,
            'duration': duration
        }
代码语言:txt
复制
 time
from datetime import datetime
import random

class Scheduler:
    def __init__(self):
        self.min_interval = 3600  # 1小时最小间隔
        
    def wait_until(self, target_time):
        now = datetime.now()
        target = datetime.strptime(target_time, '%Y-%m-%d %H:%M')
        
        if target < now:
            raise ValueError("目标时间已过期")
            
        sleep_seconds = (target - now).total_seconds()
        while sleep_seconds > 0:
            print(f"等待发布,剩余时间: {sleep_seconds//3600}小时")
            time.sleep(min(300, sleep_seconds))  # 每5分钟检查一次
            sleep_seconds = (target - datetime.now()).total_seconds()
            
    def random_delay(self):
        delay = random.randint(300, 1800)  # 5-30分钟随机延迟
        time.sleep(delay)

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
作者已关闭评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档