spark数据倾斜解决方案五:将reduce join转为map join

sapv博客之家,spark数据倾斜解决方案五:将reduce join转为map join

方案适用场景:在对RDD使用join类操作,或者是在Spark SQL中使用join语句时,而且join操作中的一个RDD或表的数据量比较小(比如几百M或者一两G),比较适用此方案。

      方案实现思路:不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。

      方案实现原理:普通的join是会走shuffle过程的,而一旦shuffle,就相当于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。但是如果一个RDD是比较小的,则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join,此时就不会发生shuffle操作,也就不会发生数据倾斜。具体原理如下图所示。

      方案优点:对join操作导致的数据倾斜,效果非常好,因为根本就不会发生shuffle,也就根本不会发生数据倾斜。

      方案缺点:适用场景较少,因为这个方案只适用于一个大表和一个小表的情况。毕竟我们需要将小表进行广播,此时会比较消耗内存资源,driver和每个Executor内存中都会驻留一份小RDD的全量数据。如果我们广播出去的RDD数据比较大,比如10G以上,那么就可能发生内存溢出了。因此并不适合两个都是大表的情况。

 

[plain] view plain copy

  1. <span style="font-size:12px;">// 首先将数据量比较小的RDD的数据,collect到Driver中来。    
  2. List<Tuple2<Long, Row>> rdd1Data = rdd1.collect()    
  3. // 然后使用Spark的广播功能,将小RDD的数据转换成广播变量,这样每个Executor就只有一份RDD的数据。    
  4. // 可以尽可能节省内存空间,并且减少网络传输性能开销。    
  5. final Broadcast<List<Tuple2<Long, Row>>> rdd1DataBroadcast = sc.broadcast(rdd1Data);    
  6.     
  7. // 对另外一个RDD执行map类操作,而不再是join类操作。    
  8. JavaPairRDD<String, Tuple2<String, Row>> joinedRdd = rdd2.mapToPair(    
  9.         new PairFunction<Tuple2<Long,String>, String, Tuple2<String, Row>>() {    
  10.             private static final long serialVersionUID = 1L;    
  11.             @Override    
  12.             public Tuple2<String, Tuple2<String, Row>> call(Tuple2<Long, String> tuple)    
  13.                     throws Exception {    
  14.                 // 在算子函数中,通过广播变量,获取到本地Executor中的rdd1数据。    
  15.                 List<Tuple2<Long, Row>> rdd1Data = rdd1DataBroadcast.value();    
  16.                 // 可以将rdd1的数据转换为一个Map,便于后面进行join操作。    
  17.                 Map<Long, Row> rdd1DataMap = new HashMap<Long, Row>();    
  18.                 for(Tuple2<Long, Row> data : rdd1Data) {    
  19.                     rdd1DataMap.put(data._1, data._2);    
  20.                 }    
  21.                 // 获取当前RDD数据的key以及value。    
  22.                 String key = tuple._1;    
  23.                 String value = tuple._2;    
  24.                 // 从rdd1数据Map中,根据key获取到可以join到的数据。    
  25.                 Row rdd1Value = rdd1DataMap.get(key);    
  26.                 return new Tuple2<String, String>(key, new Tuple2<String, Row>(value, rdd1Value));    
  27.             }    
  28.         });    
  29.     
  30. // 这里得提示一下。    
  31. // 上面的做法,仅仅适用于rdd1中的key没有重复,全部是唯一的场景。    
  32. // 如果rdd1中有多个相同的key,那么就得用flatMap类的操作,在进行join的时候不能用map,而是得遍历rdd1所有数据进行join。    
  33. // rdd2中每条数据都可能会返回多条join后的数据。</span>  
  • 发表于 2018-05-23 22:45
  • 阅读 ( 785 )
  • 分类:大数据

0 条评论

请先 登录 后评论
不写代码的码农
张鹏

大数据工程师

94 篇文章

作家榜 »

  1. 张鹏 94 文章
  2. 0 文章
  3. 赵科 0 文章
  4. 王孖珺397954227 0 文章