浪院长 浪尖聊大数据 hashjoinjoin是作为业务开发绕不开的SQL话题,无论是传统的数据库join,还是大数据里的join。 Spark BroadCastHashJoin翻过源码之后你就会发现,Spark 1.6之前实现BroadCastHashJoin就是利用的Java的HashMap来实现的。大家感兴趣可以去Spark 1.6的源码里搜索BroadCastHashJoin,HashedRelation,探查一下源码。 ShuffledHashJoinBroadCastHashJoin适合的是大表和小表的join策略,将整个小表广播。很多时候,参与join的表本身都不适合广播,也不适合放入内存,但是按照一定分区拆开后就可以放入内存构建为HashRelation。这个就是分治思想了,将两张表按照相同的hash分区器及分区数进行,对join条件进行分区,那么需要join的key就会落入相同的分区里,然后就可以利用本地join的策略来进行join了。
SortMergeJoin上面两张情况都是小表本身适合放入内存或者中表经过分区治理后适合放入内存,来完成本地化hashedjoin,小表数据放在内存中,很奢侈的,所以经常会遇到join,就oom。小表,中表都是依据内存说的,你内存无限,那是最好。
Spark SQL的join方式选择假如用户使用Spark SQL的适合用了hints,那Spark会先采用Hints提示的join方式。 -- 支持 BROADCAST, BROADCASTJOIN and MAPJOIN 来表达 broadcast hint SELECT /* BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key ShuffledHashJoin,hints的sql写法如下: -- 仅支持 SHUFFLE_HASH 来表达 ShuffledHashJoin hint SELECT /* SHUFFLE_HASH(r) */ * FROM records r JOIN src s ON r.key = s.key SortMergeJoin,hints的SQL写法如下: -- 支持 SHUFFLE_MERGE, MERGE and MERGEJOIN 来表达 SortMergeJoin hintSELECT /* MERGEJOIN(r) */ * FROM records r JOIN src s ON r.key = s.key 假设用户没有使用hints,默认顺序是: plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold 参数:spark.sql.autoBroadcastJoinThreshold 假设两张表都满足广播需求,选最小的。 spark.sql.join.preferSortMergeJoin=true, 还有两个条件,根据统计信息,表的bytes是广播的阈值*总并行度: plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions 并且该表bytes乘以3要小于等于另一张表的bytes: a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes 那么这张表就适合分治之后,作为每个分区构建本地hashtable的表。 def createSortMergeJoin() = { if (RowOrdering.isOrderable(leftKeys)) { Some(Seq(joins.SortMergeJoinExec( leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)))) } else { None } } 这段代码是在SparkStrageties类,JoinSelection单例类内部。 createBroadcastHashJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint)) .orElse { if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None } .orElse(createShuffleHashJoin(hintToShuffleHashLeft(hint), hintToShuffleHashRight(hint))) .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None } .getOrElse(createJoinWithoutHint()) 当然,这三种join都是等值join,之前的版本Spark仅仅支持等值join但是不支持非等值join,常见的业务开发中确实存在非等值join的情况,spark目前支持非等值join的实现有以下两种,由于实现问题,确实很容易oom。 Broadcast nested loop joinShuffle-and-replicate nested loop join。 |
|