首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Amphp :运行多个具有相同连接的异步循环(事件存储客户端)

Amphp :运行多个具有相同连接的异步循环(事件存储客户端)
EN

Stack Overflow用户
提问于 2020-07-07 22:41:35
回答 2查看 656关注 0票数 0

我使用事件存储客户端,它使用amphp。我需要在我的应用程序内部重用连接的许多部分。

因此,我创建了一个连接提供程序:

代码语言:javascript
复制
public function getConnection(): EventStoreConnection
{
    if ($this->connection) {
        return $this->connection;
    }
    $this->connection = $this->createConnection();
    wait($this->connection->connectAsync());

    return $this->connection;
}

然后,我在许多地方使用这种连接:

代码语言:javascript
复制
\Amp\Loop::run(function () use ($eventStoreEvents, $streamName) {
    $connection = $this->connectionProvider->getConnection();

    // Creation of an event stream
    yield $connection->appendToStreamAsync($streamName, ExpectedVersion::ANY, $eventStoreEvents);
    // sleep(10); // This sleep does not work, code continue like nothing happend
});

\Amp\Loop::run(function () use ($streamName, $aggregateFqcn, &$aggregateRoot) {

    $start = 0;
    $count = \Prooph\EventStore\Internal\Consts::MAX_READ_SIZE;

    $connection = $this->connectionProvider->getConnection();

    do {
        $events = [];
        /** @var StreamEventsSlice $streamEventsSlice */
        $streamEventsSlice = yield $connection
            ->readStreamEventsForwardAsync(
                $streamName,
                $start,
                $count,
                true
            );

        if (!$streamEventsSlice->status()->equals(SliceReadStatus::success())) {
            dump($streamEventsSlice); // Event stream does not exist
            // Error here: the event stream doesn't exist at this point.
            throw new RuntimeGangxception('Impossible to generate the aggregate');
        }
    } while (! $streamEventsSlice->isEndOfStream());
});

问题:,第一个请求似乎还没有结束,但是第二个循环已经启动了。没有评论的睡眠没有任何影响!

但是事件流最终是与内部的相关事件一起创建的,因此第一个请求可以工作。

如果我启动一个连接,然后关闭,然后启动一个新的连接,它可以工作。但这是缓慢的,因为每个新连接的握手开销。

我在Amphp的WebSocket库中尝试了一个类似的例子,它起了作用。你看到什么不对劲了吗?

下面是我使用websocket进行的测试:

代码语言:javascript
复制
$connection = \Amp\Promise\wait(connect('ws://localhost:8080'));
Amp\Loop::run(function () use ($connection) {
   /** @var Connection $connection */
   yield $connection->send("Hello...");
   sleep(10); // This sleep works!
});

Amp\Loop::run(function () use ($connection) {
   /** @var Connection $connection */
   yield $connection->send("... World !");
});

$connection->close();
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-08-14 15:27:53

Prooph事件存储库基于amphp,但并不遵循所有原则:您不能等待连接准备就绪。如果你试着在规模上使用它,那就更糟糕了,所以不要试图等待承诺完成。

作为另一种选择,您可以为以后设置一个承诺,并检查连接是否为空。实际上,这就是库内部处理进一步步骤的方法。

在我这一边,我决定停止使用这个图书馆。但作为一种替代,您可以使用使用HTTP客户端的库,它也来自prooph团队。

票数 0
EN

Stack Overflow用户

发布于 2020-07-09 02:34:23

你想做的事毫无意义。你应该读amphp文档化

Amp为事件循环使用一个全局访问器,因为每个应用程序只有一个事件循环。同时运行两个循环是没有意义的,因为它们只需要以忙碌的等待方式来安排彼此,才能正确运行。

也就是说,根本就没有第二个循环。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62784959

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档