Fork me on Github Edit me on Github
Service Metrics

ZIO based Service Metrics implementation #

The implementation of the ZIO version starts with defining an interface that resembles the operations which are hidden behind the actor based implementation of Blended 3. Essentially, the straight forward approach is to look at the messages the actor currently understands and translate them into corresponding methods on the interface.

The actor currently understands 3 messages:

  • start service invocation
  • complete invocation with success
  • complete invocation with error

In addition to this we would like to retrieve the current list of active invocations and also the current collected summaries.

This leads to the following interface definition:

62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
  trait Service {

    /**
     * Start to track a service invocation for a given start event.
     * If a tracker for that id is still active, a [[ServiceMetricsException]] will be returned.
     * Otherwise, an updated ServiceTrackingEntry will be returned.
     * @param start The start event containing the details of the service invocation to be tracked
     */
    def start(start: ServiceInvocationStarted): ZIO[Any, ServiceMetricsException, ServiceTrackingEntry]

    /**
     * Signal the successful completion of a service invocation. The invocation is referenced by
     * the id of a corresponding call to [[start]]. If the invocation cannot be found, a
     * [[ServiceMetricsException]] will be returned
     * @param s The completion event for the service invocation
     */
    def complete(s: ServiceInvocationCompleted): ZIO[Any, ServiceMetricsException, ServiceTrackingEntry]

    /**
     * Signal the successful completion of a service invocation. The invocation is referenced by
     * the id of a corresponding call to [[start]]. If the invocation cannot be found, a
     * [[ServiceMetricsException]] will be returned.
     * @param f The completion event for the service invocation
     */
    def failed(f: ServiceInvocationFailed): ZIO[Any, ServiceMetricsException, ServiceTrackingEntry]

    /**
     * Get the list of service invocation ids which are currently active.
     */
    def active: ZIO[Any, Nothing, List[String]]

    /**
     * Get the current Map of summaries recorded by the Service tracker
     */
    def summaries: ZIO[Any, Nothing, Map[String, ServiceTrackingEntry]]
  }

Note, that all methods on the interface return ZIO effects.

Implementation notes #

The implementation needs to maintain a list currently active of service invocation invocations, so that we can properly close them with a failed or completed event. Furthermore, we need to keep track of the invocation summaries so that we can keep track of the grouped invocations statistics.

Inspired by this article about implementing a concurrent LRU cache I have decided to implement a ConcurrentServiceTracker using STM References under the covers:

131
132
133
134
final class ConcurrentInvocationTracker private (
  val running: TMap[String, ServiceInvocationStarted],
  val summary: TMap[String, ServiceTrackingEntry]
) { self =>

First of all we need a couple of helpers helping us to manipulate the two maps. The names of the helper functions speak for themselves and all of them use STM under the covers, so that we can compose them to implement the actual business functions and finally call commit in order to end up with a ZIO effect as result.

174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
  private val updStarted: ServiceTrackingEntry => ServiceTrackingEntry =
    e => e.copy(inflight = e.inflight + 1)

  private val updFailed: Long => ServiceTrackingEntry => ServiceTrackingEntry = started =>
    orig =>
      orig.copy(
        lastFailed = Some(System.currentTimeMillis()),
        failed = orig.failed.record(System.currentTimeMillis() - started),
        inflight = orig.inflight - 1
      )

  private val updCompleted: Long => ServiceTrackingEntry => ServiceTrackingEntry = started =>
    orig =>
      orig.copy(
        success = orig.success.record(System.currentTimeMillis() - started),
        inflight = orig.inflight - 1
      )

  private def getExistingSummary(sumId: String): STM[ServiceMetricsException, ServiceTrackingEntry] =
    STM.require(new ServiceSummaryNotFoundException(sumId))(self.summary.get(sumId))

  private def getExistingActive(id: String): STM[ServiceMetricsException, ServiceInvocationStarted] =
    STM.require(new ServiceInvocationNotFoundException(id))(self.running.get(id))

  private def addSummary(evt: ServiceInvocationStarted): STM[Nothing, ServiceTrackingEntry] = for {
    entry <- STM.succeed(ServiceTrackingEntry(evt.summarizeId))
    _     <- self.summary.put(evt.summarizeId, entry)
  } yield (entry)

  private def getOrCreateSummary(evt: ServiceInvocationStarted): STM[Nothing, ServiceTrackingEntry] =
    self.summary.get(evt.summarizeId).flatMap {
      case Some(e) => STM.succeed(e)
      case None    => addSummary(evt)
    }

Within the implementation the updatemethod is responsible for recording the completion or failure for a given invocation id. Therefore we need to determine the currently active entry from our active map and also the existing summary. Note that if everything works as designed, the summary mst already exist at this point in time. However, either of these calls may fail with a ServiceMetricsException, which is reflected in the method signature.

Once we have looked up the entries, we can simply perform the required update and we are done.

163
164
165
166
167
168
169
170
  private def update(id: String, success: Boolean): ZIO[Any, ServiceMetricsException, ServiceTrackingEntry] = (for {
    act   <- getExistingActive(id)
    sumId  = act.summarizeId
    entry <- getExistingSummary(sumId)
    upd    = if (success) updCompleted(act.timestamp)(entry) else updFailed(act.timestamp)(entry)
    _     <- self.summary.put(sumId, upd)
    _     <- self.running.delete(id)
  } yield (upd)).commit

The start method is very similar. We are using getExistingActive(evt.id).flip, so that having an already defined entry for the given id will be considered an exception. Also, in this case we are using getOrCreateSummary(evt) to ensure that the summary map definitely has an entry.

Finally, we are using mapError to create the proper exception indicating the a service with the same id was already active.

143
144
145
146
147
148
149
150
151
152
153
  def start(evt: ServiceInvocationStarted): ZIO[Any, ServiceMetricsException, ServiceTrackingEntry] = (for {
    // Using the flip will turn an existing invocation into the error type, so that we can handle that
    // at the end with mapError
    _     <- getExistingActive(evt.id).flip
    _     <- self.running.put(evt.id, evt)
    // make sure we do have a summary entry in our summary map
    entry <- getOrCreateSummary(evt)
    // Record the service start
    upd    = updStarted(entry)
    _     <- self.summary.put(evt.summarizeId, upd)
  } yield (upd)).commit.mapError(_ => new ServiceAlreadyStartedException(evt))

Testing #

Testing is done with zio-test and is fairly straight forward. The tests provide the live service via ZLayer and then use the interface methods to call the service and verify the result with assertions.

For example, the test to verify that a successful service completion is implemented as follows:

38
39
40
41
42
43
44
45
46
47
48
49
  private val recordComplete = testM("record a service completion correctly") {
    for {
      tracker <- ZIO.service[ServiceMetrics.Service]
      s       <- start
      c       <- completed(s)
      old     <- tracker.summaries
      e       <- ZIO.effectTotal(old.getOrElse(s.summarizeId, ServiceTrackingEntry(s.summarizeId)))
      _       <- tracker.start(s)
      _       <- tracker.complete(c)
      sum     <- tracker.summaries
    } yield assert(sum(s.summarizeId).success.count)(isGreaterThanEqualTo(e.success.count))
  }