我正在尝试使用来自IG索引的流API,他们的文档是这里。Api要求在应用程序中包含光流客户端。因此,我使用了这版本并将其添加到我的项目中。我创建了一个连接到服务器的函数。(我相信)
def connect_light_stream_client():
if cst == None or xt == None:
create_session()
global client
client = lsc.LightstreamerClient(lightstreamer_username=stream_ident,
lightstreamer_password=stream_password,
lightstreamer_url=light_stream_server)
try:
client.connect()
except Exception as e:
print("Unable to connect to Lightstreamer Server")
return然后,我调用第二个函数,它应该获取一个股票数据流,在每个滴答之后打印结果。
def listner(item_info):
print(item_info)
def light_stream_chart_tick():
sub = lsc.LightstreamerSubscription(mode="DISTINCT", items={"CHART:CS.D.XRPUSD.TODAY.IP:TICK"},
fields={"BID"})
sub.addlistener(listner)
sub_key = client.subscribe(sub)
print(sub_key)最后的打印输出为1,我没有从侦听器那里得到任何信息。有什么建议我做错了吗?
发布于 2022-11-06 14:19:42
有几件事不对:
light_stream_chart_tick()的代码放入connect方法中,请求输入作为等待。items和fields参数需要是列表而不是dictsdef connect_light_stream_client():
if cst == None or xt == None:
create_session()
global client
client = lsc.LightstreamerClient(lightstreamer_username=stream_ident,
lightstreamer_password=stream_password,
lightstreamer_url=light_stream_server)
try:
client.connect()
except Exception as e:
print("Unable to connect to Lightstreamer Server")
return
sub = lsc.LightstreamerSubscription(
mode="DISTINCT",
items=["CHART:CS.D.BITCOIN.TODAY.IP:TICK"],
fields=["BID"]
)
sub.addlistener(listner)
sub_key = client.subscribe(sub)
print(sub_key)
input("{0:-^80}\n".format("Hit CR to unsubscribe and disconnect"))
client.disconnect()
def listner(item_info):
print(item_info)有一个python项目这里,它使与IG的交互变得更加容易,并包含了一个流样例。该项目是最新的,并积极维护。
充分披露:我是这个项目的维护者
https://stackoverflow.com/questions/65480735
复制相似问题