我需要一种快速的方式,在python多处理进程之间每秒通过zeromq发送300条短消息。每条消息都需要包含一个ID和time.time()
在通过msgpack发送数据之前,msgpack似乎是序列化dict的最好方法,而且msgpack有一个示例说明了我需要什么,但它有一个datetime.datetime.now()。
import datetime
import msgpack
useful_dict = {
"id": 1,
"created": datetime.datetime.now(),
}
def decode_datetime(obj):
if b'__datetime__' in obj:
obj = datetime.datetime.strptime(obj["as_str"], "%Y%m%dT%H:%M:%S.%f")
return obj
def encode_datetime(obj):
if isinstance(obj, datetime.datetime):
return {'__datetime__': True, 'as_str': obj.strftime("%Y%m%dT%H:%M:%S.%f")}
return obj
packed_dict = msgpack.packb(useful_dict, default=encode_datetime)
this_dict_again = msgpack.unpackb(packed_dict, object_hook=decode_datetime)问题是他们的例子不起作用,我得到了这个错误:
obj = datetime.datetime.strptime(obj["as_str"], "%Y%m%dT%H:%M:%S.%f")
KeyError: 'as_str'也许是因为我正在使用python3.4,但我不知道strptime有什么问题。会很感激你的帮助。
发布于 2015-05-19 19:06:06
Python3和Python2管理不同的字符串编码:编码和解码.字符串在python-3-x中
然后需要:
b'as_str' (而不是'as_str')作为字典键encode和decode像这样修改代码适用于python2和python3:
import datetime
import msgpack
useful_dict = {
"id": 1,
"created": datetime.datetime.now(),
}
def decode_datetime(obj):
if b'__datetime__' in obj:
obj = datetime.datetime.strptime(obj[b'as_str'].decode(), "%Y%m%dT%H:%M:%S.%f")
return obj
def encode_datetime(obj):
if isinstance(obj, datetime.datetime):
obj = {'__datetime__': True, 'as_str': obj.strftime("%Y%m%dT%H:%M:%S.%f").encode()}
return obj
packed_dict = msgpack.packb(useful_dict, default=encode_datetime)
this_dict_again = msgpack.unpackb(packed_dict, object_hook=decode_datetime)发布于 2016-03-24 21:19:09
考虑到messagepack ( import msgpack )擅长序列化整数,我创建了一个只使用整数的解决方案:
_datetime_ExtType = 42
def _unpacker_hook(code, data):
if code == _datetime_ExtType:
values = unpack(data)
if len(values) == 8: # we have timezone
return datetime.datetime(*values[:-1], dateutil.tz.tzoffset(None, values[-1]))
else:
return datetime.datetime(*values)
return msgpack.ExtType(code, data)
# This will only get called for unknown types
def _packer_unknown_handler(obj):
if isinstance(obj, datetime.datetime):
if obj.tzinfo:
components = (obj.year, obj.month, obj.day, obj.hour, obj.minute, obj.second, obj.microsecond, int(obj.utcoffset().total_seconds()))
else:
components = (obj.year, obj.month, obj.day, obj.hour, obj.minute, obj.second, obj.microsecond)
# we effectively double pack the values to "compress" them
data = msgpack.ExtType(_datetime_ExtType, pack(components))
return data
raise TypeError("Unknown type: {}".format(obj))
def pack(obj, **kwargs):
# we don't use a global packer because it wouldn't be re-entrant safe
return msgpack.packb(obj, use_bin_type=True, default=_packer_unknown_handler, **kwargs)
def unpack(payload):
try:
# we temporarily disable gc during unpack to bump up perf: https://pypi.python.org/pypi/msgpack-python
gc.disable()
# This must match the above _packer parameters above. NOTE: use_list is faster
return msgpack.unpackb(payload, use_list=False, encoding='utf-8', ext_hook=_unpacker_hook)
finally:
gc.enable()发布于 2017-08-24 14:59:08
在文档中,它说packb (和pack)的unicode字符串的默认编码是utf-8。您只需在'__datetime__'函数中搜索unicode字符串b'__datetime__',而不是字节对象b'__datetime__',并将encoding='utf-8'参数添加到unpack中。就像这样:
def decode_datetime(obj):
if '__datetime__' in obj:
obj = datetime.datetime.strptime(obj["as_str"], "%Y%m%dT%H:%M:%S.%f")
return obj
packed_dict = msgpack.packb(useful_dict, default=encode_datetime)
this_dict_again = msgpack.unpackb(packed_dict, object_hook=decode_datetime, encoding='utf-8')https://stackoverflow.com/questions/30313243
复制相似问题