Edit me on Github
Keep alive for JMS connections

Keep alive for JMS connections

November 5, 2020
ZIO
ZIO, Streams, JMS

Why do we need a keep alive #

In our existing application we have noticed that in some circumstances a connection that still exists and is reported as connected within the monitoring tools does not work as expected. In the case of JMS this usually happens when a particular connection is only used to receive messages. The connection may appear as connected and also have associated JMS sessions and consumers, but still will not receive any messages from the JMS broker.

Many JMS providers have implemented a keep alive mechanism, but in practice those have been proven to be somewhat difficult:

  • They do not work in some edge cases such as restarting network switches on WAN connections.
  • They are not part of the JMS specification and therefore will be different from vendor to vendor and are definitely not mandatory.
  • They usually require some vendor specific code to be configured (if not configured via properties over JNDI).

A connection which is also used to produce messages normally does require a keep alive monitor because it would encounter a JMSException when sends are attempted over a stale connection and therefore the connection could be recovered during normal error handling.

In this article I will pick up from the last article, where I investigated how we could leverage the ZIO Api to automatically recover from an exception in the JMS layer without terminating the ZIO stream with an exception.

I will show a simple keep alive monitor, which doesn’t know anything about JMS or streams at all. Then we will create an instance of that monitor watching a created JMS connection issuing a reconnect once the maximum number of missed keep alive events is reached.

The complete source code used in this article can be found on github

A simple Keep Alive monitor #

A keep alive monitor is more or less a counter that is increased at certain intervals. It can be reset by sending it alive signals. Whenever the counter reaches a defined value n that practically means that for n * interval the monitor hasn’t received a signal. The monitor as such does not need to know about the entity it monitors, in our case the JMS connection.

32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
trait KeepAliveMonitor {

  /**
   * An id to identify the monitor in the logs and metrics.
   */
  val id: String

  /**
   * The maximum number of allowed missed keep alive signals.
   */
  val allowed: Int

  /**
   * Signal that the monitored entity is still alive. This will cause to reset the missed
   * counter to 0.
   */
  def alive: ZIO[Logging, Nothing, Unit]

  /**
   * Start the monitor with a given interval. At every interval tick the counter
   * for missed keep alives will be incremented. If the counter reaches the maximum allowed
   * missed keep alives, run will terminate and yield the current counter (which happens to
   * be the allowed maximum).
   */
  def run(interval: Duration): ZIO[Clock with Logging, Nothing, Unit]

  /**
   * Return the current count of missed keep alives
   */
  def current: ZIO[Any, Nothing, Int]
}

The implementation for this interface is fairly straight forward. Within the run method we execute a step effect, which simply increases the internal counter. If the counter has reached the given maximum, the step function terminates, otherwise the next step is scheduled after the given interval.

Overall, run will terminate once the given max count has been reached. Therefore it is always a good idea fo users of the monitor to execute monitor.run in it’s own fiber.

100
101
102
103
104
105
106
107
108
109
110
111
112
113
  private[streams] def run(interval: Duration): ZIO[Clock with Logging, Nothing, Unit] = {

    def go: ZIO[Clock, Nothing, Unit] = ZIO.ifM(missed.updateAndGet(_ + 1).map(_ == allowedKeepAlives))(
      ZIO.unit,
      go.schedule(Schedule.duration(interval)).flatMap(_ => ZIO.unit)
    )

    for {
      _ <- log.trace(s"Starting KeepAliveMonitor [$name]")
      _ <- go.schedule(Schedule.duration(interval))
      c <- missed.get
      _ <- log.trace(s"KeepAliveMonitor [$name] finished with maximum keep alives of [$c]")
    } yield ()
  }

Using the Keep alive monitor with JMS connections #

With the JMS based streams explained here and the general keep alive monitor we can now build a monitor that determines whether a given JMS connection is healthy. We do that by using the connection to be monitored and regularly send and receive messages. Whenever a message is received, we execute alive on the underlying monitor - effectively resetting the counter.

Definining a Keep alive Sink #

From the API perspective we want to create a stream that regularly creates messages and run that with a JMS sink - effectively sending the messages to the JMS broker.

38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
  private def startKeepAliveSender(
    con: JmsConnection,
    dest: JmsDestination,
    interval: Duration
  ): ZIO[ZEnv with Logging, JMSException, Unit] = {

    val stream: ZStream[ZEnv, Nothing, String] = ZStream
      .fromSchedule(Schedule.spaced(interval))
      .mapM(_ =>
        currentTime(TimeUnit.MILLISECONDS)
          .map(t => s"KeepAlive ($con) : ${sdf.format(t)}")
      )

    createSession(con).use(jmsSess => createProducer(jmsSess).use(prod => stream.run(jmsSink(prod, dest))))
  }
