是否可以使用SharedAccessKey连接到代理(EventHubs)?
我无法连接到我的Azure EventHubs。
我们使用SharedAccessKey而不是SSL来连接,我有这样的配置。
"EventBusConfig": {
"BootstrapServers": "anyname.servicebus.windows.net:9093",
"SecurityProtocol": "SaslSsl",
"SaslMechanism": "Plain",
"SaslUsername": "$ConnectionString",
"SaslPassword":
"Endpoint=sb://anyname.servicebus.windows.net/;SharedAccessKeyName=anyname.;SharedAccessKey=CtDbJ/Kfjs749
8s--anypassword--SkSk749/z2Z5Fr9///33/qQ+R6Cyg=",
"SocketTimeoutMs": "60000",
"SessionTimeoutMs": "30000",
"GroupId": "NameOfTheGroup",
"AutoOffsetReset": "Earliest",
"BrokerVersionFallback": "1.0.0",
"Debug": "cgrp"
}但是我似乎需要认证路径( pem文件)。
我想生成一条简单的消息,如下所示

我正在使用https://github.com/Azure/azure-functions-kafka-extension,但我不知道这个测试库是否能处理SharedAccessKey。
我在尝试连接时遇到以下错误:

任何帮助我们都将不胜感激。
发布于 2020-04-30 07:11:01
我能够使用扩展"https://github.com/Azure/azure-functions-kafka-extension“来生成和使用消息。
使用消息很容易,因为"EventHubConnectionString“属性非常直观。
要生成消息,您需要配置CA证书,我认为我需要来自Azure的证书,但我错了,我只是遵循这些说明来使其工作。
下载并设置CA证书位置。正如Confluent文档中所述,.NET库不能访问根CA证书。缺少此步骤将导致您的函数引发错误"sasl_ssl://xyz-xyzxzy.westeurope.azure.confluent.cloud:9092/bootstrap: Failed to verify broker certificate: unable to get local issuer certificate (在135ms in state CONNECT之后)“
为了克服这个问题,我们需要:
为避免与项目中触发器文件的一部分的现有Kafka证书发生冲突,请设置"copy to extension.
这是我的生产者Azure函数,带有Kafka绑定
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
namespace EY.Disruptor.AzureFunctionsWithKafka
{
public static class Function
{
[FunctionName("Producer")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest req,
[Kafka("BootstrapServer",
"topic.event",
Username = "ConfluentCloudUsername",
Password = "ConfluentCloudPassword",
SslCaLocation = "confluent_cloud_cacert.pem",
AuthenticationMode = BrokerAuthenticationMode.Plain,
Protocol = BrokerProtocol.SaslSsl
)] IAsyncCollector<KafkaEventData<string>> events,
ILogger log)
{
log.LogInformation("C# HTTP trigger function processed a request.");
string name = req.Query["name"];
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
dynamic data = JsonConvert.DeserializeObject(requestBody);
name ??= data?.name;
string responseMessage = string.IsNullOrEmpty(name)
? "This HTTP triggered function executed successfully. Pass a name in the query string or in the request body for a personalized response."
: $"Hello, {name}. This HTTP triggered function executed successfully.";
var kafkaEvent = new KafkaEventData<string>()
{
Value = name
};
await events.AddAsync(kafkaEvent);
return new OkObjectResult(responseMessage);
}
}
}这是我的带有Kafka绑定的使用Azure函数
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
using Microsoft.Extensions.Logging;
namespace EY.Disruptor.AzureFunctionsWithKafka
{
public static class Consumer
{
[FunctionName("FunctionKafkaConsumer")]
public static void Run(
[KafkaTrigger("BootstrapServer",
"topic.event",
Username = "ConfluentCloudUsername",
Password = "ConfluentCloudPassword",
EventHubConnectionString = "ConfluentCloudPassword",
AuthenticationMode = BrokerAuthenticationMode.Plain,
Protocol = BrokerProtocol.SaslSsl,
ConsumerGroup = "Group1")] KafkaEventData<string>[] kafkaEvents,
ILogger logger)
{
foreach (var kafkaEvent in kafkaEvents)
{
logger.LogInformation(kafkaEvent.Value);
}
}
}
}这是我的local.settings.json
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"BootstrapServer": "zyxabc.servicebus.windows.net:9093",
"ConfluentCloudUsername": "$ConnectionString",
"ConfluentCloudPassword": "Endpoint=sb://zyxabc.servicebus.windows.net/;SharedAccessKeyName=TestSvc;SharedAccessKey=YAr/="
}
}当然还有Startup.cs中的初始化
public void Configure(IWebJobsBuilder builder)
{
builder.AddKafka();
}我希望这个建议能帮助其他人:)
https://stackoverflow.com/questions/61503427
复制相似问题