复制步骤
向队列中添加消息(大约500条消息)。添加一个队列触发函数应用程序,它将处理队列消息。
期望行为
队列中的所有消息都将被处理。如果在处理消息时抛出异常,则将消息发送到毒队列。
实际行为
消息开始被处理。经过一段时间(大约30-60秒),所有留在队列中的消息都会被移动到毒队列中。在该时刻处理的消息不会引发任何异常。
我们检查了队列消息生存期是默认消息:7天。我们尝试将WindowsAzure.Storage包降级为7.2.1,但没有成功。
我们使用的WindowsAzure.Storage版本是8.5.0。
函数应用程序会崩溃,这会导致所有消息被移动到毒队列中吗?
编辑
以下是将消息添加到队列中的方式:
public async Task AddMessage()
{
var queueModel = new QueueModel
{
// queue model properties
};
await AddMessageToQueueAsync(queueModel);
}
public async Task AddMessageToQueueAsync(T messageObject, TimeSpan? initialVisibilityDelay = null)
{
var queue = GetQueue();
var jsonMessage = JsonConvert.SerializeObject(messageObject);
var message = new CloudQueueMessage(jsonMessage);
await queue.AddMessageAsync(message, TimeSpan.FromDays(7), initialVisibilityDelay, new QueueRequestOptions(), new OperationContext());
}
private CloudQueue GetQueue()
{
var queueClient = _storageAccount.CreateCloudQueueClient();
var queue = queueClient.GetQueueReference(_queueName);
queue.CreateIfNotExists();
return queue;
}这是处理消息的函数应用程序:
[FunctionName("ProcessQueue")]
public static async Task Run([QueueTrigger("queue-name", Connection = "AzureWebJobsStorage")]string queueItem, TraceWriter log)
{
if (String.IsNullOrEmpty(queueItem))
{
return;
}
var queueModel = JsonConvert.DeserializeObject<QueueModel>(queueItem);
if (queueModel == null)
{
return;
}
try
{
// process the message
}
catch (Exception ex)
{
// message is moved to the poison-queue but no exception is thrown
log.Error(ex.Message, ex);
}
}发布于 2017-11-03 12:27:30
发现了问题。问题是一个剩余的函数应用程序,它监听同一个队列,并与当前的函数应用程序并行处理消息。因为两个函数应用程序都处理来自同一个队列的消息,因此在某个时候,队列中留下的消息都被移动到了毒队列中。(从队列读取消息时可能会出现一些并发问题)。
https://stackoverflow.com/questions/47080307
复制相似问题