我正在尝试从Binance中为我的数据库中的每个加密对同时检索历史数据。我遇到了APIErrors的禁令,声明“APIError(代码=-1003):使用了太多的请求权重;IP在1629399758399之前被禁止。请使用websocket进行实时更新,以避免禁令。”
如何增加延时,防止达到每1分钟1200次的API请求重量限制?
这就是我现在所拥有的
import numpy as np
import json
import requests
import datetime, time
import aiohttp, asyncpg, asyncio
from asyncio import gather, create_task
from binance.client import AsyncClient
from multiprocessing import Process
import time
import config
async def main():
# create database connection pool
pool = await asyncpg.create_pool(user=config.DB_USER, password=config.DB_PASS, database=config.DB_NAME, host=config.DB_HOST, command_timeout=60)
# get a connection
async with pool.acquire() as connection:
cryptos = await connection.fetch("SELECT * FROM crypto")
symbols = {}
for crypto in cryptos:
symbols[crypto['id']] = crypto['symbol']
await get_prices(pool, symbols)
async def get_prices(pool, symbols):
try:
# schedule requests to run concurrently for all symbols
tasks = [create_task(get_price(pool, crypto_id, symbols[crypto_id])) for crypto_id in symbols]
await gather(*tasks)
print("Finalized all. Retrieved price data of {} outputs.".format(len(tasks)))
except Exception as e:
print("Unable to fetch crypto prices due to {}.".format(e.__class__))
print(e)
async def get_price(pool, crypto_id, url):
try:
candlesticks = []
client = await AsyncClient.create(config.BINANCE_API_KEY, config.BINANCE_SECRET_KEY)
async for kline in await client.get_historical_klines_generator(f"{crypto_id}".format(), AsyncClient.KLINE_INTERVAL_1HOUR, "18 Aug, 2021", "19 Aug, 2021"):
candlesticks.append(kline)
df = pd.DataFrame(candlesticks, columns = ["date","open","high","low","close","volume","Close time","Quote Asset Volume","Number of Trades","Taker buy base asset volume","Taker buy quote asset volume","Ignore"])
df["date"] = pd.to_datetime(df.loc[:, "date"], unit ='ms')
df.drop(columns=['Close time','Ignore', 'Quote Asset Volume', 'Number of Trades', 'Taker buy base asset volume', 'Taker buy quote asset volume'], inplace=True)
df.loc[:, "id"] = crypto_id
df
print(df)
except Exception as e:
print("Unable to get {} prices due to {}.".format(url, e.__class__))
print(e)
start = time.time()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
end = time.time()
print("Took {} seconds.".format(end - start))发布于 2021-08-19 20:38:36
您可以创建自定义类的一个实例,它将保持当前活动请求的计数(以及请求的时间)-并且只允许一个请求在守卫同意的情况下继续进行。
在这样的构造中使用Python的async with命令会很好,因为它既可以保护一个块,又可以减少活动请求计数,同时最大限度地减少对已有代码的干预。
这可以像这样进行-在您的代码中实际触发请求的行是:
`client = await AsyncClient.create(config.BINANCE_API_KEY, config.BINANCE_SECRET_KEY)`因此,如果我们能够确保这条线路每分钟最多被调用1200次,并且在它没有发生的时候不得不屈从于主循环,那么我们就很好。虽然可以突发1200 (-1)个调用并让它们放弃一分钟,但如果我们每(60s / 1200)秒产生一个调用(x 90%,获得10%的良好边际),代码将更容易编写,并且API限制将在其精神中得到更多尊重。
async with将调用类的__aenter__方法。在那里,我们可以简单地检查自上次API调用以来的时间间隔,并休眠到此时间。(实际上,每个任务都需要类的一个实例,因为每个实例都需要调用__aenter__ )。但是为了不依赖全局“计数器”,我们可以创建一个工厂函数,它将为每个需要限制的API创建一个防护--并且我们将这个防护保存在一个全局变量中。)
因此,您可以将此工厂函数添加到您的程序中,然后在main函数上创建一个guard-class,并在任务代码中使用"async with“:
def create_rate_limit_guard(rate_limit=1200, safety_margin=0.9):
"""Rate limit is given in maximum requests per minute.
"""
# TBD: it would easy to have code to throttle by maximum active requests
# instead of total requests per minute.
# I will just let the accounting of concurrent_requests in place, though
class Guard:
request_interval = (60 / rate_limit) * safety_margin
current_requests = 0
max_concurrent_requests = 0
last_request = 0
async def __aenter__(self):
cls = self.__class__
cls.current_requests += 1
if (throttle_wait:= time.time() - last_request) < cls.request_interval:
await asyncio.sleep(throttle_wait)
cls.current_requests += 1
cls.last_request = time.time()
async def __aexit__(self, exc_type, exc, tb):
cls = self.__class__
cls.max_concurrent_requests = max(cls.max_concurrent_requests, cls.current_requests)
cls.current_requests -= 1
return Guard在您的代码中,只需将get_price更改为以下内容,并创建guard类( if ...__main__之前的最后一行
async def get_price(pool, crypto_id, url):
try:
candlesticks = []
# consider having a single client application wise - you are creating one per task.
with BinanceLimitGuard():
client = await AsyncClient.create(config.BINANCE_API_KEY, config.BINANCE_SECRET_KEY)
# as the actual calls to the remote endpoint are done inside the client code itself,
# we can't just run "async for" on the generator - instead we have to throttle
# all the "for" interactions. So we "unfold" the async for in a while/anext
# structure so that we can place the guard before each interation:
klines_generator = await client.get_historical_klines_generator(
f"{crypto_id}".format(), AsyncClient.KLINE_INTERVAL_1HOUR, "18 Aug, 2021", "19 Aug, 2021")
while True:
try:
with BinanceLimitGuard():
kline = await klines_generator.__anext__()
except StopAsyncIteration:
break
candlesticks.append(kline)
df = pd.DataFrame(candlesticks, columns = ["date","open","high","low","close","volume","Close time","Quote Asset Volume","Number of Trades","Taker buy base asset volume","Taker buy quote asset volume","Ignore"])
df["date"] = pd.to_datetime(df.loc[:, "date"], unit ='ms')
df.drop(columns=['Close time','Ignore', 'Quote Asset Volume', 'Number of Trades', 'Taker buy base asset volume', 'Taker buy quote asset volume'], inplace=True)
df.loc[:, "id"] = crypto_id
print(df)
except Exception as e:
print("Unable to get {} prices due to {}.".format(url, e.__class__))
print(e)
BinanceLimitGuard = create_rate_limit_guard(300)
if __name__ == "__main__":
# all code that is meant to take place when your file is run as a program
# should be guarded in this if block. Importing your file should not "print"
start = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
end = time.time()
print("Took {} seconds.".format(end - start))请注意,虽然我将防护设计为“每分钟1200个请求”-我在上面的BinanceLimitGuard = create_rate_limit_guard(300)中建议了每分钟"300“个并行任务的限制-因为,检查binance客户端本身的源代码时,它确实在对"get_historical_klines”的调用中执行了自己的几个请求-并且代码嵌入了每秒3个调用的限制-但是每个生成器都会发生,所以我们不能在外部代码中考虑它们。
如果这仍然不起作用,可以通过对AsyncClient本身进行子类化(或加密)并在其内部_request_api内部方法上设置limit rate来使其工作,在此位置https://github.com/sammchardy/python-binance/blob/a6f3048527f0f2fd9bc6591ac1fdd926b2a29f3e/binance/client.py#L330 -然后您可以返回到"1200 limit“,因为它将考虑所有内部调用。(如果你需要求助于此,请留下评论,我可以完成这个答案或添加另一个答案)
https://stackoverflow.com/questions/68853407
复制相似问题