当你有多个服务器,如何统一去查看服务器的运行状态呢?本文为大家带来基于ESP8266显示服务器相关状态信息,赶紧收藏吧!

效果预览
哪吒探针可以方便的帮助我们监控服务器性能。在前面的文章中,讲到过最新版本的安装。详情参考文章:

哪吒探针V1 全新版安装
通过对其接口分析,我们可以看到他是通过wss通信的。

接下来,我们通过Python脚本对其进行简单的处理。
import asyncio
import websockets
import json
from datetime import datetime, timezone, timedelta
from aiohttp import web
import sys
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class SimpleServerMonitor:
def __init__(self):
self.websocket_url = "改为你的WSS地址"
self.http_host = "192.168.50.48"#为了安全,我们在本地运行。改为本地IP地址。
self.http_port = 8080
self.data_cache = None
self.last_fetch_time = None
asyncdef fetch_data(self):
"""获取数据 - 最简单的方式"""
try:
# 使用最简单的连接方式
asyncwith websockets.connect(self.websocket_url) as websocket:
response = await websocket.recv()
return json.loads(response)
except Exception as e:
logger.error(f"获取数据失败: {e}")
returnNone
def is_server_online(self, last_active_str):
"""判断服务器是否在线"""
ifnot last_active_str or last_active_str == "0001-01-01T00:00:00Z":
returnFalse
try:
if last_active_str.endswith('Z'):
last_active = datetime.fromisoformat(last_active_str.replace('Z', '+00:00'))
else:
last_active = datetime.fromisoformat(last_active_str)
if last_active.tzinfo isNone:
last_active_utc = last_active.replace(tzinfo=timezone.utc)
else:
last_active_utc = last_active.astimezone(timezone.utc)
now_utc = datetime.now(timezone.utc)
return (now_utc - last_active_utc) <= timedelta(days=1)
except Exception as e:
logger.error(f"解析时间错误: {e}")
returnFalse
def process_data(self, raw_data):
"""处理数据"""
ifnot raw_data:
return {"error": "无法获取数据"}
result = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"total_servers": len(raw_data['servers']),
"online_servers": 0,
"offline_servers": 0,
"servers": []
}
for server in raw_data['servers']:
is_online = self.is_server_online(server.get('last_active'))
if is_online:
result['online_servers'] += 1
host = server.get('host', {})
state = server.get('state', {})
mem_total = host.get('mem_total', 0)
mem_used = state.get('mem_used', 0)
memory_usage = (mem_used / mem_total * 100) if mem_total > 0else0
disk_total = host.get('disk_total', 0)
disk_total_gb = disk_total / (1024**3) if disk_total > 0else0
upload_speed_kb = state.get('net_out_speed', 0) / 1024
download_speed_kb = state.get('net_in_speed', 0) / 1024
server_info = {
"id": server['id'],
"name": server['name'],
"platform": host.get('platform', '未知'),
"cpu_usage": round(state.get('cpu', 0), 2),
"memory_usage": round(memory_usage, 2),
"disk_total_gb": round(disk_total_gb, 2),
"upload_speed_kb": round(upload_speed_kb, 2),
"download_speed_kb": round(download_speed_kb, 2)
}
result['servers'].append(server_info)
else:
result['offline_servers'] += 1
return result
asyncdef handle_request(self, request):
"""处理请求"""
# 如果缓存为空或超过30秒,更新数据
if (self.data_cache isNoneor
self.last_fetch_time isNoneor
(datetime.now() - self.last_fetch_time).total_seconds() > 30):
raw_data = await self.fetch_data()
if raw_data:
self.data_cache = self.process_data(raw_data)
self.last_fetch_time = datetime.now()
if self.data_cache:
response = web.json_response(self.data_cache)
else:
response = web.json_response({"error": "无数据"}, status=500)
response.headers['Access-Control-Allow-Origin'] = '*'
return response
asyncdef run(self):
"""运行服务"""
app = web.Application()
app.router.add_get('/', self.handle_request)
app.router.add_get('/api', self.handle_request)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, self.http_host, self.http_port)
print(f"服务器监控API运行在: http://{self.http_host}:{self.http_port}")
print("访问 / 或 /api 获取数据")
await site.start()
await asyncio.Future()
asyncdef main():
monitor = SimpleServerMonitor()
await monitor.run()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n服务已停止")
完成后,我们访问api接口,效果如下:

