Archive for the 'Python' Category

How To Obtain ActiveMQ Queue Sizes using Python and Stomp

Wednesday, November 14th, 2012

This post outlines how to use Python and its Stomp client to obtain the queue length of an ActiveMQ queue.

This unit test test cases quickly illustrates how to use ActiveMQ Statistics plugin. The basic use case for doing this is:

  • Enable the ActiveMQ Statistics Plugin on the ActiveMQ instance
    you are interacting with.
  • Send an empty message to ActiveMQ.Statistics.Destination.QueueName with a JMSReplyTo header specifying the queue to return the stats on.
  • Pop the return message off the return Queue to obtain your stats.

To enable the ActiveMQ Statistics Plugin add this to your ActiveMQ configuration.

<broker>
<plugins>
<statisticsBrokerPlugin/>
</plugins>
</broker>

The Unit Test

#!/usr/bin/python

import unittest
import stomp
import time

class QueueListener(object):
"""A queue listener for objects."""

def __init__(self, handler):
self.handler = handler

def on_error(self, headers, message):
print "Received an error fetching message: %s" % message

def on_message(self, headers, message):
try:
self.handler.handleMessage( headers, message )
except Exception as e:
import traceback
print e
print traceback.format_exc()
pass

class testActivemqStats(unittest.TestCase):

def setUp(self):
pass

def getConn(self):
conn = stomp.Connection( host_and_ports= [ ( "127.0.0.1", 61613) ] )
conn.start()
conn.connect()

return conn

def handleMessage(self, headers, msg):
print "Message: %s ; %s" % ( str(headers), str(msg) )

def testSomething(self):
sendConn = self.getConn()
recvConn = self.getConn()

statQueue="/queue/ActiveMQ.Statistics.Destination.%s" % '/uruk/parseQ'
statRecvQueue="/client/stats"

# The ActiveMQ Stats Plugin returns the stats as an ObjectMessage which needs to be transformed
# to JSON or XML to be useful to the Python STOMP client
recvConn.set_listener('MyActiveMQStatListener', QueueListener(self))
recvConn.subscribe(destination=statRecvQueue, ack='auto', transformation='jms-object-json')

# The ActiveMQ Stats Plugin only respondes to messages with a JMSReplyTo header.
print "Sending a request for stats to: %s" % statQueue
sendConn.send( "", destination=statQueue, headers={'reply-to': statRecvQueue, 'JMSReplyTo': statRecvQueue} )

while True:
import time
time.sleep(60)

if __name__ == '__main__':
unittest.main()

The output from the Unit Test should look like this:

{
'expires': '0',
'timestamp': '0',
'destination': '/queue//client/stats',
'priority': '0',
'message-id': 'ID:elric-59664-1352862988577-2:1:0:0:10',
'type': 'Advisory',
'transformation': 'jms-object-json'
}; {
"map": {
"entry": [{
"string": "memoryUsage",
"long": 0
}, {
"string": "dequeueCount",
"long": 0
}, {
"string": "inflightCount",
"long": 0
}, {
"string": "messagesCached",
"long": 0
}, {
"string": "averageEnqueueTime",
"double": 0
}, {
"string": ["destinationName", "queue:\/\/\/uruk\/parseQ"]
}, {
"string": "size",
"long": 0
}, {
"string": "memoryPercentUsage",
"int": 0
}, {
"string": "producerCount",
"long": 0
}, {
"string": "consumerCount",
"long": 0
}, {
"string": "minEnqueueTime",
"double": 0
}, {
"string": "expiredCount",
"long": 0
}, {
"string": "dispatchCount",
"long": 0
}, {
"string": "maxEnqueueTime",
"double": 0
}, {
"string": "enqueueCount",
"long": 0
}, {
"string": "memoryLimit",
"long": 1048576
}]
}
}

Happy Queueing