首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >AnyEvent::STOMP::Client + AnyEvent::ForkManger =间歇错误

AnyEvent::STOMP::Client + AnyEvent::ForkManger =间歇错误
EN

Stack Overflow用户
提问于 2016-09-05 21:03:31
回答 1查看 93关注 0票数 2

我正在尝试编写一个进程,它侦听ActiveMQ并基于消息,从then服务中取出数据,进行一些处理,然后将进程数据放到另一个then服务中。(REST/JSON)

下面的模块可以正常工作,直到我与之交谈的一个摇摇欲坠的The服务返回一个错误。我已经尝试了很多方法来捕捉错误,但都没有结果。但是,一旦发生webservice错误,我会收到以下消息:

未处理的事件回调异常(MESSAGE,AnyEvent::STOMP::Client=HASH(0x3ad5e48),散列(0x3a6bbb0) {"action":"created","data":{"id":40578737,“type”:“警告”,"who":null},"guid":"ADCCEE0C-73A7-11E6-8084-74B346D1CA67","hostname":"myserver","pid":48632}):$fork_ manager ->start()应该在管理进程中调用

好的,我从概念上理解子进程正在尝试启动另一个进程,而那个分叉管理器说的是“不”。但是,在下面的模块中,启动新进程以处理长时间运行的处理的正确方法是什么。或者为什么一个子进程会导致此异常,以及如何防止此异常。

这是模块(简化后)

代码语言:javascript
复制
package consumer;

use AnyEvent::ForkManager;
use AnyEvent::STOMP::Client;
use JSON;
use Data::Dumper;
use v5.18;
use Moose;

sub run {
    my $self    = shift;
    my $pm      = AnyEvent::ForkManager->new(max_workers => 20);
    my $stomp   = AnyEvent::STOMP::Client->new();

    $stomp->connect();
    $stomp->on_connected(sub {
        my $stomp   = shift;
        $stomp->subscribe('/topic/test');
        say "Connected to STOMP";
    });

    $pm->on_start(sub {
        my ($pm,$pid,@params) = @_;
        say "Starting $pid worker";
    });

    $pm->on_finish(sub {
        my ($pm, $pid,@params) = @_;
        say "Finished $pid worker";
    });

    $pm->on_error(sub {
        say Dumper(\@_);
    });

    $stomp->on_message(sub {
        my ($stomp, $header, $body) = @_;
        my $href    = decode_json $body;
        $pm->start(cb => sub {
            my ($pm, @params) = @_;
            $self->process(@params);
        },
        args    => [ $href->{id}, $href->{data}->{type}, $href->{data}->{who} ],
        );
    });

    my $cv = AnyEvent->condvar;
    $cv->recv;
}

sub process {
    say "Processing ".Dumper(\@_);
    sleep 5;
    if ( int(rand(10)) < 5 ) {
        die "OOPS";  # this triggers the error message above
    }
    say "Done Processing $_[1]";
}

1;

下面是上面模块的驱动程序:

代码语言:javascript
复制
#!/usr/bin/env perl

use v5.18;
use lib '.';
use consumer;

my $c   = consumer->new();
$c->run;

最后,您可以使用一个流量生成器来查看它的运行情况:

代码语言:javascript
复制
#!/usr/bin/env perl

use lib '../lib';
use lib '../../lib';
use v5.18;

use Data::Dumper;
use JSON;
use Net::STOMP::Client;

$ENV{'scot_mode'} = "testing";

my $stomp = Net::STOMP::Client->new(
    host    => "127.0.0.1",
    port    => 61613
);
$stomp->connect();

for (my $i = 1; $i < 1000000; $i++) {
    my $href    = {
        id      => $i,
        type    => "event",
        what    => "foo",
    };
    my $json    = encode_json $href;
    say "Sending ".Dumper($href);
    $stomp->send(
        destination => "/topic/test",
        body        => $json,
    );
}

$stomp->disconnect();
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-03-01 23:29:20

我能够通过使用Try::Catch来解决这一问题,并将对self->process的调用包装为一个try catch,如下所示:

代码语言:javascript
复制
$stomp->on_message(sub {
        my ($stomp, $header, $body) = @_;
        my $href    = decode_json $body;
        $pm->start(cb => sub {
            my ($pm, @params) = @_;
            try {
                $self->process(@params);
            }
            catch {
                # error handling stuff
            };
        },
        args    => [ $href->{id}, $href->{data}->{type}, $href->{data}->{who} ],
        );
    }
);
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/39337677

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档