Fork me on Github Edit me on Github
M Bean Publisher

MBean Publisher #

The MBean publisher is used to publish arbitrary case classes as DynamicMBeans via JMX. A generic mapper will examine the structure of the given case class instance and recursively map all attributes to corresponding attributes within th MBean.

As such the interface definition for the publishing service is:

77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
  trait Service {

    /**
     * Retrieve the list of object names that are currently registered by this service.
     */
    def managedNames: ZIO[Any, Nothing, List[String]]

    /**
     * Create or update the MBean within JMX with the <code>DynamicMBean</code> representation of the given case class instance.
     */
    def updateMBean[T <: Product](v: T)(implicit f: T => Nameable[T]): ZIO[Any, MBeanPublishException, Unit]

    /**
     * Remove the registration from JMX for the object name derived from the given case class.
     */
    def removeMBean[T <: Product](v: T)(implicit f: T => Nameable[T]): ZIO[Any, MBeanPublishException, Unit]
  }

Using the MBean publisher #

The easiest way to use the MBeanPublisher is to make it available through a layer like it is done within the tests:

20
21
22
23
24
  private val logSlf4j = Slf4jLogger.make((_, message) => message)

  private val jmxLayer
    : ZLayer[Any, Nothing, ProductMBeanPublisher.ProductMBeanPublisher with MBeanServerFacade.MBeanServerFacade] =
    (logSlf4j >>> ProductMBeanPublisher.live) ++ (logSlf4j >>> MBeanServerFacade.live)

Then the MBeanPublisher can be used by simply passing a case class to updateMBean. For now the case class also needs to implement Nameable so that the proper ObjectName can be calculated.

37
38
39
40
41
42
43
44
45
46
47
48
  private val simplePublish = testM("publish a simple case class")(for {
    pub  <- ZIO.service[ProductMBeanPublisher.Service]
    fac  <- ZIO.service[MBeanServerFacade.Service]
    cc   <- ZIO.succeed(Simple("test1", 0, "Hello Jmx"))
    info <- pub.updateMBean(cc) >>> fac.mbeanInfo(objectName(cc))
  } yield {
    val keys = info.attributes.value.keys.toList
    assert(keys)(contains("counter")) &&
    assert(keys)(contains("message")) &&
    assert(keys)(hasSize(equalTo(3))) &&
    assert(info.attributes.value("counter").value.asInstanceOf[Int])(equalTo(0))
  })

In the test case we are also using the MBeanServerFacade to verify that the MBean has been published correctly and has the correct values.

The implementation keeps track of all instances that have been published. Only the first call to publish will actually register the MBean while subsequent calls will only update the underlying value. The Service makes sure that updates to MBeans are only allowed for MBeans that are based on the same Class.

Implementation details #

The service implementation keeps track of the published values in a TMap with the object name as key and the DynamicMBean wrapper around the case class.

To manipulate the TMap we use some helper methods to either create or update an entry within the TMap:

170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
  private def updateMBean[T <: Product](old: OpenProductMBean, bean: T): STM[Throwable, Unit] =
    STM.ifM(STM.succeed(old.beanClass.equals(bean.getClass)))(
      STM.fromTry(Try {
        val mapped = mapper.mapProduct(bean)
        old.update(bean.getClass, mapped).get
      }),
      STM.fail(new IncompatibleJmxUpdateException(bean.getClass, old.beanClass))
    )

  private def createMBean[T <: Product](bean: T)(implicit f: T => Nameable[T]): STM[Throwable, Unit] =
    STM
      .fromTry(Try {
        val on: ObjectName = new ObjectName(objectName(bean).objectName)

        try svr.unregisterMBean(on)
        catch {
          case _: InstanceNotFoundException => // swallow that exception
        }

        val mapped = mapper.mapProduct(bean)
        val b      = new OpenProductMBean(bean.getClass, mapped)
        svr.registerMBean(b, new ObjectName(objectName(bean).objectName))
        b
      })
      .flatMap { b =>
        self.beans.put(objectName(bean).objectName, b)
      }

  private def createOrUpdate[T <: Product](bean: T)(implicit f: T => Nameable[T]): STM[Throwable, Unit] =
    self.beans.get(objectName(bean).objectName).flatMap {
      case Some(e) => updateMBean(e, bean)
      case None    => createMBean(bean)
    }

The implementation uses STM under the covers. It is important to note that STM code should not include side effecting code that might not be idempotent (such as appending to a file or as in our case register an MBean). The reason for that is that the STM code will be retried if any of the STM-values that are being touched by the operation is changed from another fiber.

In our case the tests were failing when run in parallel because the registration in JMX might have executed multiple times, which in turn caused a JMX exception.

For now we are simply ignoring that specific JMX exception, but a better solution might be looking at another mechanism than TMap to handle that scenario.

(Also see the discussion on Discord)

With the helper methods in place, actual service implementation methods is fairly straightforward:

142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
  def updateMBean[T <: Product](v: T)(implicit f: T => Nameable[T]): ZIO[Logging, MBeanPublishException, Unit] =
    createOrUpdate(v).commit.mapError {
      case mbe: MBeanPublishException                         => mbe
      case _: javax.management.InstanceAlreadyExistsException => new InstanceAlreadyExistsException(v)
      case t                                                  => new JmxException(t)
    } <* log.debug(s"updated MBean with name [${objectName(v)}}] to [$v]")

  def removeMBean[T <: Product](v: T)(implicit f: T => Nameable[T]): ZIO[Logging, MBeanPublishException, Unit] =
    self.beans
      .get(objectName(v).objectName)
      .flatMap {
        case Some(_) =>
          STM.fromTry(Try {
            try svr.unregisterMBean(new ObjectName(objectName(v).objectName))
            catch {
              case _: InstanceNotFoundException => // swallow that exception as it may occur in STM retries
            }
          }) >>> self.beans.delete(objectName(v).objectName)
        case None    => STM.unit
      }
      .commit
      .mapError {
        case mbe: MBeanPublishException => mbe
        case t                          => new JmxException(t)
      } <* log.debug(s"Removed MBean with name [${objectName(v)}]")