Edit me on Github
ZIO Streams and JMS

ZIO Streams and JMS

October 27, 2020
ZIO
ZIO, Streams, JMS

Playing with ZIO Streams using JMS #

In this article I am going to explore a bit of the ZIO streams API and how it can be used to talk to a JMS broker. The ZIO web site and awesome-zio have a lot of very good articles and talks on ZIO streams covering the basics, so I won’t repeat those here.

For this article I have used an embedded ActiveMQ messaging broker, but the code makes no assumptions about the underlying JMS provider. Usually the JMS broker would run externally and we would use a provider specific connection factory to connect to it.

The complete source code used in this article can be found on github

What we want to achieve #

Let’s start by considering what a simple program sending and receiving messages from an ActiveMQ broker would look like. That program should be runnable as normal ZIO app:

64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
  private val program: ZIO[ZEnv with AMQBroker.AMQBroker with Logging, Throwable, Unit] = for {
    si <- ZIOJmsConnectionManager.Service.singleton
    _  <- putStrLn("Starting JMS Broker") *> ZIO.service[BrokerService]
    _  <- (for {
            con <- connect(cf, "sample")
            _   <- reconnect(con, Some(new Exception("Boom"))).schedule(Schedule.duration(10.seconds)).fork
            _   <- for {
                     c <- consumer(con).fork
                     p <- producer(con).fork
                     _ <- c.join
                     _ <- p.join
                   } yield ()
          } yield ())
            .provideSomeLayer[ZEnv with AMQBroker.AMQBroker with Logging](ZIOJmsConnectionManager.Service.live(si))
  } yield ()

We can see from program’s type that besides the ZIO environment ZEnv we will need an ActiveMQ message broker and also the ZIO logging API to execute the program. With those requirements the program is fairly straightforward:

  1. aquire the message broker from the environment
  2. create a connection factory to talk to the broker just started
  3. create a fiber that simply sits in the background for 10 seconds
  4. use the connection factory to establish a JMS connection
  5. use the connection to kick off a sender and receiver
  6. join with the timed fiber to interrupt the sender and receiver
  7. clean up the connection
The sample program has a scheduled reconnect after 10 seconds. This will cause the execution to fail with an exception because the underlying streams terminate with a JMSException. In one of the next articles I will get into stream recovery.

I have specifically decided to perform the connection cleanup myself in this case. The API also has a managedConnection which would automatically call close for the connection. At the moment I am undecided what the better approach might be as in most of my use cases the connection is shared across many services within my applications.

That might mean the connection is going to end up in a layer within the environment.

We will dive into the various steps throughout the remainder of this article to see how ZIO is helping to provide a smooth access to JMS. First, let’s see how we can run the program above:

82
83
84
85
  override def run(args: List[String]): ZIO[ZEnv, Nothing, ExitCode] = program
    .provideCustomLayer(combinedEnv)
    .catchAllCause(c => putStrLn(c.prettyPrint))
    .exitCode

As we can see, we need to provide the environment with all required services, so that the program can actually execute.

Constructing the Environment #

First, we create a layer which consist of the standard ZIO environment enriched with the Slf4j implementation of the Logging service. This layer is required by the ActiveMQ service implementation and also by the program itself.

Next we create the ActiveMQ service using vertical composition with the logEnv and also an instance if a ZIOJmsConnectionManager which we stick into the environment as well.

Finally we can create the combinedEnv using horizontal composition of the logging, the broker layer and the connection manager layer.

The resulting environments contains everything to run our program.

25
26
27
28
29
30
31
32
  private val logEnv: ZLayer[Any, Nothing, ZEnv with Logging] =
    ZEnv.live ++ Slf4jLogger.make((_, message) => message)

  private val brokerEnv: ZLayer[Any, Throwable, AMQBroker.AMQBroker] =
    logEnv >>> AMQBroker.simple("simple")

  private val combinedEnv: ZLayer[ZEnv, Throwable, ZEnv with AMQBroker.AMQBroker with Logging] =
    logEnv ++ brokerEnv

For reference, the Active MQ broker service is here.

Sending messages #

To have some data travelling the message broker, let’s start by creating a plain ZIO stream, which emits a String element every half second or so. Each element is simply the current time formatted using a SimpleDateFormat.

36
37
38
39
40
41
  private val stream: ZStream[ZEnv, Nothing, String] = ZStream
    .fromSchedule(Schedule.spaced(500.millis).jittered)
    .mapM(_ =>
      currentTime(TimeUnit.MILLISECONDS)
        .map(sdf.format)
    )

Now we want to send all these Strings to JMS and later on receive them.

49
50
  private def producer(con: JmsConnection): ZIO[ZEnv with Logging, Throwable, Unit] =
    createSession(con).use(session => createProducer(session).use(prod => stream.run(jmsSink(prod, testDest))))

The send is broken down into these steps:

  1. create the JMS session
  2. create the JMS MessageProducer
  3. use the producer to create a ZIO Sink
  4. run the stream with the sink, so that the generated messages aresent to JMS

Within the JMS API we use a number of case classes that are simple wrappers around the underlying JMS classes. Essentially these case classes some additional information besides the original object. For one, we kaap a reference of the instance that was used as a factory. I.e. the JmsSession has a reference to the connection it belongs to and a JmsConsumer a reference to the JmsSession it was created for.

Besides that all classes contain a human readable identifier, which is mainly used to produce a more readable log.

