当前位置: 首页 > news >正文

公司网站建立流程关联词有哪些

公司网站建立流程,关联词有哪些,app软件下载入口,国内网站为什么要备案Flink系列之#xff1a;JDBC SQL 连接器 一、JDBC SQL 连接器二、依赖三、创建 JDBC 表四、连接器参数五、键处理六、分区扫描七、Lookup Cache八、幂等写入九、JDBC Catalog十、JDBC Catalog 的使用十一、JDBC Catalog for PostgreSQL十二、JDBC Catalog for MySQL十三、数据… Flink系列之JDBC SQL 连接器 一、JDBC SQL 连接器二、依赖三、创建 JDBC 表四、连接器参数五、键处理六、分区扫描七、Lookup Cache八、幂等写入九、JDBC Catalog十、JDBC Catalog 的使用十一、JDBC Catalog for PostgreSQL十二、JDBC Catalog for MySQL十三、数据类型映射 一、JDBC SQL 连接器 Scan Source: BoundedLookup Source: Sync ModeSink: BatchSink: Streaming Append Upsert Mode JDBC 连接器允许使用 JDBC 驱动向任意类型的关系型数据库读取或者写入数据。本文档描述了针对关系型数据库如何通过建立 JDBC 连接器来执行 SQL 查询。 如果在 DDL 中定义了主键JDBC sink 将以 upsert 模式与外部系统交换 UPDATE/DELETE 消息否则它将以 append 模式与外部系统交换消息且不支持消费 UPDATE/DELETE 消息。 二、依赖 在连接到具体数据库时也需要对应的驱动依赖目前支持的驱动如下 DriverGroup IdArtifact IdJARMySQLmysqlmysql-connector-java下载Oraclecom.oracle.database.jdbcojdbc8下载PostgreSQLorg.postgresqlpostgresql下载Derbyorg.apache.derbyderby下载SQL Servercom.microsoft.sqlservermssql-jdbc下载 三、创建 JDBC 表 JDBC table 可以按如下定义 -- 在 Flink SQL 中注册一张 MySQL 表 users CREATE TABLE MyUserTable (id BIGINT,name STRING,age INT,status BOOLEAN,PRIMARY KEY (id) NOT ENFORCED ) WITH (connector jdbc,url jdbc:mysql://localhost:3306/mydatabase,table-name users );-- 从另一张表 T 将数据写入到 JDBC 表中 INSERT INTO MyUserTable SELECT id, name, age, status FROM T;-- 查看 JDBC 表中的数据 SELECT id, name, age, status FROM MyUserTable;-- JDBC 表在时态表关联中作为维表 SELECT * FROM myTopic LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime ON myTopic.key MyUserTable.id;这段代码是在Flink SQL中注册一个MySQL表users表结构包括id、name、age、status等字段并设置id作为主键。 接下来通过INSERT INTO语句从名为T的表中将数据写入到JDBC表MyUserTable中。INSERT INTO语句用于将T表中的id、name、age、status字段的值插入到MyUserTable表中。 然后使用SELECT语句查看JDBC表中的数据返回id、name、age、status字段的值。 最后一段语句是将JDBC表MyUserTable作为维表与实时流数据源myTopic进行时态表关联。通过FOR SYSTEM_TIME AS OF子句指定以myTopic.proctime字段的时间为基准将myTopic的key字段与MyUserTable的id字段进行关联查询返回所有字段的值。 通过这段代码可以实现将数据从一个表写入到MySQL表中并在流处理中进行关联查询从而实现时态表的操作。 四、连接器参数 参数是否必填默认值类型描述connector必填(none)String指定使用什么类型的连接器这里应该是’jdbc’。url必填(none)StringJDBC 数据库 url。table-name必填(none)String连接到 JDBC 表的名称。driver可选(none)String用于连接到此 URL 的 JDBC 驱动类名如果不设置将自动从 URL 中推导。username可选(none)StringJDBC 用户名。如果指定了 ‘username’ 和 ‘password’ 中的任一参数则两者必须都被指定。password可选(none)StringJDBC 密码。connection.max-retry-timeout可选60sDuration最大重试超时时间以秒为单位且不应该小于 1 秒。scan.partition.column可选(none)String用于将输入进行分区的列名。scan.partition.num可选(none)Integer分区数。scan.partition.lower-bound可选(none)Integer第一个分区的最小值。scan.partition.upper-bound可选(none)Integer最后一个分区的最大值。scan.fetch-size可选0Integer每次循环读取时应该从数据库中获取的行数。如果指定的值为 ‘0’则该配置项会被忽略。scan.auto-commit可选trueBoolean在 JDBC 驱动程序上设置 auto-commit 标志 它决定了每个语句是否在事务中自动提交。有些 JDBC 驱动程序特别是 Postgres可能需要将此设置为 false 以便流化结果。lookup.cache可选(none)枚举类型可选值: NONE, PARTIAL维表的缓存策略。 目前支持 NONE不缓存和 PARTIAL只在外部数据库中查找数据时缓存。lookup.cache.max-rows可选(none)Integer维表缓存的最大行数若超过该值则最老的行记录将会过期。 使用该配置时 “lookup.cache” 必须设置为 PARTIAL”。lookup.partial-cache.expire-after-write可选(none)Duration在记录写入缓存后该记录的最大保留时间。 使用该配置时 “lookup.cache” 必须设置为 PARTIAL”。lookup.partial-cache.expire-after-access可选(none)Duration在缓存中的记录被访问后该记录的最大保留时间。 使用该配置时 “lookup.cache” 必须设置为 PARTIAL”。lookup.partial-cache.cache-missing-key可选(none)Boolean是否缓存维表中不存在的键默认为true。 使用该配置时 “lookup.cache” 必须设置为 PARTIAL”。lookup.max-retries可选3Integer查询数据库失败的最大重试次数。sink.buffer-flush.max-rows可选100Integerflush 前缓存记录的最大值可以设置为 ‘0’ 来禁用它。sink.buffer-flush.interval可选1sDurationflush 间隔时间超过该时间后异步线程将 flush 数据。可以设置为 ‘0’ 来禁用它。注意, 为了完全异步地处理缓存的 flush 事件可以将 ‘sink.buffer-flush.max-rows’ 设置为 ‘0’ 并配置适当的 flush 时间间隔。sink.max-retries可选3Integer写入记录到数据库失败后的最大重试次数。sink.parallelism可选(none)Integer用于定义 JDBC sink 算子的并行度。默认情况下并行度是由框架决定使用与上游链式算子相同的并行度。 五、键处理 当写入数据到外部数据库时Flink 会使用 DDL 中定义的主键。如果定义了主键则连接器将以 upsert 模式工作否则连接器将以 append 模式工作。 在 upsert 模式下Flink 将根据主键判断插入新行或者更新已存在的行这种方式可以确保幂等性。为了确保输出结果是符合预期的推荐为表定义主键并且确保主键是底层数据库中表的唯一键或主键。在 append 模式下Flink 会把所有记录解释为 INSERT 消息如果违反了底层数据库中主键或者唯一约束INSERT 插入可能会失败。 六、分区扫描 为了在并行 Source task 实例中加速读取数据Flink 为 JDBC table 提供了分区扫描的特性。 如果下述分区扫描参数中的任一项被指定则下述所有的分区扫描参数必须都被指定。这些参数描述了在多个 task 并行读取数据时如何对表进行分区。 scan.partition.column 必须是相关表中的数字、日期或时间戳列。注意scan.partition.lower-bound 和 scan.partition.upper-bound 用于决定分区的起始位置和过滤表中的数据。如果是批处理作业也可以在提交 flink 作业之前获取最大值和最小值。 scan.partition.column输入用于进行分区的列名。scan.partition.num分区数。scan.partition.lower-bound第一个分区的最小值。scan.partition.upper-bound最后一个分区的最大值。 七、Lookup Cache JDBC 连接器可以用在时态表关联中作为一个可 lookup 的 source (又称为维表)当前只支持同步的查找模式。 默认情况下lookup cache 是未启用的你可以将 lookup.cache 设置为 PARTIAL 参数来启用。 lookup cache 的主要目的是用于提高时态表关联 JDBC 连接器的性能。默认情况下lookup cache 不开启所以所有请求都会发送到外部数据库。 当 lookup cache 被启用时每个进程即 TaskManager将维护一个缓存。Flink 将优先查找缓存只有当缓存未查找到时才向外部数据库发送请求并使用返回的数据更新缓存。 当缓存命中最大缓存行 lookup.partial-cache.max-rows 或当行超过 lookup.partial-cache.expire-after-write 或 lookup.partial-cache.expire-after-access 指定的最大存活时间时缓存中的行将被设置为已过期。 缓存中的记录可能不是最新的用户可以将缓存记录超时设置为一个更小的值以获得更好的刷新数据但这可能会增加发送到数据库的请求数。所以要做好吞吐量和正确性之间的平衡。 默认情况下flink 会缓存主键的空查询结果你可以通过将 lookup.partial-cache.cache-missing-key 设置为 false 来切换行为。 八、幂等写入 如果在 DDL 中定义了主键JDBC sink 将使用 upsert 语义而不是普通的 INSERT 语句。upsert 语义指的是如果底层数据库中存在违反唯一性约束则原子地添加新行或更新现有行这种方式确保了幂等性。 如果出现故障Flink 作业会从上次成功的 checkpoint 恢复并重新处理这可能导致在恢复过程中重复处理消息。强烈推荐使用 upsert 模式因为如果需要重复处理记录它有助于避免违反数据库主键约束和产生重复数据。 除了故障恢复场景外数据源kafka topic也可能随着时间的推移自然地包含多个具有相同主键的记录这使得 upsert 模式是用户期待的。 由于 upsert 没有标准的语法因此下表描述了不同数据库的 DML 语法 数据库更新语法MySQLINSERT … ON DUPLICATE KEY UPDATE …OracleMERGE INTO … USING (…) ON (…) WHEN MATCHED THEN UPDATE SET (…) WHEN NOT MATCHED THEN INSERT (…) VALUES (…)PostgreSQLINSERT … ON CONFLICT … DO UPDATE SET …MS SQL ServerMERGE INTO … USING (…) ON (…) WHEN MATCHED THEN UPDATE SET (…) WHEN NOT MATCHED THEN INSERT (…) VALUES (…) 九、JDBC Catalog JdbcCatalog 允许用户通过 JDBC 协议将 Flink 连接到关系数据库。 目前JDBC Catalog 有两个实现即 Postgres Catalog 和 MySQL Catalog。目前支持如下 catalog 方法。其他方法目前尚不支持。 // Postgres Catalog MySQL Catalog 支持的方法 databaseExists(String databaseName); listDatabases(); getDatabase(String databaseName); listTables(String databaseName); getTable(ObjectPath tablePath); tableExists(ObjectPath tablePath);其他的 Catalog 方法现在尚不支持。 十、JDBC Catalog 的使用 本小节主要描述如果创建并使用 Postgres Catalog 或 MySQL Catalog。 请参阅 Dependencies 部分了解如何配置 JDBC 连接器和相应的驱动。 JDBC catalog 支持以下参数: name必填catalog 的名称。default-database必填默认要连接的数据库。username必填Postgres/MySQL 账户的用户名。password必填账户的密码。base-url必填不应该包含数据库名 对于 Postgres Catalog base-url 应为 “jdbc:postgresql://:” 的格式。对于 MySQL Catalog base-url 应为 “jdbc:mysql://:” 的格式。 SQL CREATE CATALOG my_catalog WITH(type jdbc,default-database ...,username ...,password ...,base-url ... );USE CATALOG my_catalog;Java EnvironmentSettings settings EnvironmentSettings.inStreamingMode(); TableEnvironment tableEnv TableEnvironment.create(settings);String name my_catalog; String defaultDatabase mydb; String username ...; String password ...; String baseUrl ...JdbcCatalog catalog new JdbcCatalog(name, defaultDatabase, username, password, baseUrl); tableEnv.registerCatalog(my_catalog, catalog);// 设置 JdbcCatalog 为会话的当前 catalog tableEnv.useCatalog(my_catalog);Scala val settings EnvironmentSettings.inStreamingMode() val tableEnv TableEnvironment.create(settings)val name my_catalog val defaultDatabase mydb val username ... val password ... val baseUrl ...val catalog new JdbcCatalog(name, defaultDatabase, username, password, baseUrl) tableEnv.registerCatalog(my_catalog, catalog)// 设置 JdbcCatalog 为会话的当前 catalog tableEnv.useCatalog(my_catalog)Python from pyflink.table.catalog import JdbcCatalogenvironment_settings EnvironmentSettings.in_streaming_mode() t_env TableEnvironment.create(environment_settings)name my_catalog default_database mydb username ... password ... base_url ...catalog JdbcCatalog(name, default_database, username, password, base_url) t_env.register_catalog(my_catalog, catalog)# 设置 JdbcCatalog 为会话的当前 catalog t_env.use_catalog(my_catalog)YAML execution:...current-catalog: my_catalog # 设置目标 JdbcCatalog 为会话的当前 catalogcurrent-database: mydbcatalogs:- name: my_catalogtype: jdbcdefault-database: mydbusername: ...password: ...base-url: ...十一、JDBC Catalog for PostgreSQL PostgreSQL 元空间映射 除了数据库之外postgreSQL 还有一个额外的命名空间 schema。一个 Postgres 实例可以拥有多个数据库每个数据库可以拥有多个 schema其中一个 schema 默认名为 “public”每个 schema 可以包含多张表。 在 Flink 中当查询由 Postgres catalog 注册的表时用户可以使用 schema_name.table_name 或只有 table_name其中 schema_name 是可选的默认值为 “public”。 因此Flink Catalog 和 Postgres 之间的元空间映射如下 Flink 目录元空间结构Postgres 元空间结构catalog name (defined in Flink only)N/Adatabase namedatabase nametable name[schema_name.]table_name Flink 中的 Postgres 表的完整路径应该是 “..schema.table”。如果指定了 schema请注意需要转义 schema.table。 这里提供了一些访问 Postgres 表的例子 -- 扫描 public schema即默认 schema中的 test_table 表schema 名称可以省略 SELECT * FROM mypg.mydb.test_table; SELECT * FROM mydb.test_table; SELECT * FROM test_table;-- 扫描 custom_schema schema 中的 test_table2 表 -- 自定义 schema 不能省略并且必须与表一起转义。 SELECT * FROM mypg.mydb.custom_schema.test_table2 SELECT * FROM mydb.custom_schema.test_table2; SELECT * FROM custom_schema.test_table2;十二、JDBC Catalog for MySQL MySQL 元空间映射 MySQL 实例中的数据库与 MySQL Catalog 注册的 catalog 下的数据库处于同一个映射层级。一个 MySQL 实例可以拥有多个数据库每个数据库可以包含多张表。 在 Flink 中当查询由 MySQL catalog 注册的表时用户可以使用 database.table_name 或只使用 table_name其中 database 是可选的默认值为创建 MySQL Catalog 时指定的默认数据库。 因此Flink Catalog 和 MySQL catalog 之间的元空间映射如下 Flink 目录元空间结构Mysql 元空间结构catalog name (defined in Flink only)N/Adatabase namedatabase nametable nametable_name Flink 中的 MySQL 表的完整路径应该是 “catalog.db.table”。 这里提供了一些访问 MySQL 表的例子 -- 扫描 默认数据库中的 test_table 表 SELECT * FROM mysql_catalog.mydb.test_table; SELECT * FROM mydb.test_table; SELECT * FROM test_table;-- 扫描 given_database 数据库中的 test_table2 表 SELECT * FROM mysql_catalog.given_database.test_table2; SELECT * FROM given_database.test_table2;十三、数据类型映射 Flink 支持连接到多个使用方言dialect的数据库如 MySQL、Oracle、PostgreSQL、Derby 等。其中Derby 通常是用于测试目的。下表列出了从关系数据库数据类型到 Flink SQL 数据类型的类型映射映射表可以使得在 Flink 中定义 JDBC 表更加简单。 MySQL typeOracle typePostgreSQL typeSQL Server typeFlink SQL typeTINYINTTINYINTTINYINTSMALLINT, TINYINT UNSIGNEDSMALLINT, INT2, SMALLSERIAL, SERIAL2SMALLINTSMALLINTINT, MEDIUMINT, SMALLINT UNSIGNEDINTEGER, SERIALINTINTBIGINT, INT UNSIGNEDBIGINT, BIGSERIALBIGINTBIGINTBIGINT UNSIGNEDDECIMAL(20, 0)BIGINTBIGINTBIGINTBIGINTFLOATBINARY_FLOATREAL, FLOAT4REALFLOATDOUBLE, DOUBLE PRECISIONBINARY_DOUBLEFLOAT8, DOUBLE PRECISIONFLOATDOUBLENUMERIC(p, s), DECIMAL(p, s)SMALLINT, FLOAT(s), DOUBLE PRECISION, REAL, NUMBER(p, s)NUMERIC(p, s), DECIMAL(p, s)DECIMAL(p, s)DECIMAL(p, s)BOOLEAN, TINYINT(1)BOOLEANBITBOOLEANDATEDATEDATEDATEDATETIME [§]DATETIME [§] [WITHOUT TIMEZONE]TIME(0)TIME [§] [WITHOUT TIMEZONE]DATETIME [§]TIMESTAMP [§] [WITHOUT TIMEZONE]TIMESTAMP [§] [WITHOUT TIMEZONE]DATETIME, DATETIME2TIMESTAMP [§] [WITHOUT TIMEZONE]CHAR(n), VARCHAR(n), TEXTCHAR(n), VARCHAR(n), CLOBCHAR(n), CHARACTER(n), VARCHAR(n), CHARACTER VARYING(n), TEXTCHAR(n), NCHAR(n), VARCHAR(n), NVARCHAR(n), TEXT, NTEXTSTRINGBINARY, VARBINARY, BLOBRAW(s), BLOBBYTEABINARY(n), VARBINARY(n)BYTESARRAYARRAY
http://www.yutouwan.com/news/463985/

