作为 DevOps 工程师、系统管理员或 Python 开发者,你是不是每天都在重复这些“搬砖”操作:凌晨三点被告警叫醒,手动登录十几台服务器查日志;批量修改配置文件时,一台台敲 SSH 命令改到眼花;服务出问题时,手忙脚乱重装服务,还得担心漏改参数……运维人的日常,仿佛永远在和重复的“体力活”较劲,既耗时又容易出错,一不小心还得“背锅”。
但其实,Python 早已为我们准备了一套“自动化工具箱”——用好这些工具,你能把 80% 的重复工作交给脚本,从“救火队员”变成“甩手掌柜”,把精力放在更有价值的架构优化、性能调优上。今天就给大家盘点 5 个运维自动化必备的 Python 工具,手把手教你落地使用。
Paramiko 是 Python 实现的 SSHv2 协议库,让你能在代码里直接建立 SSH 连接、执行远程命令、传输文件,不用再手动敲 ssh 命令。它是所有远程自动化运维的基础,适合批量登录服务器执行命令、批量上传/下载文件、自动化部署脚本等场景——比如你要检查 50 台服务器的磁盘使用率,用 Paramiko 写个脚本,1 分钟就能出结果,不用逐台登录。
pip install paramiko # 推荐安装 3.4.0 最新稳定版import paramiko
import time
from typing import List
def ssh_execute_command(hosts: List[dict], command: str) -> dict:
"""
批量登录服务器执行指定命令
:param hosts: 服务器列表,格式 [{"host": "IP", "port": 22, "user": "用户名", "password": "密码"}, ...]
:param command: 要执行的命令
:return: 执行结果,key 为服务器IP,value 为命令输出
"""
results = {}
# 创建 SSH 客户端配置
ssh_config = paramiko.SSHClient()
# 自动添加未知主机密钥(生产环境建议手动配置 known_hosts)
ssh_config.set_missing_host_key_policy(paramiko.AutoAddPolicy())
for host in hosts:
try:
# 建立 SSH 连接
ssh_config.connect(
hostname=host["host"],
port=host["port"],
username=host["user"],
password=host["password"],
timeout=10 # 超时时间 10 秒
)
# 执行命令
stdin, stdout, stderr = ssh_config.exec_command(command)
# 获取输出结果(注意编码)
output = stdout.read().decode("utf-8").strip()
error = stderr.read().decode("utf-8").strip()
if error:
results[host["host"]] = f"执行失败: {error}"
else:
results[host["host"]] = output
# 关闭连接
ssh_config.close()
time.sleep(0.5) # 避免连接过快被防火墙拦截
except Exception as e:
results[host["host"]] = f"连接失败: {str(e)}"
return results
# 示例:批量执行 df -h
if __name__ == "__main__":
# 配置要操作的服务器列表(生产环境建议从配置文件读取,不要硬编码密码)
servers = [
{"host": "192.168.1.10", "port": 22, "user": "root", "password": "your_password"},
{"host": "192.168.1.11", "port": 22, "user": "root", "password": "your_password"},
]
# 执行磁盘查看命令
disk_results = ssh_execute_command(servers, "df -h")
# 打印结果
for ip, result in disk_results.items():
print(f"=== 服务器 {ip} 磁盘信息 ===")
print(result)
print("-" * 50)
Fabric 基于 Paramiko 封装,进一步简化了远程命令执行、批量操作的代码写法,还支持并行执行、任务封装——比如你可以把“部署服务”拆成“拉取代码→停止服务→更新配置→重启服务”几个子任务,用 Fabric 一键执行。它适合日常高频的批量运维操作,比如批量重启 Nginx、批量更新系统包、自动化部署应用,比直接用 Paramiko 代码更简洁,可读性更高。
pip install fabric # 推荐安装 3.2.2 最新稳定版# 文件名:fabfile.py(Fabric 约定的默认文件名)
from fabric import Connection, task
from invoke import Responder
# 定义服务器列表(生产环境建议用环境变量/配置文件)
SERVERS = ["192.168.1.10", "192.168.1.11"]
USER = "root"
PASSWORD = "your_password"
# 定义 sudo 密码响应器(避免手动输入密码)
sudo_responder = Responder(
pattern=r"\[sudo\] password for .*:",
response=f"{PASSWORD}\n"
)
@task
def restart_nginx(c):
"""
批量重启 Nginx 服务的 Fabric 任务
:param c: Fabric 自动传入的 Connection 对象
"""
# 循环连接每台服务器
for host in SERVERS:
print(f"=== 操作服务器 {host} ===")
# 建立连接
conn = Connection(
host=host,
user=USER,
connect_kwargs={"password": PASSWORD}
)
# 执行重启命令(需要 sudo 权限)
result = conn.sudo(
"systemctl restart nginx",
watchers=[sudo_responder], # 自动输入 sudo 密码
warn=True # 命令失败时不中断后续操作
)
# 输出结果
if result.ok:
print(f"{host} Nginx 重启成功")
else:
print(f"{host} Nginx 重启失败: {result.stderr}")
# 关闭连接
conn.close()
# 执行方式:在终端运行
# fab restart_nginx
Ansible 是主流的声明式配置管理工具,本身基于 Python 开发,通过 YAML 定义“目标状态”(比如“所有服务器都要安装 Nginx 1.24.0”),就能自动将服务器配置到目标状态,不用写复杂的流程代码。而 Ansible Python API 则让你能在 Python 脚本里调用 Ansible 的能力,适合大规模服务器的配置标准化、批量部署服务、环境一致性管理——比如你要给 100 台服务器配置相同的防火墙规则,用 Ansible 只需写一份 Playbook,再通过 API 调用执行,全程无需手动操作。
pip install ansible # 推荐安装 9.1.0 最新稳定版from ansible import context
from ansible.cli import CLI
from ansible.executor.playbook_executor import PlaybookExecutor
from ansible.inventory.manager import InventoryManager
from ansible.parsing.dataloader import DataLoader
from ansible.vars.manager import VariableManager
import os
def run_ansible_playbook(playbook_path: str, inventory_path: str):
"""
用 Python API 执行 Ansible Playbook
:param playbook_path: Playbook 文件路径
:param inventory_path: 主机清单文件路径
"""
# 初始化 Ansible 上下文
context.CLIARGS = CLI.setup_cli_args({
"connection": "ssh",
"module_path": "",
"forks": 10, # 并行执行的进程数
"become": True,
"become_method": "sudo",
"become_user": "root",
"check": False,
"diff": False,
"verbosity": 1
})
# 加载数据(如 Playbook、清单)
loader = DataLoader()
# 加载主机清单
inventory = InventoryManager(loader=loader, sources=[inventory_path])
# 变量管理
variable_manager = VariableManager(loader=loader, inventory=inventory)
# 初始化 Playbook 执行器
pbex = PlaybookExecutor(
playbooks=[playbook_path],
inventory=inventory,
variable_manager=variable_manager,
loader=loader,
passwords={"conn_pass": "your_password", "become_pass": "your_password"}
)
# 执行 Playbook
result = pbex.run()
# 输出执行结果
if result == 0:
print("Playbook 执行成功!")
else:
print(f"Playbook 执行失败,返回码: {result}")
# 示例:执行安装 Nginx 的 Playbook
if __name__ == "__main__":
# 1. 先创建 Playbook 文件(nginx_install.yml)
playbook_content = """
- hosts: all
tasks:
- name: 安装 Nginx
apt:
name: nginx=1.24.0
state: present
update_cache: yes
when: ansible_os_family == "Debian"
- name: 启动并启用 Nginx
service:
name: nginx
state: started
enabled: yes
"""
with open("nginx_install.yml", "w") as f:
f.write(playbook_content)
# 2. 创建主机清单文件(inventory.ini)
inventory_content = """
[web_servers]
192.168.1.10
192.168.1.11
"""
with open("inventory.ini", "w") as f:
f.write(inventory_content)
# 3. 执行 Playbook
run_ansible_playbook("nginx_install.yml", "inventory.ini")
# 4. 清理临时文件(可选)
os.remove("nginx_install.yml")
os.remove("inventory.ini")
psutil(process and system utilities)是跨平台的系统监控库,能轻松获取 CPU、内存、磁盘、网络、进程等系统指标,不用再解析 top free df 等命令的输出。它适合做服务器健康监控、资源告警、进程管理——比如 CPU 使用率超过 80% 时自动发告警,内存不足时清理缓存,进程挂掉时自动重启。
pip install psutil # 推荐安装 5.9.6 最新稳定版import psutil
import time
import requests
def get_cpu_usage(interval: int = 5) -> float:
"""
获取 CPU 平均使用率(按指定间隔采样)
:param interval: 采样间隔(秒)
:return: CPU 使用率(百分比)
"""
return psutil.cpu_percent(interval=interval)
def send_wechat_alert(content: str, webhook_url: str):
"""
发送企业微信告警
:param content: 告警内容
:param webhook_url: 企业微信机器人 webhook 地址
"""
headers = {"Content-Type": "application/json"}
data = {
"msgtype": "text",
"text": {"content": content}
}
try:
response = requests.post(webhook_url, json=data, headers=headers, timeout=10)
if response.json()["errcode"] == 0:
print("告警发送成功")
else:
print(f"告警发送失败: {response.text}")
except Exception as e:
print(f"告警发送异常: {str(e)}")
# 监控主逻辑
if __name__ == "__main__":
# 企业微信机器人 webhook(替换成你的)
WEBHOOK_URL = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=your_key"
# CPU 告警阈值
CPU_THRESHOLD = 80.0
print("开始监控 CPU 使用率...")
while True:
cpu_usage = get_cpu_usage()
print(f"当前 CPU 使用率: {cpu_usage}%")
# 超过阈值发送告警
if cpu_usage > CPU_THRESHOLD:
alert_content = f"【服务器告警】CPU 使用率超过阈值!\n当前使用率: {cpu_usage}%\n阈值: {CPU_THRESHOLD}%\n时间: {time.strftime('%Y-%m-%d %H:%M:%S')}"
send_wechat_alert(alert_content, WEBHOOK_URL)
# 每 60 秒检查一次
time.sleep(60)
schedule 是轻量级的定时任务库,语法简单易懂,不用配置 crontab 就能实现“每天凌晨 2 点清理日志”“每小时备份数据库”;logging 是 Python 内置的日志模块,能规范记录脚本执行日志;结合自定义脚本,就能实现“定时执行运维任务 + 日志全记录”,适合所有需要定时自动化的场景——比如定时清理日志文件、定时备份数据、定时检查服务状态,再也不用手动记着“到点要做什么”。
pip install schedule # 推荐安装 1.2.0 最新稳定版import schedule
import time
import logging
import os
from datetime import datetime
# 配置日志(聚合脚本执行日志)
def setup_logging():
"""配置日志输出:同时输出到文件和控制台"""
# 日志文件路径
log_file = f"ops_automation_{datetime.now().strftime('%Y%m%d')}.log"
# 日志格式
log_format = "%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s"
# 配置根日志器
logging.basicConfig(
level=logging.INFO,
format=log_format,
handlers=[
# 输出到文件
logging.FileHandler(log_file, encoding="utf-8"),
# 输出到控制台
logging.StreamHandler()
]
)
def clean_log_files(log_dir: str, keep_days: int = 7):
"""
清理指定目录下的旧日志文件
:param log_dir: 日志目录
:param keep_days: 保留最近几天的日志
"""
logger = logging.getLogger(__name__)
try:
# 检查目录是否存在
if not os.path.exists(log_dir):
logger.warning(f"日志目录不存在: {log_dir}")
return
# 计算过期时间(秒)
expire_time = time.time() - (keep_days * 24 * 3600)
# 遍历目录下的文件
for filename in os.listdir(log_dir):
file_path = os.path.join(log_dir, filename)
# 只处理文件(跳过目录)
if os.path.isfile(file_path):
# 获取文件最后修改时间
file_mtime = os.path.getmtime(file_path)
# 判断是否过期
if file_mtime < expire_time:
# 删除文件
os.remove(file_path)
logger.info(f"已删除过期日志文件: {file_path}")
logger.info("日志清理任务执行完成")
except Exception as e:
logger.error(f"日志清理任务执行失败: {str(e)}", exc_info=True)
# 定时任务配置
if __name__ == "__main__":
# 初始化日志
setup_logging()
logger = logging.getLogger(__name__)
# 配置定时任务:每天凌晨 2 点执行日志清理
schedule.every().day.at("02:00").do(
clean_log_files,
log_dir="/var/log/nginx", # Nginx 日志目录
keep_days=7 # 保留 7 天日志
)
logger.info("定时任务已启动,每天凌晨 2 点清理日志文件...")
# 循环执行定时任务
while True:
schedule.run_pending()
time.sleep(60) # 每分钟检查一次任务
以上 5 个工具并非孤立使用,而是可以组合成一套完整的自动化运维体系:
不用一开始就追求“大而全”,建议你从一个小场景入手:比如先写一个 Paramiko 脚本批量查磁盘使用率,再给它加上 schedule 定时执行,最后用 psutil 补充内存监控——一步步把重复的“搬砖”工作替换成脚本,你会发现运维工作会轻松很多。
paramiko.RSAKey 加载私钥;