当前位置: 首页 > news >正文

wordpress网仿站网站前台用什么做

wordpress网仿站,网站前台用什么做,agile WordPress,seo系统如今#xff0c;流数据是一个热门话题#xff0c;而Apache Spark是出色的流框架。 在此博客文章中#xff0c;我将向您展示如何将自定义数据源集成到Spark中。 Spark Streaming使我们能够从各种来源进行流传输#xff0c;同时使用相同的简洁API访问数据流#xff0c;执行… 如今流数据是一个热门话题而Apache Spark是出色的流框架。 在此博客文章中我将向您展示如何将自定义数据源集成到Spark中。 Spark Streaming使我们能够从各种来源进行流传输同时使用相同的简洁API访问数据流执行SQL查询或创建机器学习算法。 这些功能使Spark成为流式或任何类型的工作流应用程序的首选框架因为我们可以使用框架的所有方面。 面临的挑战是弄清楚如何将自定义数据源集成到Spark中以便我们能够利用其强大功能而无需更改为更多标准源。 更改似乎是合乎逻辑的但是在某些情况下这样做是不可能或不方便的。 流式自定义接收器 Spark提供了不同的扩展点正如我们在此处扩展Data Source API以便将自定义数据存储集成到Spark SQL中所看到的那样。 在此示例中我们将做同样的事情但是我们还将扩展流API以便我们可以从任何地方流。 为了实现我们的自定义接收器我们需要扩展Receiver [A]类。 请注意它具有类型注释因此我们可以从流客户端的角度对DStream实施类型安全。 我们将使用此自定义接收器来流式传输我们的应用程序之一通过套接字发送的订单。 通过网络传输的数据的结构如下所示 1 5 1 1 2 2 1 1 2 1 1 4 1 1 2 2 1 2 2 我们首先接收订单ID和订单总金额然后接收订单的行项目。 第一个值是商品ID第二个是订单ID与订单ID值匹配然后是商品成本。 在此示例中我们有两个订单。 第一个只有四个项目第二个只有一个项目。 这个想法是将所有这些隐藏在我们的Spark应用程序中因此它在DStream上收到的是在流上定义的完整顺序如下所示 val orderStream: DStream[Order] .....val orderStream: DStream[Order] ..... 同时我们还使用接收器来流式传输我们的自定义流式源。 即使它通过套接字发送数据使用来自Spark的标准套接字流也将非常复杂因为我们将无法控制数据的输入方式并且会遇到在应用程序上遵循顺序的问题本身。 这可能非常复杂因为一旦进入应用程序空间我们便会并行运行并且很难同步所有这些传入数据。 但是在接收方空间中很容易从原始输入文本创建订单。 让我们看看我们的初始实现是什么样的。 case class Order(id: Int, total: Int, items: List[Item] null) case class Item(id: Int, cost: Int)class OrderReceiver(host: String, port: Int) extends Receiver[Order](StorageLevel.MEMORY_ONLY) {override def onStart(): Unit {println(starting...)val thread new Thread(Receiver) {override def run() {receive() }}thread.start()}override def onStop(): Unit stop(I am done)def receive() .... }case class Order(id: Int, total: Int, items: List[Item] null) case class Item(id: Int, cost: Int)class OrderReceiver(host: String, port: Int) extends Receiver[Order](StorageLevel.MEMORY_ONLY) {override def onStart(): Unit {println(starting...)val thread new Thread(Receiver) {override def run() {receive() }}thread.start()}override def onStop(): Unit stop(I am done)def receive() .... } 我们的OrderReceiver扩展了Receiver [Order]它使我们可以在Spark内部存储Order带注释的类型。 我们还需要实现onStart和onStop方法。 请注意onStart创建一个线程因此它是非阻塞的这对于正确的行为非常重要。 现在让我们看一下接收方法真正发生魔术的地方。 def receive() {val socket new Socket(host, port)var currentOrder: Order nullvar currentItems: List[Item] nullval reader new BufferedReader(new InputStreamReader (socket.getInputStream(), UTF-8))while (!isStopped()) {var userInput reader.readLine()if (userInput null) stop(Stream has ended)else {val parts userInput.split( )if (parts.length 2) {if (currentOrder ! null) {store(Order(currentOrder.id, currentOrder.total, currentItems))}currentOrder Order(parts(0).toInt, parts(1).toInt)currentItems List[Item]()}else {currentItems Item(parts(0).toInt, parts(1).toInt) :: currentItems}}}}def receive() {val socket new Socket(host, port)var currentOrder: Order nullvar currentItems: List[Item] nullval reader new BufferedReader(new InputStreamReader (socket.getInputStream(), UTF-8))while (!isStopped()) {var userInput reader.readLine()if (userInput null) stop(Stream has ended)else {val parts userInput.split( )if (parts.length 2) {if (currentOrder ! null) {store(Order(currentOrder.id, currentOrder.total, currentItems))}currentOrder Order(parts(0).toInt, parts(1).toInt)currentItems List[Item]()}else {currentItems Item(parts(0).toInt, parts(1).toInt) :: currentItems}}}} 在这里我们创建一个套接字并将其指向源然后我们就可以简单地开始读取它直到调度了stop命令或者套接字上没有更多数据为止。 请注意我们正在读取与之前定义的结构相同的结构如何发送数据。 完全阅读订单后我们将调用store…以便将其保存到Spark中。 除了在我们的应用程序中使用我们的接收器外这里别无所要做 val config new SparkConf().setAppName(streaming) val sc new SparkContext(config) val ssc new StreamingContext(sc, Seconds(5))val stream: DStream[Order] ssc.receiverStream(new OrderReceiver(port))val config new SparkConf().setAppName(streaming) val sc new SparkContext(config) val ssc new StreamingContext(sc, Seconds(5))val stream: DStream[Order] ssc.receiverStream(new OrderReceiver(port)) 请注意我们是如何使用自定义OrderReceiver创建流的仅为了清楚起见对val流进行了注释但这不是必需的。 从现在开始我们将流DString [Order]用作我们在任何其他应用程序中使用的任何其他流。 stream.foreachRDD { rdd rdd.foreach(order {println(order.id)) order.items.foreach(println)}}stream.foreachRDD { rdd rdd.foreach(order {println(order.id)) order.items.foreach(println)}}摘要 当处理生成无尽数据的源时Spark Streaming非常方便。 您可以使用与Spark SQL和系统中其他组件相同的API但它也足够灵活可以扩展以满足您的特定需求。 翻译自: https://www.javacodegeeks.com/2016/05/integrate-custom-data-sources-apache-spark.html
http://www.sadfv.cn/news/69511/