如上,我们对数据进行了清洗。设备离线后,不再显示数据!
接下来,要做的事情就很简单了。让8266访问api接口,并将数据展示出来。考虑到屏幕比较少,可以采取轮播的形式,将得到的服务器的cpu、内存、磁盘、以及设备名称,在oled屏幕显示出来。 OLED配置如下:SCL=D6, SDA=D5。 完整代码如下:
#include <Arduino.h>
#include <ESP8266WiFi.h>
#include <ESP8266HTTPClient.h>
#include <ArduinoJson.h>
#include <U8g2lib.h>
#include <Wire.h>
// WiFi配置
constchar* ssid = "nbxiaoyaozi";
constchar* password = "kali笔记";
// API配置
constchar* apiUrl = "http://192.168.50.48:8080/";//你的内网api接口
constunsignedlong updateInterval = 30000; // 30秒更新一次数据
constunsignedlong displayInterval = 3000; // 3秒切换一次显示
// OLED配置 - SCL=D6, SDA=D5
U8G2_SSD1306_128X64_NONAME_F_SW_I2C u8g2(U8G2_R0, /* clock=*/ D6, /* data=*/ D5, /* reset=*/ U8X8_PIN_NONE);
// 服务器数据结构体
struct ServerData {
String name;
float cpuUsage;
float memoryUsage;
float diskTotal;
};
// 全局变量
ServerData servers[10]; // 最多存储10台服务器数据
int serverCount = 0;
unsignedlong lastUpdateTime = 0;
unsignedlong lastDisplayTime = 0;
int currentDisplayIndex = 0;
// 连接WiFi
void connectWiFi() {
WiFi.begin(ssid, password);
u8g2.clearBuffer();
u8g2.setFont(u8g2_font_wqy12_t_gb2312);
u8g2.setCursor(0, 20);
u8g2.print("连接WiFi...");
u8g2.sendBuffer();
while (WiFi.status() != WL_CONNECTED) {
delay(500);
}
u8g2.clearBuffer();
u8g2.setCursor(0, 20);
u8g2.print("WiFi已连接");
u8g2.setCursor(0, 40);
u8g2.print("IP: ");
u8g2.print(WiFi.localIP());
u8g2.sendBuffer();
delay(2000);
}
// 从API获取数据
void fetchServerData() {
if (WiFi.status() != WL_CONNECTED) {
connectWiFi();
}
HTTPClient http;
http.begin(apiUrl);
int httpCode = http.GET();
if (httpCode == HTTP_CODE_OK) {
String payload = http.getString();
// 解析JSON
DynamicJsonDocument doc(2048);
deserializeJson(doc, payload);
// 获取服务器数量
JsonArray serverArray = doc["servers"];
serverCount = serverArray.size();
// 提取服务器数据
for (int i = 0; i < serverCount; i++) {
JsonObject server = serverArray[i];
servers[i].name = server["name"].as<String>();
servers[i].cpuUsage = server["cpu_usage"].as<float>();
servers[i].memoryUsage = server["memory_usage"].as<float>();
servers[i].diskTotal = server["disk_total_gb"].as<float>();
}
}
http.end();
}
// 显示服务器信息
void displayServerInfo(int index) {
if (index >= serverCount) return;
u8g2.clearBuffer();
u8g2.setFont(u8g2_font_wqy12_t_gb2312);
// 显示服务器名称
u8g2.setCursor(0, 15);
u8g2.print(servers[index].name);
// 显示CPU使用率
u8g2.setCursor(0, 35);
u8g2.print("CPU: ");
u8g2.print(servers[index].cpuUsage);
u8g2.print("%");
// 显示内存使用率
u8g2.setCursor(64, 35);
u8g2.print("内存: ");
u8g2.print(servers[index].memoryUsage);
u8g2.print("%");
// 显示磁盘总量
u8g2.setCursor(0, 55);
u8g2.print("磁盘: ");
u8g2.print(servers[index].diskTotal);
u8g2.print("GB");
u8g2.sendBuffer();
}
void setup() {
// 初始化OLED
u8g2.begin();
u8g2.enableUTF8Print(); // 启用UTF8打印
// 连接WiFi
connectWiFi();
// 初始获取数据
fetchServerData();
}
void loop() {
unsignedlong currentTime = millis();
// 定期更新数据
if (currentTime - lastUpdateTime >= updateInterval) {
fetchServerData();
lastUpdateTime = currentTime;
}
// 轮播显示服务器信息
if (currentTime - lastDisplayTime >= displayInterval) {
displayServerInfo(currentDisplayIndex);
currentDisplayIndex = (currentDisplayIndex + 1) % serverCount;
lastDisplayTime = currentTime;
}
}
这样,我们就可以在oled屏幕中看到服务器信息了,每 3 秒切换到下一台服务器,每 30秒重新从 API 获取最新数据。 如果要修改数据,可以修改updateInterval - 数据更新间隔和displayInterval - 显示切换间隔的值。
更多精彩文章 欢迎关注我们