How To Obtain ActiveMQ Queue Sizes using Python and Stomp
Wednesday, November 14th, 2012This post outlines how to use Python and its Stomp client to obtain the queue length of an ActiveMQ queue.
This unit test test cases quickly illustrates how to use ActiveMQ Statistics plugin. The basic use case for doing this is:
- Enable the ActiveMQ Statistics Plugin on the ActiveMQ instance
you are interacting with. - Send an empty message to ActiveMQ.Statistics.Destination.QueueName with a JMSReplyTo header specifying the queue to return the stats on.
- Pop the return message off the return Queue to obtain your stats.
To enable the ActiveMQ Statistics Plugin add this to your ActiveMQ configuration.
<broker>
<plugins>
<statisticsBrokerPlugin/>
</plugins>
</broker>
The Unit Test
#!/usr/bin/python
import unittest
import stomp
import time
class QueueListener(object):
"""A queue listener for objects."""
def __init__(self, handler):
self.handler = handler
def on_error(self, headers, message):
print "Received an error fetching message: %s" % message
def on_message(self, headers, message):
try:
self.handler.handleMessage( headers, message )
except Exception as e:
import traceback
print e
print traceback.format_exc()
pass
class testActivemqStats(unittest.TestCase):
def setUp(self):
pass
def getConn(self):
conn = stomp.Connection( host_and_ports= [ ( "127.0.0.1", 61613) ] )
conn.start()
conn.connect()
return conn
def handleMessage(self, headers, msg):
print "Message: %s ; %s" % ( str(headers), str(msg) )
def testSomething(self):
sendConn = self.getConn()
recvConn = self.getConn()
statQueue="/queue/ActiveMQ.Statistics.Destination.%s" % '/uruk/parseQ'
statRecvQueue="/client/stats"
# The ActiveMQ Stats Plugin returns the stats as an ObjectMessage which needs to be transformed
# to JSON or XML to be useful to the Python STOMP client
recvConn.set_listener('MyActiveMQStatListener', QueueListener(self))
recvConn.subscribe(destination=statRecvQueue, ack='auto', transformation='jms-object-json')
# The ActiveMQ Stats Plugin only respondes to messages with a JMSReplyTo header.
print "Sending a request for stats to: %s" % statQueue
sendConn.send( "", destination=statQueue, headers={'reply-to': statRecvQueue, 'JMSReplyTo': statRecvQueue} )
while True:
import time
time.sleep(60)
if __name__ == '__main__':
unittest.main()
The output from the Unit Test should look like this:
{
'expires': '0',
'timestamp': '0',
'destination': '/queue//client/stats',
'priority': '0',
'message-id': 'ID:elric-59664-1352862988577-2:1:0:0:10',
'type': 'Advisory',
'transformation': 'jms-object-json'
}; {
"map": {
"entry": [{
"string": "memoryUsage",
"long": 0
}, {
"string": "dequeueCount",
"long": 0
}, {
"string": "inflightCount",
"long": 0
}, {
"string": "messagesCached",
"long": 0
}, {
"string": "averageEnqueueTime",
"double": 0
}, {
"string": ["destinationName", "queue:\/\/\/uruk\/parseQ"]
}, {
"string": "size",
"long": 0
}, {
"string": "memoryPercentUsage",
"int": 0
}, {
"string": "producerCount",
"long": 0
}, {
"string": "consumerCount",
"long": 0
}, {
"string": "minEnqueueTime",
"double": 0
}, {
"string": "expiredCount",
"long": 0
}, {
"string": "dispatchCount",
"long": 0
}, {
"string": "maxEnqueueTime",
"double": 0
}, {
"string": "enqueueCount",
"long": 0
}, {
"string": "memoryLimit",
"long": 1048576
}]
}
}
Happy Queueing