首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Azure函数工作人员隔离进程.NET 6-使用servicebus和servicebus触发器工作

Azure函数工作人员隔离进程.NET 6-使用servicebus和servicebus触发器工作
EN

Stack Overflow用户
提问于 2022-06-28 07:05:03
回答 1查看 749关注 0票数 0

我正在使用Azure函数隔离进程.net 6。

我有一个问题要处理服务总线和服务总线触发器。

如果在过程中,我会像这样声明

代码语言:javascript
复制
public async Task<ActionResult> Run([HttpTrigger(AuthorizationLevel.Function, "post", Route = null)]HttpRequest req, ILogger log, ExecutionContext context,
    [ServiceBus("mail-sender", Connection = "ServiceBusConnection")] IAsyncCollector<dynamic> outgoingMessage)
{
    ... outgoingMessage.Send(mymessage);...
}

然后,我将有另一个服务总线触发器azure函数来处理这样的消息

代码语言:javascript
复制
   public void Run([ServiceBusTrigger("mail-sender", Connection = "ServiceBusConnection")]string myQueueItem, ILogger log)
        {
            try
            {
                var mailHost = Environment.GetEnvironmentVariable("MAIL_HOST") ?? "smtp.sendgrid.net";
                var mailPort = Convert.ToInt32(Environment.GetEnvironmentVariable("MAIL_PORT") ?? "587");
                var mailUsername = Environment.GetEnvironmentVariable("MAIL_USERNAME") ?? "apikey";
                var mailPassword = Environment.GetEnvironmentVariable("MAIL_PASSWORD") ?? "8755faf7-78c9-4389-b3a5-f1578953bc00";
                var ssl = Convert.ToBoolean(Environment.GetEnvironmentVariable("MAIL_SSL") ?? "false");

                using (var mailHelpers = new MailHelpers(mailHost, mailPort, mailUsername, mailPassword, ssl))
                {
                    var mail = JsonConvert.DeserializeObject<MailViewModel>(myQueueItem);
                    mailHelpers.Send(mail);
                }
            }
            catch (Exception ex)
            {
                log.LogError(ex, "Error during sending email.");
            }
        }

如何在蔚蓝功能隔离过程中实现这一点?

请帮助我给出详细的例子和包的依赖关系,如果有的话。

非常感谢

===========================

顺便说一下,我已经声明了TimerTrigger azure函数,它使用https://www.nuget.org/packages/Microsoft.Azure.Functions.Worker.Extensions.Timer它可以触发任务运行,但我不能调试它?我不知道为什么?

代码语言:javascript
复制
        public async Task<DispatchedMessages> Run([TimerTrigger("* * * * * *")] MyInfo myTimer)
        {
            try
            {...}
        }
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-07-04 08:11:16

我理解你的问题,流程看起来就像这样:

  1. post到Http触发器
  2. HttpTrigger将在某些服务总线队列中对消息进行排队(有关如何向队列发送消息的详细信息,请参阅文献资料),下面是如何向服务总线队列发送消息:
代码语言:javascript
复制
using Azure.Messaging.ServiceBus;

public class ServiceBusAdapter
    {
        private readonly ServiceBusClient _client;

        public ServiceBusAdapter(ServiceBusClient client)
        {
            _client = client;
        }

        public async Task SendMessage(string queueName, BinaryData body, string? messageId = null)
        {
            ServiceBusMessage sbMessage = CreateServiceBusMessage(body, messageId);
            await using ServiceBusSender sender = _client.CreateSender(queueName);

            try
            {
                await sender.SendMessageAsync(CreateServiceBusMessage(body, messageId));
            }
            catch (ServiceBusException e) when (e.IsTransient)
            {
                throw new SomeCustomException(e.Message, e);
            }
        }

        public async Task SendMessages(string queueName, IEnumerable<Message> messages)
        {
            Queue<ServiceBusMessage> sbMessages = new(messages.Select(m => CreateServiceBusMessage(m.Body, m.MessageId)));
            await using ServiceBusSender sender = _client.CreateSender(queueName);

            // While all messages are not sent to the Service Bus queue
            while (sbMessages.Count > 0)
            {
                // Start a new batch
                using ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync();

                // Add the first message to the batch
                if (messageBatch.TryAddMessage(sbMessages.Peek()))
                {
                    // Dequeue the message from the .NET queue once the message is added to the batch
                    sbMessages.Dequeue();
                }
                else
                {
                    // If the first message can't fit, then it is too large for the batch.
                    // Try to send it anyway so that we get a proper service bus exception.
                    await sender.SendMessageAsync(sbMessages.Dequeue());
                    continue;
                }

                // Add as many messages as possible to the current batch
                while (sbMessages.Count > 0 && messageBatch.TryAddMessage(sbMessages.Peek()))
                {
                    // Dequeue the message from the .NET queue as it has been added to the batch
                    sbMessages.Dequeue();
                }

                try
                {
                    // Now, send the batch
                    await sender.SendMessagesAsync(messageBatch);
                }
                catch (ServiceBusException e) when (e.IsTransient)
                {
                    throw new SomeCustomException(e.Message, e);
                }

                // If there are any remaining messages in the .NET queue, the while loop repeats
            }
        }

        private static ServiceBusMessage CreateServiceBusMessage(BinaryData body, string? messageId)
        {
            var message = new ServiceBusMessage(body);

            if (messageId is not null)
            {
                // Service Bus (Standard and Premium SKU but not Basic) finds
                // duplicates by this MessageId
                message.MessageId = messageId;
            }

            return message;
        }
    }
  1. 消息排队后,服务总线将通知配置的ServiceBusTrigger并侦听该队列。

下面是一个ServiceBusTrigger在.net 6隔离函数中的示例:

代码语言:javascript
复制
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;

namespace Functions.Sample
{
    public class SampleServiceBusTrigger
    {
        private readonly ILogger _logger;
        private const string QueueName = "My.Sample.Queue.Name";

        private readonly ISomeHandler _someHandler;

        public SampleServiceBusTrigger(
            ILoggerFactory loggerFactory,
            ISomeHandler someHandler)
        {
            _logger = loggerFactory.CreateLogger<SampleServiceBusTrigger>();
            _someHandler = someHandler;
        }

        [Function(nameof(SampleServiceBusTrigger))]
        public async Task RunAsync(
            [ServiceBusTrigger(QueueName)] MyObject outMessage)
        {
            _logger.LogInformation("Triggered...");
            await _someHandler.Execute(outMessage);
            _logger.LogInformation("Completed.");
        }
    }
}

为了本地开发,您还需要将这些信任添加到您的local.settings.json中:

代码语言:javascript
复制
"FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated",
"AzureWebJobsServiceBus": "<YOUR SB CONNECTION STRING>",
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72782038

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档