当前位置: 首页 > news >正文

太原广告公司网站建设wordpress让超链接不显示蓝字

太原广告公司网站建设,wordpress让超链接不显示蓝字,英文网站的外部链接 建设,工业贸易企业 营销型网站简介#xff1a;本文介绍了如何使用阿里云的Confluent Cloud和Databricks构建数据流和LakeHouse#xff0c;并介绍了如何使用Databricks提供的能力来挖掘数据价值#xff0c;使用Spark MLlib构建您的机器学习模型。 前提条件 已注册阿里云账号#xff0c;详情请参见阿里云…简介本文介绍了如何使用阿里云的Confluent Cloud和Databricks构建数据流和LakeHouse并介绍了如何使用Databricks提供的能力来挖掘数据价值使用Spark MLlib构建您的机器学习模型。 前提条件 已注册阿里云账号详情请参见阿里云账号注册流程已开通 Databricks 数据洞察服务已开通 OSS 对象存储服务已开通 Confluent 流数据服务 创建Databricks集群 Confluent集群 登录Confluent管理控制台创建Confluent集群并开启公网服务登录Databricks管理控制台创建Databricks集群 Databricks Worker节点公网访问 Databricks的worker节点暂时不支持公网访问为了能访问Confluent的公网地址请联系Databricks的开发人员添加NAT网关。 案例出租车数据入湖及分析 出租车和网约车在每天的运行中持续产生行驶轨迹和交易数据这些数据对于车辆调度流量预测安全监控等场景有着极大的价值。 本案例中我们使用纽约市的出租车数据来模拟网约车数据从产生发布到流数据服务Confluent通过Databricks Structured Streaming进行实时数据处理并存储到LakeHouse的整个流程。数据存储到LakeHouse后我们使用spark和spark sql对数据进行分析并使用Spark的MLlib进行机器学习训练。 前置准备 创建topic 登录Confluent的control center在左侧选中Topics点击Add a topic按钮创建一个名为nyc_taxi_data的topic将partition设置为3其他配置保持默认。 创建OSS bucket 在和Databricks同一Region的OSS中创建bucketbucket命名为databricks-confluent-integration 进入到Bucket列表页点击创建bucket按钮 创建好bucket之后在该bucket创建目录checkpoint_dir和data/nyc_taxi_data两个目录收集url用户名密码路径等以便后续使用a。confluent集群ID在csp的管控界面集群详情页获取 Confluent Control Center的用户名和密码路径 Databricks Structured Streaming的checkpoint存储目录采集的数据的存储目录 下面是我们后续会使用到的一些变量 # 集群管控界面获取 confluent_cluster_id your_confluent_cluster_id # 使用confluent集群ID拼接得到 confluent_server rb-{confluent_cluster_id}.csp.aliyuncs.com:9092 control_center_username your_confluent_control_center_username control_center_password your_confluent_control_center_passwordtopic nyc_taxi_datacheckpoint_location oss://databricks-confluent-integration/checkpoint_dir taxi_data_delta_lake oss://databricks-confluent-integration/data/nyc_taxi_data 数据的产生 在本案例中我们使用Kaggle上的NYC出租车数据集来模拟数据产生。 我们先安装confluent的python客户端其他语言的客户端参考confluent官网 pip install confluent_kafka 构造用于创建Kafka Producer的基础信息如bootstrap-servercontrol center的usernamepassword等 conf {bootstrap.servers: confluent_server,key.serializer: StringSerializer(utf_8),value.serializer: StringSerializer(utf_8),client.id: socket.gethostname(),security.protocol: SASL_SSL,sasl.mechanism: PLAIN,sasl.username: control_center_username,sasl.password: control_center_password } 创建Producer producer Producer(conf) 向Kafka中发送消息模拟数据的产生 with open(/Path/To/train.csv, rt) as f:float_field [fare_amount, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude]for row in reader:i 1try:for field in float_field:row[field] float(row[field])row[passenger_count] int(row[passenger_count])producer.produce(topictopic, valuejson.dumps(row))if i % 1000 0:producer.flush()if i 200000:breakexcept ValueError: # discard null/NAN datacontinue Kafka中的partition和offset 在使用spark读取Kafka中的数据之前我们回顾一下Kafka中的概念partition和offset partitionkafka为了能并行进行数据的写入将每个topic的数据分为多个partition每个partition由一个Broker负责向partition写入数据时负责该partition的Broker将消息复制给它的followeroffsetKafka会为每条写入partition里的消息进行编号消息的编号即为offset我们在读取Kafka中的数据时需要指定我们想要读取的数据该指定需要从两个维度partition的维度 offset的维度。 Earliest从每个partition的offset 0开始读取和加载Latest从每个partition最新的数据开始读取自定义指定每个partition的开始offset和结束offset 读取topic1 partition 0 offset 23和partition 0 offset -2之后的数据{topic1:{0:23,1:-2}} 除了指定start offset我们还可以通过endingOffsets参数指定读取到什么位置为止。 将数据存储到LakeHouseSpark集成Confluent 理解上述概念后Databricks和Confluent的集成非常简单只需要对spark session的readStream参数进行简单的设置就可以将Kafka中的实时流数据转换为Spark中的Dataframe lines (spark.readStream# 指定数据源: kafka.format(kafka)# 指定kafka bootstrap server的URL.option(kafka.bootstrap.servers, confluent_server)# 指定订阅的topic.option(subscribe, topic)# 指定想要读取的数据的offsetearliest表示从每个partition的起始点开始读取.option(startingOffsets, earliest)# 指定认证协议.option(kafka.security.protocol, SASL_SSL).option(kafka.sasl.mechanism, PLAIN)# 指定confluent的用户名和密码.option(kafka.sasl.jaas.config,forg.apache.kafka.common.security.plain.PlainLoginModule required username{control_center_username} password{control_center_password};).load()) 从kafka中读取的数据格式如下 root|-- key: binary (nullable true)|-- value: binary (nullable true)|-- topic: string (nullable true)|-- partition: integer (nullable true)|-- offset: long (nullable true)|-- timestamp: timestamp (nullable true)|-- timestampType: integer (nullable true) 由于key和value都是binary格式的我们需要将valuejson由binary转换为string格式并定义schema提取出Json中的数据并转换为对应的格式 schema (StructType().add(key, TimestampType()).add(fare_amount, FloatType()).add(pickup_datetime, TimestampType()).add(pickup_longitude, FloatType()).add(pickup_latitude, FloatType()).add(dropoff_longitude, FloatType()).add(dropoff_latitude, FloatType()).add(passenger_count, IntegerType()))# 将json中的列提取出来 lines (lines.withColumn(data, from_json(col(value).cast(string), # binary 转 stringschema)) # 解析为schema.select(col(data.*))) # select value中的所有列 过滤掉错误为空NaN的数据 lines (lines.filter(col(pickup_longitude) ! 0).filter(col(pickup_latitude) ! 0).filter(col(dropoff_longitude) ! 0).filter(col(dropoff_latitude) ! 0).filter(col(fare_amount) ! 0).filter(col(passenger_count) ! 0)) 最后我们将解析出来的数据输出到LakeHouse中以进行后续的分析和机器学习模型训练 # lakehouse 的存储格式为 delta query (lines.writeStream.format(delta).option(checkpointLocation, checkpoint_location).option(path, taxi_data_delta_lake).start()) # 执行job直到出现异常如果只想执行该Job一段时间可以指定timeout参数 query.awaitTermination() 数据分析 我们先将LakeHouse中的数据使用Spark加载进来 然后我们对该Dataframe创建一个Table View并探索fare_amount的分布 可以看到fare_amount的最小值是负数这显然是一条错误的数据我们将这些错误的数据过滤并探索fare_amount的分布 然后我们探索价格和年份月份星期打车时间的关系 从上面可以看出两点 出租车的价格和年份有很大关系从09年到15年呈不断增长的态势在中午和凌晨打车比上午和下午打车更贵一些。 我们再进一步探索价格和乘客数量的关系 此外出租车价格的另一个影响因素就是距离这里我们借助python的geopy包和Spark的UDF来计算给定两个位置的距离然后再分析费用和距离的关系。 经纬度的范围为[-90, 90]因此我们第一步是清除错误的数据 然后我们增加一列数据出租车行驶的距离并将距离离散化进行后续的分析 统计打车距离的分布 从上图可以看出打车距离分布在区间[0, 15]miles内我们继续统计在该区间内打车价格和打车距离的关系 如上图所示打车价格和打车距离呈现出线性增长的趋势。 机器学习建模 在上一小节的数据分析中我们已经提取了和出租车相关联的一些特征根据这些特征我们建立一个简单的线性回归模型 打车费用 ~ (年份打车时间乘客数距离) 先将特征和目标值提取出来 对特征做归一化 分割训练集和测试集 建立线性回归模型进行训练 训练结果统计 使用Evaluator对模型进行评价 总结 我们在本文中介绍了如何使用阿里云的Confluent Cloud和Databricks来构建您的数据流和LakeHouse并介绍了如何使用Databricks提供的能力来挖掘数据价值使用Spark MLlib构建您的机器学习模型。有了Confluent Cloud和Databricks您可以轻松实现数据入湖及时在最新的数据上进行探索挖掘您的数据价值。欢迎您试用阿里云Confluent和Databricks。 原文链接 本文为阿里云原创内容未经允许不得转载。
http://www.sadfv.cn/news/41788/

