我有一个函数,它构建多个发布服务器,并在一个带有MergeMany的发布服务器中返回它们。问题是,一些用户可能在这个发布服务器中有很多端点,同时频繁地访问所有这些端点会导致服务器超时。是否有一种方法来限制并发网络请求(如DispatchSemaphore)?
let mergedPubs = Publishers.MergeMany(urlRequests.map { dataTaskPublisher(for: $0)
.decode(type: RawJSON.self, decoder: JSONDecoder())
.mapError { _ in
return URLError(URLError.Code.badServerResponse)
}
})
.collect()
.eraseToAnyPublisher()发布于 2022-06-29 14:57:02
这方面没有现成的解决方案,但是我们可以在现有的Publishers.MergeMany和Publishers.Concatenate的基础上构建它。
这样做的目的是:
Int数组[1, 2, 3, 4, 5, 6]和maxConcurrent = 3,我们将拥有[[1, 2, 3], [4, 5, 6]],因此请求1、2、3将并行执行,但4、5、6只在前一个块完成时才会启动。Publishers.MergeMany。因此,我们将让[Publisher([1, 2, 3]), Publisher([4, 5, 6])]为了实现这一点,我们需要从本质上实现Publishers.ConcatenateMany,利用只需要2个输入流的Publishers.Concatenate。如果您想遵循组合风格,这应该在一个全新的结构中实现,但我现在是在静态功能中实现的。
extension Publishers {
static func concatenateMany<Output, Failure>(_ publishers: [AnyPublisher<Output, Failure>]) -> AnyPublisher<Output, Failure> {
return publishers.reduce(Empty().eraseToAnyPublisher()) { acc, elem in
Publishers.Concatenate(prefix: acc, suffix: elem).eraseToAnyPublisher()
}
}用于块数组的实用程序:
extension Array {
func chunked(into size: Int) -> [[Element]] {
return stride(from: 0, to: count, by: size).map {
Array(self[$0 ..< Swift.min($0 + size, count)])
}
}
}现在我们可以实现一个新版本的MergeMany,它也带有一个maxConcurrent参数。
extension Publishers {
static func mergeMany<Output, Failure>(maxConcurrent: Int, _ publishers: [AnyPublisher<Output, Failure>]) -> AnyPublisher<Output, Failure> {
return Publishers.concatenateMany(
publishers.chunked(into: maxConcurrent)
.map {
Publishers.MergeMany($0)
.eraseToAnyPublisher()
}
)
}
}最后,您的代码将如下所示:
let mergedPubs = Publishers.mergeMany(maxConcurrent: 3, requests)
.collect()
.eraseToAnyPublisher()这只是一个想法,也许还有其他方法来达到同样的结果!
https://stackoverflow.com/questions/72792881
复制相似问题