首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >OpenWhisk发送消息给卡夫卡超时

OpenWhisk发送消息给卡夫卡超时
EN

Stack Overflow用户
提问于 2021-04-27 09:23:25
回答 1查看 104关注 0票数 1

环境细节

CentOS7 7、独立OpenWhisk

问题描述

我计划在openwhisk中向Kafka发送一条消息,数据流过程是:WSK -> OpenWhisk action -> kafka-console-

但在此过程中会出现间歇性的故障,如:我发送"test01"~"test06",只得到“test02”、“test04”“、”“test06”。

根据日志,故障的原因是超时.

这是我的动作剧本:

代码语言:javascript
复制
// js
const kafka = require('kafka-node');

const Producer = kafka.Producer;
const client = new kafka.KafkaClient({
    kafkaHost: "192.168.68.132:3093"
});

const producer = new Producer(client);

function main(actionObj) {
    var message = JSON.stringify(actionObj);
    const payloads = [
        {
            topic: 'beforeReductionTopic',
            messages: message,
            partition: 0
        }
    ];
    return new Promise(function (resolve, reject) {
        producer
        .on('ready', () => {
            producer.send(payloads, function (err, data) {
                if (err) {
                    console.log("++++++ producer err: ", err);
                    return reject(err);
                } else {
                    console.log("++++++ producer data: ", data);
                    return resolve(data);
                }
            });
        })
        .on("error", err => {
            console.error(err);
            return reject(err);
        });
    });
}

exports.main = main;

根据日志,它是"action“

我使用下面的代码来测试动作脚本,kafka可以接收所有的消息。

代码语言:javascript
复制
const action = require("./openwhisk-kafka.js");
for(let i = 0; i < 3; i++) {
    action.main("ccc");
}

我不知道如何修改动作脚本。

日志细节

用WSK CLI实现调用openwhisk动作的操作记录

代码语言:javascript
复制
[root@localhost openwhisk-kafka]# wsk action update scicatTest --kind nodejs:10 /test/actions/openwhisk-kafka/openwhisk-kafka.zip 
ok: updated action scicatTest
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test01"
ok: invoked /_/scicatTest, but the request has not yet finished, with id 78e633791c114d29a633791c11fd29c2
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test02"
{
    "beforeReductionTopic": {
        "0": 57
    }
}
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test03"
ok: invoked /_/scicatTest, but the request has not yet finished, with id 09165a45a0f94011965a45a0f920110b
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test04"
{
    "beforeReductionTopic": {
        "0": 58
    }
}
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test05"
ok: invoked /_/scicatTest, but the request has not yet finished, with id 8a9f38f9d5ab4fa49f38f9d5ab6fa45a
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test06"
{
    "beforeReductionTopic": {
        "0": 59
    }
}
[root@localhost openwhisk-kafka]# 

失败操作日志

代码语言:javascript
复制
[root@localhost openwhisk-kafka]# wsk activation get 8a9f38f9d5ab4fa49f38f9d5ab6fa45a
ok: got activation 8a9f38f9d5ab4fa49f38f9d5ab6fa45a
{
    "namespace": "guest",
    "name": "scicatTest",
    "version": "0.0.4",
    "subject": "guest",
    "activationId": "8a9f38f9d5ab4fa49f38f9d5ab6fa45a",
    "start": 1619540684355,
    "end": 1619540744418,
    "duration": 60063,
    "statusCode": 0,
    "response": {
        "status": "action developer error",
        "statusCode": 0,
        "success": false,
        "result": {
            "error": "The action exceeded its time limits of 60000 milliseconds."
        }
    },
    "logs": [],
    "annotations": [
        {
            "key": "path",
            "value": "guest/scicatTest"
        },
        {
            "key": "waitTime",
            "value": 652
        },
        {
            "key": "kind",
            "value": "nodejs:10"
        },
        {
            "key": "timeout",
            "value": true
        },
        {
            "key": "limits",
            "value": {
                "concurrency": 1,
                "logs": 10,
                "memory": 256,
                "timeout": 60000
            }
        }
    ],
    "publish": false
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-05-02 12:04:19

不要使用“卡夫卡节点”。用"kafkajs“代替

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67280339

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档