Keep in mind that the library has prototyping character for now, so some elements like the ping message format are hard coded for the time being and need to be fleshed out later on.

Defining a Keep alive Stream #

Now we need to define a consumer - in other words a ZIO stream - and for each message received we want to execute alive on a given KeepAliveMonitor.

56
57
58
59
60
61
62
  private def startKeepAliveReceiver(
    con: JmsConnection,
    dest: JmsDestination,
    kam: KeepAliveMonitor
  ): ZIO[ZEnv with Logging, JMSException, Unit] = createSession(con).use { jmsSess =>
    createConsumer(jmsSess, dest).use(cons => jmsStream(cons).foreach(_ => kam.alive))
  }

Create the JMS Keep Alive Monitor #

The JMS keep alive monitor will be created once a connection configured with monitoring is established. We also need a destination that shall be used for sending and receiving the keep alive messages, an interval and the maximum allowed missed keep alives.

With those parameters the JMS keep alive monitor is straight forward:

  1. Create an an instance of a general KeepAliveMonitor
  2. Start the sink for keep alives
  3. Start the stream to receive keep alives
  4. Fork the run method of the just created monitor
  5. Once run terminates, interrupt the stream and sink
  6. Terminate with the current count of the underlying monitor
21
22
23
24
25
26
27
28
29
30
31
32
33
34
  def run(
    con: JmsConnection,
    dest: JmsDestination,
    interval: Duration,
    allowed: Int
  ): ZIO[ZEnv with Logging with ZIOJmsConnectionManager, JMSException, Int] = for {
    kam  <- DefaultKeepAliveMonitor.make(s"${con.id}-KeepAlive", allowed)
    send <- startKeepAliveSender(con, dest, interval).fork
    rec  <- startKeepAliveReceiver(con, dest, kam).fork
    f    <- kam.run(interval).fork
    _    <- f.join
    _    <- send.interrupt *> rec.interrupt
    c    <- kam.current
  } yield (c)

Instrumenting a JMS connection with a keep alive monitor #

The API has a case class exposed for a JMS connection:

23
24
25
26
27
28
29
30
final case class JmsConnectionFactory(
  override val id: String,
  factory: ConnectionFactory,
  reconnectInterval: Duration,
  onConnect: JmsConnection => ZIO[ZEnv with Logging, Nothing, Unit] = _ => ZIO.unit
) extends JmsApiObject {
  def connId(clientId: String): String = s"$id-$clientId"
}

In here we see that the case class also contains an effect which will be executed every time a physical connection to the JMS broker has been established. This effect takes the JMSConnection as a parameter. We can now use onConnect to set up the keep alive monitor.

45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
  private def keepAlive(
    conMgr: SingletonService[ZIOJmsConnectionManager.Service]
  ): JmsConnection => ZIO[ZEnv with Logging, Nothing, Unit] = { c =>
    val ka = ZIO
      .ifM(
        JmsKeepAliveMonitor
          .run(c, JmsQueue("keepAlive"), 1.second, 3)
          .map(_ == allowedKeepAlives)
      )(
        reconnect(c, Some(new KeepAliveException(c.id, allowedKeepAlives))),
        ZIO.unit
      )

    val monitor: ZIO[ZEnv with Logging with ZIOJmsConnectionManager.ZIOJmsConnectionManager, JMSException, Unit] = for {
      f <- ka.fork
      _ <- log.trace(s"Waiting for keep alive monitor for [$c]")
      _ <- f.join
    } yield ()

    val effect = for {
      // We need to specify the left over !!!
      _ <- monitor.provideSomeLayer[ZEnv with Logging](ZIOJmsConnectionManager.Service.live(conMgr)).forkDaemon
    } yield ()

    effect
  }

Let’s break this down a bit:

  • First we run an instance of the JMSKeepAliveMonitor which either terminates when the maximum number of missed keep alives has been reached or the application terminates. In the first case we need to issue a reconnect on the JMS connection using an underlying connection manager.

  • Then we wrap running the monitor in it’s own fiber and wait for it to terminate.

  • The final step is perhaps a bit confusing, the case class defines onConnect to require an environment [ZEnv with Logging], but using reconnect also requires a ZIOJMSConnectionManager to be present in the environment. Therefore we pass a SingletonService[ZIOJmsConnectionManager.Service] as a parameter to the keepAlive method.

    We can the use provideSomeLayer to provide the connection manager to the JMSKeepAliveMonitor and we are left with an effect that requires only [ZEnv with Logging] which is what we need to fulfill the API.

