当前位置: 首页 > news >正文

顺德网站建设如何镇江唐唐网络科技有限公司

顺德网站建设如何,镇江唐唐网络科技有限公司,织梦dedecms网站内容页,免费建站建设网站目录 举个例子 连接器 下载连接器#xff08;connector#xff09;和格式#xff08;format#xff09;jar 包 依赖管理 如何使用连接器 举个例子 StreamExecutionEnvironment集成了DataStream API#xff0c;通过额外的函数扩展了TableEnvironment。 下面代码演示两…目录 举个例子 连接器 下载连接器connector和格式formatjar 包 依赖管理 如何使用连接器 举个例子 StreamExecutionEnvironment集成了DataStream API通过额外的函数扩展了TableEnvironment。 下面代码演示两种API如何互转 from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment from pyflink.common.typeinfo import Typesenv StreamExecutionEnvironment.get_execution_environment() t_env StreamTableEnvironment.create(env) # create a DataStream ds env.from_collection([Alice, Bob, John], Types.STRING())# interpret the insert-only DataStream as a Table t t_env.from_data_stream(ds)# register the Table object as a view and query it t_env.create_temporary_view(InputTable, t) res_table t_env.sql_query(SELECT UPPER(f0) FROM InputTable)# interpret the insert-only Table as a DataStream again res_ds t_env.to_data_stream(res_table)# add a printing sink and execute in DataStream API res_ds.print()env.execute() TableEnvironment将采用StreamExecutionEnvironment所有的配置选项。 建议在转换为Table API之前设置DataStream API的所有配置选项如下代码。 from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment from pyflink.datastream.checkpointing_mode import CheckpointingMode# create Python DataStream API env StreamExecutionEnvironment.get_execution_environment()# set various configuration early env.set_max_parallelism(256)env.get_config().add_default_kryo_serializer(type_class_name, serializer_class_name) env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)# then switch to Python Table API t_env StreamTableEnvironment.create(env) # set configuration early t_env.get_config().set_local_timezone(Europe/Berlin)# start defining your pipelines in both APIs... 连接器 下载连接器connector和格式formatjar 包 由于Flink是一个基于 Java/Scala 的项目连接器connector和格式format的实现是作为 jar 包存在的 要在 PyFlink 作业中使用首先需要将其指定为作业的依赖。 如果使用第三方JAR可以在Python Table API中指定JAR如下所示 table_env.get_config().get_configuration().set_string(pipeline.jars, file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar) or table_env.get_config().get_configuration().set_string(pipeline.classpaths, file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar) 依赖管理 需要在Python API程序中使用依赖项。例如Python用户自定义函数中使用第三方Python库。此外在用机器学习模型预测等场景中用户可能希望在Python自定义函数中加载机器学习模型。 当PyFlink作业在本地执行时可以将第三方Python库安装到本地Python环境中将机器学习模型下载到本地等等。 然而当用户想要将PyFlink任务提交到远程集群时这种方法并不奏效。 除了Table API 在Python DataStream API中则如下配置 stream_execution_environment.add_jars(file:///my/jar/path/connector1.jar, file:///my/jar/path/connector2.jar) stream_execution_environment.add_jars(file:///E:/my/jar/path/connector1.jar, file:///E:/my/jar/path/connector2.jar) # NOTE: The paths must specify a protocol (e.g. file://) and users should ensure that the # URLs are accessible on both the client and the cluster. stream_execution_environment.add_classpaths(file:///my/jar/path/connector1.jar, file:///my/jar/path/connector2.jar) 如何使用连接器 在 PyFlink Table API 中DDL 是定义 source 和 sink 比较推荐的方式这可以通过 TableEnvironment 中的 execute_sql() 方法来完成然后就可以在作业中使用这张表了。 --下面是如何在 PyFlink 中使用 Kafka source/sink 和 JSON 格式的完整示例。 from pyflink.table import TableEnvironment, Environmentsettingsdef log_processing():env_settings Environmentsettings.in_streaming_mode()t_env TableEnvironment.create(env_settings)# specify connector and format jarst_env.get_config().get_configuration().set_string(pipeline.jars, file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar)source_ddl CREATE TABLE source_table(a VARCHAR,b INT) WITH (connector kafka,topic source_topic,properties.bootstrap.servers kafka:9092,properties.group.id test_3,scan.startup.mode latest-offset,format json)sink_ddl CREATE TABLE sink_table(a VARCHAR) WITH (connector kafka,topic sink_topic,properties.bootstrap.servers kafka:9092,format json)t_env.execute_sql(source_ddl)t_env.execute_sql(sink_ddl)t_env.sql_query(SELECT a FROM source_table) \.execute_insert(sink_table).wait()if __name__ __main__:log_processing()
http://www.sadfv.cn/news/312696/

相关文章:

  • 太仓企业网站建设价格新乡网站建设方案
  • 商丘做建设网站的公司网站app在线生成器
  • 大连装修网站推广网站qq在线状态
  • 成都青羊区网站建设精湛的佛山网站设计
  • idc网站源码下载wordpress 页面挂件
  • 开发平台指什么南宁seo计费管理
  • 西安百度网站快速优化深圳 设计
  • 黄山网站建设公司线上平面设计哪家培训好
  • 网站设计论文答辩问题seo排名软件哪个好
  • 四川手机响应式网站建设推荐化工网站建站模板下载
  • 网站建设html5源码众筹网站的分析与设计
  • 山西网站制作应用订制型网站费用
  • 网站开发大约多少钱wordpress 英文采集
  • 在线做网站 自动生成手机版网站开发制作价格
  • 十大图片素材网站5个常见的电子商务网站
  • 龙海做网站费用最新远程网站建设服务
  • 微信商城与网站一体男直接做的视频网站
  • 专业网站建设公司哪里济南兴田德润什么活动关于《大学物理》网站资源建设的思路
  • 信息化建设 网站新网页游戏开服表
  • 网站建设哪家服务好优秀营销软文100篇
  • 阜阳企业做网站荆门网站建设电话咨询
  • 如何申请网站域名wordpress 对联广告
  • 网站内页关键词密度长沙正规网站制作公司
  • 手机网站关键词快速排名艺术家网站源码
  • 做php网站用的软件思源黑体可以做网站
  • sharepoint网站制作wordpress在哪设置评论
  • 甘肃做网站的公司有哪些网站导航设置
  • 网站 售后服务做网站推广的话术
  • 广安商城网站建设大型网站有哪些用php做的
  • 毕业设计网站方向wordpress静态优化