cancel
Showing results for 
Search instead for 
Did you mean: 

Getting security session expired while connecting to EMS

Hello All,

I'm using SAP event mesh(EMS) for communication between events and sap JMS library (com.sap.cloud.servicesdk.xbem) for listening to messages from queue and I was using version 2.0.1. But after few days of inactivity I was getting this error:

javax.jms.IllegalStateException : The Session was closed due to an unrecoverable error

But to solve this issue, recently I have upgraded my lib from 2.0.1 to 2.2.5. In addition to this, I have added 1 exception listener in connection. But I don't have any idea how to reconnect in case of any error/exception and also after 1-2 days of inactivity the system goes to idle state and our application is no more able to listen to the EMS queues and because of that the data remain in the queue. And after library upgradation, the error has changed to
Connection ID has failed due to: security session expired (220005) [condition = amqp:unauthorized-access]"

It started working as expected after restarting our application. So, Can anyone please suggest anything to resolve this error?

Accepted Solutions (1)

Accepted Solutions (1)

former_member349596
Discoverer

Hi,

The following summarizes the best practices. In general there are some todos by the application developer:

  • Configure the failover settings to the needs of your application
  • Implement a resilient reconnect mechanism for the initial connection
  • Implement a resilient reconnect mechanism for the ExceptionListener

Apache QPID JMS Settings

The amqpIdleTimeout must be set to -1 to avoid idle timeouts forced by the Apache QPID JMS client when it is used with the SAP Event Mesh Service.

Connections

Even in case of an error it is necessary to perform a connection.close() in order to free up occupied resources.

Connection Limits The connection limit of the related service instance MUST be respected. In rare cases it might seem like the application is able to create more connections than actual allowed. In this case there was an issue with a different connection and APACHE QPID JMS was able to create a new connection while another (cached) connect was in the middle of its reconnect behavior. Those cached connections will "fight" for the resources (connection limit) and might end up in an endless loop of the APACHE QPID JMS internal reconnect (failover configuration). Be aware, that this issue gets worse if setFailoverMaxReconnectAttempts(..) was set to unlimited (-1) as the ExceptionListener will never be called.

Exception handling

Apache QPID JMS provides a reconnection mechanism and is reacting to Exceptions. The Apache QPID JMS library is using a failover mechanism to reconnect to a given URL. The library also lets you configure the failover behavior such as the amount of attempts to reconnect, or the reconnection delay. Most of the configuration provided by the Apache QPID JMS Client is exposed by the EMJAPI Connection Factory.

While the reconnection mechanism of the Apache QPID JMS client covers most of the exceptions such as connection losses or idle timeouts there might be the case that the lib cannot handle a certain issue. An example is the token expires, and the connection must be closed. A security issue is not handled by the Apache QPID JMS lib and must be handled within a try(){...}catch(){..} - block or the JMSExceptionListener of the application.

JMS Exception Listener

The ExceptionListener is used for catching and handling Exceptions at runtime. It is called if the APACHE QPID JMS client cannot handle the exception or if the set amount of failover reconnect attempts did not recover the connection. So if an Exception of a connection should be handled do not use -1 (unlimited) for setFailoverMaxReconnectAttempts.

For creating an JMS Exception listener the interface ExceptionListener has to be implemented. The ExceptionListener interface only contains one method:

public interface ExceptionListener {

    /** Notifies user of a JMS exception.
     *
     * @param exception the JMS exception
     */

    void
    onException(JMSException exception);
}

The method onException(...) is called whenever an exception is thrown during runtime. The application must decide (based on its design) how to handle this exception. If the application is an "always-on" scenario (listening to a queue and processing events constantly) it is recommended to implement a reconnection mechanism in order to achieve a resilient application design. That means the ExceptionListener must have some knowledge on how to reconnect by passing meta-data like the tenantId, the connection itself or simply using an object which handles the JMS behavior.

Please, keep in mind the JMS design hardly depends on the application design, so the following is not a copy/paste example. The example should rather give an idea of what has to be done.

public class App {

  private static final String CLIENT_ID = "";
  private static final String CLIENT_SECRET = "";
  private static final String TOKEN_ENDPOINT = "";
  private static final String SERVICE_URL = "";

  private static final int MAX_RECONNECTS = 3;
  private static final long WAITING_TIME_IN_MS = 2000L; // wait for 2s in case of a JMS error

  public static void main(String[] args) {
    ConnectionHandler.registerConsumer(MAX_RECONNECTS, WAITING_TIME_IN_MS);
  }

  /**
   * The ConnectionHandler is a very simple and basic caching mechanism to
   * hold a connection. The class opens a consumer and closes the connection
   * gracefully if needed.
   */
  static final class ConnectionHandler {
    private static final Logger LOG = LoggerFactory.getLogger(ConnectionHandler.class);

    private static Connection connection; // caching the connection
    private static MessagingServiceJmsConnectionFactory connectionFactory;

    private ConnectionHandler() {
    }

    static {
      if (connectionFactory == null) {
        try {
          connectionFactory = createConnectionFactory();
        } catch (MessagingException e) {
          LOG.error("Unable to create connection factory", e);
        }
      }
    }

