October 27, 2020
Playing with ZIO Streams using JMS #
In this article I am going to explore a bit of the ZIO streams API and how it can be used to talk to a JMS broker. The ZIO web site and awesome-zio have a lot of very good articles and talks on ZIO streams covering the basics, so I won’t repeat those here.
For this article I have used an embedded ActiveMQ messaging broker, but the code makes no assumptions about the underlying JMS provider. Usually the JMS broker would run externally and we would use a provider specific connection factory to connect to it.
The complete source code used in this article can be found on github
What we want to achieve #
Let’s start by considering what a simple program sending and receiving messages from an ActiveMQ broker would look like. That program should be runnable as normal ZIO app:
We can see from
program’s type that besides the ZIO environment
ZEnv we will need an ActiveMQ message broker and also the ZIO logging API to execute the program. With those requirements the program is fairly straightforward:
- aquire the message broker from the environment
- create a connection factory to talk to the broker just started
- create a fiber that simply sits in the background for 10 seconds
- use the connection factory to establish a JMS connection
- use the connection to kick off a sender and receiver
- join with the timed fiber to interrupt the sender and receiver
- clean up the connection
The sample program has a scheduled
reconnectafter 10 seconds. This will cause the execution to fail with an exception because the underlying streams terminate with a
JMSException. In one of the next articles I will get into stream recovery.
I have specifically decided to perform the connection cleanup myself in this case. The API also has a
managedConnectionwhich would automatically call close for the connection. At the moment I am undecided what the better approach might be as in most of my use cases the connection is shared across many services within my applications.
That might mean the connection is going to end up in a layer within the environment.
We will dive into the various steps throughout the remainder of this article to see how ZIO is helping to provide a smooth access to JMS. First, let’s see how we can run the program above:
As we can see, we need to provide the environment with all required services, so that the
program can actually execute.
Constructing the Environment #
First, we create a layer which consist of the standard ZIO environment enriched with the Slf4j implementation of the
Logging service. This layer is required by the ActiveMQ service implementation and also by the program itself.
Next we create the ActiveMQ service using vertical composition with the
logEnv and also an instance if a
ZIOJmsConnectionManager which we stick into the environment as well.
Finally we can create the
combinedEnv using horizontal composition of the logging, the broker layer and the connection manager layer.
The resulting environments contains everything to run our program.
For reference, the Active MQ broker service is here.
Sending messages #
To have some data travelling the message broker, let’s start by creating a plain ZIO stream, which emits a String element every half second or so. Each element is simply the current time formatted using a
Now we want to send all these Strings to JMS and later on receive them.
The send is broken down into these steps:
- create the JMS session
- create the JMS MessageProducer
- use the producer to create a ZIO Sink
- run the stream with the sink, so that the generated messages aresent to JMS
Within the JMS API we use a number of case classes that are simple wrappers around the underlying JMS classes. Essentially these case classes some additional information besides the original object. For one, we kaap a reference of the instance that was used as a factory. I.e. the
JmsSessionhas a reference to the connection it belongs to and a
JmsConsumera reference to the
JmsSessionit was created for.
Besides that all classes contain a human readable identifier, which is mainly used to produce a more readable log.
Having this in mind, we create a named JMS session as a
ZManaged, so that ZIO takes care of closing the session after it has been used. Within the session we create a
JmsProducer, which is again a
Note, that the producer does not actually produce the messages in the sense of JMS - it just has all the information to do so.
So, let’s have a look how we can define an effect using the poducer to actually send a message:
- use the producer’s session to create the JMS
- use the producer’s session to create the JMS
- perform the JMS send
- record the send in the log
Now that we have the effect sending a single message, we can easily create a sink. The
ZSink object in the ZIO streams API has a
foreach method, which allows us to create a sink from an effect:
// in ZSink: def foreach[R, E, I](f: I => ZIO[R, E, Any]): ZSink[R, E, I, I, Unit]
Let’s take a moment to digest the signature:
E is the usual type magic within ZIO to describe the environment required for the sink and the errors it may produce.
Unit in that case means that the final result after running a Stream with this Sink is
Unit. In other words, the Stream is just run for the effect passed in as a parameter.
The function provided needs to create an effect for each element of type
In our case we already have the effect, which is the
send method above, so we can define our sink as
Receiving messages #
Now let’s understand the
consumer of our program:
Again, we need to create a
JmsSession, but this time we use it to create a
JmsConsumer for the given
We then use the created consumer to create a
ZStream[R, E, Message], in other words a plain
ZStream of JMS
Messageobjects. From that stream we collect all
TextMessage instances, get the
String body of those and print that to the console.
Creating the stream is again amazingly simple with the ZIO streams API:
We repeat an effect producing optional
Message objects (optional because the underlying receive yields
None if no message is available for the consumer). As we are only interested in the actual messages, we collect only the results actually having a
The actual consume is simply a wrapper around the JMS API with some logging:
Next steps #
I have not elaborated too much on the
SingleConnectionFactory. This is just a wrapper around a plain JMS
ConnectionFactory and for the moment exposes the methods of the connection factory wrapped as effects. Other than the
SingleConnectionFactory caches a connection once it has been established and has an empty implementation for
close. To really close the JMS connection, we must use
shutdown as we have done in the program above.
Next I will explore how to enhance the
SingleConnectionFactory with some resilience. The idea here is to create self healing streams that would automatically reconnect after a connection has been lost.
For now I have just used Strings as message objects; this will be enhanced to more flexible and useful messages.
With ZIO it is very straight forward to break down a given problem into smaller pieces and then use those as building blocks for the solution. The challenge for developer like me is to get the head around the signatures and how all the types play together.
For example, building the environment took me a couple of hours to get it right. One has to take the time to read and understand the compiler errors. In the end I found that these errors pretty much tell me what I need to put together in terms of hirzontal and vertical composition to get it right.
The first steps in some areas of ZIO require quite a bit of code study. Given that ZIO is just beyond it’s first official release, the documentation and other resources are plenty, but scattered araound talks and blogs. The upcoming book by John De Goes and Adam Fraser adresses a lot of that and already has a lot of content in it’s alpha version.
Even if it means stating the obvious: Also when you work with a more sophisticated API in ZIO in the inner layers you are going to find ZIO effects which you then combine into something else - in our case create
ZSink from plain ZIO’s and then have the entire ZStream magic at the tip of your fingers to manipulate JMS streams.