semcms外贸网站管理系统,wordpress修改访问路径,宁波谷歌seo,申请网站需要什么资料前言#xff1a;
SparkSQL和HiveSQL的Join操作中也有谓词下推#xff1f;今天就通过大神的文章来了解下。同样#xff0c;如有冒犯#xff0c;请联系。
正文
上文简要介绍了Join在大数据领域中的使用背景以及常用的几种算法#xff0d;broadcast hash join 、shuffle h…前言
SparkSQL和HiveSQL的Join操作中也有谓词下推今天就通过大神的文章来了解下。同样如有冒犯请联系。
正文
上文简要介绍了Join在大数据领域中的使用背景以及常用的几种算法broadcast hash join 、shuffle hash join以及sort merge join等对每一种算法的核心应用场景也做了相关介绍这里再重点说明一番大表与小表进行join会使用broadcast hash join一旦小表稍微大点不再适合广播分发就会选择shuffle hash join最后两张大表的话无疑选择sort merge join。 好了问题来了说是这么一说但到底选择哪种算法归根结底是SQL执行引擎干的事情按照上文逻辑SQL执行引擎肯定要知道参与Join的两表大小才能选择最优的算法喽那么斗胆问一句怎么知道两表大小衡量两表大小的是物理大小还是纪录多少抑或两者都有其实这是另一门学问基于代价优化Cost Based Optimization简称CBO它不仅能够解释Join算法的选择问题更重要的它还能确定多表联合Join场景下的Join顺序问题。 是不是对CBO很期待呢好吧这里先刨个坑下一个话题我们再聊。那今天要聊点什么呢Join算法选择、Join顺序选择确实对Join性能影响极大但还有一个很重要的因素对Join的性能至关重要那就是Join算法优化无论是broadcast hash join、shuffle hash join还是sort merge join都是最基础的join算法有没有什么优化方案呢还真有这就是今天要聊的主角Runtime Filter下文简称RF
RF预备知识bloom filter
RF说白了是使用bloomfilter对参与join的表进行过滤减少实际参与join的数据量。为了下文详细解释整个流程有必要先解释一下bloomfilter这个数据结构对之熟悉的看官可以绕道。Bloom Filter使用位数组来实现过滤初始状态下位数组每一位都为0如下图所示 假如此时有一个集合S {x1, x2, … xn}Bloom Filter使用k个独立的hash函数分别将集合中的每一个元素映射到1,…,m的范围。对于任何一个元素被映射到的数字作为对应的位数组的索引该位会被置为1。比如元素x1被hash函数映射到数字8那么位数组的第8位就会被置为1。下图中集合S只有两个元素x和y分别被3个hash函数进行映射映射到的位置分别为036和4710对应的位会被置为1: 现在假如要判断另一个元素是否是在此集合中只需要被这3个hash函数进行映射查看对应的位置是否有0存在如果有的话表示此元素肯定不存在于这个集合否则有可能存在。下图所示就表示z肯定不在集合xy中
RF算法理论
为了更好地说明整个过程这里使用一个SQL示例对RF算法进行完整讲解SQL
select item.name, order.*
from order , item
where order.item_id item.id
and item.category ‘book’其中order为订单表item为商品表两张表根据商品id字段进行join该SQL意为取出商品类别为书籍的所有订单详情。假设商品类型为书籍的商品并不多join算法因此确定为broadcast hash join。整个流程如下图所示 Step 1将item表的join字段item.id经过多个hash函数映射处理为一个bloomfilter如果对bloomfilter不了解自行google Step 2将映射好的bloomfilter分别广播到order表的所有partition上准备进行过滤 Step 3以Partition2为例存储进程比如DataNode进程将order表中join列order.item_id数据一条一条读出来使用bloomfilter进行过滤。淘汰该订单数据不是书籍相关商品的订单这条数据直接跳过否则该条订单数据有可能是待检索订单将该行数据全部扫描出来。 Step 4将所有未被bloomfilter过滤掉的订单数据通过本地socket通信发送到计算进程impalad。 Step 5再将所有书籍商品数据广播到所有Partition节点与step4所得订单数据进行真正的hashjoin操作得到最终的选择结果。
RF算法分析
上面通过一个SQL示例简单演示了整个RF算法在broadcast hash join中的操作流程根据流程对该算法进行一下理论层次分析
RF本质通过谓词 bloomfilter下推在存储层通过bloomfilter对数据进行过滤可以从三个方面实现对Join的优化。其一如果可以跳过很多记录就可以减少了数据IO扫描次数。这点需要重点解释一下许多朋友会有这样的疑问既然需要把数据扫描出来使用BloomFilter进行过滤为什么还会减少IO扫描次数呢这里需要关注一个事实大多数表存储行为都是列存列之间独立存储扫描过滤只需要扫描join列数据而不是所有列如果某一列被过滤掉了其他对应的同一行的列就不需要扫描了这样减少IO扫描次数。其二减少了数据从存储层通过socket(甚至TPC发送到计算层的开销其三减少了最终hash join执行的开销。RF代价对照未使用RF的Broadcast Hash Join来看前者主要增加了bloomfilter的生成、广播以及大表根据bloomfilter进行过滤这三个开销。通常情况下这几个步骤在小表较小的情况下代价并不大基本可以忽略。RF优化效果基本取决于bloomfilter的过滤效果如果大量数据被过滤掉了那么join的性能就会得到极大提升否则性能提升就会有限。RF实现和常见的谓词下推’‘’’’‘等一样RF实现需要在计算层以及存储层分别进行相关逻辑实现计算层要构造bloomfilter并将bloomfilter下传到存储层存储层要实现使用该bloomfilter对指定数据进行过滤。
RF效果验证
事实上RF这个东东的优化效果是在组内同事何大神做impala on parquet以及impala on kudu的基准对比测试的时候分析发现的。实际测试中impala on parquet 比之impala on kudu性能有明显优势目测至少10倍性能提升。同一SQL解析引擎不同存储引擎性能竟然天壤之别为了分析具体原因同事就使用impala的执行计划分析工具对两者的执行计划分别进行了分析才透过蛛丝马迹发现前者使用了RF而后者并没有当然可能还有其他因素但RF肯定是原因之一。 简单复盘一下这次测试吧基准测试使用TPCDS测试数据规模为1T本文使用测试过程中的一个典型SQLQ40作为示例对RF的神奇功效进行回放演示。下图是Q40的对比性能直观上来看RF可以直接带来40x的性能提升40倍哎这到底是怎么做到的 先来简单看看Q40的SQL语句如下所示看起来比较复杂核心涉及到3个表catalog_sales join date_dim 、catalog_sales join warehouse 、catalog_sales join item的join操作
select w_state ,i_item_id ,
sum(case when (cast(d_date as date) cast (‘1998-04-08’ as date)) then cs_sales_price – coalesce(cr_refunded_cash,0) else 0 end) as sales_before ,
sum(case when (cast(d_date as date) cast (‘1998-04-08’ as date)) then cs_sales_price – coalesce(cr_refunded_cash,0) else 0 end) as sales_after
from catalog_sales
left outer join catalog_returns on (catalog_sales.cs_order_number catalog_returns.cr_order_number
and catalog_sales.cs_item_sk catalog_returns.cr_item_sk) ,
warehouse ,item ,date_dim
where i_current_price between 0.99 and 1.49
and item.i_item_sk catalog_sales.cs_item_sk
and catalog_sales.cs_warehouse_sk warehouse.w_warehouse_sk
and catalog_sales.cs_sold_date_sk date_dim.d_date_sk
and date_dim.d_date between ‘1998-03-09’ and ‘1998-05-08’
group by w_state,i_item_id
order by w_state,i_item_id limit 100;典型的星型结构其中catalog_sales是事实表其他表为纬度表。本次分析选择其中catalog_sales join item这个纬度的join。因为对比测试中两者的SQL解析引擎都是使用impala所以SQL执行计划基本都相同。在此基础上来看看执行计划中单个执行节点在执行catalog_sales join item操作时由先到后的主要阶段耗时其中只贴出来重要耗时阶段Q40中Join算法为shuffle hash join与上文所举broadcast hash join示例略有不同不过不影响结论
实验项目impala on kudu(without runtime filter)impala on kudu(without runtime filter)total time43s996ms2s385msbloomfilter生成Filter 0 arrival: 857ms Filter 1 arrival: 879ms Filter 2 arrival: 939ms大表scan扫描HDFS_SCAN_NODE (id0):(Total: 3s479ms – RowsRead: 72.01M –RowsReturned: 72.01M – RowsReturnedRate: 20.69 M/sHDFS_SCAN_NODE (id0):(Total: 2s011ms – RowsRead: 72.01M – RowsReturned: 35.92K – RowsReturnedRate: 17.86 K/sec Filter 0 (1.00 MB): – Rows processed: 72.01M – Rows rejected: 71.43M – Rows total: 72.01M Filter 1 (1.00 MB): – Rows processed: 49.15K – Rows rejected: 126 – Rows total: 49.15K Filter 2 (1.00 MB): – Rows processed: 584.38K – Rows rejected: 548.46K – Rows total: 584.38K数据加载计算进程内存DataStreamSender (dst_id11):(Total: 15s984ms) – NetworkThroughput(*): 298.78 MB/sec – OverallThroughput: 100.85 MB/sec – RowsReturned: 72.01M– SerializeBatchTime: 10s567ms – TransmitDataRPCTime: 5s395msDataStreamSender (dst_id11):(Total: 10.725ms – NetworkThroughput(*): 244.06 MB/sec – OverallThroughput: 71.23 MB/sec – RowsReturned: 35.92K – SerializeBatchTime: 7.544ms – TransmitDataRPCTime: 3.130msHash JoinHASH_JOIN_NODE (id5): (Total: 19s104ms – BuildPartitionTime: 862.560ms – BuildRows: 8.99M – BuildRowsPartitioned: 8.99M – BuildTime: 373.855ms – …… – ProbeRows: 90.00M – ProbeRowsPartitioned: 0 (0) – ProbeTime: 17s628ms – RowsReturned: 90.00M – RowsReturnedRate: 985.85 K/s – SpilledPartitions: 0 (0) – UnpinTime: 960.000nsHASH_JOIN_NODE (id6): (Total: 21.707ms) – BuildPartitionTime: 3.487ms – BuildRows: 18.81K (18814) – BuildRowsPartitioned: 18.81K – BuildTime: 646.817us – …… – ProbeRows: 85.28K (85278) – ProbeRowsPartitioned: 0 (0) – ProbeTime: 6.396ms – RowsReturned: 85.27K – RowsReturnedRate: 38.88 K/s – SpilledPartitions: 0 (0) – UnpinTime: 915.000ns
经过对两种场景执行计划的解析可以基本验证上文所做的基本理论结果 1. 确认经过RF之后大表的数据量得到大量滤除只剩下少量数据参与最终的HashJoin。参见第二行大表scan扫描结果未使用rf的返回结果有7千万行纪录而经过RF过滤之后满足条件的只有3w纪录。3万相比7千万性能优化效果自然不言而喻。 2. 经过RF滤除之后少量数据经过网络从存储进程加载到计算进程内存的网络耗时大量减少。参见第三行“数据加载到计算进程内存”前者耗时15s后者耗时仅仅11ms。主要耗时分为两部分其中数据序列化时间占到2/310s左右数据经过RPC传输时间占另外1/3 5s左右。 3. 最后经过RF滤除之后参与到最终Hash Join的数据量大幅减少Hash Join耗时前者是19s后者是21ms左右。主要耗时在于大表Probe Time前者消耗了17s左右而后者仅需6ms。 说好的谓词下推呢 讲真刚开始接触RF的时候觉得这简直是一个实实在在的神器崇拜之情溢于言表。然而经过一段时间的探索消化直至把这篇文章写完也就是此时此刻忽然觉得它并不高深莫测说白了就是一个谓词下推不同的是这里的谓词稍微奇怪一点是一个bloomfilter而已。
提到谓词下推这里再引申一下下。以前经常满大街听到谓词下推然而对谓词下推却总感觉懵懵懂懂并不明白的很真切。经过RF的洗礼现在确信有了更进一步的理解。这里拿出来和大家交流交流。个人认为谓词下推有两个层面的理解 其一是逻辑执行计划优化层面的说法比如SQL语句select * from order ,item where item.id order.item_id and item.category ‘book’正常情况语法解析之后应该是先执行Join操作再执行Filter操作。通过谓词下推可以将Filter操作下推到Join操作之前执行。即将where item.category ‘book’下推到 item.id order.item_id之前先行执行。 其二是真正实现层面的说法谓词下推是将过滤条件从计算进程下推到存储进程先行执行注意这里有两种类型进程计算进程以及存储进程。计算与存储分离思想这在大数据领域相当常见比如最常见的计算进程有SparkSQL、Hive、impala等负责SQL解析优化、数据计算聚合等存储进程有HDFSDataNode、Kudu、HBase负责数据存储。正常情况下应该是将所有数据从存储进程加载到计算进程再进行过滤计算。谓词下推是说将一些过滤条件下推到存储进程直接让存储进程将数据过滤掉。这样的好处显而易见过滤的越早数据量越少序列化开销、网络开销、计算开销这一系列都会减少性能自然会提高。
写到这里忽然意识到笔者在上文出现了一个很严重的认知错误RF机制并不仅仅是一个简单的谓词下推它的精髓在于提出了一个重要的谓词bloomfilter。当前对RF支持的系统并不多笔者只知道目前唯有Impala on Parquet进行了支持。Impala on Kudu虽说Impala支持但Kudu并不支持。SparkSQL on Parqeut中虽有存储系统支持无奈计算引擎SparkSQL目前还不支持。
转自http://hbasefly.com/2017/04/10/bigdata-join-2/