    /**
     * Creates a connection and caches the connection.
     *
     * @param maxRetries in case the connection cannot be created, it will be
     *                   re-tried according to the maxRetries before giving up.
     * @param waitingTimeInMS   wait time after connection error in ms
     */
    public static void registerConsumer(int maxRetries, long waitingTimeInMS) {
      // TODO decide how to handle that in your application - e.g. maxRetries, waitInMS
      while (maxRetries > 0) {
        /*
         * The maxRetries parameter has nothing to do with QPIDs internal reconnect (failover) mechanism. In this code it is used for the
         * initial reconnect only.
         */
        try {
          createConsumer();
          break;
        } catch (JMSException e) { // initial connection exception
          // try to reconnect more often - it could be a temporary issue
          maxRetries--;

          if (maxRetries > 0) {
            // JMSException occurred - try to reconnect
            LOG.warn("Unable to initialize JMS connection. Reconnect attempt: {}", maxRetries, e);
          } else {
            /*
             * Reconnect did not work. The Exception has to be handled
             * properly. - Re-Throw the exception or throw an own Exception
             * - Consider an alerting so operational tasks can be performed
             */
            LOG.error("Unable to initialize JMS connection", e);

            /* circuit breaker here - initialization failed. depending on application this can be fine tuned for app
             * requirements e.g. is messaging really a hard dependency or can there be a lazy re-init procedure.
             *
             * Central question is messaging required on application startup. Recommendation is not to add a hard dependency.
             */
            throw new IllegalStateException("Failed to initialize JMS connection");
          }
        }

        /*
         * wait in order to not immediately fail for the same reason - depending on your app requirements this can be static
         * or can even grow in exponential e.g. do first retry immediately and then increase the waiting e.g. exponentially.
         */
        if (waitingTimeInMS > 0) {
          try {
            Thread.sleep(waitingInMS);
          } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException("Failed to initialize JMS connection", e);
          }
        }
      }
    }

    private static void createConsumer() throws JMSException {
      connection = connectionFactory.createConnection();
      Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);

      /*
       * An exception listener which performs a reconnect when an exception is
       * thrown during runtime. The exception listener uses a static class for
       * recreating the connection and closes it gracefully before reconnecting.
       * Of course, this is only one way of doing so and there are multiple
       * options of performing a reconnect.
       */
      connection.setExceptionListener(new ExceptionListener() {
        private final Logger LOG = LoggerFactory.getLogger("ExceptionListener");

        @Override
        public void onException(JMSException exception) {
          // TODO decide how to handle that in your application
          LOG.error("Connection threw an exception at runtime. Closing connection and re-initializing.", exception);
          ConnectionHandler.closeConnection();
          ConnectionHandler.registerConsumer(MAX_RECONNECTS, WAITING_TIME_IN_MS);
        }
      });

      // TODO avoid unnecessary retries in your application if e.g. createQueue, createConsumer fails
      // the sample here just illustrates a simplified retry mechanism
      Queue queue = session.createQueue("queue:a/b/c/d");
      MessageConsumer consumer = session.createConsumer(queue);

      consumer.setMessageListener(new MessageListener() {

        @Override
        public void onMessage(Message message) {
          // process the message
          // don't forget to acknowledge the message depending on client mode
        }
      });
      connection.start(); // everything is setup, start the connection
    }

    public static void closeConnection() {
      if (connection != null) {
        try {
          connection.close();
        } catch (Exception e) {
          LOG.error("Unable to close connection", e);
        } finally {
          // remove the connection reference
          connection = null;
        }
      }
    }

    /*
     * Create a connection factory. The MessagingServiceJmsConnectionFactory
     * is simply a JMS connection factory and only a delegator of the Apache
     * QPID JMS JmsConnectionFactory.
     */
    private static MessagingServiceJmsConnectionFactory createConnectionFactory() throws MessagingException {
      MessagingService messagingService = messagingServiceCredentials();

      // configure Apache QPID JMS
      MessagingServiceJmsSettings settings = new MessagingServiceJmsSettings();
      settings.setFailoverMaxReconnectAttempts(5);
      settings.setFailoverInitialReconnectDelay(3000);
      settings.setFailoverReconnectDelay(3000);
      settings.setJmsRequestTimeout(30000);
      settings.setAmqpIdleTimeout(-1);

      MessagingServiceFactory messagingServiceFactory = MessagingServiceFactoryCreator.createFactory(messagingService);
      return messagingServiceFactory.createConnectionFactory(MessagingServiceJmsConnectionFactory.class, settings);
    }

    /*
     * Retrieve the credentials of the service instance binding. Preferable
     * this should be done by parsing the binding with e.g. cfEnv
     */
    private static MessagingService messagingServiceCredentials() {
      MessagingService messagingService = new MessagingService();
      messagingService.setClientId(CLIENT_ID);
      messagingService.setClientSecret(CLIENT_SECRET);
      messagingService.setOAuthTokenEndpoint(TOKEN_ENDPOINT);
      messagingService.setServiceUrl(SERVICE_URL);
      return messagingService;
    }
  }
}

morten Thanks for you answer! It helped.

Answers (0)