Edit me on Github
Auto-Recovery for (JMS) Streams

Auto-Recovery for (JMS) Streams

October 30, 2020
ZIO
ZIO, Streams, JMS

Automatically recover (JMS) streams #

In my last article I have shown how the ZIO stream API allows us to easily create streams for sending or receiving messages via JMS. Within the sample program we have seen that the streams terminate with an exception whenever the underlying JMS API raises encounters an error.

One of the most common errors is that the connection is lost due to a network error. For long running applications we would like to initiate an automatic reconnect and either create a new stream or recover the existing stream. The advantage of recovering the existing stream is that we do not have to rewire the users of the streams. Any effect using the existing stream will be suspended until the reconnect has happened and then continue.

In this article I will explore how we ca use the ZIO API to achieve such a transparent reconnect.

The complete source code used in this article can be found on github

What we want to achieve #

Like in the last article, let’s start by looking at a sample program we would like to run:

52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
  private val program: ZIO[ZEnv with AMQBroker.AMQBroker with Logging, Throwable, Unit] = {
    val logic = for {
      mgr       <- ZIO.service[ZIOJmsConnectionManager.Service]
      _         <- putStrLn("Starting JMS Broker") *> ZIO.service[BrokerService]
      f         <- ZIO.unit.schedule(Schedule.duration(30.seconds)).fork
      _         <-
        mgr.reconnect(amqCF.connId(clientId), Some(new Exception("Boom"))).schedule(Schedule.duration(10.seconds)).fork
      jmsStream <- recoveringJmsStream(amqCF, clientId, testDest, 2.seconds)
      jmsSink   <- recoveringJmsSink(amqCF, clientId, testDest, 1.second)
      consumer  <- jmsStream.foreach(s => putStrLn(s)).fork
      producer  <- stream.run(jmsSink).fork
      _         <- f.join >>> consumer.interrupt >>> producer.interrupt
    } yield ()

    for {
      si <- ZIOJmsConnectionManager.Service.singleton
      _  <- logic.provideSomeLayer[ZEnv with AMQBroker.AMQBroker with Logging](ZIOJmsConnectionManager.Service.live(si))
    } yield ()
  }

There are 2 important differences in comparison to the sample application of the last article:

  1. We are issuing a reconnect to the underlying connection factory. This implies that there is some mechanism within the connection factory that controls an automated reconnect.
  2. Rather than creating the JMS stream / sink directly we use an effect that yields a factory linked to the connection factory which can create a stream or sink for a given destination.

When we run this program with the input stream below, we will notice that the output pauses for a couple of seconds when the reconnect is triggered and then continues sending and receiving messages.

33
34
35
36
37
38
  private val stream: ZStream[ZEnv, Nothing, String] = ZStream
    .fromSchedule(Schedule.spaced(1000.millis).jittered)
    .mapM(_ =>
      currentTime(TimeUnit.MILLISECONDS)
        .map(sdf.format)
    )

Here is an excerpt from the console output:

2020-10-30-07.13.52.817
2020-10-30-07.13.54.631
2020-10-30-07.13.54.985  <----- NOTE THE GAP
2020-10-30-07.13.58.952  <----- NOTE THE GAP
2020-10-30-07.13.59.095
2020-10-30-07.13.59.955

A reconnecting wrapper around the JMS Connection factory #

Under the covers we use a connection factory that lifts all JMS related calls to ZIO effects. Further, an established connection will be reused as much as possible - effectively limiting the number of connections per connection factory to at most 1.

To achieve this, we wrap the connect method into a Semaphore and return the connection if it already exists, otherwise we create a new connection and store it.

77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
      private[jms] def connect(
        cf: JmsConnectionFactory,
        clientId: String
      ): ZIO[ZEnv with Logging, JMSException, JmsConnection] = {

        val cid = cf.connId(clientId)

        sem.withPermit(
          for {
            con <- for {
                     cr <- getConnection(cid)
                     c  <- ZIO.fromOption(cr).orElse(checkedConnect(cf, clientId))
                   } yield c
          } yield con
        )
      }

