본문 바로가기
Programming/Python

python 에서 redis를 메시지 queue로 사용

by Chan_찬 2020. 6. 19.
728x90
반응형

파이썬에서 redis를 message queue로 구현

redis class - redis_queue.py

"""RedisQueue class"""
import redis


class RedisQueue(object):
    """
        Redis Lists are an ordered list, First In First Out Queue
        Redis List pushing new elements on the head (on the left) of the list.
        The max length of a list is 4,294,967,295
    """

    def __init__(self, name, max_size, **redis_kwargs):
        self.key = name
        self.max_size = max_size
        self.rq = redis.Redis(**redis_kwargs)

    def size(self):  # 큐 크기 확인
        return self.rq.llen(self.key)

    def is_empty(self):  # 비어있는 큐인지 확인
        return self.size() == 0

    def put(self, element):  # 데이터 넣기
        return self.rq.lpush(self.key, element)  # left push

    def put_and_trim(self, element):  # 데이터 넣기
        queue_count = self.rq.lpush(self.key, element)  # left push
        self.rq.ltrim(self.key, 0, self.max_size - 1)  # 최대크기를 초과한 경우 자르기
        return queue_count

    def get(self, is_blocking=False, timeout=None):  # 데이터 꺼내기
        if is_blocking:
            element = self.rq.brpop(self.key, timeout=timeout)  # blocking right pop
            element = element[1]  # key[0], value[1]
        else:
            element = self.rq.rpop(self.key)  # right pop
        return element

    def get_without_pop(self):  # 꺼낼 데이터 조회
        if self.is_empty():
            return None
        element = self.rq.lindex(self.key, -1)
        return element

redis process - redis_process.py

"""redis process"""
import json
import time

from redis_queue import RedisQueue

max_data = 100000
host = 'localhost'
port = 6379


def put_raw_data_to_queue(_data):
    if type(_data) is dict:
        raw_data = json.dumps(_data)
    else:
        raw_data = _data
    q = RedisQueue('raw-data', max_data, host=host, port=port, db=0)
    q_count = q.put(raw_data)
    return q_count


def get_translated_data_from_queue():
    # q = RedisQueue('translated-data', max_data, host=host, port=port, db=0)
    q = RedisQueue('raw-data', max_data, host=host, port=port, db=0)
    terminate_count = int(0.5 * 60 * 5)
    while True:
        if not q.is_empty():
            translated_data = q.get()
            return translated_data
        time.sleep(0.2)
        terminate_count -= 1
        if terminate_count < 1:
            break


def get_json_data_from_queue():
    translated_data = get_translated_data_from_queue()
    if type(translated_data) is bytes:
        return json.loads(translated_data)
    return translated_data

test code - redis_tests.py

"""test redis queue"""
# -*- coding: utf-8 -*-
from __future__ import unicode_literals

import unittest

import redis_process


class RedisUnitTest(unittest.TestCase):
    def setUp(self) -> None:
        self.max_data = 100000
        self.host = 'localhost'
        self.port = 6379
        self.data = {'id': 1234_0, 'url': 'google.com', 'lang': 'ko', 'keywords': 'hahaha\test'}

    def test_01_put_dict_to_queue(self):
        for _ in range(2):
            result = redis_process.put_raw_data_to_queue(self.data)
            print(result)

    def test_02_get_dict_from_queue(self):
        for _ in range(3):
            result = redis_process.get_json_data_from_queue()
            print(result)
            print(type(result))
728x90
728x90
BIG
Buy me a coffeeBuy me a coffee

댓글