相关文章:

  • 企业在建设银行网站怎么发工资培训机构白名单
  • 如何查看网站根目录北京seo优化费用
  • 海洋cms怎么做电影网站品牌代理网
  • 登录河北建设厅网站进入不了技术支持广州网站建设
  • 网站ftp织梦微信网站模板
  • 音乐网站用什么语言做河南无限动力做网站怎么样
  • 邢台企业做网站找谁wordpress页面模板获取内容
  • 河西区做网站的公司南京网站设南京网站设计计
  • 做app还是网站wordpress 网页设计
  • 论坛开源网站源码十堰网站建设u2028
  • 大型购物网站建设方案wordpress ping百度
  • 百度网站邀您点评dw网页设计模板制作过程
  • 王欣网站建设与维护深圳小程序开发定制公司
  • 提供有经验的网站建设章丘建网站
  • 中国免费域名申请网站网站建站常见问题
  • 青岛出版集团网站系统开发成本可以分为哪三种
  • 凡科建站官网网站模板做淘客一定要建网站吗
  • php网站模板制作工具外贸营销公司
  • 上鼎工程建设有限公司网站qq注册网页入口
  • 怎样在百度上建立网站做一个网站都需要什么
  • 山东响应式网站建设岑溪网站
  • 徐州建设网站网站开发行业资讯
  • 建设网站服务器的方式有自营方式宁波网站建设怎么做
  • 网站开发公司赚钱么深圳电器公司是国企吗
  • 网站优化是往新闻中心发新闻吗网页广告调词平台多少钱
  • 网站百度收录查询哪些网站可以做淘宝推广
  • 有哪些做ppt的网站最好的网站推广软件
  • 设计有哪些网站饰品企业网站建设
  • 如何做请求队列防止网站高并发网站开发属于什么系统
  • 东莞整站优化公司火速公司单位做网站怎么做