To recover a connection, we perform a JMS close on the existing connect and enter a recovery period. Within that period any execution of the connect effect will result in a JMSException.

119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
      private def recover(
        c: JmsConnection,
        t: Option[Throwable]
      ): ZIO[ZEnv with Logging, JMSException, Unit] = for {
        _ <- close(c)
        _ <- scheduleRecover(c.factory, c.clientId, t)
      } yield ()

      private def scheduleRecover(
        cf: JmsConnectionFactory,
        clientId: String,
        t: Option[Throwable]
      ): ZIO[ZEnv with Logging, Nothing, Unit] = {

        val cid = cf.connId(clientId)

        ZIO.ifM(getConnection(cid).map(_.isDefined))(
          ZIO.unit,
          for {
            _ <-
              log.debug(
                s"Beginning recovery period for [$cid]" + t.map(c => s" , cause [${c.getMessage}]").getOrElse("")
              )
            _ <- recovering.update(r => cid :: r)
            f <- recovering.update(_.filterNot(_ == cid)).schedule(Schedule.duration(cf.reconnectInterval)).fork
            _ <- f.join
            _ <- log.debug(s"Ending recovery period for [$cid]")
          } yield ()
        )
      }

      private def checkedConnect(
        cf: JmsConnectionFactory,
        clientId: String
      ): ZIO[ZEnv with Logging, JMSException, JmsConnection] = {

        val cid = cf.connId(clientId)

        ZIO.ifM(isRecovering(cid))(
          ZIO.fail(new JMSException(s"Connection factory [$cid] is in recovery")),
          for {
            c <- doConnect(cf, clientId)
          } yield c
        )
      }

Finally, the reconnecteffect simply triggers the recover if an underlying connection currently exists.

 96
 97
 98
 99
100
101
102
103
104
105
106
      private[jms] def reconnect(
        con: JmsConnection,
        cause: Option[Throwable]
      ): ZIO[ZEnv with Logging, JMSException, Unit] = for {
        _  <- log.debug(s"Reconnecting JMS connection [$con]")
        cr <- getConnection(con.id)
        _  <- cr match {
                case None    => ZIO.unit
                case Some(c) => recover(c, cause)
              }
      } yield ()

Creating a recoverable Stream (consume messages) #

The idea behind the recovering stream is that we connect to the JMS broker with the given connection factory and then start consuming messages until we hit an exception. Whenever we hit an exception, we catch it and enter a recovery phase. After the recovery phase we will try to reconnect and continue to consume messages.

35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
  def stream(
    dest: JmsDestination
  ): ZIO[ZEnv with Logging with ZIOJmsConnectionManager, Nothing, ZStream[ZEnv with Logging, Nothing, String]] = {

    def consumeUntilException(cons: JmsConsumer): ZIO[ZEnv with Logging, JMSException, Unit] = jmsStream(cons).collect {
      case tm: TextMessage => tm.getText()
    }
      .foreach(s => buffer.offer(s))

    def consumeForEver: ZIO[ZEnv with Logging with ZIOJmsConnectionManager, Nothing, Unit] = {
      val part = for {
        _   <- log.debug(s"Trying to recover consumer for [${factory.id}] with destination [$dest]")
        con <- connect(factory, clientId)
        _   <- createSession(con).use(jmsSess => createConsumer(jmsSess, dest).use(c => consumeUntilException(c)))
      } yield ()

      part.catchAll { _ =>
        for {
          f <- consumeForEver.schedule(Schedule.duration(retryInterval)).fork
          _ <- f.join
        } yield ()
      }
    }

    for {
      _ <- consumeForEver.fork
      s <- ZIO.succeed(ZStream.repeatEffect(buffer.take))
    } yield s
  }

The idea manifests in consumeUntilException and consumeForEver. consumeUntilException uses the stream we have seen in the last article. It will stick all messages that have been received into a one element buffer which we can use later on to create the final stream visible to the outside world.

consumeForever simply creates an effect which will cerate the JMS connection and then delegate to consumeUntilException. The we apply the catchAll operator to that effect where we schedule the next iteration to consumeForEver after a recovery period.

