joomla网站建设,河北沙河市规划局或建设局网站,网站改版降权,做营销网站建设挣钱吗目录
一、目的与要求
二、实验内容
三、实验步骤
1、pyspark交互式编程
2、编写独立应用程序实现数据去重
3、编写独立应用程序实现求平均值问题
4、三个综合实例
四、结果分析与实验体会 一、目的与要求
1、熟悉Spark的RDD基本操作及键值对操作#xff1b; 2、熟悉使…目录
一、目的与要求
二、实验内容
三、实验步骤
1、pyspark交互式编程
2、编写独立应用程序实现数据去重
3、编写独立应用程序实现求平均值问题
4、三个综合实例
四、结果分析与实验体会 一、目的与要求
1、熟悉Spark的RDD基本操作及键值对操作 2、熟悉使用RDD编程解决实际具体问题的方法。
二、实验内容
1、pyspark交互式编程
给定数据集 data1.txt包含了某大学计算机系的成绩数据格式如下所示 Tom,DataBase,80 Tom,Algorithm,50 Tom,DataStructure,60 Jim,DataBase,90 Jim,Algorithm,60 Jim,DataStructure,80 …… 请根据给定的实验数据在pyspark中通过编程来计算以下内容
1该系总共有多少学生 2该系共开设了多少门课程 3Tom同学的总成绩平均分是多少 4求每名同学的选修的课程门数 5该系DataBase课程共有多少人选修 6各门课程的平均分是多少 7使用累加器计算共有多少人选了DataBase这门课。
2、编写独立应用程序实现数据去重 对于两个输入文件A和B编写Spark独立应用程序对两个文件进行合并并剔除其中重复的内容得到一个新文件C。下面是输入文件和输出文件的一个样例供参考。 输入文件A的样例如下 20170101 x 20170102 y 20170103 x 20170104 y 20170105 z 20170106 z 输入文件B的样例如下 20170101 y 20170102 y 20170103 x 20170104 z 20170105 y 根据输入的文件A和B合并得到的输出文件C的样例如下 20170101 x 20170101 y 20170102 y 20170103 x 20170104 y 20170104 z 20170105 y 20170105 z 20170106 z
3、编写独立应用程序实现求平均值问题 每个输入文件表示班级学生某个学科的成绩每行内容由两个字段组成第一个是学生名字第二个是学生的成绩编写Spark独立应用程序求出所有学生的平均成绩并输出到一个新文件中。下面是输入文件和输出文件的一个样例供参考。 Algorithm成绩 小明 92 小红 87 小新 82 小丽 90 Database成绩 小明 95 小红 81 小新 89 小丽 85 Python成绩 小明 82 小红 83 小新 94 小丽 91 平均成绩如下 (小红,83.67) (小新,88.33) (小明,89.67) (小丽,88.67)
4、三个综合实例 题目详情可查看实验步骤。
三、实验步骤
1、pyspark交互式编程
先在终端启动pyspark
[rootbigdata zhc]# pyspark
1该系总共有多少学生 lines sc.textFile(file:///home/zhc/datasets/data1.txt)res lines.map(lambda x:x.split(,)).map(lambda x: x[0]) # 获取每行数据的第1列distinct_res res.distinct() # 去重操作distinct_res.count() # 取元素总个数执行结果 2该系共开设了多少门课程 lines sc.textFile(file:///home/zhc/datasets/data1.txt)res lines.map(lambda x:x.split(,)).map(lambda x:x[1]) # 获取每行数据的第2列distinct_res res.distinct() # 去重操作distinct_res.count() # 取元素总个数执行结果 3Tom同学的总成绩平均分是多少 lines sc.textFile(file:///home/zhc/datasets/data1.txt)res lines.map(lambda x:x.split(,)).filter(lambda x:x[0]Tom) # 筛选Tom同学的成绩信息res.foreach(print) score res.map(lambda x:int(x[2])) # 提取Tom同学的每门成绩并转换为int类型num res.count() # Tom同学选课门数sum_score score.reduce(lambda x,y:xy) # Tom同学的总成绩avg sum_score/num # 总成绩/门数平均分print(avg)
执行结果 4求每名同学的选修的课程门数 lines sc.textFile(file:///home/zhc/datasets/data1.txt)res lines.map(lambda x:x.split(,)).map(lambda x:(x[0],1)) # 学生每门课程都对应(学生姓名,1)学生有n门课程则有n个(学生姓名,1)each_res res.reduceByKey(lambda x,y: xy) # 按学生姓名获取每个学生的选课总数each_res.foreach(print)
执行结果
......
5该系DataBase课程共有多少人选修 lines sc.textFile(file:///home/zhc/datasets/data1.txt)res lines.map(lambda x:x.split(,)).filter(lambda x:x[1]DataBase)res.count()
执行结果 6各门课程的平均分是多少 lines sc.textFile(file:///home/zhc/datasets/data1.txt)res lines.map(lambda x:x.split(,)).map(lambda x:(x[1],(int(x[2]),1))) # 为每门课程的分数后面新增一列1表示1个学生选择了该课程。格式如(ComputerNetwork, (44, 1))temp res.reduceByKey(lambda x,y:(x[0]y[0],x[1]y[1])) # 按课程名聚合课程总分和选课人数。格式如(ComputerNetwork, (7370, 142))avg temp.map(lambda x:(x[0], round(x[1][0]/x[1][1],2))) # 课程总分/选课人数 平均分并利用round(x,2)保留两位小数avg.foreach(print) 执行结果 7使用累加器计算共有多少人选了DataBase这门课。 lines sc.textFile(file:///home/zhc/datasets/data1.txt)res lines.map(lambda x:x.split(,)).filter(lambda x:x[1]DataBase) # 筛选出选了DataBase课程的数据accum sc.accumulator(0) # 定义一个从0开始的累加器accumres.foreach(lambda x:accum.add(1)) # 遍历res每扫描一条数据累加器加1accum.value # 输出累加器的最终值
执行结果 2、编写独立应用程序实现数据去重 对于两个输入文件A和B编写Spark独立应用程序对两个文件进行合并并剔除其中重复的内容得到一个新文件C。下面是输入文件和输出文件的一个样例供参考。 输入文件A的样例如下 20170101 x 20170102 y 20170103 x 20170104 y 20170105 z 20170106 z 输入文件B的样例如下 20170101 y 20170102 y 20170103 x 20170104 z 20170105 y 根据输入的文件A和B合并得到的输出文件C的样例如下 20170101 x 20170101 y 20170102 y 20170103 x 20170104 y 20170104 z 20170105 y 20170105 z 20170106 z
在“/home/zhc/mycode/remdup”目录下新建代码文件remdup.py
# /home/zhc/mycode/remdup/remdup.py
from pyspark import SparkContext
#初始化SparkContext
sc SparkContext(local,remdup)
#加载两个文件A和B
lines1 sc.textFile(file:///home/zhc/mycode/remdup/A.txt)
lines2 sc.textFile(file:///home/zhc/mycode/remdup/B.txt)
#合并两个文件的内容
lines lines1.union(lines2)
#去重操作
distinct_lines lines.distinct()
#排序操作
res distinct_lines.sortBy(lambda x:x)
#将结果写入result文件中repartition(1)的作用是让结果合并到一个文件中不加的话会结果写入到两个文件
res.repartition(1).saveAsTextFile(file:///home/zhc/mycode/remdup/result)
在目录“/home/zhc/mycode/remdup”下执行下面命令执行程序注意执行程序时请先退出pyspark shell否则会出现“地址已在使用”的警告。
[rootbigdata remdup]# python3 remdup.py
在目录“/home/zhc/mycode/remdup/result”下即可得到结果文件part-00000。
[rootbigdata remdup]# cd result
[rootbigdata result]# cat part-00000 3、编写独立应用程序实现求平均值问题 每个输入文件表示班级学生某个学科的成绩每行内容由两个字段组成第一个是学生名字第二个是学生的成绩编写Spark独立应用程序求出所有学生的平均成绩并输出到一个新文件中。下面是输入文件和输出文件的一个样例供参考。 Algorithm成绩 小明 92 小红 87 小新 82 小丽 90 Database成绩 小明 95 小红 81 小新 89 小丽 85 Python成绩 小明 82 小红 83 小新 94 小丽 91 平均成绩如下 (小红,83.67) (小新,88.33) (小明,89.67) (小丽,88.67)
在“/home/zhc/mycode/avgscore”目录下新建代码文件avgscore.txt
# /home/zhc/mycode/avgscore/avgscore.txt
from pyspark import SparkContext
#初始化SparkContext
sc SparkContext(local, avgscore)
#加载三个文件Algorithm.txt、Database.txt和Python.txt
lines1 sc.textFile(file:///home/zhc/mycode/avgscore/Algorithm.txt)
lines2 sc.textFile(file:///home/zhc/mycode/avgscore/Database.txt)
lines3 sc.textFile(file:///home/zhc/mycode/avgscore/Python.txt)
#合并三个文件的内容
lines lines1.union(lines2).union(lines3)
#为每行数据新增一列1方便后续统计每个学生选修的课程数目。data的数据格式为(小明, (92, 1))
data lines.map(lambda x:x.split( )).map(lambda x:(x[0],(int(x[1]),1)))
#根据key也就是学生姓名合计每门课程的成绩以及选修的课程数目。res的数据格式为(小明, (269, 3))
res data.reduceByKey(lambda x,y:(x[0]y[0],x[1]y[1]))
#利用总成绩除以选修的课程数来计算每个学生的每门课程的平均分并利用round(x,2)保留两位小数
result res.map(lambda x:(x[0],round(x[1][0]/x[1][1],2)))
#将结果写入result文件中repartition(1)的作用是让结果合并到一个文件中不加的话会结果写入到三个文件
result.repartition(1).saveAsTextFile(file:///home/zhc/mycode/avgscore/result)在目录“/home/zhc/mycode/avgscore”下执行下面命令执行程序注意执行程序时请先退出pyspark shell否则会出现“地址已在使用”的警告。
[rootbigdata avgscore]# python3 avgscore.py
在目录“/home/zhc/mycode/avgscore/result”下即可得到结果文件part-00000。
[rootbigdata avgscore]# cd result
[rootbigdata result]# cat part-00000 4、三个综合实例
案例一求Top值
任务描述某个目录下有若干个文本文件每个文件里包含了很多数据每行数据由4个字段的值构成不同字段之间用逗号隔开4个字段分别为orderiduseridpayment和productid要求求出Top N个payment值。
file01.txt 1,1768,50,155 2,1218, 600,211 3,2239,788,242 4,3101,28,599 5,4899,290,129 6,3110,54,1201 7,4436,259,877 8,2369,7890,27 file02.txt 100,4287,226,233 101,6562,489,124 102,1124,33,17 103,3267,159,179 104,4569,57,125 105,1438,37,116 [rootbigdata zhc]# cd /mycode/RDD
[rootbigdata RDD]# vi file0.txt
[rootbigdata RDD]# vi TopN.py
[rootbigdata RDD]# vi file0.txt
[rootbigdata RDD]# spark-submit TopN.py
使用vim编辑器编辑“/home/zhc/mycode/RDD/file0.txt”文件
我这里将file01.txt和file02.txt合并为一个文件了——file0.txt 1,1768,50,155 2,1218,600,211 3,2239,788,242 4,3101,28,599 5,4899,290,129 6,3110,54,1201 7,4436,259,877 8,2369,7890,27 100,4287,226,233 101,6562,489,124 102,1124,33,17 103,3267,159,179 104,4569,57,125 105,1438,37,116 使用vim编辑器编辑“/home/zhc/mycode/RDD/TopN.py”代码文件
#/home/zhc/mycode/RDD/TopN.py
from pyspark import SparkConf, SparkContext
# 创建SparkConf对象设置应用程序名称和部署模式
conf SparkConf().setMaster(local).setAppName(ReadHBase)
# 创建SparkContext对象
sc SparkContext(conf conf)
# 从本地文件系统读取数据
lines sc.textFile(file:///home/zhc/mycode/RDD/file0.txt)
# 过滤出长度不为0且包含4个逗号的行
result1 lines.filter(lambda line:(len(line.strip()) 0) and (len(line.split(,)) 4))
# 提取第三列数据
result2result1.map(lambda x:x.split(,)[2])
# 将第三列数据转换成键值对key为数字value为空串
result3result2.map(lambda x:(int(x),))
# 对数据进行重新分区分区数为1
result4result3.repartition(1)
# 按照键降序排序
result5result4.sortByKey(False)
# 取出前5个键
result6result5.map(lambda x:x[0])
result7result6.take(5)
# 打印前5个键
for a in result7: print(a)使用spark-submit提交TopN.py文件得到结果如下。 案例二文件排序
任务描述有多个输入文件每个文件中的每一行内容均为一个整数。要求读取所有文件中的整数进行排序后输出到一个新的文件中输出的内容个数为每行两个整数第一个整数为第二个整数的排序位次第二个整数为原待排序的整数。
输入文件
file1.txt 33 37 12 40 file2.txt 4 16 39 5 file3.txt 1 45 25 [rootbigdata RDD]# mkdir filesort
[rootbigdata RDD]# cd filesort
[rootbigdata filesort]# vi file1.txt
[rootbigdata filesort]# vi file2.txt
[rootbigdata filesort]# vi file3.txt
[rootbigdata filesort]# cd ..
[rootbigdata RDD]# vi FileSort.py
[rootbigdata RDD]# spark-submit FileSort.py 在“/home/zhc/mycode/RDD/filesort”路径下使用vim编辑器将上面三个文件内容输入。
使用vim编辑器编辑“/home/zhc/mycode/RDD/FileSort.py”文件
#/home/zhc/mycode/RDD/FileSort.py
from pyspark import SparkConf, SparkContext
# 定义一个全局变量index用于记录索引值
index0
# 自定义函数getindex每调用一次将index加1并返回新的index值
def getindex():global indexindex1return index
def main():# 创建SparkConf对象设置应用程序名称和部署模式本地1核运行conf SparkConf().setMaster(local[1]).setAppName(FileSort) sc SparkContext(conf conf)lines sc.textFile(file:///home/zhc/mycode/RDD/filesort/file*.txt) index 0# 过滤出长度不为0的行result1 lines.filter(lambda line:(len(line.strip()) 0))# 将每行数据转换成整型键值对 result2result1.map(lambda x:(int(x.strip()),)) # 对数据进行重新分区分区数为1result3result2.repartition(1)# 按照键升序排序 result4result3.sortByKey(True)# 只保留键 result5result4.map(lambda x:x[0])# 将数据映射为(index, value)的形式result6result5.map(lambda x:(getindex(),x)) result6.foreach(print)# 将结果保存到本地文件系统result6.saveAsTextFile(file:///home/zhc/mycode/RDD/filesort/sortresult)
if __name__ __main__:main()使用spark-submit提交FileSort.py文件得到结果如下。 可以到“/home/zhc/mycode/RDD/filesort/sortresult”目录下查看结果文件part-00000。
[rootbigdata RDD]# cd ./filesort/sortresult
[rootbigdata sortresult]# cat part-00000 案例三二次排序
任务描述 对于一个给定的文件数据如file4.txt所示请对数据进行排序首先根据第1列数据降序排序如果第1列数据相等则根据第2列数据降序排序。
输入文件 file4.txt 5 3 1 6 4 9 8 3 4 7 5 6 3 2 [rootbigdata RDD]# vi file4.txt
[rootbigdata RDD]# vi SecondarySortApp.py
[rootbigdata RDD]# spark-submit SecondarySortApp.py
在“/home/zhc/mycode/RDD”路径下使用vim编辑器将上面file4.txt文件内容输入。
使用vim编辑器编辑“/home/zhc/mycode/RDD/SecondarySortApp.py”文件
#/home/zhc/mycode/RDD/SecondarySortApp.py
# 导入gt函数用于比较大小
from operator import gt
from pyspark import SparkContext, SparkConf
# 定义SecondarySortKey类
class SecondarySortKey():def __init__(self, k):self.column1 k[0]self.column2 k[1]# 定义__gt__方法用于比较大小def __gt__(self, other):if other.column1 self.column1:return gt(self.column2,other.column2)else:return gt(self.column1, other.column1)def main():# 创建SparkConf对象设置应用程序名称和部署模式本地1核运行conf SparkConf().setAppName(spark_sort).setMaster(local[1])sc SparkContext(confconf)filefile:///home/zhc/mycode/RDD/file4.txtrdd1 sc.textFile(file)# 过滤出长度不为0的行rdd2rdd1.filter(lambda x:(len(x.strip()) 0))# 将每行数据转换成带有键值对的元组键为元组类型rdd3rdd2.map(lambda x:((int(x.split( )[0]),int(x.split( )[1])),x))# 将数据中的键转换成SecondarySortKey类型rdd4rdd3.map(lambda x: (SecondarySortKey(x[0]),x[1]))# 对数据进行按键排序rdd5rdd4.sortByKey(False)# 只保留值rdd6rdd5.map(lambda x:x[1])rdd6.foreach(print)if __name__ __main__:main() 使用spark-submit提交SecondarySortApp.py文件得到结果如下。 四、结果分析与实验体会 在进行RDD编程实验之前需要掌握Spark的基本概念和RDD的特性例如惰性计算、分区、依赖关系等。同时需要了解Python等语言的基础知识。在实验过程中可以通过以下步骤来完成 1创建SparkContext对象用于连接Spark集群和创建RDD2通过textFile函数读取文件数据并利用filter等函数进行数据清洗和处理3将数据转换成键值对的形式再利用map、reduceByKey等函数进行计算和处理4利用sortByKey等函数进行排序操作5最后通过foreach等函数将结果输出。 在实验过程中需要注意以下几点1选择合适的算子例如filter、map、reduceByKey、sortByKey等以及合适的lambda表达式来进行数据处理和计算。2对于大规模数据的处理需要考虑分区和并行计算以提高计算效率。3需要注意数据类型和格式确保数据的正确性和一致性。4在进行排序操作时需要利用自定义类来实现二次排序等功能。 总之通过实验可以更加深入地理解Spark的原理和机制提高数据处理和计算的效率和准确性。同时也能够培养代码编写和调试的能力提高编程水平。