首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用RateLimiter的ExecutionContext

使用RateLimiter的ExecutionContext
EN

Stack Overflow用户
提问于 2021-05-12 19:10:59
回答 2查看 77关注 0票数 4

假设我有一个HTTP客户端来调用具有请求速率限制的服务器,例如1000个请求/秒。我用ExecutionContext实现了一个速率限制器,如下所示:

已创建RateLimiter为Guava的有界阻塞队列

代码语言:javascript
复制
class MyBlockingQueue[A](capacity: Int, permitsPerSecond: Int) 
  extends ArrayBlockingQueue[A](capacity) {

  private val rateLimiter = RateLimiter.create(permitsPerSecond.toDouble)

  override def take(): A = {
    rateLimiter.acquire()
    super.take()
  }

  override def poll(timeout: Long, unit: TimeUnit): A = {
    rateLimiter.tryAcquire(timeout, unit) // todo: fix it
    super.poll(timeout, unit)
  }
}

已使用此队列从ThreadPoolExecutor创建ExecutionContext

代码语言:javascript
复制
def createRateLimitingExecutionContext(numThreads: Int,
                                       capacity: Int,
                                       permitsPerSecond: Int): ExecutionContext = {
  val queue = new MyBlockingQueue[Runnable](capacity, permitsPerSecond)
  val executor = new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, queue)
  ExecutionContext.fromExecutor(executor)
}

现在我可以创建一个有速率限制的ExecutionContext并将其传递给客户端:

代码语言:javascript
复制
implicit val ec = createRateLimitingThreadPoolExecutionContext(
  numThreads = 100,
  capacity = 1000,
  permitsPerSecond = 1000
)
httpGet("http://myserver.com/xyz") // create Futures with "ec"  

这有意义吗?您将如何测试此ExecutionContext

EN

回答 2

Stack Overflow用户

发布于 2021-05-12 21:00:20

这似乎还可以,除了自定义执行上下文应该是显式的和/或在httpGet或它的封装类中管理的,而不是全局隐式的。

因为不然的话,当你像这样写东西的时候:

代码语言:javascript
复制
    httpGet(foo)
     .recover("")
     .map(_.split(","))
     .map(_.map(_.toInt))
     .map(_.max)
     .foreach(println)

你最终消耗了6(!)许可,而不是一个-也就是说,如果你提出了6个请求,这可能不是你想要的。

票数 3
EN

Stack Overflow用户

发布于 2021-05-12 20:30:14

为了测试这个自定义的ExecutionContext,您应该能够创建一个行为类似于以下内容的测试:

  • 调度触发一堆Future,持续几秒钟
  • 每个Future修改一个原子值(如计数器),您可以在该值上断言并检查原子值:假设您允许10个未来/秒,然后检查每一秒您的计数器应该小于或等于之前的值+ 10

代码语言:javascript
复制
// Pseudo-code

implicit val ec: ExecutionContext = ??? // your custom ExecutionContext allowing only 10 futures/second

val counter = new AtomicInteger()

// Fire some Futures
val start = Instant.now
val futures = (1 to 100).map(_ => Future { counter.getAndIncrement() }) )

// Check every second
(0 to 10).foreach { i =>
  counter.get() shouldBe between (i-1)*10 and (i+1)*10
  Thread.sleep(1000)
}

// Final check
Await.result(Future.sequence(futures))
val end = Instant.now
(end - start) shouldBe > 10s

这只是一个粗略的基本想法,你可以适应不同的场景。

也许计数器太基础了,你需要更细粒度的断言。此外,Future几乎可以立即完成,您还可以模拟持续时间更长的操作。

请记住,对于对时间敏感的操作,您可能无法对特定值进行断言,而是对一些具有可接受误差的值进行断言。

最后,您还可以依赖于经过广泛测试的Guava RateLimiter。因此,您可能希望将其视为测试的边界,并且只测试与它的不同交互,而不是所有可能的计时场景。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67502492

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档