cancel
Showing results for 
Search instead for 
Did you mean: 

How to reject messages from Event Mesh in Node.js CAP application

carlonnheim
Participant
0 Kudos

Hi,

I am trying to get my CAP application working with Event Mesh and Dead Message Queues, but I cannot find the appropriate way to reject an inflight message so it gets back onto the queue and eventually moves to the DMQ.

I have this in my cds config:

    "cds": {
        "requires": {
            "messaging": {
                "kind": "enterprise-messaging-shared",
                "queue": {
                    "name": "default/my/queue",
                    "maxDeliveredUnackedMsgsPerFlow": 5,
                    "maxRedeliveryCount": 10,
                    "deadMsgQueue": "default/my/queue/dmq",
                    "maxTtl": 0,
                    "respectTtl": true
                }
            },
...

and sample code like this:

const cds = require('@sap/cds');
module.exports = async srv => {
    const messaging = await cds.connect.to("messaging");
    messaging.on("default/my/topic", (msg) => {
        return new Promise((resolve, reject) => {
            setTimeout(() => {
                reject();
            }, 1000);
        })
    });
}

I.e it returns a promise which gets rejected after a second. I was expecting that the failing the message would cause the error to propagate back to the queue, get retried until the TTL or Max Count expires and then move to the DMQ. But the actual result is that the message simply gets consumed.

If I throw an error from within the handler instead, the message fails and gets retried indefinately.

const cds = require('@sap/cds');
module.exports = async srv => {
    const messaging = await cds.connect.to("messaging");
    messaging.on("default/my/topic", (msg) => {
        throw new Error("!");
    });
}

Can somebody shed some light on how this should be implemented?

Thanks!

//Carl

carlonnheim
Participant
0 Kudos

Hi Again,

So far we have found that the rejection of a message requires a value. I.e. this works to properly put the message back to the queue:

const cds = require('@sap/cds');
module.exports = async srv => {
    const messaging = await cds.connect.to("messaging");
    messaging.on("default/my/topic", (msg) => {
        return new Promise((resolve, reject) => {
            setTimeout(() => {
                reject("Some Reason"); // <------------- a reason here makes it reject properly
            }, 1000);
        })
    });
}

But with that sorted, we still have a couple of issues:

  1. The "Max Redelivery Count" is not taking effect. If we have this set to 3 for example and TTL is zero, the retries keep going indefinitely. If we set the TTL to 10 seconds it stops after 10 seconds.
  2. Failed messages are being put back to the front of the queue. The same message gets tried repeatedly until the TTL expires and then moves to the DMQ. But at this time, all other messages (which entered the queue later) will also have their TTL expired. This causes congestion and once the TTL expires for the first message it has expired for the ones after it as well - the full queue goes to the DMQ.

Is this something wrong with the Event Mesh, with CAP or are we simply using it wrong?

Thanks in advance!

//Carl

david_kunz2
Advisor
Advisor
0 Kudos

Hi tobias.griebe ,

Can you comment on that?
The stakeholder is using the AMQP protocol.

Thanks and best regards,
David

david_kunz2
Advisor
Advisor
0 Kudos

Hi tobias.griebe ,

That's also how CAP is doing it. The queue configuration is just propagated to Event Mesh.
Are there differences between SAP Event Mesh plans?

onnheimc , which plan of SAP Event Mesh are you using?

david_kunz2
Advisor
Advisor
0 Kudos

In addition to that, make sure you use the newest version of @sap/cds: 5.8.3.

Here, if a handler rejects, we will execute

message.failed(new Error('processing failed'))

as written by tobias.griebe .

Accepted Solutions (1)

Accepted Solutions (1)

Tobias_Griebe
Advisor
Advisor
0 Kudos

Hi onnheimc and david.kunz2,

I just did a quick test with the plain node.js AMQP lib. I just slightly changed the included consumer example:

This is working as expected. I get the message a total of 3 times, as I have configured 2 redeliveries on my queue settings.

Not sure how this translates to the CAP implementation.

carlonnheim
Participant
0 Kudos

Hi tobias.griebe , david.kunz2 ,

This turned out to be caused by the way failed messages were rejected by CAP into the Event Mesh. Maybe that is what David meant in the last comment above. It used to be returned with a "failed" but was corrected to be returned with a "rejected" in the february release of CAP I think.

So this works fine now. I still have some issues with processing the DMQ however. In particular I am struggling to find the right strategy to handle the dead messages once the root cause of the error has been resolved. Our scenario is such that all messages must be processed (or actively discarded). It does not need to happen in order. So when a message fails, we move it to the dmq and operators monitor that, solve the root case (usually bad master data) and then needs to reprocess the messages. I am not finding a smooth way of doing this however. We sometimes get surges of failures (hundreds of thousands with single root causes).

Currently we are using a batch job which I have placed inside the CAP application to move the messages back. We have the job scheduler call on it, and then it consumes from the dmq, posts back to the original topic and finally confirms the consumption - using the REST protocol. It works, but has a number of flaws. The approach seems wrong but I do not see good alternatives. In the end I want CAP's "on" handler to run, but I cannot see a way to consume the DMQ straight into it. I also do not see a way to shovel blocks of DMQ messages back onto the original queue.

Would love to hear your thoughts on how to approach that scenario. It is a bit off topic for this question though, maybe this one instead.

Thanks!

//Carl

david_kunz2
Advisor
Advisor
0 Kudos

Hi onnheimc ,

Exactly, there ware some issues in the underlying message infrastructure which prevented the counter to work when failed() was called without an error object, that has been changed in CAP release.

Regarding consumption of messages in the DMQ, this is unfortunately not supported in CAP at the moment.

In principle, you could also use the AMQP library directly: https://www.npmjs.com/package/@sap/xb-msg-amqp-v100


Best regards,
David

former_member695224
Discoverer
0 Kudos

Hello,

Do we have option to handle error code at the time of message.failed() ?

Regards,

Niranjan

Answers (0)