kazoo是Python连接操作ZooKeeper的客户端库。我们可以通过kazoo来使用ZooKeeper。 1. 安装 pip install kazoo 2. 使用 连接ZooKeeper from kazoo.client import KazooClient zk = KazooClient(hosts='127.0.0.1:2181') # 启动连接
使用的库是kazoo,安装方式 pip install kazoo 应用场景: 多个实例部署,但不是“去中心化”的部署方式; 有且只有一个节点作为master,履行master的职责,在例子中是注册调度器 # -*- coding:utf-8 -*- import socket import traceback from kazoo.client import KazooClient from kazoo.client
_conn_retry.copy(), logger=self.logger) 初始化了这个连接对象,进入该连接对象定义:kazoo.protocol.connection.py class ConnectionHandler This function will be called with a :class:`~kazoo.protocol.states.KazooState` instance indicating :type client: :class:`~kazoo.client.KazooClient` :param path: The path to watch for data changes :rtype: :class:`~kazoo.interfaces.IAsyncResult` """ if not isinstance(path, string_types): raise 类 class Lock(object): """Kazoo Lock Example usage with a :class:`~kazoo.client.KazooClient`
进行讲解 注册监控 通过kazoo提供的装饰器进行注册 @zookeeper.DataWatch(path) def changed(data, stat): fun(data) type(stat :rtype: :class:`~kazoo.interfaces.IAsyncResult` """ if not isinstance(path, string_types): raise Close): self.logger.log(BLATHER, 'Read close response') return CLOSE_RESPONSE 从上面的代码可以看出:1、如果接口正常返回,kazoo 执行方式 kazoo提供了两种请求方式,一种是异步执行,一种是同步执行 异步类 class AsyncResult(object): """A one-time event that stores 注:这里的async_object,请求回复一定要是同一个对象,这也就是kazoo在创建请求的时候,生成了一个async_object对象,并将其同request对象一起放入队列的原因。
环境配置 ZooKeeper集群的安装可以参考http://blog.csdn.net/mrbcy/article/details/54767484 使用下面的命令安装kazoo pip install kazoo 基本使用 这一部分可参考官方文档:http://kazoo.readthedocs.io/en/latest/basic_usage.htm 监听子节点变化 下面的代码实现了创建一个临时、 #-*- coding: utf-8 -*- import time from kazoo.client import KazooClient from kazoo.recipe.watchers import 而kazoo则在这个基础上封装了更上层的API,可以持续的触发。这就是上面的ChildrenWatch,除此之外kazoo还封装了一个DataWatch,用于监听数据的变化。下面我们也会用到。 #-*- coding: utf-8 -*- import threading import time from kazoo.client import KazooClient from kazoo.protocol.states
/usr/bin/env python # -*- coding: utf-8 -*- import sys from kazoo.client import KazooClient def main /usr/bin/env python # -*- coding: utf-8 -*- import sys import time from kazoo.client import KazooClient from kazoo.client import ChildrenWatch from kazoo.client import DataWatch """ Watcher可以通过两种方式设置,一种是在调用 一次性事件关注是zookeeper默认的即便在JAVA客户端里也是,这种高级别 API在JAVA里是zkclient,而在Python里面就是kazoo。
安装:pip install kazoo 1、链接 from kazoo.client import KazooClient zk = KazooClient(hosts='127.0.0.1:2181 Version: %s, data: %s" % (stat.version, data.decode("utf-8"))) 7、监听事件:LOST/CONNECTED/SUSPENDED from kazoo.client Zookeeper else: # Handle being connected/reconnected to Zookeeper zk.add_listener(my_listener) 8、kazoo 支持异步 9、API DOCUMENT:https://kazoo.readthedocs.io/en/latest/api/client.html
python3相关的zookeeper包支持的少,互联网上大量的文章都是使用的zkpython,可是zkpython只支持到python2.7,找了半天才寻找到 kazoo,也很好用 环境信息 python3.5 用到的包 kazoo base64 rsa #! get.py # @Version : 1.0 # 说明: code后有'#'是测试时加的或者需要修改的code # 用法:python3使用zookeeper和公私钥加解密传输机器人配置文件 from kazoo.client
测试环境 Win7 64位 Python 3.3.4 kazoo-2.6.1-py2.py3-none-any.whl(windows) kazoo-2.6.1.tar.gz (linux) https ://pypi.org/project/kazoo/#files zookeeper-3.4.13.tar.gz 下载地址: http://zookeeper.apache.org/releases.html import KazooClient from kazoo.client import KazooState from kazoo.retry import KazooRetry def restart_zk_client /usr/bin/env python #-*- encoding:utf-8 -*- __author__ = 'shouke' import time from kazoo.client import 参考链接: https://kazoo.readthedocs.io/en/latest/basic_usage.html
1.引用kazoo lib API DOC: http://kazoo.readthedocs.io/en/latest/install.html Code: # -*- coding:utf-8 - *- __author__ = 'yangxin' from kazoo.client import KazooClient class PyZooConn(object): # init
#先安装pip工具 #再安装python支持zk的库 yum -y install python-pip && pip install kazoo 演示工作 这里设计了2个服务,服务a和服务b,他们之间有调用关系 import json import socket import time import threading from kazoo.client import KazooClient from kazoo.exceptions 9999, 'timestamp': 1745683444.3150668} 服务b import json import socket import time import threading from kazoo.client
+行,大致架构如下: [image.png] Kingpin > go的一个命令行库,处理用户输入的参数 sarama(核心) > go实现的kafka客户端,连接broker获取相关的指标与元数据 kazoo kafka的zk集群,主要用于zk消费组的lag计算 promhttp > 用于生成 Prometheus HTTP服务器,供prometheus pull指标 其他组件 > 协助将 sarama 和kazoo 分析代码发现,kafka exporter使用zk库kazoo的姿势不太对,使用NewKazooFromConnectionString代替NewKazoo方法就能兼容我们的场景,目前这种改进方案已经提交
pip install D:\Dev\Python\pkg\six-*.whl pip install D:\Dev\Python\pkg\kazoo-*.whl pip list 打开
deployUser - 部署用户,需要有操作HDFS的权限 要使用HDFS作为资源中心,HA情况下,需要将集群的core-site.xml文件和hdfs-site.xml文件拷贝到conf目录 部署安装kazoo 安装python的zk工具 CDH集群默认是python2.7 yum -y install python-pip; pip install kazoo; ?
import os import time from kazoo.client import KazooClient from prometheus_client import Info from
import redisfrom kazoo.client import KazooClientfrom kazoo.exceptions import NoNodeError# 初始化 zk 客户端和临时存储
_watcher of <kazoo.recipe.watchers.DataWatch object at 0x7ff860ba3438>>) 2020-04-21 20:25:13.461 localhost 448 _read_socket] test: Reading for header ReplyHeader(xid=4, zxid=17179869239, err=0) 序列化和反序列化 下面看下kazoo 这里的request就是kazoo/protocol/serialization.py定义的各个类实例 比如连接类: class Connect(namedtuple('Connect', 'protocol_version
import json import time import random from kazoo.client import KazooClient class ClusterMessagePublisher import json import time from kazoo.client import KazooClient class ClusterMessageWriter: def __init
import logging import os import re import socket import pymysql import thriftpy2 import yaml from kazoo.client functools import json import logging import os import random import re import uuid import yaml from kazoo.client probe', 'probe/config' ], package_data={'': ['*.*']}, install_requires=[ 'kazoo
每个三秒钟输出一次,这说明我们定义的配置变更的通知已经生效了 MyConfigurationProvider 中我们只是通过赋值一个 DateTime 来模拟配置源 实际上可以从远程来说,比如阿波罗的配置中心,Kazoo