Archive for November, 2012

How To Obtain ActiveMQ Queue Sizes using Python and Stomp

Wednesday, November 14th, 2012

This post outlines how to use Python and its Stomp client to obtain the queue length of an ActiveMQ queue.

This unit test test cases quickly illustrates how to use ActiveMQ Statistics plugin. The basic use case for doing this is:

  • Enable the ActiveMQ Statistics Plugin on the ActiveMQ instance
    you are interacting with.
  • Send an empty message to ActiveMQ.Statistics.Destination.QueueName with a JMSReplyTo header specifying the queue to return the stats on.
  • Pop the return message off the return Queue to obtain your stats.

To enable the ActiveMQ Statistics Plugin add this to your ActiveMQ configuration.


The Unit Test


import unittest
import stomp
import time

class QueueListener(object):
"""A queue listener for objects."""

def __init__(self, handler):
self.handler = handler

def on_error(self, headers, message):
print "Received an error fetching message: %s" % message

def on_message(self, headers, message):
self.handler.handleMessage( headers, message )
except Exception as e:
import traceback
print e
print traceback.format_exc()

class testActivemqStats(unittest.TestCase):

def setUp(self):

def getConn(self):
conn = stomp.Connection( host_and_ports= [ ( "", 61613) ] )

return conn

def handleMessage(self, headers, msg):
print "Message: %s ; %s" % ( str(headers), str(msg) )

def testSomething(self):
sendConn = self.getConn()
recvConn = self.getConn()

statQueue="/queue/ActiveMQ.Statistics.Destination.%s" % '/uruk/parseQ'

# The ActiveMQ Stats Plugin returns the stats as an ObjectMessage which needs to be transformed
# to JSON or XML to be useful to the Python STOMP client
recvConn.set_listener('MyActiveMQStatListener', QueueListener(self))
recvConn.subscribe(destination=statRecvQueue, ack='auto', transformation='jms-object-json')

# The ActiveMQ Stats Plugin only respondes to messages with a JMSReplyTo header.
print "Sending a request for stats to: %s" % statQueue
sendConn.send( "", destination=statQueue, headers={'reply-to': statRecvQueue, 'JMSReplyTo': statRecvQueue} )

while True:
import time

if __name__ == '__main__':

The output from the Unit Test should look like this:

'expires': '0',
'timestamp': '0',
'destination': '/queue//client/stats',
'priority': '0',
'message-id': 'ID:elric-59664-1352862988577-2:1:0:0:10',
'type': 'Advisory',
'transformation': 'jms-object-json'
}; {
"map": {
"entry": [{
"string": "memoryUsage",
"long": 0
}, {
"string": "dequeueCount",
"long": 0
}, {
"string": "inflightCount",
"long": 0
}, {
"string": "messagesCached",
"long": 0
}, {
"string": "averageEnqueueTime",
"double": 0
}, {
"string": ["destinationName", "queue:\/\/\/uruk\/parseQ"]
}, {
"string": "size",
"long": 0
}, {
"string": "memoryPercentUsage",
"int": 0
}, {
"string": "producerCount",
"long": 0
}, {
"string": "consumerCount",
"long": 0
}, {
"string": "minEnqueueTime",
"double": 0
}, {
"string": "expiredCount",
"long": 0
}, {
"string": "dispatchCount",
"long": 0
}, {
"string": "maxEnqueueTime",
"double": 0
}, {
"string": "enqueueCount",
"long": 0
}, {
"string": "memoryLimit",
"long": 1048576

Happy Queueing