728x90
반응형
파이썬에서 redis를 message queue로 구현
redis class - redis_queue.py
"""RedisQueue class"""
import redis
class RedisQueue(object):
"""
Redis Lists are an ordered list, First In First Out Queue
Redis List pushing new elements on the head (on the left) of the list.
The max length of a list is 4,294,967,295
"""
def __init__(self, name, max_size, **redis_kwargs):
self.key = name
self.max_size = max_size
self.rq = redis.Redis(**redis_kwargs)
def size(self): # 큐 크기 확인
return self.rq.llen(self.key)
def is_empty(self): # 비어있는 큐인지 확인
return self.size() == 0
def put(self, element): # 데이터 넣기
return self.rq.lpush(self.key, element) # left push
def put_and_trim(self, element): # 데이터 넣기
queue_count = self.rq.lpush(self.key, element) # left push
self.rq.ltrim(self.key, 0, self.max_size - 1) # 최대크기를 초과한 경우 자르기
return queue_count
def get(self, is_blocking=False, timeout=None): # 데이터 꺼내기
if is_blocking:
element = self.rq.brpop(self.key, timeout=timeout) # blocking right pop
element = element[1] # key[0], value[1]
else:
element = self.rq.rpop(self.key) # right pop
return element
def get_without_pop(self): # 꺼낼 데이터 조회
if self.is_empty():
return None
element = self.rq.lindex(self.key, -1)
return element
redis process - redis_process.py
"""redis process"""
import json
import time
from redis_queue import RedisQueue
max_data = 100000
host = 'localhost'
port = 6379
def put_raw_data_to_queue(_data):
if type(_data) is dict:
raw_data = json.dumps(_data)
else:
raw_data = _data
q = RedisQueue('raw-data', max_data, host=host, port=port, db=0)
q_count = q.put(raw_data)
return q_count
def get_translated_data_from_queue():
# q = RedisQueue('translated-data', max_data, host=host, port=port, db=0)
q = RedisQueue('raw-data', max_data, host=host, port=port, db=0)
terminate_count = int(0.5 * 60 * 5)
while True:
if not q.is_empty():
translated_data = q.get()
return translated_data
time.sleep(0.2)
terminate_count -= 1
if terminate_count < 1:
break
def get_json_data_from_queue():
translated_data = get_translated_data_from_queue()
if type(translated_data) is bytes:
return json.loads(translated_data)
return translated_data
test code - redis_tests.py
"""test redis queue"""
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import unittest
import redis_process
class RedisUnitTest(unittest.TestCase):
def setUp(self) -> None:
self.max_data = 100000
self.host = 'localhost'
self.port = 6379
self.data = {'id': 1234_0, 'url': 'google.com', 'lang': 'ko', 'keywords': 'hahaha\test'}
def test_01_put_dict_to_queue(self):
for _ in range(2):
result = redis_process.put_raw_data_to_queue(self.data)
print(result)
def test_02_get_dict_from_queue(self):
for _ in range(3):
result = redis_process.get_json_data_from_queue()
print(result)
print(type(result))
728x90
728x90
BIG
'Programming > Python' 카테고리의 다른 글
string을 원하는 width로 자르고 싶을때 (0) | 2020.06.26 |
---|---|
RabbitMQ - python (0) | 2020.06.25 |
time 명령어를 이용한 간단한 처리 시간 측정 (0) | 2020.06.17 |
파이썬 정규식(regular expression:regex) 사용 - 일부만 추출 (0) | 2020.06.15 |
python code reformatter black + pycharm (0) | 2020.06.13 |
댓글