当前位置: 首页 > news >正文

化妆品企业网站案例大全平台网站建设报价

化妆品企业网站案例大全,平台网站建设报价,wordpress简单工作室博客,西安广告公司网站建设我正在使用Kafka Consumer API将所有数据从Kafka主题复制到Hive表 . 为此#xff0c;我使用HDFS作为中间步骤 . 我使用唯一的组ID并将偏移重置为“最早”#xff0c;以便从头开始获取所有数据#xff0c;并在执行后忽略提交 . 然后我遍历Kafka主题中的记录#xff0c;并将每…我正在使用Kafka Consumer API将所有数据从Kafka主题复制到Hive表 . 为此我使用HDFS作为中间步骤 . 我使用唯一的组ID并将偏移重置为“最早”以便从头开始获取所有数据并在执行后忽略提交 . 然后我遍历Kafka主题中的记录并将每条记录保存到HDFS中的临时文件中 . 然后我使用Spark从HDFS读取数据然后使用日期作为文件名将其保存到Parquet文件中 . 然后我在Hive表中创建一个带日期的分区最后在Parquet中将文件作为分区加载到Hive中 .正如您在下面的代码中看到的我使用了几个中间步骤这使得我的代码远非最佳 . 这是从Kafka主题复制所有数据的最佳推荐方法吗我做了一些研究到目前为止这是我设法开始工作的变通方法但是随着记录数量每天增加我的执行时间达到了可容忍的极限(从2分钟变为6分钟到6分钟)周) .代码在这里def start( lowerDate: String, upperDate: String ){// Configurations for kafka consumerval conf ConfigFactory.parseResources(properties.conf)val brokersip conf.getString(enrichment.brokers.value)val topics_in conf.getString(enrichment.topics_in.value)// Crea la sesion de Sparkval spark SparkSession.builder().master(yarn).appName(ParaTiUserXY).getOrCreate()spark.sparkContext.setLogLevel(ERROR)import spark.implicits._val properties new Propertiesproperties.put(key.deserializer, classOf[StringDeserializer])properties.put(value.deserializer, classOf[StringDeserializer])properties.put(bootstrap.servers, brokersip)properties.put(auto.offset.reset, earliest)properties.put(group.id, ParaTiUserXYZZ12345)//Schema para transformar los valores del topico de Kafka a JSONval my_schema new StructType().add(longitudCliente, StringType).add(latitudCliente, StringType).add(dni, StringType).add(alias, StringType).add(segmentoCliente, StringType).add(timestampCliente, StringType).add(dateCliente, StringType).add(timeCliente, StringType).add(tokenCliente, StringType).add(telefonoCliente, StringType)val consumer new KafkaConsumer[String, String](properties)consumer.subscribe( util.Collections.singletonList(parati_rt_geoevents) )val fs {val conf new Configuration()FileSystem.get(conf)}val temp_path:Path new Path(hdfs:///tmp/s70956/tmpstgtopics)if( fs.exists(temp_path)){fs.delete(temp_path, true)}while(true){val recordsconsumer.poll(100)for (recordval data record.value.toString//println(data)val dataos: FSDataOutputStream fs.create(temp_path)val bw: BufferedWriter new BufferedWriter( new OutputStreamWriter(dataos, UTF-8))bw.append(data)bw.closeval data_schema spark.read.schema(my_schema).json(hdfs:///tmp/s70956/tmpstgtopics)val fechaCliente data_schema.select(dateCliente).first.getString(0)if( fechaCliente upperDate fechaCliente lowerDate){data_schema.select(longitudCliente, latitudCliente,dni, alias,segmentoCliente, timestampCliente, dateCliente, timeCliente,tokenCliente, telefonoCliente).coalesce(1).write.mode(SaveMode.Append).parquet(/desa/landing/parati/xyuser/ fechaCliente)}else if( fechaCliente lowerDate){//}else if( fechaCliente upperDate){break;}}}consumer.close()}
http://www.sadfv.cn/news/352435/

相关文章:

  • 公司自有网站工信备案免费公益主机
  • 化妆品网站模版免费下载节能环保公司网站建设
  • 给网站可以怎么做外链建设网站阿里云服务器
  • 网站开发提问wordpress更新php
  • 制作自己的网站教程诚信网站建设的意义
  • 成都精品网站建设seo站长综合查询工具
  • 有一个做搞笑英语视频网站创办一个公司需要什么条件
  • 手机版网站优化汽车之家官网入口
  • 做质粒图谱的网站软件销售网站模板
  • 专业的企业网站制作做推广的装修网站
  • 网站开发需要什么基础文创产品设计说明模板
  • 连锁销售公司网站的建设方案广告设计制作图片
  • 西安火车站网站建设商标购买网站
  • 佛山 建站公司怎么介绍自己的学校
  • 网站帮忙备案html5开发工程师是做什么的
  • 网站开发模板下载wordpress结合小程序
  • 用手机做网站自己做的网站和ie不兼容
  • 如何在百度上建免费网站深圳全网营销公司有哪些
  • 网站qq显示未启用分销商城什么意思
  • 查找网站后台的软件开发一个商城网站多少钱
  • 婚介所网站开发费用网站建设技术服务的方式是什么意思
  • 免费网站推广软件哪个好大兴网站建设一条龙
  • 苏州网站建设布局中国公司网站建设方案
  • 大连网站维护如何查看网站是不是wordpress
  • 网站建设与设计饰品h5网站制作视频
  • jsp网站开发目的及意义网站开发和软件开发有什么区别
  • 手机网站栏目结构图网页设计与制作课程设计报告
  • 如何制作网站建设搜索引擎优化工作主要做好哪些方面
  • 苏州网站建设求职简历常州市建设工程网站
  • 网站运营推广这么做网站设计 下拉式菜单怎么做