The final stream is then created from repeating the take operation of the buffer while consumerForEver is executing in it’s own fiber.

Why a one element buffer ? #

One might wonder why I am using a one element buffer. We are operating on JMS and want to make sure that no messages are being lost. As a result, in a real application we have to acknowledge the message to the message broker once we are done with processing it. In case we encounter an exception while processing the message we have several options:

  1. We drop the message byt acknowledging even if we could not process it correctly
  2. We forward the message to another destination such as an error destination or a retry destination and acknowledge it
  3. We deny the message

The last option here is not really part of the JMS API which only has an acknowledge method on the JMS message. What we would do in a real application is use a session with CLIENT_ACKNOWLEDGE and to deny the message we would close the receiving session. This would automatically mark the message as undelivered in the JMS broker - effectively denying the message. As this would apply to all messages that have been received within the same session and that have not been acknowledged yet, we consume the messages one by one.

We will explore the error handling further in another post.

Creating a recoverable Sink (send messages) #

The idea behind the recovering sink is pretty much the same as for the recovering stream. The subtle difference is that we do not use the sink we have seen in the last article, but the single send method.

Apart from that, the pattern to create the recoverable sink is the same as for creating the stream.

32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
  def sink(
    dest: JmsDestination,
    retryInterval: Duration
  ): ZIO[
    ZEnv with Logging with ZIOJmsConnectionManager,
    Nothing,
    ZSink[ZEnv with Logging, Nothing, String, String, Unit]
  ] = {

    def produceOne(p: JmsProducer): ZIO[ZEnv with Logging, JMSException, Unit] = buffer.take.flatMap { s: String =>
      send(s, p, dest)
    }

    def produceForever: ZIO[ZEnv with Logging with ZIOJmsConnectionManager, Nothing, Unit] = {
      val part = for {
        _   <- log.debug(s"Trying to recover producer for [${factory.id}] with destination [$dest]")
        con <- connect(factory, clientId)
        _   <- createSession(con).use { jmsSess =>
                 createProducer(jmsSess).use { p =>
                   for {
                     f <- produceOne(p).forever.fork
                     _ <- f.join
                   } yield ()
                 }
               }
      } yield ()

      part.catchAll { _ =>
        for {
          f <- produceForever.schedule(Schedule.duration(retryInterval)).fork
          _ <- f.join
        } yield ()
      }
    }

    for {
      _ <- produceForever.fork
      s <- ZIO.succeed(ZSink.foreach(msg => buffer.offer(msg)))
    } yield s
  }

Sample log #

Below is a log excerpt of the sample app execution. Note the disconnect starting time 10770, the recovery period of the connection and the recurring recovery attempts of the consumer and producer stream until the reconnect has finished and everything can resume from 16550 onwards.

