我试图通过UDS套接字和UDP协议从一个进程发送长json字符串到另一个进程,如下所示:
# server.py
def main():
if os.path.exists(UDSFILE):
os.remove(UDSFILE)
sock = socket(AF_UNIX, SOCK_DGRAM)
sock.bind(UDSFILE)
file = open(CSVFILE, 'ab')
msg = b''
while True:
while True:
try:
package, *_ = sock.recvfrom(4096)
msg += package
except ConnectionError:
log.exception()
raise
if b'\t' in msg:
data, msg = msg.split(b'\t', 1)
break
data = json.loads(data.decode())
exchange, ts, data = data
file.write(("%s\t%s\t%s\n" % (exchange, ts, data)).encode())
# Client.py
def main():
wss = GeminiWSS(args.pair)
wss.start()
sock = socket(AF_UNIX, SOCK_DGRAM)
while True:
try:
sock.sendto((json.dumps(wss.get()) + '\t').encode(), socket_addr.encode())
except Exception as e:
log.exception(e)
log.info("Attempting to restart..")
wss.restart()
except OSError as e:
log.exception(e)
print("OSError on connecting, trying again..")服务器在解码时崩溃,跟踪如下:
Traceback (most recent call last):
File "scribe.py", line 62, in <module>
main()
File "scribe.py", line 49, in main
data = json.loads(data.decode())
File "/opt/anaconda3/lib/python3.5/json/__init__.py", line 319, in loads
return _default_decoder.decode(s)
File "/opt/anaconda3/lib/python3.5/json/decoder.py", line 339, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/opt/anaconda3/lib/python3.5/json/decoder.py", line 355, in raw_decode
obj, end = self.scan_once(s, idx)
json.decoder.JSONDecodeError: Expecting ':' delimiter: line 1 column 4099 (char 4098)如您所见,我尝试使用自定义分隔器来读取接收到的数据(\t),但这不起作用(因此产生了JSONDecodeError)。
那么,我如何才能确保我的数据被正确地组装呢?不幸的是,简单地增加缓冲区是无效的,因为据我所知,我没有最大的消息长度。
编辑生成从wss.get()返回的数据的代码
def _subscription_thread(self):
"""
Thread Method, running the connection for each endpoint.
:param endpoint:
:return:
"""
try:
conn = create_connection(self.addr + self.endpoint, timeout=5)
except WebSocketTimeoutException:
self._controller_q.put('restart')
return
while self.endpoint_thread_running:
try:
msg = conn.recv()
except WebSocketTimeoutException:
log.exception()
raise
log.debug("_subscription_thread(): Putting data on q..")
try:
self.data_q.put(('Gemini', json.loads(msg), time.time()), timeout=1)
except TimeoutError:
continue
finally:
log.debug("_subscription_thread(): Data Processed, looping back..")
conn.close()
log.debug("_subscription_thread(): Thread Loop Ended.")数据样本
引发bytes的'JSONDecodeError对象
b'["Gemini", {"eventId": 609771498, "events": [{"remaining": "21914.3", "reason": "initial", "price": "0.01", "side": "bid", "type": "change", "delta": "21914.3"}, {"remaining": "0.000047", "reason": "initial", "price": "0.06", "side": "bid", "type": "change", "delta": "0.000047"}, {"remaining": "110", "reason": "initial", "price": "0.10", "side": "bid", "type": "change", "delta": "110"}, {"remaining": "1053.33333333", "reason": "initial", "price": "0.15", "side": "bid", "type": "change", "delta": "1053.33333333"}, {"remaining": "1", "reason": "initial", "price": "0.90", "side": "bid", "type": "change", "delta": "1"}, {"remaining": "744", "reason": "initial", "price": "1.00", "side": "bid", "type": "change", "delta": "744"}, {"remaining": "2400", "reason": "initial", "price": "1.10", "side": "bid", "type": "change", "delta": "2400"}, {"remaining": "1", "reason": "initial", "price": "5.00", "side": "bid", "type": "change", "delta": "1"}, {"remaining": "1", "reason": "initial", "price": "8.00", "side": "bid", "type": "change", "delta": "1"}, {"remaining": "0.022", "reason": "initial", "price": "9.00", "side": "bid", "type": "change", "delta": "0.022"}, {"remaining": "1", "reason": "initial", "price": "11.00", "side": "bid", "type": "change", "delta": "1"}, {"remaining": "30", "reason": "initial", "price": "16.00", "side": "bid", "type": "change", "delta": "30"}, {"remaining": "2", "reason": "initial", "price": "24.00", "side": "bid", "type": "change", "delta": "2"}, {"remaining": "400", "reason": "initial", "price": "26.00", "side": "bid", "type": "change", "delta": "400"}, {"remaining": "0.03015", "reason": "initial", "price": "30.00", "side": "bid", "type": "change", "delta": "0.03015"}, {"remaining": "0.97", "reason": "initial", "price": "31.85", "side": "bid", "type": "change", "delta": "0.97"}, {"remaining": "0.029", "reason": "initial", "price": "33.85", "side": "bid", "type": "change", "delta": "0.029"}, {"remaining": "1", "reason": "initial", "price": "36.97", "side": "bid", "type": "change", "delta": "1"}, {"remaining": "2.9975", "reason": "initial", "price": "40.00", "side": "bid", "type": "change", "delta": "2.9975"}, {"remaining": "0.04460443", "reason": "initial", "price": "67.00", "side": "bid", "type": "change", "delta": "0.04460443"}, {"remaining": "16.20200501", "reason": "initial", "price": "99.75", "side": "bid", "type": "change", "delta": "16.20200501"}, {"remaining": "62.08649948", "reason": "initial", "price": "100.00", "side": "bid", "type": "change", "delta": "62.08649948"}, {"remaining": "1", "reason": "initial", "price": "151.00", "side": "bid", "type": "change", "delta": "1"}, {"remaining": "1", "reason": "initial", "price": "159.00", "side": "bid", "type": "change", "delta": "1"}, {"remaining": "2.60664747", "reason": "initial", "price": "191.05", "side": "bid", "type": "change", "delta": "2.60664747"}, {"remaining": "1", "reason": "initial", "price": "200.00", "side": "bid", "type": "change", "delta": "1"}, {"remaining": "0.03976369", "reason": "initial", "price": "220.05", "side": "bid", "type": "change", "delta": "0.03976369"}, {"remaining": "2.6635", "reason": "initial", "price": "300.00", "side": "bid", "type": "change", "delta": "2.6635"}, {"remaining": "0.855", "reason": "initial", "price": "350.00", "side": "bid", "type": "change", "delta": "0.855"}, {"remaining": "0.04172229", "reason": "initial", "price": "359.52", "side": "bid", "type": "change", "delta": "0.04172229"}, {"remaining": "0.02773771", "reason": "initial", "price": "360.52", "side": "bid", "type": "change", "delta": "0.02773771"}, {"remaining": "2.59220779", "reason": "initial", "price": "385.00", "side": "bid", "type": "change", "delta": "2.59220779"}, {"remaining": "3.740625", "reason": "initial", "price": "400.00", "side": "bid", "type": "change", "delta": "3.740625"}, {"remaining": "1.21646341", "reason": "initial", "price": "410.00", "side": "bid", "type": "change", "delta": "1.21646341"}, {"remaining": "1.20471014", "reason": "initial", "price": "414.00", "side": "bid", "type": "change", "delta": "1.20471014"}, {"remai["Gemini", {"eventId": 609771500, "events": [{"remaining": "0", "reason": "cancel", "price": "2053.62", "side": "ask", "type": "change", "delta": "-2"}], "type": "update"}, 1495312354.6720355]'从这一来源产生:
巴斯丁 (由于文本限制,移动到pastebin )
发布于 2017-06-09 09:38:20
最终,我不再试图处理UDS连接和发送,而只是让python完成它的工作,使用multiprocessing库的Client和Listener类,并提供文件路径。现在看来,这是不可能做到的。
> listener.py
from multiprocessing.connection import Listener
serv = Listener('/path/to/uds')
while True:
conn = serv.accept()
try:
while True:
msg = conn.recv()
print(msg)
conn.send(msg.upper())
except EOFError:
print('Conn closed!')
finally:
conn.close()
serv.close()
> client.py
from multiprocessing.connection import Client
large_data = ''.join(['a' for i in range(1000000)])
sock = Client('/path/to/uds')
sock.send(large_data)
answer = sock.recv()
print(answer)
sock.close()更不用说我无缘无故地分裂消息了,因为UDS连接是文件描述符。
发布于 2017-05-30 11:18:38
这与UDS无关,但数据本身并不是有效的JSON字符串。
您的错误消息说:
json.decoder.JSONDecodeError: Expecting ':' delimiter: line 1 column 4099 (char 4098)这意味着json.loads由于JSON解码错误而失败,因为输入数据不是有效的JSON字符串。原因被指定为
期待:“划界者”
所以你知道数据不对但在哪里?答案:
第1栏4099 (char 4098)
现在让我们获取输入
>>> input_data = b'["Gemini", {"eventId": 609771498, "events": [{"remaining": "21914.3", "reason": "initial", "price": "0.01", "side": "bid", "type": "change", "delta": "21914.3"}, {"remaining": "0.000047", "reason": "initial", "price": "0.06", "side": "bid", "type": "change", "delta": "0.000047"}, {"remaining": "110", "reason": "initial", "price": "0.10", "side": "bid", "type": "change", "delta": "110"}, {"remaining": "1053.33333333", "reason": "initial", "price": "0.15", "side": "bid", "type": "change", "delta": "1053.33333333"}, {"remaining": "1", "reason": "initial", "price": "0.90", "side": "bid", "type": "change", "delta": "1"}, {"remaining": "744", "reason": "initial", "price": "1.00", "side": "bid", "type": "change", "delta": "744"}, {"remaining": "2400", "reason": "initial", "price": "1.10", "side": "bid", "type": "change", "delta": "2400"}, {"remaining": "1", "reason": "initial", "price": "5.00", "side": "bid", "type": "change", "delta": "1"}, {"remaining": "1", "reason": "initial", "price": "8.00", "side": "bid", "type": "change", "delta": "1"}, {"remaining": "0.022", "reason": "initial", "price": "9.00", "side": "bid", "type": "change", "delta": "0.022"}, {"remaining": "1", "reason": "initial", "price": "11.00", "side": "bid", "type": "change", "delta": "1"}, {"remaining": "30", "reason": "initial", "price": "16.00", "side": "bid", "type": "change", "delta": "30"}, {"remaining": "2", "reason": "initial", "price": "24.00", "side": "bid", "type": "change", "delta": "2"}, {"remaining": "400", "reason": "initial", "price": "26.00", "side": "bid", "type": "change", "delta": "400"}, {"remaining": "0.03015", "reason": "initial", "price": "30.00", "side": "bid", "type": "change", "delta": "0.03015"}, {"remaining": "0.97", "reason": "initial", "price": "31.85", "side": "bid", "type": "change", "delta": "0.97"}, {"remaining": "0.029", "reason": "initial", "price": "33.85", "side": "bid", "type": "change", "delta": "0.029"}, {"remaining": "1", "reason": "initial", "price": "36.97", "side": "bid", "type": "change", "delta": "1"}, {"remaining": "2.9975", "reason": "initial", "price": "40.00", "side": "bid", "type": "change", "delta": "2.9975"}, {"remaining": "0.04460443", "reason": "initial", "price": "67.00", "side": "bid", "type": "change", "delta": "0.04460443"}, {"remaining": "16.20200501", "reason": "initial", "price": "99.75", "side": "bid", "type": "change", "delta": "16.20200501"}, {"remaining": "62.08649948", "reason": "initial", "price": "100.00", "side": "bid", "type": "change", "delta": "62.08649948"}, {"remaining": "1", "reason": "initial", "price": "151.00", "side": "bid", "type": "change", "delta": "1"}, {"remaining": "1", "reason": "initial", "price": "159.00", "side": "bid", "type": "change", "delta": "1"}, {"remaining": "2.60664747", "reason": "initial", "price": "191.05", "side": "bid", "type": "change", "delta": "2.60664747"}, {"remaining": "1", "reason": "initial", "price": "200.00", "side": "bid", "type": "change", "delta": "1"}, {"remaining": "0.03976369", "reason": "initial", "price": "220.05", "side": "bid", "type": "change", "delta": "0.03976369"}, {"remaining": "2.6635", "reason": "initial", "price": "300.00", "side": "bid", "type": "change", "delta": "2.6635"}, {"remaining": "0.855", "reason": "initial", "price": "350.00", "side": "bid", "type": "change", "delta": "0.855"}, {"remaining": "0.04172229", "reason": "initial", "price": "359.52", "side": "bid", "type": "change", "delta": "0.04172229"}, {"remaining": "0.02773771", "reason": "initial", "price": "360.52", "side": "bid", "type": "change", "delta": "0.02773771"}, {"remaining": "2.59220779", "reason": "initial", "price": "385.00", "side": "bid", "type": "change", "delta": "2.59220779"}, {"remaining": "3.740625", "reason": "initial", "price": "400.00", "side": "bid", "type": "change", "delta": "3.740625"}, {"remaining": "1.21646341", "reason": "initial", "price": "410.00", "side": "bid", "type": "change", "delta": "1.21646341"}, {"remaining": "1.20471014", "reason": "initial", "price": "414.00", "side": "bid", "type": "change", "delta": "1.20471014"}, {"remai["Gemini", {"eventId": 609771500, "events": [{"remaining": "0", "reason": "cancel", "price": "2053.62", "side": "ask", "type": "change", "delta": "-2"}], "type": "update"}, 1495312354.6720355]'
>>> print(input_data[4060:4140])
b'ge", "delta": "1.20471014"}, {"remai["Gemini", {"eventId": 609771500, "events": '现在清楚了,您的数据在此部分已被损坏:
{remai[“双子座”,{"eventId“)
现在,您需要弄清楚为什么您的数据不是有效的json。
https://stackoverflow.com/questions/44090150
复制相似问题