18 de noviembre de 2017

Colas (RabbitMQ) - Publicando mensajes

Siguiendo con la interacción entre RabbitMQ y python usando la librería puka, en esta entrada vamos a ver como publicar mensajes.

En la anterior entrada vimos como crear un pool de threads que se conectaba a un broker de RabbitMQ, consumía mensajes y ejecutaba un workflow para cada mensaje consumido. En esta entrada veremos como crear un pool de threads que van a publicar mensajes.

En este caso lo que vamos a publicar son strings aleatorios de cierta longitud que vamos a generar con la función genrandom que acepta como parámetro la longitud con la que queramos generar los mensajes.

Estableceremos un tiempo de espera entre la publicación de mensaje que vendrá determinado por el valor de PUBLISHER_SLEEP_TIME en segundos. Este parámetro junto con el número de threads WORKER_NUMBER van a determinar el rate de publicación.

Igual que en el ejemplo de los consumidores, en caso de problemas con la conexión al broker se establece una política de reconexión con reintentos de conexión cada BROKER_RETRY_CONNECTION segundos.

De este modo el código quedaría del siguiente modo:

#!/usr/bin/env python

import puka
import time
import threading
import random
import string

BROKER_HOST = '192.168.0.169'
QUEUE = 'QUEUE-1'
BROKER_RETRY_CONNECTION = 10
WORKER_NUMBER = 4
BROKER_CONNECTION_DEFINITION = 'amqp://' + BROKER_HOST + '/'
PUBLISHER_SLEEP_TIME = 0.2

def genrandom(length):
    chain = ''
    for i in range(length+1):
        chain = chain + random.choice(string.ascii_uppercase)
    return chain

class Threaded_worker(threading.Thread):
    BROKER_CONNECTION = 0

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        threadName = threading.currentThread().getName()
        while True:
            if self.BROKER_CONNECTION == 0:
                try:
                    self.client = puka.Client(BROKER_CONNECTION_DEFINITION)
                    self.client.wait(self.client.connect())
                    self.client.wait(self.client.queue_declare(QUEUE))
                    print '[%s] Succesfully Connected at broker %s' % (threadName, BROKER_HOST)
                    self.BROKER_CONNECTION = 1
                except Exception as error:
                    print '[%s] Error at broker connection: %s.....retrying after %d seconds' % (
                    threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)
            else:
                try:
                    msgToPublish = genrandom(40)
                    self.client.wait(self.client.basic_publish(exchange ='', routing_key=QUEUE, body=msgToPublish))
                    time.sleep(PUBLISHER_SLEEP_TIME)
                    print '[%s] Message published at queue: %s' % (threadName, msgToPublish)
                except Exception as error:
                    print '[%s] Error at publisher: %s.....retrying connection after %d seconds' % (
                    threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)


print 'Starting %d publisher threads...' % (WORKER_NUMBER)
for i in range(WORKER_NUMBER):
    td = Threaded_worker()
    td.start()


Al ejecutarlo obtendremos lo siguiente:

Starting 4 publisher threads...
[Thread-1] Succesfully Connected at broker 192.168.0.169
[Thread-3] Succesfully Connected at broker 192.168.0.169
[Thread-2] Succesfully Connected at broker 192.168.0.169
[Thread-4] Succesfully Connected at broker 192.168.0.169
[Thread-1] Message publish at queue: YKHYRBRJWQLOLSREOGONKZICBVOQBXHUALGXCWSIT
[Thread-3] Message publish at queue: AXCLEQMGONKZUPNNOIFFZTLZBMAPRRNCELSWVORAU
[Thread-2] Message publish at queue: HAXNHDXBVJVUVOLTSYRTGEZMXGSWLOTIAMDWIQCQZ
[Thread-4] Message publish at queue: MDRJVZBHFHHCIPNXHFATUHEQWBQSJQWVOVTVIAEMA
[Thread-1] Message publish at queue: SBIRZMHIOGQDTQVSZKGGZICXJIEEXVLCCGFHONQIE
[Thread-3] Message publish at queue: WPLUWHCJWEBYETTDWFIYGBEERADWFVGPQTHPRJIKR
[Thread-2] Message publish at queue: JXGJBYEIUQRZRLFKUSKRPAMKSTZDZGOJIBFGKVLEM
[Thread-4] Message publish at queue: OELBGLBPYFTEBZVKGCGUTBCLQDXRCQLPLPDJDRRRM
........
Con el valor PUBLISHER_SLEEP_TIME = 0.2 deberíamos tener un rate de publicación de 5 mensajes por cada hilo, de forma que si fijamos WORKER_NUMBER = 4, al ejecutarse el código deberían publicarse 20 mensajes por segundo.

Efectivamente, si lo lanzamos y vamos a la web de administración de RabbitMQ vemos que el rate de publicación es el esperado:


No hay comentarios:

Publicar un comentario