首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >连接到docker.sock,仅使用python获取容器状态。

连接到docker.sock,仅使用python获取容器状态。
EN

Stack Overflow用户
提问于 2020-07-06 04:44:44
回答 1查看 1.3K关注 0票数 0

所以,我试图为服务器上运行的码头集装箱获取一些统计数据。我尝试过多种方法:

用于python module

  • Socket

  • Docker使用子进程
  • 发出卷曲请求

前两种方法工作得很好,但我不能使用套接字方法。我想比较一下这些方法,找出哪种方法对我的使用来说更快更好。基本上,我试图将curl命令转换为python请求。下面是我使用的curl命令:curl -H --unix-socket /var/run/docker.sock http://localhost/containers/CONTAINER_ID/stats?stream=false

有人能帮我吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-07-06 06:00:07

  1. 确保要运行Python的进程具有对/var/run/docker.sock的访问权限。例如,我将它作为卷挂载到container
  2. python代码中,然后无论如何使用套接字,因此不需要生成额外的子进程来执行curl。您可以完全访问所有状态信息。如果需要更详细的内容,请使用

代码语言:javascript
复制
  htpc:
    container_name: htpc
    build:
      context: .
      dockerfile: flask-Dockerfile
    ports: 
      - "5000:5000"
    restart: unless-stopped
    volumes: 
      - /var/run/docker.sock:/var/run/docker.sock  # map through socket
代码语言:javascript
复制
def docker_api():
    client = docker.from_env()
    if request.args["action"] == "list":
        l = []
        for c in client.containers.list(all=True):
            # string conversion only works for 6 digit ms,  strip back to 26 chars
            if c.status == "running":
                utc_time = datetime.strptime(client.api.inspect_container(c.short_id)["State"]["StartedAt"][:26]+"Z", "%Y-%m-%dT%H:%M:%S.%fZ")
            else:
                utc_time = datetime.utcnow()
            l.append({"id":c.short_id, "name":c.name, "image":c.image.tags[0] if len(c.image.tags)>0 else "unknown", 
                      "status":c.status, "logs":True, "inspect":True,
                      "stats":c.status=="running", "restart":c.status=="running", "stop":c.status=="running", "start":c.status!="running", "delete":True,
                      "uptime":(utc_time - datetime(1970, 1, 1)).total_seconds(),
                      })
        return Response(json.dumps(l), mimetype="text/json")
    elif request.args["action"] == "logs":
        return Response(client.containers.get(request.args["id"]).logs().decode(), mimetype="text/plain")
    elif request.args["action"] == "stats":
        return Response(json.dumps(client.containers.get(request.args["id"]).stats(stream=False)), mimetype="text/json")
    elif request.args["action"] == "inspect":
        js = client.api.inspect_container(request.args["id"])
        m = {"StartedAt":js["State"]["StartedAt"], "Mounts":js["Mounts"]}
        for i, mp in enumerate(m["Mounts"]):
            m["Mounts"][i] = {k: mp[k] for k in ('Source', 'Destination')}
        e = {"Env":js["Config"]["Env"], 'RestartPolicy':js['HostConfig']['RestartPolicy']}
        p = {"Ports":js["NetworkSettings"]["Ports"]}
        js = {**m, **e, **p} #, **js}
        return Response(json.dumps(js), mimetype="text/json")
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62749397

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档