相关文章:

  • 搭建网站的网站电商设计平台
  • 四川网站设计查流量网站
  • 阿里云服务器可以做网站吗建设手机银行注销网站
  • 昆明猫咪科技网站建设公司深圳网站制作企业邮箱
  • 网站设计优化龙岩市官网
  • 如何做网站seo排名优化全国物流平台货找车
  • 撤销网站备案表填写后设计公司室内设计
  • wordpress建站教程交友福田住房和建设局网站官网
  • 建设事业单位网站多少钱百度总部
  • 如何建立一个网站共享网站被黑能查到是谁做的吗
  • 做物流网站多少钱天津做网站的公
  • 免费网站开发软件8090在线观看免费观看
  • 做网站没有做退钱济南迅翔网站建设
  • 高质量的常州网站建设北京建设注册中心网站
  • 忻府网站建设排名点击图片跳转到网站怎么做链接
  • 购物网站建设开题报告中国航发网上商城官网
  • 新建门户网站的建设自查做兼职网站
  • 免费的查企业的网站在线域名注册
  • 建设企业网站成本多少钱网站建设布局
  • 游戏资讯网站怎么做安徽徐州网站建设公司
  • 在线网站优化中国风网站配色方案
  • 江苏盐城有做淘宝网站的吗discuz wordpress主题
  • 网站建设项目分析报告做床上用品网站
  • 保定建站服务网站想做个链接怎么做的
  • 网站建网站建设企业电话wordpress dome.php
  • 万网怎么创建网站吗外国做家具的网站
  • 盐城网站建设制作方案锡林郭勒盟建设厅官方网站
  • 网站建设投标书怎么制作北京网站备案负责人变更
  • 深圳安嘉建设有限公司网站做网站收获了什么
  • 中国网站建设市场规模开发工具包