With the keepAlive method in place we can now create the connection factory. Again, we see an instance of the connection manager service passed through.

74
75
76
77
78
79
  private def amqCF(si: SingletonService[ZIOJmsConnectionManager.Service]): JmsConnectionFactory = JmsConnectionFactory(
    "amq:amq",
    new ActiveMQConnectionFactory("vm://simple?create=false"),
    3.seconds,
    keepAlive(si)
  )

The program to be run #

First of all we need to create an instance of a ZIOJmsConnectionManager.Service. This instance is then passed to the actual logic, so that the keep alive monitor can use it to inject it into the environment of JMSKeepAliveMonitor.run. The instance is also required by the logic effect itself, otherwise the connections could not be established to create the streams and sinks.

 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
  private val program: ZIO[ZEnv with AMQBroker.AMQBroker with Logging, Throwable, Unit] = {

    def logic(si: SingletonService[ZIOJmsConnectionManager.Service]) = for {
      _         <- putStrLn("Starting JMS Broker") *> ZIO.service[BrokerService]
      f         <- ZIO.unit.schedule(Schedule.duration(30.seconds)).fork
      jmsStream <- recoveringJmsStream(amqCF(si), clientId, testDest, 2.seconds)
      jmsSink   <- recoveringJmsSink(amqCF(si), clientId, testDest, 1.second)
      consumer  <- jmsStream.foreach(s => putStrLn(s)).fork
      producer  <- stream.run(jmsSink).fork
      _         <- f.join >>> consumer.interrupt >>> producer.interrupt
    } yield ()

    for {
      si <- ZIOJmsConnectionManager.Service.singleton
      _  <-
        logic(si).provideSomeLayer[ZEnv with AMQBroker.AMQBroker with Logging](ZIOJmsConnectionManager.Service.live(si))
    } yield ()
  }

The SingletonService here simply encapsulates a reference to an Option[A] and an effect to create an A to create an A upon the first retrieval. Other retrievals will simply reuse the instance created before.

Most likely I will move the SingletonService as a helper class to blended-zio-core as there will be many more use cases to have only a sinlge instance of a service while executing an application. For example a metrics collector for named metrics or a JMX service to expose arbitrary MBeans.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package blended.zio.streams

import zio._

private[streams] trait SingletonService[A] {

  private[streams] val instance: Ref[Option[A]]
  private[streams] def makeService: ZIO[Any, Nothing, A]

  private[streams] def service: ZIO[Any, Nothing, A] = for {
    sr  <- instance.get
    svc <- sr match {
             case Some(s) => ZIO.succeed(s)
             case None    =>
               for {
                 s <- makeService
                 _ <- instance.set(Some(s))
               } yield s
           }
  } yield (svc)
}

object SingletonService {
  def fromEffect[A](e: ZIO[Any, Nothing, A]): ZIO[Any, Nothing, SingletonService[A]] = for {
    inst <- Ref.make[Option[A]](None)
    svc   = new SingletonService[A] {
              override private[streams] val instance    = inst
              override private[streams] def makeService = e
            }
  } yield (svc)
}

The ZIOJmsConnectionManager uses the SingletonService as follows:

30
31
32
33
    val singleton: ZIO[Any, Nothing, SingletonService[Service]] =
      SingletonService.fromEffect(createLive)

    def live(s: SingletonService[Service]): ZLayer[Any, Nothing, ZIOJmsConnectionManager] = ZLayer.fromEffect(s.service)

Conclusion #

We have added a simple keep alive mechanism to the JMS connections we have discussed in the previous articles. The keep alive build on the streams with auto recovery and triggers a reconnect once a limit of maximum missed keep alives has been reached. The reconnect then triggers the recovery and reconnect as defined for auto recovery.

For the users of the stream the keep alive and potential reconnect is completely transparent. Further, the onConnect effect would allow us to instrument the connection factory with other effects - for example to collect metrics or publish JMX information.

Next steps #

Apart from finalizing the API there are more areas to explore:

  • Use defined types rather than only String as message payloads.
  • Support arbitrary message properties.
  • Look into defining message flows on top of streams with error handling and acknowledgements.
  • Explore zio-zmx to visualize the current state of all fibers within a running application (for learning about threads primarily)
  • Build a sample application that represents a JMS bridge
    • Consume message from provider A
    • Send the message to provider B
    • Acknowledge the message if and only if the send was successful, otherwise pass the message to a retry handler
    • Replay messages from the retry handler up to a given retry count until the send was successful. If the maximum retries have been reached or a given amount of time has passed before the send was successful, the message shall be processed by an error handler.
    • Messages in the retry handler shall be persisted and survive an application restart.