본문 바로가기
Programming/Python

RabbitMQ - python

by Chan_찬 2020. 6. 25.
728x90
반응형

RabbitMQ

RabbitMQ producer, consumer 구현

rabbit mq process

"""Message Queue."""
import logging

import pika


class MessageQueue:
    """
    Message Queue.

    mq
    """

    LOGGER = logging.getLogger(__name__)

    def __init__(self, name: str, url: str) -> None:
        """
        Init.

        name str: queue name
        url str: radditMQ url with id, password
            "amqp://relay:relay1234@localhost/vh1"
        """
        self.key = name
        self.url = url
        self.conn = None
        self.channel = None
        self.queue_declare = None
        self.connect()

    def connect(self) -> None:
        """Connect to RadditMQ."""
        parameters = pika.URLParameters(self.url)
        self.conn = pika.BlockingConnection(parameters)
        self.channel = self.conn.channel()
        self.queue_declare = self.channel.queue_declare(queue=self.key)

    def size(self) -> int:  # 큐 크기 확인
        """
        Check queue size.

        :returns int: message count
        """
        if self.channel.is_closed:
            self.connect()
        self.close()
        return self.queue_declare.method.message_count

    def is_empty(self) -> bool:  # 비어있는 큐인지 확인
        """
        Check empty.

        True if is empty else False
        Returns bool: True or Falsse
        """
        return self.size() == 0

    def put(self, element: str) -> None:  # 데이터 넣기
        """
        Put element to queue.

        :param element str: inpu text
        """
        if self.channel.is_closed:
            self.connect()

        self.channel.basic_publish(
            exchange="", routing_key=self.key, body=element)
        self.close()

    def get(self) -> (int, str):
        """
        Get text from queue.

        :return
            str: text
            int: queue_count
        """
        if self.channel.is_closed:
            self.connect()

        rtn = self.channel.basic_get(queue=self.key, auto_ack=True)
        self.close()
        self.LOGGER.debug(rtn)

        _get_ok = 0
        _return_text = 2
        if rtn[_get_ok]:
            queue_count = rtn[_get_ok].message_count
            text = rtn[_return_text]
        else:
            queue_count = -1
            text = ""
        return queue_count, text

    def close(self) -> None:
        """Close connection to queue."""
        if self.conn:
            self.conn.close()

test code

"""Test message queue."""
import json
import unittest
from multiprocessing import Process

from utils.message_queue import MessageQueue


class MQUnitTest(unittest.TestCase):
    """Message Queue Unit test."""

    def setUp(self) -> None:
        """
        Init name, url.
        """
        self.queue_name = "test"
        self.url = "amqp://relay:relay1234@10.211.55.15/vh1"

    def test_01_put(self):
        """
        Test put input text.
        """
        loop = 8
        message_queue = MessageQueue(self.queue_name, self.url)
        before_size = message_queue.size()
        texts = "<br>".join(
            [f"good morning.  how have you been?::{i}" for i in range(500)])
        for i in range(loop):
            dict_input = {
                "keyword": "hahahahahahahahahahahahahaah:", "id": str(i)}
                "keyword": texts, "id": str(i)}
            text = json.dumps(dict_input)
            message_queue.put(text)
        after_size = message_queue.size()
        # message_queue.close()
        self.assertEqual(after_size, before_size + loop)

    def test_02_get(self):
        """
        Test get text by a few client.
        """
        loop = 5
        for i in range(loop):
            thread = Process(target=self.get_process, args=(i,))
            thread.start()

    def get_process(self, i):
        """
        Get process.

        :param i: thread count
        """
        loop = 2
        message_queue = MessageQueue(self.queue_name, self.url)
        for _ in range(loop):
            count, text = message_queue.get()
            print("%s: c:%s, t:%s" % (i, count, text))
            print("%s" % json.loads(text))
            print(f"{i}: c:{count}, t:{text}")
            if text:
                print(f"{json.loads(text)}")


if __name__ == '__main__':
    unittest.main()
728x90
728x90
BIG
Buy me a coffeeBuy me a coffee

댓글