首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >ESP8266实时显现你的服务器信息

ESP8266实时显现你的服务器信息

作者头像
逍遥子大表哥
发布2025-12-21 15:01:47
发布2025-12-21 15:01:47
2400
举报
文章被收录于专栏:kali blogkali blog

当你有多个服务器,如何统一去查看服务器的运行状态呢?本文为大家带来基于ESP8266显示服务器相关状态信息,赶紧收藏吧!

效果预览
效果预览

效果预览

搭建哪吒探针

哪吒探针可以方便的帮助我们监控服务器性能。在前面的文章中,讲到过最新版本的安装。详情参考文章:

哪吒探针V1 全新版安装
哪吒探针V1 全新版安装

哪吒探针V1 全新版安装

通过对其接口分析,我们可以看到他是通过wss通信的。

接下来,我们通过Python脚本对其进行简单的处理。

代码语言:javascript
复制
import asyncio
import websockets
import json
from datetime import datetime, timezone, timedelta
from aiohttp import web
import sys
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class SimpleServerMonitor:
    def __init__(self):
        self.websocket_url = "改为你的WSS地址"
        self.http_host = "192.168.50.48"#为了安全,我们在本地运行。改为本地IP地址。
        self.http_port = 8080
        self.data_cache = None
        self.last_fetch_time = None
        
    asyncdef fetch_data(self):
        """获取数据 - 最简单的方式"""
        try:
            # 使用最简单的连接方式
            asyncwith websockets.connect(self.websocket_url) as websocket:
                response = await websocket.recv()
                return json.loads(response)
        except Exception as e:
            logger.error(f"获取数据失败: {e}")
            returnNone
    
    def is_server_online(self, last_active_str):
        """判断服务器是否在线"""
        ifnot last_active_str or last_active_str == "0001-01-01T00:00:00Z":
            returnFalse
        
        try:
            if last_active_str.endswith('Z'):
                last_active = datetime.fromisoformat(last_active_str.replace('Z', '+00:00'))
            else:
                last_active = datetime.fromisoformat(last_active_str)
            
            if last_active.tzinfo isNone:
                last_active_utc = last_active.replace(tzinfo=timezone.utc)
            else:
                last_active_utc = last_active.astimezone(timezone.utc)
            
            now_utc = datetime.now(timezone.utc)
            return (now_utc - last_active_utc) <= timedelta(days=1)
        except Exception as e:
            logger.error(f"解析时间错误: {e}")
            returnFalse
    
    def process_data(self, raw_data):
        """处理数据"""
        ifnot raw_data:
            return {"error": "无法获取数据"}
        
        result = {
            "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "total_servers": len(raw_data['servers']),
            "online_servers": 0,
            "offline_servers": 0,
            "servers": []
        }
        
        for server in raw_data['servers']:
            is_online = self.is_server_online(server.get('last_active'))
            
            if is_online:
                result['online_servers'] += 1
                host = server.get('host', {})
                state = server.get('state', {})
                
                mem_total = host.get('mem_total', 0)
                mem_used = state.get('mem_used', 0)
                memory_usage = (mem_used / mem_total * 100) if mem_total > 0else0
                
                disk_total = host.get('disk_total', 0)
                disk_total_gb = disk_total / (1024**3) if disk_total > 0else0
                
                upload_speed_kb = state.get('net_out_speed', 0) / 1024
                download_speed_kb = state.get('net_in_speed', 0) / 1024
                
                server_info = {
                    "id": server['id'],
                    "name": server['name'],
                    "platform": host.get('platform', '未知'),
                    "cpu_usage": round(state.get('cpu', 0), 2),
                    "memory_usage": round(memory_usage, 2),
                    "disk_total_gb": round(disk_total_gb, 2),
                    "upload_speed_kb": round(upload_speed_kb, 2),
                    "download_speed_kb": round(download_speed_kb, 2)
                }
                
                result['servers'].append(server_info)
            else:
                result['offline_servers'] += 1
        
        return result
    
    asyncdef handle_request(self, request):
        """处理请求"""
        # 如果缓存为空或超过30秒,更新数据
        if (self.data_cache isNoneor
            self.last_fetch_time isNoneor
            (datetime.now() - self.last_fetch_time).total_seconds() > 30):
            
            raw_data = await self.fetch_data()
            if raw_data:
                self.data_cache = self.process_data(raw_data)
                self.last_fetch_time = datetime.now()
        
        if self.data_cache:
            response = web.json_response(self.data_cache)
        else:
            response = web.json_response({"error": "无数据"}, status=500)
        
        response.headers['Access-Control-Allow-Origin'] = '*'
        return response
    
    asyncdef run(self):
        """运行服务"""
        app = web.Application()
        app.router.add_get('/', self.handle_request)
        app.router.add_get('/api', self.handle_request)
        
        runner = web.AppRunner(app)
        await runner.setup()
        site = web.TCPSite(runner, self.http_host, self.http_port)
        
        print(f"服务器监控API运行在: http://{self.http_host}:{self.http_port}")
        print("访问 / 或 /api 获取数据")
        
        await site.start()
        await asyncio.Future()

asyncdef main():
    monitor = SimpleServerMonitor()
    await monitor.run()

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\n服务已停止")

完成后,我们访问api接口,效果如下:

如上,我们对数据进行了清洗。设备离线后,不再显示数据!

编写ESP8266代码

接下来,要做的事情就很简单了。让8266访问api接口,并将数据展示出来。考虑到屏幕比较少,可以采取轮播的形式,将得到的服务器的cpu、内存、磁盘、以及设备名称,在oled屏幕显示出来。 OLED配置如下:SCL=D6, SDA=D5。 完整代码如下:

代码语言:javascript
复制
#include <Arduino.h>
#include <ESP8266WiFi.h>
#include <ESP8266HTTPClient.h>
#include <ArduinoJson.h>
#include <U8g2lib.h>
#include <Wire.h>

// WiFi配置
constchar* ssid = "nbxiaoyaozi";
constchar* password = "kali笔记";

// API配置
constchar* apiUrl = "http://192.168.50.48:8080/";//你的内网api接口
constunsignedlong updateInterval = 30000; // 30秒更新一次数据
constunsignedlong displayInterval = 3000; // 3秒切换一次显示

// OLED配置 - SCL=D6, SDA=D5
U8G2_SSD1306_128X64_NONAME_F_SW_I2C u8g2(U8G2_R0, /* clock=*/ D6, /* data=*/ D5, /* reset=*/ U8X8_PIN_NONE);

// 服务器数据结构体
struct ServerData {
  String name;
float cpuUsage;
float memoryUsage;
float diskTotal;
};

// 全局变量
ServerData servers[10]; // 最多存储10台服务器数据
int serverCount = 0;
unsignedlong lastUpdateTime = 0;
unsignedlong lastDisplayTime = 0;
int currentDisplayIndex = 0;

// 连接WiFi
void connectWiFi() {
  WiFi.begin(ssid, password);
  u8g2.clearBuffer();
  u8g2.setFont(u8g2_font_wqy12_t_gb2312);
  u8g2.setCursor(0, 20);
  u8g2.print("连接WiFi...");
  u8g2.sendBuffer();

while (WiFi.status() != WL_CONNECTED) {
    delay(500);
  }

  u8g2.clearBuffer();
  u8g2.setCursor(0, 20);
  u8g2.print("WiFi已连接");
  u8g2.setCursor(0, 40);
  u8g2.print("IP: ");
  u8g2.print(WiFi.localIP());
  u8g2.sendBuffer();
  delay(2000);
}

// 从API获取数据
void fetchServerData() {
if (WiFi.status() != WL_CONNECTED) {
    connectWiFi();
  }

  HTTPClient http;
  http.begin(apiUrl);
int httpCode = http.GET();

if (httpCode == HTTP_CODE_OK) {
    String payload = http.getString();
    
    // 解析JSON
    DynamicJsonDocument doc(2048);
    deserializeJson(doc, payload);
    
    // 获取服务器数量
    JsonArray serverArray = doc["servers"];
    serverCount = serverArray.size();
    
    // 提取服务器数据
    for (int i = 0; i < serverCount; i++) {
      JsonObject server = serverArray[i];
      servers[i].name = server["name"].as<String>();
      servers[i].cpuUsage = server["cpu_usage"].as<float>();
      servers[i].memoryUsage = server["memory_usage"].as<float>();
      servers[i].diskTotal = server["disk_total_gb"].as<float>();
    }
  }

  http.end();
}

// 显示服务器信息
void displayServerInfo(int index) {
if (index >= serverCount) return;

  u8g2.clearBuffer();
  u8g2.setFont(u8g2_font_wqy12_t_gb2312);

// 显示服务器名称
  u8g2.setCursor(0, 15);
  u8g2.print(servers[index].name);

// 显示CPU使用率
  u8g2.setCursor(0, 35);
  u8g2.print("CPU: ");
  u8g2.print(servers[index].cpuUsage);
  u8g2.print("%");

// 显示内存使用率
  u8g2.setCursor(64, 35);
  u8g2.print("内存: ");
  u8g2.print(servers[index].memoryUsage);
  u8g2.print("%");

// 显示磁盘总量
  u8g2.setCursor(0, 55);
  u8g2.print("磁盘: ");
  u8g2.print(servers[index].diskTotal);
  u8g2.print("GB");

  u8g2.sendBuffer();
}

void setup() {
// 初始化OLED
  u8g2.begin();
  u8g2.enableUTF8Print(); // 启用UTF8打印

// 连接WiFi
  connectWiFi();

// 初始获取数据
  fetchServerData();
}

void loop() {
unsignedlong currentTime = millis();

// 定期更新数据
if (currentTime - lastUpdateTime >= updateInterval) {
    fetchServerData();
    lastUpdateTime = currentTime;
  }

// 轮播显示服务器信息
if (currentTime - lastDisplayTime >= displayInterval) {
    displayServerInfo(currentDisplayIndex);
    currentDisplayIndex = (currentDisplayIndex + 1) % serverCount;
    lastDisplayTime = currentTime;
  }
}

这样,我们就可以在oled屏幕中看到服务器信息了,每 3 秒切换到下一台服务器,每 30秒重新从 API 获取最新数据。 如果要修改数据,可以修改updateInterval - 数据更新间隔和displayInterval - 显示切换间隔的值。

更多精彩文章 欢迎关注我们

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-12-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 kali笔记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 搭建哪吒探针
  • 编写ESP8266代码
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档