Having this in mind, we create a named JMS session as a ZManaged, so that ZIO takes care of closing the session after it has been used. Within the session we create a JmsProducer, which is again a ZManaged.

Note, that the producer does not actually produce the messages in the sense of JMS - it just has all the information to do so.

So, let’s have a look how we can define an effect using the poducer to actually send a message:

 95
 96
 97
 98
 99
100
101
102
103
104
105
106
  def send(
    text: String,
    prod: JmsProducer,
    dest: JmsDestination
  ): ZIO[ZEnv with Logging, JMSException, Unit] = (for {
    msg <- effectBlocking(prod.session.session.createTextMessage(text))
    d   <- dest.create(prod.session)
    _   <- effectBlocking(prod.producer.send(d, msg))
    _   <- log.debug(s"Message [$text] sent successfully with [$prod] to [${dest.asString}]")
  } yield ()).flatMapError { t =>
    log.warn(s"Error sending message with [$prod] to [$dest]: [${t.getMessage()}]") *> ZIO.succeed(t)
  }.refineOrDie { case t: JMSException => t }
  1. use the producer’s session to create the JMS Message object
  2. use the producer’s session to create the JMS Destination object
  3. perform the JMS send
  4. record the send in the log

Now that we have the effect sending a single message, we can easily create a sink. The ZSink object in the ZIO streams API has a foreach method, which allows us to create a sink from an effect:

// in ZSink:
def foreach[R, E, I](f: I => ZIO[R, E, Any]): ZSink[R, E, I, I, Unit]

Let’s take a moment to digest the signature: R and E is the usual type magic within ZIO to describe the environment required for the sink and the errors it may produce. Unit in that case means that the final result after running a Stream with this Sink is Unit. In other words, the Stream is just run for the effect passed in as a parameter.

The function provided needs to create an effect for each element of type I.

In our case we already have the effect, which is the send method above, so we can define our sink as

134
135
136
137
138
  def jmsSink(
    prod: JmsProducer,
    dest: JmsDestination
  ): ZSink[ZEnv with Logging, JMSException, String, String, Unit] =
    ZSink.foreach[ZEnv with Logging, JMSException, String](s => send(s, prod, dest))

Receiving messages #

Now let’s understand the consumer of our program:

54
55
56
57
58
59
60
  private def consumer(con: JmsConnection): ZIO[ZEnv with Logging, Throwable, Unit] =
    createSession(con).use { session =>
      createConsumer(session, testDest).use { cons =>
        jmsStream(cons).collect { case m: TextMessage => m.getText() }
          .foreach(s => putStrLn(s))
      }
    }

Again, we need to create a JmsSession, but this time we use it to create a JmsConsumer for the given JmsDestination. We then use the created consumer to create a ZStream[R, E, Message], in other words a plain ZStream of JMS Messageobjects. From that stream we collect all TextMessage instances, get the String body of those and print that to the console.

Creating the stream is again amazingly simple with the ZIO streams API:

119
120
  def jmsStream(cons: JmsConsumer): ZStream[ZEnv with Logging, JMSException, Message] =
    ZStream.repeatEffect(receive(cons)).collect { case Some(s) => s }

We repeat an effect producing optional Message objects (optional because the underlying receive yields None if no message is available for the consumer). As we are only interested in the actual messages, we collect only the results actually having a Message.

The actual consume is simply a wrapper around the JMS API with some logging:

110
111
112
113
114
115
  def receive(cons: JmsConsumer): ZIO[ZEnv with Logging, JMSException, Option[Message]] = (for {
    msg <- effectBlocking(Option(cons.consumer.receiveNoWait()))
    _   <- if (msg.isDefined) log.debug(s"Received [$msg] with [$cons]") else ZIO.unit
  } yield msg).flatMapError { t =>
    log.warn(s"Error receiving message with [$cons] : [${t.getMessage()}]") *> ZIO.succeed(t)
  }.refineOrDie { case t: JMSException => t }

Next steps #

I have not elaborated too much on the SingleConnectionFactory. This is just a wrapper around a plain JMS ConnectionFactory and for the moment exposes the methods of the connection factory wrapped as effects. Other than the ConnectionFactory the SingleConnectionFactory caches a connection once it has been established and has an empty implementation for close. To really close the JMS connection, we must use shutdown as we have done in the program above.

Next I will explore how to enhance the SingleConnectionFactory with some resilience. The idea here is to create self healing streams that would automatically reconnect after a connection has been lost.

For now I have just used Strings as message objects; this will be enhanced to more flexible and useful messages.

Conclusion #

With ZIO it is very straight forward to break down a given problem into smaller pieces and then use those as building blocks for the solution. The challenge for developer like me is to get the head around the signatures and how all the types play together.

For example, building the environment took me a couple of hours to get it right. One has to take the time to read and understand the compiler errors. In the end I found that these errors pretty much tell me what I need to put together in terms of hirzontal and vertical composition to get it right.

The first steps in some areas of ZIO require quite a bit of code study. Given that ZIO is just beyond it’s first official release, the documentation and other resources are plenty, but scattered araound talks and blogs. The upcoming book by John De Goes and Adam Fraser adresses a lot of that and already has a lot of content in it’s alpha version.

Even if it means stating the obvious: Also when you work with a more sophisticated API in ZIO in the inner layers you are going to find ZIO effects which you then combine into something else - in our case create ZStream and ZSink from plain ZIO’s and then have the entire ZStream magic at the tip of your fingers to manipulate JMS streams.