728x90
반응형
RabbitMQ producer, consumer 구현
rabbit mq process
"""Message Queue."""
import logging
import pika
class MessageQueue:
"""
Message Queue.
mq
"""
LOGGER = logging.getLogger(__name__)
def __init__(self, name: str, url: str) -> None:
"""
Init.
name str: queue name
url str: radditMQ url with id, password
"amqp://relay:relay1234@localhost/vh1"
"""
self.key = name
self.url = url
self.conn = None
self.channel = None
self.queue_declare = None
self.connect()
def connect(self) -> None:
"""Connect to RadditMQ."""
parameters = pika.URLParameters(self.url)
self.conn = pika.BlockingConnection(parameters)
self.channel = self.conn.channel()
self.queue_declare = self.channel.queue_declare(queue=self.key)
def size(self) -> int: # 큐 크기 확인
"""
Check queue size.
:returns int: message count
"""
if self.channel.is_closed:
self.connect()
self.close()
return self.queue_declare.method.message_count
def is_empty(self) -> bool: # 비어있는 큐인지 확인
"""
Check empty.
True if is empty else False
Returns bool: True or Falsse
"""
return self.size() == 0
def put(self, element: str) -> None: # 데이터 넣기
"""
Put element to queue.
:param element str: inpu text
"""
if self.channel.is_closed:
self.connect()
self.channel.basic_publish(
exchange="", routing_key=self.key, body=element)
self.close()
def get(self) -> (int, str):
"""
Get text from queue.
:return
str: text
int: queue_count
"""
if self.channel.is_closed:
self.connect()
rtn = self.channel.basic_get(queue=self.key, auto_ack=True)
self.close()
self.LOGGER.debug(rtn)
_get_ok = 0
_return_text = 2
if rtn[_get_ok]:
queue_count = rtn[_get_ok].message_count
text = rtn[_return_text]
else:
queue_count = -1
text = ""
return queue_count, text
def close(self) -> None:
"""Close connection to queue."""
if self.conn:
self.conn.close()
test code
"""Test message queue."""
import json
import unittest
from multiprocessing import Process
from utils.message_queue import MessageQueue
class MQUnitTest(unittest.TestCase):
"""Message Queue Unit test."""
def setUp(self) -> None:
"""
Init name, url.
"""
self.queue_name = "test"
self.url = "amqp://relay:relay1234@10.211.55.15/vh1"
def test_01_put(self):
"""
Test put input text.
"""
loop = 8
message_queue = MessageQueue(self.queue_name, self.url)
before_size = message_queue.size()
texts = "<br>".join(
[f"good morning. how have you been?::{i}" for i in range(500)])
for i in range(loop):
dict_input = {
"keyword": "hahahahahahahahahahahahahaah:", "id": str(i)}
"keyword": texts, "id": str(i)}
text = json.dumps(dict_input)
message_queue.put(text)
after_size = message_queue.size()
# message_queue.close()
self.assertEqual(after_size, before_size + loop)
def test_02_get(self):
"""
Test get text by a few client.
"""
loop = 5
for i in range(loop):
thread = Process(target=self.get_process, args=(i,))
thread.start()
def get_process(self, i):
"""
Get process.
:param i: thread count
"""
loop = 2
message_queue = MessageQueue(self.queue_name, self.url)
for _ in range(loop):
count, text = message_queue.get()
print("%s: c:%s, t:%s" % (i, count, text))
print("%s" % json.loads(text))
print(f"{i}: c:{count}, t:{text}")
if text:
print(f"{json.loads(text)}")
if __name__ == '__main__':
unittest.main()
728x90
728x90
BIG
'Programming > Python' 카테고리의 다른 글
Array(list)의 원소 개수 카운팅 (0) | 2020.06.27 |
---|---|
string을 원하는 width로 자르고 싶을때 (0) | 2020.06.26 |
python 에서 redis를 메시지 queue로 사용 (0) | 2020.06.19 |
time 명령어를 이용한 간단한 처리 시간 측정 (0) | 2020.06.17 |
파이썬 정규식(regular expression:regex) 사용 - 일부만 추출 (0) | 2020.06.15 |
댓글