相关文章:

  • 芜湖做网站的邓健照片厦门专业的网站建设
  • 东莞网站建设制作价格做视频网站 服务器
  • 去哪个网站找题目给孩子做网站建设排名优化公司
  • 郑州pc网站开发wap建站程序
  • 网站免费一站二站四站模板制作工艺流程
  • 什么是网站建设外包wordpress 商场插件
  • 百度站长平台网址wordpress外部链接
  • html5自适应网站模板广州安全教育平台登录入口账号
  • 没备案的网站能用吗有源码怎么做app
  • 制作网站 美工天津做一个简单的网站首页
  • 东莞php网站建设价格网站前置审批类型
  • 网站建设的步骤及方法哪些是用vue做的网站
  • 广东广州电脑个人建站wordpress怎么把图片存七牛
  • 保险官方网站顺德手机网站设计信息
  • 做网站工作室互联网商城建设
  • 什么网站可以做高三英语试题吉林省吉林市舒兰市
  • 免费网站商城建设网站图片自动切换怎么做
  • 建网站发信息做推广云主机怎么建网站
  • 电影网站系统源码wordpress缩略图延时加载
  • 外贸网站平台是不是很难做莱芜网站开发
  • 音乐网站开发案例微网站开发 php
  • 安徽平台网站建设哪里好在社保网站上怎么做员工的退费
  • 做网站需要备注号码安徽建设住房建设厅网站
  • 建设银行有招投标网站吗网站入口你明白我的意思吧
  • 苏州网站建设万户微信群二维码大全网站
  • 法律对网站建设的规制制作网站 公司简介
  • 可以做国外购物的网站赌粉在哪个平台引流
  • 设计相关的网站哪个网站做照片书最好
  • 做准考证的网站深圳网约车
  • 苏州网站建设公司电话深圳画册设计排版