In queue-triggered Azure Webjobs can an Azure Storage Queue message be modified after webjob function failure but before poisoning?
Clash Royale CLAN TAG#URR8PPP
In queue-triggered Azure Webjobs can an Azure Storage Queue message be modified after webjob function failure but before poisoning?
I've got queue-triggered functions in my Azure webjobs. Normal behavior of course is when the function fails MaxDequeueCount
times the message is put into the appropriate poison queue. I would like to modify the message after the error but before poison queue insertion. Example:
MaxDequeueCount
Initial message:
"Name":"Tom", "Age", 30"
"Name":"Tom", "Age", 30"
And upon failure I want to modify the message as follows and have the modified message be inserted into the poison queue:
"Name":"Tom", "Age", 30", "ErrorMessage":"Unable to find user"
"Name":"Tom", "Age", 30", "ErrorMessage":"Unable to find user"
Can this be done?
2 Answers
2
According to the Webjobs documentation, messages will get put on the poison queue after 5 failed attempts to process the message:
The SDK will call a function up to 5 times to process a queue message.
If the fifth try fails, the message is moved to a poison queue. The
maximum number of retries is configurable.
Source: https://github.com/Azure/azure-webjobs-sdk/wiki/Queues#poison
This is the automatic behavior. But you can still handle exceptions in your WebJobs Function code (so the exception doesn't leave your function and automatic poison message handling is not triggered) and put a modified message to the poison queue using output bindings.
Another option would be to check the dequeueCount property which indicates how many times the message was tried to be processed.
You can get the number of times a message has been picked up for
processing by adding an int parameter named dequeueCount to your
function. You can then check the dequeue count in function code and
perform your own poison message handling when the number exceeds a
threshold, as shown in the following example.
public static void CopyBlob(
[QueueTrigger("copyblobqueue")] string blobName, int dequeueCount,
[Blob("textblobs/queueTrigger", FileAccess.Read)] Stream blobInput,
[Blob("textblobs/queueTrigger-new", FileAccess.Write)] Stream blobOutput,
TextWriter logger)
if (dequeueCount > 3)
logger.WriteLine("Failed to copy blob, name=" + blobName);
else
blobInput.CopyTo(blobOutput, 4096);
(also taken from above link).
Your function signature could look like this
public static void ProcessQueueMessage(
[QueueTrigger("myqueue")] CloudQueueMessage message,
[Queue("myqueue-poison")] CloudQueueMessage poisonMessage,
TextWriter logger)
The example shows Queue Triggers, but the output goes to Blob. I've added another example Funtion signature using output to a poison queue (just for information, not tested). Which attributes of the message are you interested in? Also automatic fault handling will result in a new message on the poison-queue. But you could extract the attributes you are interested in and put it in the payload of the message going to the poison queue.
– Markus S.
Aug 6 at 7:10
I put the example of which attributes of the message I wanted to add/edit in my question. My problem isn't determining how many times the message has been dequeued or determining when it is ready to go into the poison queue; my problem is I want to edit the message before it goes into the queue. Please see Ivan's answer -
SetMessageContent
is it.– Howiecamp
Aug 6 at 16:12
SetMessageContent
I don't see the benefit you get of editing the original message and then relying on the default-poison-handling vs. putting the faulty message on the toxic queue yourself. You obviously don't need metadata of the message but only need to modify the payload/content of the message. So in this case you would not need CloudQueueMessage as well in the output and putting it on the poison-queue in your code would be a much cleaner / explicit approach to error handling in my opinion.
– Markus S.
Aug 8 at 13:22
The default maximum retry time is 5. you also can set this value by yourself using the property Queues.MaxDequeueCount
of the JobHostConfiguration()
instance, code like below:
Queues.MaxDequeueCount
JobHostConfiguration()
static void Main(string args)
var config = new JobHostConfiguration();
config.Queues.MaxDequeueCount = 5; // set the maximum retry time
var host = new JobHost(config);
host.RunAndBlock();
Then you can update the failed queue message when the maximum retry time have reached. You can specify a non-existing Blob container to enforce the retry mechanism. Code like below:
public static void ProcessQueueMessage([QueueTrigger("queue")] CloudQueueMessage message, [Blob("container/queueTrigger", FileAccess.Read)] Stream myBlob, ILogger logger)
string yourUpdatedString = "ErrorMessage" + ":" + "Unable to find user";
string str1 = message.AsString;
if (message.DequeueCount == 5) // here, the maximum retry time is set to 5
message.SetMessageContent(str1.Replace("", "," + yourUpdatedString + "")); // modify the failed message here
}
logger.LogInformation($"Blob name:message n Size: myBlob.Length bytes");
}
When the above is done, you can see the updated queue message in the queue-poison.
UPDATED:
Since CloudQueueMessage is a sealed class, we cannot inherit it.
For your MySpecialPoco message, you can use JsonConvert.SerializeObject(message), code like below:
using Newtonsoft.Json;
static int number = 0;
public static void ProcessQueueMessage([QueueTrigger("queue")] object message, [Blob("container/queueTrigger", FileAccess.Read)] Stream myBlob, ILogger logger)
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(CloudConfigurationManager.GetSetting("StorageConnectionString"));
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
CloudQueue queue = queueClient.GetQueueReference("queue-poison");// get the poison queue
CloudQueueMessage msg1 = new CloudQueueMessage(JsonConvert.SerializeObject(message));
number++;
string yourUpdatedString = ""ErrorMessage"" + ":" + ""Unable to find user"";
string str1 = msg1.AsString;
if (number == 5)
msg1.SetMessageContent(str1.Replace("", "," + yourUpdatedString + ""));
queue.AddMessage(msg1);
number = 0;
}
logger.LogInformation($"Blob name:message n Size: myBlob.Length bytes");
}
But the bad thing is that, both the original / updated queue messages are written into poison queue.
SetMessageContent
is what I was looking for. Thank you for pointing it out. Only downside is the SetMessageContent
is a method off CloudQueueMessage
, and in my function signature I have MySpecialPoco message
rather than CloudQueueMessage message
so I'll have to somehow convert. ps The blob stuff in your answer isn't directly related to the question. The SetMessageContent
really is it.– Howiecamp
Aug 6 at 16:10
SetMessageContent
SetMessageContent
CloudQueueMessage
MySpecialPoco message
CloudQueueMessage message
SetMessageContent
If I did want my function signature to be
MySpecialPoco
message
how would I accomplish setting the message content?– Howiecamp
Aug 6 at 16:18
MySpecialPoco
message
I have updated the post, please refer to the UPDATED section. You can use JsonConvert.SerializeObject().
– Ivan Yang
Aug 7 at 10:58
By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.
Thanks for your answer. Your example above shows blob-triggers not queue triggers but I see what you're getting at. However, my question is how can I edit the message before poisoning it. I thought if I use output bindings, or rather just manually put the message in the poison queue for that matter, that I lose the dequeue count and other attributes of the message, because it's effectively a new message now.
– Howiecamp
Aug 6 at 0:15