Spark:repartition和repartitionByRange有什么区别?

我在这里浏览了文档:https : //spark.apache.org/docs/latest/api/python/pyspark.sql.html

它说:

  • 用于重新分区:生成的 DataFrame 是散列分区的。
  • 对于 repartitionByRange:生成的 DataFrame 是范围分区的。

和前面的问题也提到了它。但是,我仍然不明白它们究竟有何不同,以及在选择其中一个时会产生什么影响?

更重要的是,如果 repartition 进行哈希分区,提供列作为其参数有什么影响?

回答

我认为最好通过一些实验来研究差异。

测试数据帧

对于这个实验,我使用了以下两个数据帧(我在 Scala 中展示了代码,但概念与 Python API 相同):

// Dataframe with one column "value" containing the values ranging from 0 to 1000000
val df = Seq(0 to 1000000: _*).toDF("value")

// Dataframe with one column "value" containing 1000000 the number 0 in addition to the numbers 5000, 10000 and 100000
val df2 = Seq((0 to 1000000).map(_ => 0) :+ 5000 :+ 10000 :+ 100000: _*).toDF("value")

理论

  • repartition应用HashPartitioner何时提供一列或多列,以及RoundRobinPartitioner在提供的分区数量上均匀分布数据。如果提供了一列(或更多),这些值将被散列并用于通过计算类似的东西来确定分区号partition = hash(columns) % numberOfPartitions

  • repartitionByRange将根据列值的范围对数据进行分区。这通常用于连续(非离散)值,例如任何类型的数字。请注意,由于性能原因,此方法使用采样来估计范围。因此,输出可能不一致,因为采样可能返回不同的值。样本大小可以由 config 控制spark.sql.execution.rangeExchange.sampleSizePerPartition

还值得一提的是,对于这两种方法,如果没有numPartitions给出,默认情况下它会将 Dataframe 数据分区到spark.sql.shuffle.partitions您的 Spark 会话中配置,并且可以通过自适应查询执行(自 Spark 3.x 起可用)合并。

测试设置

基于给定的 Testdata 我总是应用相同的代码:

val testDf = df
// here I will insert the partition logic
    .withColumn("partition", spark_partition_id()) // applying SQL built-in function to determin actual partition
    .groupBy(col("partition"))
    .agg(
      count(col("value")).as("count"),
      min(col("value")).as("min_value"),
      max(col("value")).as("max_value"))
    .orderBy(col("partition"))

testDf.show(false)

检测结果

df.repartition(4, col("value"))

正如预期的那样,我们得到了 4 个分区,因为 的值df范围从 0 到 1000000,我们看到它们的散列值将产生一个分布良好的 Dataframe。

+---------+------+---------+---------+
|partition|count |min_value|max_value|
+---------+------+---------+---------+
|0        |249911|12       |1000000  |
|1        |250076|6        |999994   |
|2        |250334|2        |999999   |
|3        |249680|0        |999998   |
+---------+------+---------+---------+

df.repartitionByRange(4, col("value"))

同样在这种情况下,我们得到 4 个分区,但这次最小值和最大值清楚地显示了分区内的值范围。它几乎均匀分布,每个分区有 250000 个值。

+---------+------+---------+---------+
|partition|count |min_value|max_value|
+---------+------+---------+---------+
|0        |244803|0        |244802   |
|1        |255376|244803   |500178   |
|2        |249777|500179   |749955   |
|3        |250045|749956   |1000000  |
+---------+------+---------+---------+

df2.repartition(4, col("value"))

现在,我们正在使用另一个 Dataframe df2。这里,散列算法对只有 0、5000、10000 或 100000 的值进行散列。当然,值 0 的散列将始终相同,因此所有零最终都在同一个分区中(在这种情况下,分区 3 )。其他两个分区只包含一个值。

+---------+-------+---------+---------+
|partition|count  |min_value|max_value|
+---------+-------+---------+---------+
|0        |1      |100000   |100000   |
|1        |1      |10000    |10000    |
|2        |1      |5000     |5000     |
|3        |1000001|0        |0        |
+---------+-------+---------+---------+

df2.repartition(4)

如果不使用“value”列的内容,该repartition方法将在 RoundRobin 的基础上分发消息。所有分区的数据量几乎相同。

+---------+------+---------+---------+
|partition|count |min_value|max_value|
+---------+------+---------+---------+
|0        |250002|0        |5000     |
|1        |250002|0        |10000    |
|2        |249998|0        |100000   |
|3        |250002|0        |0        |
+---------+------+---------+---------+

df2.repartitionByRange(4, col("value"))

这种情况表明数据帧df2没有很好地定义用于按范围重新分区,因为几乎所有值都是 0。因此,我们甚至最终只有两个分区,而分区 0 包含所有零。

+---------+-------+---------+---------+
|partition|count  |min_value|max_value|
+---------+-------+---------+---------+
|0        |1000001|0        |0        |
|1        |3      |5000     |100000   |
+---------+-------+---------+---------+


以上是Spark:repartition和repartitionByRange有什么区别?的全部内容。
THE END
分享
二维码
< <上一篇
下一篇>>