邢台网红打卡地,唐山网站优化,网站设计的发展趋势,微网站如何做微信支付宝支付在上一篇源码阅读中#xff0c;我们介绍了 INSERT 的执行流程。而 INSERT IGNORE 与 INSERT 不同#xff0c;需要对插入值判断是否有 Unique Key 的冲突#xff0c;并忽略有冲突的插入值。因此本文将进一步介绍 PolarDB-X 中 INSERT IGNORE 的执行流程#xff0c;其根据插入…在上一篇源码阅读中我们介绍了 INSERT 的执行流程。而 INSERT IGNORE 与 INSERT 不同需要对插入值判断是否有 Unique Key 的冲突并忽略有冲突的插入值。因此本文将进一步介绍 PolarDB-X 中 INSERT IGNORE 的执行流程其根据插入的表是否有 GSI 也有所变化。
下推执行 如果插入的表只有一张主表没有 GSI那么只需要将 INSERT IGNORE 直接发送到对应的物理表上由 DN 自行忽略存在冲突的值。在这种情况下INSERT IGNORE 的执行过程和 INSERT 基本上相同读者可以参考之前的源码阅读文章。
逻辑执行 而在有 GSI 的情况下就不能简单地将 INSERT IGNORE 分别下发到主表和 GSI 对应的物理分表上否则有可能出现主表和 GSI 数据不一致的情况。举个例子
create table t1 (a int primary key, b int, global index g1(b) dbpartition by hash(b)) dbpartition by hash(a);
insert ignore into t1 values (1,1),(1,2);
对于插入的两条记录它们在主表上位于同一个物理表a 相同但是在 GSI 上位于不同的物理表b 不相同如果直接下发 INSERT IGNORE 的话主表上只有 (1,1) 能够成功插入主键冲突而在 GSI 上 (1,1) 和 (1,2) 都能成功插入于是 GSI 比主表多了一条数据。
针对这种情况一种解决方案是根据插入值中的 Unique Key先到数据库中 SELECT 出有可能冲突的数据到 CN然后在 CN 判断冲突的值并删除。
进行 SELECT 的时候最简单的方式就是将所有的 SELECT 直接发送到主表上但是主表上可能没有对应的 Unique Key这就导致 SELECT 的时候会进行全表扫描影响性能。所以在优化器阶段我们会根据 Unique Key 是在主表还是 GSI 上定义的来确定相应的 SELECT 需要发送到主表还是 GSI具体代码位置
com.alibaba.polardbx.optimizer.core.planner.rule.OptimizeLogicalInsertRule#groupUkByTable
protected MapString, ListListString groupUkByTable(LogicalInsertIgnore insertIgnore,ExecutionContext executionContext) {// 找到每个 Unique Key 在主表和哪些 GSI 中存在MapInteger, ListString ukAllTableMap new HashMap();for (int i 0; i uniqueKeys.size(); i) {ListString uniqueKey uniqueKeys.get(i);for (Map.EntryString, MapString, SetString e : writableTableUkMap.entrySet()) {String currentTableName e.getKey().toUpperCase();MapString, SetString currentUniqueKeys e.getValue();boolean found false;for (SetString currentUniqueKey : currentUniqueKeys.values()) {if (currentUniqueKey.size() ! uniqueKey.size()) {continue;}boolean match currentUniqueKey.containsAll(uniqueKey);if (match) {found true;break;}}if (found) {ukAllTableMap.computeIfAbsent(i, k - new ArrayList()).add(currentTableName);}}}// 确定是在哪一个表上进行 SELECTfor (Map.EntryInteger, ListString e : ukAllTableMap.entrySet()) {ListString tableNames e.getValue();if (tableNames.contains(primaryTableName.toUpperCase())) {tableUkMap.computeIfAbsent(primaryTableName.toUpperCase(), k - new ArrayList()).add(uniqueKeys.get(e.getKey()));} else {final boolean onlyNonPublicGsi tableNames.stream().noneMatch(tn - GlobalIndexMeta.isPublished(executionContext, sm.getTable(tn)));boolean found false;for (String tableName : tableNames) {if (!onlyNonPublicGsi GlobalIndexMeta.isPublished(executionContext, sm.getTable(tableName))) {tableUkMap.computeIfAbsent(tableName, k - new ArrayList()).add(uniqueKeys.get(e.getKey()));found true;break;} else if (onlyNonPublicGsi GlobalIndexMeta.canWrite(executionContext, sm.getTable(tableName))) {tableUkMap.computeIfAbsent(tableName, k - new ArrayList()).add(uniqueKeys.get(e.getKey()));found true;break;}}}}return tableUkMap;}
而到了执行阶段我们在 LogicalInsertIgnoreHandler 中处理 INSERT IGNORE。我们首先会进入 getDuplicatedValues 函数其通过下发 SELECT 的方式查找表中已有的冲突的 Unique Key 的记录。我们将下发的 SELECT 语句中选择的列设置为 (value_index, uk_index, pk)。其中 value_index 和 uk_index 均为的常量。
举个例子假设有表
CREATE TABLE t (id int(11) NOT NULL,a int(11) NOT NULL,b int(11) NOT NULL,PRIMARY KEY (id),UNIQUE GLOBAL KEY g_i_a (a) COVERING (id) DBPARTITION BY HASH(a)
) DBPARTITION BY HASH(id)
以及一条 INSERT IGNORE 语句
INSERT IGNORE INTO t VALUES (1,2,3),(2,3,4),(3,4,5);
假设在 PolarDB-X 中执行时其会将 Unique Key 编号为
0: id
1: g_i_a
INSERT IGNORE 语句中插入的每个值分别编号为
0: (1,2,3)
1: (2,3,4)
2: (3,4,5)
那么对于 (2,3,4) 的 UNIQUE KEY 构造的 GSI 上的 SELECT 即为
查询 GSI
SELECT 1 as value_index, 1 as uk_index, id
FROM g_i_a_xxxx
WHERE a in 3;
假设表中已经存在 (5,3,6)那么这条 SELECT 的返回结果即为 (1,1,5)。此外由于不同的 Unique Key 的 SELECT 返回格式是相同的所以我们会将同一个物理库上不同的SELECT 查询 UNION 起来发送以一次性得到多个结果减少 CN 和 DN 之间的交互次数。只要某个 Unique Key 有重复值我们就能根据 value_index 和 uk_index 确定是插入值的哪一行的哪个 Unique Key 是重复的。
当得到所有的返回结果之后我们对数据进行去重。我们将上一步得到的冲突的的值放入一个 SET 中然后顺序扫描所有的每一行插入值如果发现有重复的就跳过该行否则就将该行也加入到 SET 中因为插入值之间也有可能存在相互冲突。去重完毕之后我们就得到了所有不存在冲突的值将这些值插入到表中之后就完成了一条 INSERT IGNORE 的执行。
逻辑执行的执行流程
com.alibaba.polardbx.repo.mysql.handler.LogicalInsertIgnoreHandler#doExecute
protected int doExecute(LogicalInsert insert, ExecutionContext executionContext,LogicalInsert.HandlerParams handlerParams) {// ...try {MapString, ListListString ukGroupByTable insertIgnore.getUkGroupByTable();ListMapInteger, ParameterContext deduplicated;ListListObject duplicateValues;// 获取表中已有的 Unique Key 冲突值duplicateValues getDuplicatedValues(insertIgnore, LockMode.SHARED_LOCK, executionContext, ukGroupByTable,(rowCount) - memoryAllocator.allocateReservedMemory(MemoryEstimator.calcSelectValuesMemCost(rowCount, selectRowType)), selectRowType, true,handlerParams);final ListMapInteger, ParameterContext batchParameters executionContext.getParams().getBatchParameters();// 根据上一步得到的结果去掉 INSERT IGNORE 中的冲突值deduplicated getDeduplicatedParams(insertIgnore.getUkColumnMetas(), insertIgnore.getBeforeUkMapping(),insertIgnore.getAfterUkMapping(), RelUtils.getRelInput(insertIgnore), duplicateValues,batchParameters, executionContext);if (!deduplicated.isEmpty()) {insertEc.setParams(new Parameters(deduplicated));} else {// All duplicatedreturn affectRows;}// 执行 INSERTtry {if (gsiConcurrentWrite) {affectRows concurrentExecute(insertIgnore, insertEc);} else {affectRows sequentialExecute(insertIgnore, insertEc);}} catch (Throwable e) {handleException(executionContext, e, GeneralUtil.isNotEmpty(insertIgnore.getGsiInsertWriters()));}} finally {selectValuesPool.destroy();}return affectRows;}
RETURNING 优化 上一节提到的 INSERT IGNORE 的逻辑执行方式虽然保证了数据的正确性但是也使得一条 INSERT IGNORE 语句至少需要 CN 和 DN 的两次交互才能完成第一次 SELECT第二次 INSERT影响了 INSERT IGNORE 的执行性能。
目前的 DN 已经支持了 AliSQL 的 RETURNING 优化其可以在 DN 的 INSERT IGNORE 执行完毕之后返回成功插入的值。利用这一功能PolarDB-X 对 INSERT IGNORE 进行了进一步的优化直接将 INSERT IGNORE 下发如果在主表和 GSI 上全部成功返回那么就说明插入值中没有冲突于是就成功完成该条 INSERT IGNORE 的执行否则就将多插入的值删除。
执行时CN 首先会根据上文中的语法下发带有 RETURNING 的物理 INSERT IGNORE 语句到 DN比如
call dbms_trans.returning(a, insert into t1_xxxx values(1,1));
其中返回列是主键用来标识插入的一批数据中哪些被成功插入了t1_xxxx 是逻辑表 t1 的一个物理分表。当主表和 GSI 上的所有 INSERT IGNORE 执行完毕之后我们计算主表和 GSI 中成功插入值的交集作为最后的结果然后删除多插入的值。这部分代码在
com.alibaba.polardbx.repo.mysql.handler.LogicalInsertIgnoreHandler#getRowsToBeRemoved
private MapString, ListListObject getRowsToBeRemoved(String tableName,MapString, ListListObject tableInsertedValues,ListInteger beforePkMapping,ListColumnMeta pkColumnMetas) {final MapString, SetGroupKey tableInsertedPks new TreeMap(String.CASE_INSENSITIVE_ORDER);final MapString, ListPairGroupKey, ListObject tablePkRows new TreeMap(String.CASE_INSENSITIVE_ORDER);tableInsertedValues.forEach((tn, insertedValues) - {final SetGroupKey insertedPks new TreeSet();final ListPairGroupKey, ListObject pkRows new ArrayList();for (ListObject inserted : insertedValues) {final Object[] groupKeys beforePkMapping.stream().map(inserted::get).toArray();final GroupKey pk new GroupKey(groupKeys, pkColumnMetas);insertedPks.add(pk);pkRows.add(Pair.of(pk, inserted));}tableInsertedPks.put(tn, insertedPks);tablePkRows.put(tn, pkRows);});// Get intersect of inserted valuesfinal SetGroupKey distinctPks new TreeSet();for (GroupKey pk : tableInsertedPks.get(tableName)) {if (tableInsertedPks.values().stream().allMatch(pks - pks.contains(pk))) {distinctPks.add(pk);}}// Remove values which not exists in at least one insert resultsfinal MapString, ListListObject tableDeletePks new TreeMap(String.CASE_INSENSITIVE_ORDER);tablePkRows.forEach((tn, pkRows) - {final ListListObject deletePks new ArrayList();pkRows.forEach(pkRow - {if (!distinctPks.contains(pkRow.getKey())) {deletePks.add(pkRow.getValue());}});if (!deletePks.isEmpty()) {tableDeletePks.put(tn, deletePks);}});return tableDeletePks;}
与上一节的逻辑执行的“悲观执行”相比使用 RETURNING 优化的 INSERT IGNORE 相当于“乐观执行”如果插入的值本身没有冲突那么一条 INSERT IGNORE 语句 CN 和 DN 间只需要一次交互即可而在有冲突的情况下我们需要下发 DELETE 语句将主表或 GSI 中多插入的值删除于是 CN 和 DN 间需要两次交互。可以看出即便是有冲突的情况CN 和 DN 间的交互次数也不会超过上一节的逻辑执行。因此在无法直接下推的情况下INSERT IGNORE 的执行策略是默认使用 RETURNING 优化执行。
当然 RETURNING 优化的使用也有一些限制比如插入的 Value 有重复主键时就不能使用因为这种情况下无法判断具体是哪一行被成功插入哪一行需要删除具体可以阅读代码中的条件判断。当不能使用 RETURNING 优化时系统会自动选择上一节中的逻辑执行方式执行该条 INSERT IGNORE 语句以保证数据的正确性。
使用 RETURNING 优化的执行流程
com.alibaba.polardbx.repo.mysql.handler.LogicalInsertIgnoreHandler#doExecute
protected int doExecute(LogicalInsert insert, ExecutionContext executionContext,LogicalInsert.HandlerParams handlerParams) {// ...// 判断能否使用 RETURNING 优化boolean canUseReturning executorContext.getStorageInfoManager().supportsReturning() executionContext.getParamManager().getBoolean(ConnectionParams.DML_USE_RETURNING) allDnUseXDataSource gsiCanUseReturning !isBroadcast !ComplexTaskPlanUtils.canWrite(tableMeta);if (canUseReturning) {canUseReturning noDuplicateValues(insertIgnore, insertEc);}if (canUseReturning) {// 执行 INSERT IGNORE 并获得返回结果final ListRelNode allPhyPlan new ArrayList(replaceSeqAndBuildPhyPlan(insertIgnore, insertEc, handlerParams));getPhysicalPlanForGsi(insertIgnore.getGsiInsertIgnoreWriters(), insertEc, allPhyPlan);final MapString, ListListObject tableInsertedValues executeAndGetReturning(executionContext, allPhyPlan, insertIgnore, insertEc, memoryAllocator,selectRowType);// ...// 生成 DELETEfinal boolean removeAllInserted targetTableNames.stream().anyMatch(tn - !tableInsertedValues.containsKey(tn));if (removeAllInserted) {affectedRows -removeInserted(insertIgnore, schemaName, tableName, isBroadcast, insertEc, tableInsertedValues);if (returnIgnored) {ignoredRows totalRows;}} else {final ListInteger beforePkMapping insertIgnore.getBeforePkMapping();final ListColumnMeta pkColumnMetas insertIgnore.getPkColumnMetas();// 计算所有插入值的交集final MapString, ListListObject tableDeletePks getRowsToBeRemoved(tableName, tableInsertedValues, beforePkMapping, pkColumnMetas);affectedRows -removeInserted(insertIgnore, schemaName, tableName, isBroadcast, insertEc, tableDeletePks);if (returnIgnored) {ignoredRows Optional.ofNullable(tableDeletePks.get(insertIgnore.getLogicalTableName())).map(List::size).orElse(0);}}handlerParams.optimizedWithReturning true;if (returnIgnored) {return ignoredRows;} else {return affectedRows;}} else {handlerParams.optimizedWithReturning false;}// ... }
最后以一个例子来展现 RETURNING 优化的执行流程与逻辑执行的不同。通过 /TDDL:CMD_EXTRA(DML_USE_RETURNINGTRUE)/ 这条 HINT用户可以手动控制是否使用 RETURNING 优化。
首先建表并插入一条数据
CREATE TABLE t (id int(11) NOT NULL,a int(11) NOT NULL,b int(11) NOT NULL,PRIMARY KEY (id),UNIQUE GLOBAL KEY g_i_a (a) COVERING (id) DBPARTITION BY HASH(a)
) DBPARTITION BY HASH(id);INSERT INTO t VALUES (1,3,3);
再执行一条 INSERT IGNORE
INSERT IGNORE INTO t VALUES (1,2,3),(2,3,4),(3,4,5);
其中 (1,2,3) 与 (1,3,3) 主键冲突(2,3,4) 与 (1,3,3) 对于 Unique Key g_i_a 冲突。如果是 RETURNING 优化 可以看到 PolarDB-X 先进行了 INSERT IGNORE再将多插入的数据删除(1,2,3) 在主表上冲突在 UGSI 上成功插入(2,3,4) 在 UGSI 上冲突在主表上成功插入因此分别下发对应的 DELETE 到 UGSI 和主表上。
如果关闭 RETURNING 优化逻辑执行 可以看到 PolarDB-X 先进行了 SELECT再将没有冲突的数据 (3,4,5) 插入。
小结 本文介绍了 PolarDB-X 中 INSERT IGNORE 的执行流程。除了 INSERT IGNORE 之外还有一些 DML 语句在执行时也需要进行重复值的判断比如 REPLACE、INSERT ON DUPLICATE KEY UPDATE 等这些语句在有 GSI 的情况下均采用了逻辑执行的方式即先进行 SELECT 再进行判重、更新等操作感兴趣的读者可以自行阅读相关代码。
作者潜璟
原文链接
本文为阿里云原创内容未经允许不得转载