首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用Azure函数通过Apache-Kafka扩展连接到EventHubs?

如何使用Azure函数通过Apache-Kafka扩展连接到EventHubs?
EN

Stack Overflow用户
提问于 2020-04-29 21:51:13
回答 1查看 522关注 0票数 1

是否可以使用SharedAccessKey连接到代理(EventHubs)?

我无法连接到我的Azure EventHubs。

我们使用SharedAccessKey而不是SSL来连接,我有这样的配置。

代码语言:javascript
复制
"EventBusConfig": {

    "BootstrapServers": "anyname.servicebus.windows.net:9093",

    "SecurityProtocol": "SaslSsl",

    "SaslMechanism": "Plain",

    "SaslUsername": "$ConnectionString",

    "SaslPassword": 
    "Endpoint=sb://anyname.servicebus.windows.net/;SharedAccessKeyName=anyname.;SharedAccessKey=CtDbJ/Kfjs749
    8s--anypassword--SkSk749/z2Z5Fr9///33/qQ+R6Cyg=",

    "SocketTimeoutMs": "60000",

    "SessionTimeoutMs": "30000",

    "GroupId": "NameOfTheGroup",

    "AutoOffsetReset": "Earliest",

    "BrokerVersionFallback": "1.0.0",

    "Debug": "cgrp"
}

但是我似乎需要认证路径( pem文件)。

我想生成一条简单的消息,如下所示

我正在使用https://github.com/Azure/azure-functions-kafka-extension,但我不知道这个测试库是否能处理SharedAccessKey。

我在尝试连接时遇到以下错误:

任何帮助我们都将不胜感激。

EN

回答 1

Stack Overflow用户

发布于 2020-04-30 07:11:01

我能够使用扩展"https://github.com/Azure/azure-functions-kafka-extension“来生成和使用消息。

使用消息很容易,因为"EventHubConnectionString“属性非常直观。

要生成消息,您需要配置CA证书,我认为我需要来自Azure的证书,但我错了,我只是遵循这些说明来使其工作。

下载并设置CA证书位置。正如Confluent文档中所述,.NET库不能访问根CA证书。缺少此步骤将导致您的函数引发错误"sasl_ssl://xyz-xyzxzy.westeurope.azure.confluent.cloud:9092/bootstrap: Failed to verify broker certificate: unable to get local issuer certificate (在135ms in state CONNECT之后)“

为了克服这个问题,我们需要:

为避免与项目中触发器文件的一部分的现有Kafka证书发生冲突,请设置"copy to extension.

  • Include directory"

  • Set
  • cacert.pem EventHubs“属性,以下载CA证书(即从证书文件复制到触发器以外的任何内容)。在本例中,我们设置为confluent_cloud_cacert.pem

这是我的生产者Azure函数,带有Kafka绑定

代码语言:javascript
复制
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Azure.WebJobs.Extensions.Kafka;

namespace EY.Disruptor.AzureFunctionsWithKafka
{
public static class Function
{
    [FunctionName("Producer")]
    public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest req,
            [Kafka("BootstrapServer",
            "topic.event",
            Username = "ConfluentCloudUsername",
            Password = "ConfluentCloudPassword",
            SslCaLocation = "confluent_cloud_cacert.pem",
            AuthenticationMode = BrokerAuthenticationMode.Plain,
            Protocol = BrokerProtocol.SaslSsl
        )] IAsyncCollector<KafkaEventData<string>> events,
            ILogger log)
    {
        log.LogInformation("C# HTTP trigger function processed a request.");

        string name = req.Query["name"];

        string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
        dynamic data = JsonConvert.DeserializeObject(requestBody);
        name ??= data?.name;

        string responseMessage = string.IsNullOrEmpty(name)
            ? "This HTTP triggered function executed successfully. Pass a name in the query string or in the request body for a personalized response."
            : $"Hello, {name}. This HTTP triggered function executed successfully.";

        var kafkaEvent = new KafkaEventData<string>()
        {
            Value = name
        };

        await events.AddAsync(kafkaEvent);

        return new OkObjectResult(responseMessage);
    }
}
}

这是我的带有Kafka绑定的使用Azure函数

代码语言:javascript
复制
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
using Microsoft.Extensions.Logging;

namespace EY.Disruptor.AzureFunctionsWithKafka
{
public static class Consumer
{
    [FunctionName("FunctionKafkaConsumer")]
    public static void Run(
        [KafkaTrigger("BootstrapServer",
        "topic.event",
        Username = "ConfluentCloudUsername",
        Password = "ConfluentCloudPassword",
        EventHubConnectionString = "ConfluentCloudPassword",
        AuthenticationMode = BrokerAuthenticationMode.Plain,
        Protocol = BrokerProtocol.SaslSsl,
        ConsumerGroup = "Group1")] KafkaEventData<string>[] kafkaEvents,
        ILogger logger)
    {
        foreach (var kafkaEvent in kafkaEvents)
        {
            logger.LogInformation(kafkaEvent.Value);
        }
    }
}
}

这是我的local.settings.json

代码语言:javascript
复制
{
  "IsEncrypted": false,
  "Values": {
     "AzureWebJobsStorage": "UseDevelopmentStorage=true",
     "FUNCTIONS_WORKER_RUNTIME": "dotnet",
     "BootstrapServer": "zyxabc.servicebus.windows.net:9093",
     "ConfluentCloudUsername": "$ConnectionString",
     "ConfluentCloudPassword": "Endpoint=sb://zyxabc.servicebus.windows.net/;SharedAccessKeyName=TestSvc;SharedAccessKey=YAr/="
   }
}

当然还有Startup.cs中的初始化

代码语言:javascript
复制
public void Configure(IWebJobsBuilder builder)
{
   builder.AddKafka();
}

我希望这个建议能帮助其他人:)

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61503427

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档