Python 实现事件监听
缘起
最近的项目需要用到事件监听机制,Google 了下,Python 似乎没有专门用于这方面的包。参考大牛发的 Snippet 写了个事件监听类,方便以后调用。
事件类
首先需要有自己的事件类,定义如下两个 Demo:
class Event1:
"""
Event 1
"""
value = "this is a event, <Event 1>"
class Event2:
"""
Event 2
"""
value = "this is a event, <Event 2>"
事件管理、分发
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
"""
Event Manager,负责任务的分发
"""
from queue import Queue, Empty
from multiprocessing.dummy import Pool
class EventManager:
"""
事件分发
"""
def __init__(self):
"""
初始化
"""
self.__event_handler = {}
self.__pool = Pool(5)
def add_event_handler(self, event_type, handler):
"""
注册 event handler
:param event_type: 事件类型
:type event_type: object
:param handler: Event Handler
:type handler: function
"""
self.__event_handler.setdefault(event_type, []).append(handler)
def dispatch_event(self, event):
"""
分发事件
:param event: 事件
:type event: Event
:return: 是否分发成功
:rtype: bool
"""
try:
handlers = self.__event_handler.get(event.__class__, [])
for handler in handlers:
self.__pool.apply_async(
func=handler,
args=(event,),
callback=self.__handler_callback
)
return True
except:
return False
def __handler_callback(self, result):
"""
进程回调
:param result: 返回值
:type result: str
"""
print(result)
由上而下,类中共有 4 个方法。
init
self.__event_handler
: 用于注册 handlerself.__pool
: 以线程池的形式进行任务调度
add_event_handler
注册 event-handler,将 handler 保存至 self.__event_handler
中,以供后期分发事件。
dispatch_event
分发事件,做法是根据事件类型(event.__class__
)从 self.__event_handler
中获取其 handler
,依次将 handler(event)
添加至线程池中,callback
用于处理 handler
的返回结果,如不需要可以自行去掉。
__handler_callback
用于处理 handler
的返回结果,如不需要可以自行去掉。
测试
def handler1(event):
"""
处理 Event1 的 handler
"""
return "I'm handler1, monitored an event. Its value: {value}".format(value=event.value)
def handler2(event):
"""
处理 Event2 的 handler
"""
return "I'm handler2, monitored an event. Its value: {value}".format(value=event.value)
# 实例化 EventManager
em = EventManager()
# 注册 handler
em.add_event_handler(Event1, handler1)
em.add_event_handler(Event2, handler2)
# 分发事件
e1 = Event1()
e2 = Event2()
em.dispatch_event(e1)
em.dispatch_event(e2)