朋友让你做网站如何拒绝,合同备案号查询系统,萌新seo,支付行业的网站怎么做目录
案例需求
代码
结果
解析 案例需求#xff1a; 使用netcat工具向9999端口不断的发送数据#xff0c;通过SparkStreaming读取端口数据并统计不同单词出现的次数 -- 1. Spark从socket中获取数据#xff1a;一行一行的获取 -- 2. Driver程序执行时#xff0c…目录
案例需求
代码
结果
解析 案例需求 使用netcat工具向9999端口不断的发送数据通过SparkStreaming读取端口数据并统计不同单词出现的次数 -- 1. Spark从socket中获取数据一行一行的获取 -- 2. Driver程序执行时streaming处理过程不能结束 -- 3. 采集器在正常情况下启动后就不应该停止除非特殊情况 -- 4. 采集器位于一个executor中是一个线程执行时需要一个核如果设定的总核数为1时那么在运行时因为没有核数所以不会有打印结果所以sparkStreaming使用的核数至少为2个 -- 5. print()方法默认是打印10行结果 -- 6. netcat的指令 在Windows下nc -lp 9999在linux下 nc -lk 9999 代码
package cn.olo.streamimport org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}object StreamDemo {def main(args: Array[String]): Unit {// 连接SparkStreamingval sparkConf: SparkConf new SparkConf().setMaster(local[*]).setAppName(sparkStreaming)/*1.方法StreamingContext(形参)2.形参形参1conf: SparkConfspark配置对象形参2batchDuration: Duration采集时间*/val ssc new StreamingContext(sparkConf,Seconds(5))// 需求使用netcat工具向9999端口不断的发送数据通过SparkStreaming读取端口数据并统计不同单词出现的次数// 1. 获取netcat工具9999端口的连接并开始接收数据// 从socket中获取数据一行一行的获取val socketDS: ReceiverInputDStream[String] ssc.socketTextStream(localhost,9999)// 2. 数据处理val wordDS: DStream[String] socketDS.flatMap(_.split( ))val wordToSumDS: DStream[(String, Int)] wordDS.map((_,1)).reduceByKey(_ _ )// 3. 打印数据wordToSumDS.print()// 4. Driver程序执行时streaming处理过程不能结束// 采集器在正常情况下启动后就不应该停止除非特殊情况// 启动采集器ssc.start()// 等待采集器的结束ssc.awaitTermination()}}结果 解析 a、采集周期时间之间每一个采集周期生成一个RDD按照时间的顺序依次进行 b、在每一个采集周期内会执行wordcount计算最终得出:统计出每一个采集周期时间的wordcount