Python - 反应式编程
反应式编程是一种处理数据流和更改传播的编程范例。这意味着当一个组件发出数据流时,更改将通过响应式编程库传播到其他组件。更改的传播将继续,直到到达最终接收器。事件驱动编程和反应式编程之间的区别在于,事件驱动编程围绕事件展开,而反应式编程围绕数据展开。
ReactiveX 或 RX 用于反应式编程
ReactiveX 或 Raective Extension 是最著名的反应式编程实现。ReactiveX 的工作取决于以下两个类 -
Observable 类
此类是数据流或事件的源,它打包传入数据,以便数据可以从一个线程传递到另一个线程。在一些观察者订阅之前,它不会提供数据。
观察者类
此类使用 observable 发出的数据流。可以使用 observable 有多个观察者,每个观察者将接收发出的每个数据项。观察者可以通过订阅 observable 来接收三种类型的事件 -
- on_next() 事件 − 它意味着数据流中有一个元素。
- on_completed() 事件 − 这意味着 emission 结束,并且不会有更多项目出现。
- on_error() 事件 − 它也意味着 emission 的结束,但当 observable 抛出错误时。
RxPY – 用于反应式编程的 Python 模块
RxPY 是一个 Python 模块,可用于反应式编程。我们需要确保已安装该模块。以下命令可用于安装 RxPY 模块 -
例
下面是一个 Python 脚本,它使用 RxPY 模块及其类 Observable 和 Observe 进行反应式编程。基本上有两类 -
- get_strings() − 用于从 observer 获取字符串。
- PrintObserver() − 用于从 observer 打印字符串。它使用 observer 类的所有三个事件。它还使用 subscribe() 类。
from rx import Observable, Observer
def get_strings(observer):
observer.on_next("Ram")
observer.on_next("Mohan")
observer.on_next("Shyam")
observer.on_completed()
class PrintObserver(Observer):
def on_next(self, value):
print("Received {0}".format(value))
def on_completed(self):
print("Finished")
def on_error(self, error):
print("Error: {0}".format(error))
source = Observable.create(get_strings)
source.subscribe(PrintObserver())
输出
Received Mohan
Received Shyam
Finished
用于反应式编程的 PyFunctional 库
PyFunctional是另一个可用于反应式编程的 Python 库。它使我们能够使用 Python 编程语言创建功能式程序。它很有用,因为它允许我们使用链式函数运算符创建数据管道。
RxPY 和 PyFunctional 之间的区别
这两个库都用于反应式编程并以类似的方式处理流,但它们之间的主要区别取决于数据的处理。RxPY 处理系统中的数据和事件,而 PyFunctional 专注于使用函数式编程范式转换数据。
安装 PyFunctional 模块
我们需要在使用之前安装此模块。可以借助 pip 命令进行安装,如下所示 -
例
以下示例使用 PyFunctional 模块及其 seq 类,它们充当我们可以迭代和操作的流对象。在这个程序中,它通过使用 lamda 函数来映射序列,该函数将每个值加倍,然后过滤 x 大于 4 的值,最后将序列减少为所有剩余值的总和。
from functional import seq
result = seq(1,2,3).map(lambda x: x*2).filter(lambda x: x > 4).reduce(lambda x, y: x + y)
print ("Result: {}".format(result))
输出