前言
众所周知,celery是一个基于python开发的任务消息队列,轻松实现任务的异步处理,如果对celery不了解,请观阅之前一篇文章Celery
- 关于celery,它的底层也不是很难,假如,我们在一个需要celery的场景下,例如,我们发送邮件,使用celery是不是有些过重了。这时候,与其调用celery,还不如自己动手去实现一下,这样既轻量,又好用
- 我们需要用到的东西,第一就是多线程,因为我们要进行异步操作,使用多线程模拟是最为合理的,然后就是redis,当然了,用List也是可以的,但是我们为了能够贴近真实操作,使用redis中的列表模拟
实现celery
要知道,celery的本质,是队列,所以,我们手动写一个队列
class MyQueue: def __init__(self,kename:str,**redis_kwargs): # 链接redis decode_response的作用是,字符串不会转换成bytes self.__db = redis.Redis(**redis_kwargs,decode_responses=True) self.key = kename self.queue = [] # 添加数据 def push(self,x:str) -> None: self.__db.rpush(self.key,x) # 删除数据 def pop(self) -> int: return self.queue.pop(0) # 获取数据 def peek(self) -> str: return self.__db.lpop(self.key) # 判断是否执行完毕 def empty(self)->bool: return self.__db.llen(self.key)
实例化
task_queue = MyQueue('myqueue')
我们就以模拟发送邮件为生产者
result = ['1111111@qq.com','22222@qq.com','3333333@qq.com','4444444444@qq.com','55555555555@qq.com'] for i in result: if i[0]: task_queue.push(i[0])
然后,我们使用多线程来模拟消费者
def task(): # 如果队列空了,证明任务完成 while task_queue.empty() != 0: # 取出任务,模拟消费 print(task_queue.peek()) time.sleep(1) return '执行完毕' if __name__ == '__main__': t1 = threading.Thread(target=task) t2 = threading.Thread(target=task) t1.start() t2.start() t1.join() t2.join()
需要几个消费者,就可以开启多个线程,当然了,也可以通过继承threading.Thread创建新的子类,实例化后调用start方法启动新线程,即它调用了线程的
run()
方法class Thread_test(threading.Thread): def __init__(self,queue_task): threading.Thread.__init__(self) self.queue = queue_task def run(self) -> None: print(self.queue) if __name__ == '__main__': # 队列为空,即为停止 while task_queue.empty() != 0: # 创建线程 for i in range(2): a = Thread_test(task_queue.peek()) a.start() time.sleep(1)
如此,我们便简单的实现了celery的底层
总结,任何名字听起来高大上的工具,不要因为它的触不可及而放弃,我们也可以用很简单的方式实现它们该有的功能。
- Post link: https://www.godhearing.cn/shou-xie-celery/
- Copyright Notice: All articles in this blog are licensed under unless otherwise stated.