以下代码从Binance下载从开始日期到结束日期的历史OHLCV数据。因为Binance只允许我们一次下载1000根蜡烛,所以我就这样做了DownloadAsync。代码上的任何建议,也是值得感谢的。
实际的问题是使DownloadAsync多线程,以加快进程,因为想象一下从2018年到2021年以5m间隔下载candles。我更喜欢使用System.Reactive,但我猜其他解决方案也是受欢迎的,因为很难将代码表示为多线程版本。
下面的代码可以测试。
using Binance.Net;
using Binance.Net.Enums;
using Binance.Net.Interfaces;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using System.Linq;
using System.Text.RegularExpressions;
using System.Reactive.Linq;
using System.Threading;
namespace DownloadCandleDataTest
{
public class DataProvider
{
private Exchange _exchange;
public DataProvider(Exchange exchange)
{
_exchange = exchange;
}
public async Task<List<OHLCV>> DownloadAsync(string pair, KlineInterval interval, DateTime startDate, DateTime endDate, int startupCandleCount)
{
DateTime start = startDate;
DateTime end = endDate;
var tempStartDate = start;
var tempEndDate = end;
var tempList = new List<OHLCV>();
for (int i = 0; tempStartDate < tempEndDate; i++)
{
var candles = await _exchange.GetCandlesAsync(pair, interval, tempStartDate, tempEndDate, 100).ConfigureAwait(false);
if (candles.Count > 0)
{
// Remove the first candle when i > 0, to prevent duplicates
if (i > 0)
{
candles.RemoveAt(0);
}
var first = candles.First();
var last = candles.Last();
Console.WriteLine($"First: {first.Timestamp} | Last: {last.Timestamp}");
tempList.AddRange(candles);
tempStartDate = last.Timestamp;
}
}
// Find duplicates
var groups = tempList.GroupBy(g => g.Timestamp).Where(e => e.Skip(1).Any());
foreach (var group in groups)
{
Console.WriteLine(group.Key);
foreach (var ohclv in group)
{
Console.WriteLine("\t" + ohclv.Timestamp);
}
}
return null;
}
}
class Program
{
public static void StartBackgroundWork()
{
Console.WriteLine("Shows use of Start to start on a background thread:");
var o = Observable.Start(() =>
{
//This starts on a background thread.
Console.WriteLine("From background thread. Does not block main thread.");
Console.WriteLine("Calculating...");
Thread.Sleep(3000);
Console.WriteLine("Background work completed.");
}).Finally(() => Console.WriteLine("Main thread completed."));
Console.WriteLine("\r\n\t In Main Thread...\r\n");
o.Wait(); // Wait for completion of background operation.
}
static async Task Main(string[] args)
{
using var exchange = new Exchange();
var dataProvider = new DataProvider(exchange);
await dataProvider.DownloadAsync("TRXUSDT", KlineInterval.FiveMinutes, new DateTime(2019, 1, 1), new DateTime(2019, 1, 2), 100).ConfigureAwait(false);
Console.ReadLine();
}
}
public class OHLCV
{
public DateTime Timestamp { get; set; }
public decimal Open { get; set; }
public decimal High { get; set; }
public decimal Low { get; set; }
public decimal Close { get; set; }
public decimal Volume { get; set; }
}
public static class Extensions
{
public static OHLCV ToCandle(this IBinanceKline candle)
{
return new OHLCV
{
Timestamp = candle.OpenTime,
Open = candle.Open,
High = candle.High,
Low = candle.Low,
Close = candle.Close,
Volume = candle.BaseVolume,
};
}
}
public class Exchange : IDisposable
{
private readonly IBinanceClient _client;
public Exchange()
{
_client = new BinanceClient();
}
public async Task<List<OHLCV>> GetCandlesAsync(string pair, KlineInterval interval, DateTime? startTime = null, DateTime? endTime = null, int? limit = null)
{
var result = await _client.Spot.Market.GetKlinesAsync(pair, interval, startTime, endTime, limit).ConfigureAwait(false);
if (result.Success)
{
return result.Data?.Select(e => e.ToCandle()).ToList();
}
return null;
}
public void Dispose()
{
if (_client != null)
{
_client.Dispose();
}
}
}
}发布于 2021-02-06 10:12:02
你想得太多了。
由于您得到的是均匀分布的蜡烛,并且您知道每次调用GetKlinesAsync会得到多少个蜡烛,因此您可以计算出所需的所有开始日期。
var pair = "TRXUSDT";
var interval = KlineInterval.FiveMinutes;
var startDate = new DateTime(2019, 1, 1);
var endDate = new DateTime(2019, 1, 2);
var gap = 5.0; // same as `interval` for purpose of computing start dates.
var limit = 100;
IObservable<DateTime> startDates =
Observable
.Generate(startDate, x => x <= endDate, x => x.AddMinutes(gap * limit), x => x);现在很容易生成您的查询:
IObservable<OHLCV> query =
from s in startDates
from rs in
Observable
.Using(
() => new BinanceClient(),
bc => Observable.FromAsync(ct =>
bc.Spot.Market.GetKlinesAsync(pair, interval, s, endDate, limit, ct)))
where rs.Success
from c in rs.Data.Select(x => x.ToCandle())
select c;因为这是并行查询,所以您可能会得到无序的结果,因此您需要对查询执行一个.ToArray(),以便能够处理最后生成的所有数据,而不是在每次出现时。
IDisposable subscription =
query
.ToArray()
.Select(xs => xs.OrderBy(x => x.Timestamp).ToArray())
.Subscribe(cs =>
{
/* candles downloaded using multiple threads */
/* and sorted in `Timestamp` order */
});这会使用多个线程按顺序生成所有的蜡烛,而不会有任何重复。
如果您希望它作为DownLoadAsync方法,那么您可以这样做:
public async Task<List<OHLCV>> DownloadAsync(string pair, KlineInterval interval, double gap, DateTime startDate, DateTime endDate, int limit)
{
IObservable<DateTime> startDates =
Observable
.Generate(startDate, x => x <= endDate, x => x.AddMinutes(gap * limit), x => x);
IObservable<OHLCV> query =
from s in startDates
from rs in
Observable
.Using(
() => new BinanceClient(),
bc => Observable.FromAsync(ct =>
bc.Spot.Market.GetKlinesAsync(pair, interval, s, endDate, limit, ct)))
where rs.Success
from c in rs.Data.Select(x => x.ToCandle())
select c;
return await query.ToArray().Select(xs => xs.OrderBy(x => x.Timestamp).ToList());
}请注意,签名略有更改。
发布于 2021-02-04 19:00:15
并行发出更多web请求的关键是创建多个任务,并使用Task.WhenAll()等待所有任务,而不是在循环中等待每个任务。
如果你在一个循环中等待每一个请求,它们将被顺序处理(尽管在发出web请求时UI线程不会被阻塞)。
https://stackoverflow.com/questions/66043361
复制相似问题