前言

众所周知,celery是一个基于python开发的任务消息队列,轻松实现任务的异步处理,如果对celery不了解,请观阅之前一篇文章Celery

  • 关于celery,它的底层也不是很难,假如,我们在一个需要celery的场景下,例如,我们发送邮件,使用celery是不是有些过重了。这时候,与其调用celery,还不如自己动手去实现一下,这样既轻量,又好用
  • 我们需要用到的东西,第一就是多线程,因为我们要进行异步操作,使用多线程模拟是最为合理的,然后就是redis,当然了,用List也是可以的,但是我们为了能够贴近真实操作,使用redis中的列表模拟

实现celery

  • 要知道,celery的本质,是队列,所以,我们手动写一个队列

    class MyQueue:
    
        def __init__(self,kename:str,**redis_kwargs):
            # 链接redis   decode_response的作用是,字符串不会转换成bytes
            self.__db = redis.Redis(**redis_kwargs,decode_responses=True)
            self.key = kename
            self.queue = []
        # 添加数据
        def push(self,x:str) -> None:
            self.__db.rpush(self.key,x)
        # 删除数据
        def pop(self) -> int:
            return self.queue.pop(0)
        # 获取数据
        def peek(self) -> str:
            return self.__db.lpop(self.key)
        # 判断是否执行完毕
        def empty(self)->bool:
            return self.__db.llen(self.key)
  • 实例化

    task_queue = MyQueue('myqueue')
  • 我们就以模拟发送邮件为生产者

    result = ['1111111@qq.com','22222@qq.com','3333333@qq.com','4444444444@qq.com','55555555555@qq.com']
    for i in result:
        if i[0]:
            task_queue.push(i[0])
  • 然后,我们使用多线程来模拟消费者

    def task():
        # 如果队列空了,证明任务完成
        while task_queue.empty() != 0:
            # 取出任务,模拟消费
            print(task_queue.peek())
            time.sleep(1)
        return '执行完毕'
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=task)
        t2 = threading.Thread(target=task)
        t1.start()
        t2.start()
        t1.join()
        t2.join()

    需要几个消费者,就可以开启多个线程,当然了,也可以通过继承threading.Thread创建新的子类,实例化后调用start方法启动新线程,即它调用了线程的run()方法

    class Thread_test(threading.Thread):
    
        def __init__(self,queue_task):
            threading.Thread.__init__(self)
            self.queue = queue_task
    
        def run(self) -> None:
            print(self.queue)
    
    if __name__ == '__main__':
        # 队列为空,即为停止
        while task_queue.empty() != 0:
            # 创建线程
            for i in range(2):
                a = Thread_test(task_queue.peek())
                a.start()
            time.sleep(1)
  • 如此,我们便简单的实现了celery的底层


总结,任何名字听起来高大上的工具,不要因为它的触不可及而放弃,我们也可以用很简单的方式实现它们该有的功能。