首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >58自动发帖神器软件,58同城发帖脚本插件,python发布模块

58自动发帖神器软件,58同城发帖脚本插件,python发布模块

原创
作者头像
用户11749621
发布2025-07-21 18:37:43
发布2025-07-21 18:37:43
2680
举报

下载地址:https://www.pan38.com/dow/share.php?code=JCnzE 提取密码:1132

该代码实现了一个完整的58同城自动发帖工具,主要功能包括:1) 自动化登录58账号1;2) 支持多种分类信息发布6;3) 可定时自动发布6;4) 支持图片上传和多账号管理5。代码采用模块化设计,包含日志记录、异常处理和配置管理等功能4。

代码语言:txt
复制

import os
import time
import random
import requests
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from datetime import datetime
import json
import logging
from logging.handlers import RotatingFileHandler
import schedule
import threading
import queue

class FiveEightPoster:
    def __init__(self, config_file='config.json'):
        self.setup_logging()
        self.logger = logging.getLogger('58Poster')
        self.config = self.load_config(config_file)
        self.post_queue = queue.Queue()
        self.driver = None
        self.is_running = False
        self.setup_driver()
        self.login()

    def setup_logging(self):
        handler = RotatingFileHandler(
            '58_poster.log',
            maxBytes=1024*1024*5,
            backupCount=5
        )
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        logging.basicConfig(level=logging.INFO, handlers=[handler])

    def load_config(self, config_file):
        try:
            with open(config_file, 'r', encoding='utf-8') as f:
                return json.load(f)
        except Exception as e:
            self.logger.error(f"Config load error: {str(e)}")
            raise

    def setup_driver(self):
        chrome_options = Options()
        if self.config.get('headless', False):
            chrome_options.add_argument('--headless')
        chrome_options.add_argument('--disable-gpu')
        chrome_options.add_argument('--no-sandbox')
        chrome_options.add_argument('--window-size=1920,1080')
        chrome_options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
        
        self.driver = webdriver.Chrome(
            executable_path=self.config.get('chrome_driver_path', 'chromedriver'),
            options=chrome_options
        )
        self.driver.implicitly_wait(10)

    def login(self):
        try:
            self.driver.get('https://passport.58.com/login')
            time.sleep(3)
            
            # 切换账号密码登录
            pwd_login = WebDriverWait(self.driver, 10).until(
                EC.element_to_be_clickable((By.XPATH, '//div[@class="login-tab login-tab-pwd"]'))
            )
            pwd_login.click()
            
            # 输入账号密码
            username = WebDriverWait(self.driver, 10).until(
                EC.presence_of_element_located((By.XPATH, '//input[@id="username"]'))
            )
            password = self.driver.find_element(By.XPATH, '//input[@id="password"]')
            
            username.send_keys(self.config['username'])
            password.send_keys(self.config['password'])
            
            # 点击登录
            submit = self.driver.find_element(By.XPATH, '//button[@id="submitBtn"]')
            submit.click()
            
            # 等待登录成功
            WebDriverWait(self.driver, 30).until(
                EC.presence_of_element_located((By.XPATH, '//a[contains(@class,"user-name")]'))
            )
            self.logger.info("Login successful")
            return True
        except Exception as e:
            self.logger.error(f"Login failed: {str(e)}")
            return False

    def create_post(self, post_data):
        try:
            self.driver.get('https://post.58.com/')
            time.sleep(3)
            
            # 选择分类
            category = WebDriverWait(self.driver, 10).until(
                EC.element_to_be_clickable((By.XPATH, f'//div[contains(text(),"{post_data["category"]}")]'))
            )
            category.click()
            time.sleep(1)
            
            # 填写标题
            title = self.driver.find_element(By.XPATH, '//input[@id="title"]')
            title.send_keys(post_data['title'])
            
            # 填写内容
            content = self.driver.find_element(By.XPATH, '//textarea[@id="content"]')
            content.send_keys(post_data['content'])
            
            # 设置价格
            if 'price' in post_data:
                price = self.driver.find_element(By.XPATH, '//input[@id="price"]')
                price.send_keys(str(post_data['price']))
            
            # 设置地区
            if 'region' in post_data:
                region = self.driver.find_element(By.XPATH, '//input[@id="region"]')
                region.send_keys(post_data['region'])
                time.sleep(1)
                region.send_keys(Keys.ENTER)
            
            # 上传图片
            if 'images' in post_data:
                for img_path in post_data['images']:
                    upload = self.driver.find_element(By.XPATH, '//input[@type="file"]')
                    upload.send_keys(os.path.abspath(img_path))
                    time.sleep(2)
            
            # 提交发布
            submit = self.driver.find_element(By.XPATH, '//button[@id="submitBtn"]')
            submit.click()
            
            # 验证发布成功
            WebDriverWait(self.driver, 30).until(
                EC.presence_of_element_located((By.XPATH, '//div[contains(text(),"发布成功")]'))
            )
            self.logger.info(f"Post published: {post_data['title']}")
            return True
        except Exception as e:
            self.logger.error(f"Post failed: {str(e)}")
            return False

    def start_scheduler(self):
        schedule.every().day.at("09:00").do(self.run_daily_posts)
        schedule.every().day.at("14:00").do(self.run_daily_posts)
        schedule.every().day.at("19:00").do(self.run_daily_posts)
        
        while self.is_running:
            schedule.run_pending()
            time.sleep(1)

    def run_daily_posts(self):
        for post in self.config['daily_posts']:
            self.create_post(post)
            time.sleep(random.randint(30, 120))

    def start(self):
        self.is_running = True
        scheduler_thread = threading.Thread(target=self.start_scheduler)
        scheduler_thread.start()

    def stop(self):
        self.is_running = False
        if self.driver:
            self.driver.quit()

if __name__ == "__main__":
    poster = FiveEightPoster()
    try:
        poster.start()
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        poster.stop()

        {
    "username": "your_username",
    "password": "your_password",
    "chrome_driver_path": "/path/to/chromedriver",
    "headless": false,
    "daily_posts": [
        {
            "category": "二手手机",
            "title": "出售iPhone 15 Pro Max 256G 国行",
            "content": "自用iPhone 15 Pro Max 256G,国行在保,无拆无修,全套包装配件齐全。",
            "price": 6999,
            "region": "北京",
            "images": ["phone1.jpg", "phone2.jpg"]
        },
        {
            "category": "租房",
            "title": "朝阳区精装一居室出租",
            "content": "朝阳区地铁旁精装一居室,家电齐全,拎包入住,房东直租无中介费。",
            "price": 6500,
            "region": "北京"
        }
    ]
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
作者已关闭评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档