博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【原创】大叔问题定位分享(11)Spark中对大表子查询加limit为什么会报Broadcast超时错误...
阅读量:4701 次
发布时间:2019-06-09

本文共 4176 字,大约阅读时间需要 13 分钟。

当两个表需要join时,如果一个是大表,一个是小表,正常的map-reduce流程需要shuffle,这会导致大表数据在节点间网络传输,常见的优化方式是将小表读到内存中并广播到大表处理,避免shuffle+reduce;

在hive中叫mapjoin(map-side join),配置为 hive.auto.convert.join

在spark中叫BroadcastHashJoin (broadcast hash join)

Spark SQL uses broadcast join (aka broadcast hash join) instead of hash join to optimize join queries when the size of one side data is below .

Broadcast join can be very efficient for joins between a large table (fact) with relatively small tables (dimensions) that could then be used to perform a star-schema join. It can avoid sending all data of the large table over the network.

有几种方式可以触发:

1)sql hint (从spark 2.3版本开始支持)

SELECT /*+ MAPJOIN(b) */ ...SELECT /*+ BROADCASTJOIN(b) */ ...SELECT /*+ BROADCAST(b) */ ...

2)broadcast function:DataFrame.broadcast

testTable3= testTable1.join(broadcast(testTable2), Seq("id"), "right_outer")

3)自动优化

org.apache.spark.sql.execution.SparkStrategies.JoinSelection

private def canBroadcast(plan: LogicalPlan): Boolean = {      plan.statistics.isBroadcastable ||        (plan.statistics.sizeInBytes >= 0 &&          plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold)    }

 

 

例如:

spark-sql> explain select * from big_table1 a, (select * from big_table2 limit 10) b where a.id = b.id;

18/09/17 18:14:09 339 WARN Utils66: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.

== Physical Plan ==

BroadcastHashJoin [id#5], [id#14], Inner, BuildRight

:- *Filter isnotnull(id#5)

:  +- HiveTableScan [name#4, id#5], MetastoreRelation big_table1

+- BroadcastExchange HashedRelationBroadcastMode(List(input[6, string, false]))

   +- Filter isnotnull(id#14)

      +- GlobalLimit 10

         +- Exchange SinglePartition

            +- LocalLimit 10

               +- HiveTableScan [id#14, ... 187 more fields], MetastoreRelation big_table2

Time taken: 4.216 seconds, Fetched 1 row(s)

BroadcastExchange 执行过程为

org.apache.spark.sql.execution.exchange.BroadcastExchangeExec

override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {    ThreadUtils.awaitResultInForkJoinSafely(relationFuture, timeout)      .asInstanceOf[broadcast.Broadcast[T]]  }

其中timeout是指spark.sql.broadcastTimeout,默认300s

private lazy val relationFuture: Future[broadcast.Broadcast[Any]] = {    // broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.    val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)    Future {      // This will run in another thread. Set the execution id so that we can connect these jobs      // with the correct execution.      SQLExecution.withExecutionId(sparkContext, executionId) {        try {          val beforeCollect = System.nanoTime()          // Note that we use .executeCollect() because we don't want to convert data to Scala types          val input: Array[InternalRow] = child.executeCollect()          if (input.length >= 512000000) {            throw new SparkException(              s"Cannot broadcast the table with more than 512 millions rows: ${input.length} rows")          }          val beforeBuild = System.nanoTime()          longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000          val dataSize = input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum          longMetric("dataSize") += dataSize          if (dataSize >= (8L << 30)) {            throw new SparkException(              s"Cannot broadcast the table that is larger than 8GB: ${dataSize >> 30} GB")          }           // Construct and broadcast the relation.          val relation = mode.transform(input)          val beforeBroadcast = System.nanoTime()          longMetric("buildTime") += (beforeBroadcast - beforeBuild) / 1000000           val broadcasted = sparkContext.broadcast(relation)          longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 1000000           SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq)          broadcasted

对一个表broadcast执行过程为首先计算然后collect,然后通过SparkContext broadcast出去,并且执行过程为线程异步执行,超时时间为spark.sql.broadcastTimeout;

 

转载于:https://www.cnblogs.com/barneywill/p/10109434.html

你可能感兴趣的文章
mybatis之xml中日期时间段查询的sql语句
查看>>
污染物在线自动监控(监测)系统数据传输标准 (HJ212-2017)-空气质量监测数据包构造...
查看>>
【Python】django模型models的外键关联使用
查看>>
httperf ---linux web站点压力测试
查看>>
JAVA基础知识(五)数据类型转换
查看>>
hdu-5583 Kingdom of Black and White(数学,贪心,暴力)
查看>>
算法与设计模式
查看>>
(4)理解 neutron ml2---port创建流程代码解析
查看>>
python List
查看>>
Expression #1 of SELECT list is not in GROUP BY clause and contains nonaggregated column 'userinfo.
查看>>
免费资源:Polaris UI套件 + Linecons图标集(AI, PDF, PNG, PSD, SVG)
查看>>
http响应状态码大全
查看>>
C# winform 使用DsoFramer 创建 显示office 文档
查看>>
找工作的一些感悟——前端小菜的成长
查看>>
C#委托和事件的应用Observer模式实例
查看>>
codevs1018 单词接龙(DFS)
查看>>
内容分发系统MediaEW:助新闻媒体转投HTML5
查看>>
HTML5 Canvas ( 径向渐变, 升级版的星空 ) fillStyle, createRadialGradient
查看>>
Stanford Local Programming Contest 2011
查看>>
Sql server锁,独占锁,共享锁,更新锁,乐观锁,悲观锁
查看>>