--- Entries omitted
2020-10-30-08:11.49.397 |     9458 | DEBUG : Received [Some(ActiveMQTextMessage {commandId = 37, responseRequired = true, messageId = ID:ToonBox-46199-1604041900408-4:1:1:1:16, originalDestination = null, originalTransactionId = null, producerId = ID:ToonBox-46199-1604041900408-4:1:1:1, destination = queue://sample, transactionId = null, expiration = 0, timestamp = 1604041909396, arrival = 0, brokerInTime = 1604041909396, brokerOutTime = 1604041909397, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1070, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = 2020-10-30-08.11.49.396})] with [JmsConsumer((amq:amq)(sampleCon)-(S-1604041900731-1)-(C-1)-sample)]
2020-10-30-08:11.50.005 |    10066 | DEBUG : Message [2020-10-30-08.11.50.004] sent successfully with [JmsProducer((amq:amq)(sampleCon)-(S-1604041900731-2)-P-1)] to [sample]
2020-10-30-08:11.50.005 |    10066 | DEBUG : Received [Some(ActiveMQTextMessage {commandId = 39, responseRequired = true, messageId = ID:ToonBox-46199-1604041900408-4:1:1:1:17, originalDestination = null, originalTransactionId = null, producerId = ID:ToonBox-46199-1604041900408-4:1:1:1, destination = queue://sample, transactionId = null, expiration = 0, timestamp = 1604041910004, arrival = 0, brokerInTime = 1604041910004, brokerOutTime = 1604041910005, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1070, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = 2020-10-30-08.11.50.004})] with [JmsConsumer((amq:amq)(sampleCon)-(S-1604041900731-1)-(C-1)-sample)]
2020-10-30-08:11.50.684 |    10745 | DEBUG : Message [2020-10-30-08.11.50.683] sent successfully with [JmsProducer((amq:amq)(sampleCon)-(S-1604041900731-2)-P-1)] to [sample]
2020-10-30-08:11.50.684 |    10745 | DEBUG : Received [Some(ActiveMQTextMessage {commandId = 41, responseRequired = true, messageId = ID:ToonBox-46199-1604041900408-4:1:1:1:18, originalDestination = null, originalTransactionId = null, producerId = ID:ToonBox-46199-1604041900408-4:1:1:1, destination = queue://sample, transactionId = null, expiration = 0, timestamp = 1604041910683, arrival = 0, brokerInTime = 1604041910683, brokerOutTime = 1604041910684, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1070, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = 2020-10-30-08.11.50.683})] with [JmsConsumer((amq:amq)(sampleCon)-(S-1604041900731-1)-(C-1)-sample)]
2020-10-30-08:11.50.709 |    10770 | INFO  : Connector vm://simple stopped
2020-10-30-08:11.50.710 |    10771 | DEBUG : Closed [((amq:amq)(sampleCon))]
2020-10-30-08:11.50.717 |    10778 | DEBUG : Beginning recovery period for [(amq:amq)] , cause [Boom]
2020-10-30-08:11.50.720 |    10781 | WARN  : Error receiving message with [JmsConsumer((amq:amq)(sampleCon)-(S-1604041900731-1)-(C-1)-sample)] : [The Consumer is closed]
2020-10-30-08:11.50.763 |    10824 | DEBUG : Closing Consumer [JmsConsumer((amq:amq)(sampleCon)-(S-1604041900731-1)-(C-1)-sample)]
2020-10-30-08:11.50.767 |    10828 | DEBUG : Closing JMS Session [((amq:amq)(sampleCon)-(S-1604041900731-1))]
2020-10-30-08:11.51.442 |    11503 | WARN  : Error sending message with [JmsProducer((amq:amq)(sampleCon)-(S-1604041900731-2)-P-1)] to [JmsQueue(sample)]: [The Session is closed]
2020-10-30-08:11.51.451 |    11512 | DEBUG : Closing JMS Producer [JmsProducer((amq:amq)(sampleCon)-(S-1604041900731-2)-P-1)]
2020-10-30-08:11.51.455 |    11516 | DEBUG : Closing JMS Session [((amq:amq)(sampleCon)-(S-1604041900731-2))]
2020-10-30-08:11.52.459 |    12520 | DEBUG : Trying to recover producer for [(amq:amq)] with [JmsQueue(sample)]
2020-10-30-08:11.52.771 |    12832 | DEBUG : Trying to recover consumer for [(amq:amq)] with destination [JmsQueue(sample)]
2020-10-30-08:11.53.464 |    13525 | DEBUG : Trying to recover producer for [(amq:amq)] with [JmsQueue(sample)]
2020-10-30-08:11.54.467 |    14528 | DEBUG : Trying to recover producer for [(amq:amq)] with [JmsQueue(sample)]
2020-10-30-08:11.54.774 |    14835 | DEBUG : Trying to recover consumer for [(amq:amq)] with destination [JmsQueue(sample)]
2020-10-30-08:11.55.470 |    15531 | DEBUG : Trying to recover producer for [(amq:amq)] with [JmsQueue(sample)]
2020-10-30-08:11.55.773 |    15834 | DEBUG : Ending recovery period for [(amq:amq)]
2020-10-30-08:11.56.472 |    16533 | DEBUG : Trying to recover producer for [(amq:amq)] with [JmsQueue(sample)]
2020-10-30-08:11.56.474 |    16535 | DEBUG : Connecting [(amq:amq)] with clientId [sampleCon]
2020-10-30-08:11.56.475 |    16536 | INFO  : Connector vm://simple started
2020-10-30-08:11.56.486 |    16547 | DEBUG : Created [((amq:amq)(sampleCon))]
2020-10-30-08:11.56.489 |    16550 | DEBUG : Created JMS Producer [JmsProducer((amq:amq)(sampleCon)-(S-1604041916487-1)-P-1)]
2020-10-30-08:11.56.492 |    16553 | DEBUG : Message [2020-10-30-08.11.51.895] sent successfully with [JmsProducer((amq:amq)(sampleCon)-(S-1604041916487-1)-P-1)] to [sample]
2020-10-30-08:11.56.493 |    16554 | DEBUG : Message [2020-10-30-08.11.52.295] sent successfully with [JmsProducer((amq:amq)(sampleCon)-(S-1604041916487-1)-P-1)] to [sample]
2020-10-30-08:11.56.776 |    16837 | DEBUG : Trying to recover consumer for [(amq:amq)] with destination [JmsQueue(sample)]
2020-10-30-08:11.56.781 |    16842 | DEBUG : Created JMS Consumer [JmsConsumer((amq:amq)(sampleCon)-(S-1604041916777-2)-(C-1)-sample)]
2020-10-30-08:11.56.782 |    16843 | DEBUG : Received [Some(ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:ToonBox-46199-1604041900408-4:2:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:ToonBox-46199-1604041900408-4:2:1:1, destination = queue://sample, transactionId = null, expiration = 0, timestamp = 1604041916490, arrival = 0, brokerInTime = 1604041916490, brokerOutTime = 1604041916779, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1070, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = 2020-10-30-08.11.51.895})] with [JmsConsumer((amq:amq)(sampleCon)-(S-1604041916777-2)-(C-1)-sample)]
2020-10-30-08:11.56.783 |    16844 | DEBUG : Received [Some(ActiveMQTextMessage {commandId = 6, responseRequired = true, messageId = ID:ToonBox-46199-1604041900408-4:2:1:1:2, originalDestination = null, originalTransactionId = null, producerId = ID:ToonBox-46199-1604041900408-4:2:1:1, destination = queue://sample, transactionId = null, expiration = 0, timestamp = 1604041916492, arrival = 0, brokerInTime = 1604041916493, brokerOutTime = 1604041916780, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1070, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = 2020-10-30-08.11.52.295})] with [JmsConsumer((amq:amq)(sampleCon)-(S-1604041916777-2)-(C-1)-sample)]
2020-10-30-08:11.57.123 |    17184 | DEBUG : Message [2020-10-30-08.11.57.122] sent successfully with [JmsProducer((amq:amq)(sampleCon)-(S-1604041916487-1)-P-1)] to [sample]
2020-10-30-08:11.57.123 |    17184 | DEBUG : Received [Some(ActiveMQTextMessage {commandId = 11, responseRequired = true, messageId = ID:ToonBox-46199-1604041900408-4:2:1:1:3, originalDestination = null, originalTransactionId = null, producerId = ID:ToonBox-46199-1604041900408-4:2:1:1, destination = queue://sample, transactionId = null, expiration = 0, timestamp = 1604041917122, arrival = 0, brokerInTime = 1604041917122, brokerOutTime = 1604041917123, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1070, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = 2020-10-30-08.11.57.122})] with [JmsConsumer((amq:amq)(sampleCon)-(S-1604041916777-2)-(C-1)-sample)]
--- Entries omitted

Conclusion #

With very little code and simple patterns we could create ZIO streams on top of JMS with automatic recovery baked in. Towards the users of the created stream or sink the recovery is completely transparent and from their perspective they are working with normal ZStreams or ZSinks.

Next steps #

The next step is to add a keep alive monitor to an established connection, which will trigger reconnects if a maximum number of keep alive messages have been missed.