百度收录网站方法,我想学编程,微信公众号推广赚钱,中国城乡住房建设厅网站首页这次只是一篇简短的文章#xff0c;因为我仍在尝试这种东西。 关于反应式编程有很多话题。 在Java 8中#xff0c;我们有Stream API#xff0c;我们有rxJava我们有ratpack #xff0c;Akka有akka-streams 。 这些实现的主要问题是它们不兼容。 您不能将一个实现的订阅者连… 这次只是一篇简短的文章因为我仍在尝试这种东西。 关于反应式编程有很多话题。 在Java 8中我们有Stream API我们有rxJava我们有ratpack Akka有akka-streams 。 这些实现的主要问题是它们不兼容。 您不能将一个实现的订阅者连接到另一个实现的发布者。 幸运的是一项倡议已经开始提供一种方法使这些不同的实现可以协同工作 “本规范旨在允许创建许多符合标准的实现这些实现将通过遵守规则将能够平滑地互操作并在流应用程序的整个处理图中保留上述好处和特征。” 来自– http://www.reactive-streams.org/ 这是如何运作的 现在我们该怎么做 让我们看一下基于akka-stream提供的示例的快速示例从此处开始 。 在下面的清单中 package sample.streamimport akka.actor.ActorSystem
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.{SubscriberSink, PublisherSource, Source}
import com.google.common.collect.{DiscreteDomain, ContiguousSet}
import rx.RxReactiveStreams
import rx.Observable;
import scala.collection.JavaConverters._object BasicTransformation {def main(args: Array[String]): Unit {// define an implicit actorsystem and import the implicit dispatcherimplicit val system ActorSystem(Sys)import system.dispatcher// flow materializer determines how the stream is realized.// this time as a flow between actors.implicit val materializer FlowMaterializer()// input text for the stream.val text |Lorem Ipsum is simply dummy text of the printing and typesetting industry.|Lorem Ipsum has been the industrys standard dummy text ever since the 1500s, |when an unknown printer took a galley of type and scrambled it to make a type |specimen book..stripMargin// create an observable from a simple list (this is in rxjava style)val first Observable.from(text.split(\\s).toList.asJava);// convert the rxJava observable to a publisherval publisher RxReactiveStreams.toPublisher(first);// based on the publisher create an akka sourceval source PublisherSource(publisher);// now use the akka style syntax to stream the data from the source// to the sink (in this case this is println)source.map(_.toUpperCase). // executed as actorsfilter(_.length 3).foreach { el // the sink/consumerprintln(el)}.onComplete(_ system.shutdown()) // lifecycle event}
} 此示例中的代码注释几乎解释了正在发生的事情。 我们在这里所做的是创建一个基于rxJava的Observable。 将此Observable转换为“反应流”发布者并使用此发布者创建akka-streams源。 对于其余的代码我们可以使用akka-stream样式流API对流进行建模。 在这种情况下我们只需要进行一些过滤并打印出结果即可。 翻译自: https://www.javacodegeeks.com/2014/11/use-reactive-streams-api-to-combine-akka-streams-with-rxjava.html