RDD Transformation?

  • 간단히 설명하자면, 결과 값, 즉 return이 RDD로 반환되는 연산이다.
  • RDD들은 지속적으로 기록만 하다가, RDD Action을 수행할 때 연쇄적으로 수행된다.
    • lazy evaluation이다.

사용 변수들

val numericRDD     = sc.parallelize(1 to 4, 3)  
val characterRDD   = sc.parallelize(Seq("good,bad", "good,bad", "best,worst", "plus,minus"), 3)  
val characterRDD2  = sc.parallelize(Seq("good,bad", "good,bad", "plus,minus"), 3)  
val pairRDD        = characterRDD.zipWithIndex.map {case (key, value) => (key, value + 2) }  
val pairRDD2       = characterRDD.zipWithIndex.map {case (key, value) => (key, value + 10) }  
val pairRDD3       = characterRDD.zipWithIndex.map {case (key, value) => (value + 1, key) }  
val numericPairRDD = sc.parallelize(for (i <- 1 to 10) yield (scala.util.Random.nextInt(1000), "-"))
  • 기본적으로 위와 같이 RDD생성을 함

Map

Map

/*map*/
println(characterRDD.map(_.split(",")).collect().map(_.mkString("{", ",", "}")).mkString("(", ",", ")"))  
//  ({good,bad},{best,worst},{plus,minus})

/*mapValues*/
println(pairRDD.mapValues(num => num + 1).collect().mkString(", "))  
// (good,bad,3), (good,bad,4), (best,worst,5), (plus,minus,6)
  • map 실행 순서
    1. characterRDD.map(_.split(","))
      1. “good,bad”, “good,bad”, “best,worst”, “plus,minus” 를 “,” 로 분할
      2. seq((“good”,“bad”), (“good”,“bad”), (“best”,“worst”), (“plus”,“minus”))
    2. map(_.mkString("{", ",", "}")) 를 이용하여 문자열생성
      1. seq({“good”,“bad”}, {“good”,“bad”}, {“best”,“worst”}, {“plus”,“minus”})
    3. .mkString("(", ",", ")"))를 이용하여 최종 출력 모양 생성
      1. ({“good”,“bad”}, {“good”,“bad”}, {“best”,“worst”}, {“plus”,“minus”})
  • mapValues
    • Map과 동일하게 처리하는데, Value의 값을 조작하고 싶을 때 사용.
    • 위 코드는 각 value에 1을 더하는 연산이다

Flatmap

Monad 개념인데, 이것은 추후 공부할 예정

/*flatMap*/
println(characterRDD.flatMap(_.split(",")).collect().mkString("(", ",", ")"))  
//  (good,bad,best,worst,plus,minus)  

/*flatMapValues*/
val pairRDD3 = characterRDD.zipWithIndex.map {case (key, value) => (value + 1, key) }  
println(pairRDD2.flatMapValues(_.split(",")).collect().mkString(", "))  
// (1,good), (1,bad), (2,good), (2,bad), (3,best), (3,worst), (4,plus), (4,minus)
  • flatmap 실행 순서
    1. characterRDD.flatMap(_.split(","))
      1. 모든 값들을 풀어서 ","로 분할
      2. seq(“good”,“bad”,“good”,“bad”,“best”,“worst”,“plus”,“minus”)
    2. .mkString("(", ",", ")"))를 이용하여 최종 출력 모양 생성
      1. (“good”,“bad”,“good”,“bad”,“best”,“worst”,“plus”,“minus”)
  • flatMapValues
    • flatMap과 동일하게 처리하는데, Value의 값을 조작하고 싶을 때 사용.
    • key를 기준으로 value를 나누는 연산
      • Seq(1,“good,bad”), (2,“good,bad”), (3,“best,worst”), (4,“plus,minus”) 를
      • (1,good), (1,bad), (2,good), (2,bad), (3,best), (3,worst), (4,plus), (4,minus)로 변환

mapPartitions

println(  
  numericRDD.mapPartitions(nums => {  
	println("MapPartition loop")  
	nums.map(num => num * 2)  
  }).collect().mkString(", ")  
  )  
// MapPartition loop
// MapPartition loop
// MapPartition loop
// mapPartitions
// 2, 4, 6, 8

// 짝수만 출력
println(  
  numericRDD.mapPartitionsWithIndex((idx, nums) => {  
	println("mapPartitionsWithIndex loop")  
	nums.flatMap {  
	  case num if idx % 2 == 0 => Option(num * 2)  
	  case _                   => None  
	}  
  }).collect().mkString(", ")  
  )  
// mapPartitionsWithIndex loop
// mapPartitionsWithIndex loop
// mapPartitionsWithIndex loop
// 2, 6, 8
  • Partition를 기준으로 Map을 수행함
    • 3개의 Partition이 존재하면 3번 순회한다.
  • mapPartitions vs mapPartitionsWithIndex
    • 간단히 말하면 둘다 Partition 기준으로 순회한다
    • 차이점 : mapPartitionsWithIndex은 각 Row에 Index를 붙여서 반환한다.

Group

Zip

println(characterRDD.zip(numericRDD).collect().mkString("(", ",", ")"))  
//   ((good,bad,1),(good,bad,2),(best,worst,3),(plus,minus,4))  
  
println(characterRDD.zipPartitions(numericRDD, numericRDD) {  
  (key, num, num2) => for {  
    key_result <- key  
    num_result <- num  
    num_result2 <- num2.map(_ + 1)  
  } yield (key_result, num_result, num_result2)  
}.collect().mkString("(", ",", ")"))  
//  ((good,bad,1,2),(good,bad,2,3),(best,worst,3,4),(best,worst,3,5))  
  • zip
    • 2개의 RDD를 Key : value 로 묶어줌
    • 1 : 1 로 같은 인덱스 위치에 있는 것끼리 묶이기 때문에 길이가 및 파티션 수가 동일해야 함
  • zipPartitions
    • Partition을 요소로 사용하여 새로운 파티션을 생성
    • spark 2.4에서는 인자로 RDD 4개까지 가능
    • Partition 수가 동일해야함
numericRDD.groupBy {  
  case num: Int if num % 2 == 0 => "even"  
  case _                        => "odd"  
}.collect() foreach {  
  row => println(s"${row._1}, [${row._2.mkString(",")}]")  
}  
//      even, [2,4]  
//      odd, [1,3]  
  
pairRDD.groupByKey().collect() foreach {  
  row => println(s"${row._1}, [${row._2.mkString(",")}]")  
}  
//    best,worst, [3]  
//    good,bad, [1,2]  
//    plus,minus, [4]  
  
val rdd1   = sc.parallelize(Seq(("apple", 100), ("banana", 500), ("apple", 200)))  
val rdd2   = sc.parallelize(Seq(("banana", 400), ("apple", 300)))  
val result = rdd1.cogroup(rdd2)  
result.collect.foreach {  
  case (k, (rdd1_val, rdd2_val)) =>  
    println(s"($k, [${rdd1_val.mkString(",")}], [${rdd2_val.mkString(",")}])")  
}  
//    (apple, [100,200], [300])  
//    (banana, [500], [400])
  • groupBy
    • 조건에 의한 Grouping
  • groupByKey
    • Key값을 기준으로 grouping
  • cogroup
    • Key를 기준으로 각 RDD에 값을 묶음
    • key , rdd1의 values, rdd2의 values

Set

Distinct

println(  
  s"""distinct  
     |${characterRDD.distinct.collect.mkString(", ")}  
     |""".stripMargin  
  )  
//    best,worst, good,bad, plus,minus  
  • 이름 그대로 중복을 제거하는 역할

cartesian

println(  
  s"""cartesian  
     |${characterRDD.cartesian(numericRDD).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//    (good,bad,1), (good,bad,2), (good,bad,3), (good,bad,4),(good,bad,1), (good,bad,2), (good,bad,3), (good,bad,4),(best,worst,1), (plus,minus,1), (best,worst,2), (plus,minus,2), (best,worst,3), (best,worst,4), (plus,minus,3), (plus,minus,4)  
  • 카르테시안 곱
  • 모든 경우의 수를 조합하는 것이라고 보면된다.
  • Sql에서 Join 조건을 주지 않은 경우, Cross Join

subtract

println(  
  s"""subtract  
     |${characterRDD.subtract(characterRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//    best,worst  
  • rdd1에 존재하지만, rdd2에 존재하지 않은 값들을 RDD로 생성하여 반환
  • 예시
    • characterRDD : "good,bad", "good,bad", "best,worst", "plus,minus"
    • characterRDD2 : "good,bad", "good,bad", "plus,minus"
    • characterRDD에 “best,worst” 가 존재하지만 characterRDD2 에는 존재하지 않음
      • best,worst 반환

Union

println(  
  s"""union  
     |${characterRDD.union(characterRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//    good,bad, good,bad, best,worst, plus,minus, good,bad, good,bad, plus,minus  
  • 2개의 RDD를 병합하는 것
  • 자료형이 동일해야함

intersection

// 교집합을 구하는것  
println(  
  s"""intersection  
     |${characterRDD.intersection(characterRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//    good,bad, plus,minus 

Join

println(  
  s"""join  
     |${pairRDD.join(pairRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//(best,worst,(3,12)), (good,bad,(1,10)), (good,bad,(1,11)), (good,bad,(2,10)), (good,bad,(2,11)), (plus,minus,(4,13))  
  

println(  
  s"""leftOuterJoin  
     |${pairRDD.leftOuterJoin(pairRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
// (best,worst,(3,Some(12))), (good,bad,(1,Some(10))), (good,bad,(1,Some(11))), (good,bad,(2,Some(10))), (good,bad,(2,Some(11))), (plus,minus,(4,Some(13)))  
  

println(  
  s"""leftOuterJoin  
     |${pairRDD.subtractByKey(pairRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//(best,worst,3)
  • Join
    • Key값을 기준으로 Join연산
    • (key , (rdd01 value, rdd2 value)) 조합으로 모든 값 Join
  • leftOuterJoin / rightOuterJoin
    • left join 연산을 한다. join결과값이 없을 수 있음으로 Option 타입으로 반환한다.
  • subtractByKey
    • rdd01.subtractByKey(rdd2)
    • rdd01가 rdd2에 없는 key만 추출

Aggregate

reduceByKey / foldByKey

    println(  
      s"""reduceByKey  
         |${pairRDD.reduceByKey(_ * _).collect().mkString(", ")}  
         |""".stripMargin  
      )  
    // (best,worst,4), (good,bad,6), (plus,minus,5)  

    println(  
      s"""foldByKey  
         |${pairRDD.foldByKey(1)(_ * _).collect().mkString(", ")}  
         |""".stripMargin  
      )  
    // (best,worst,4), (good,bad,6), (plus,minus,5)  
  • reduceByKey
    • Key값을 기준으로 value에 주어진 연산 수행
    • 기본 값이 존재하지 않기 때문에 교환 법칙 및 결합 법칙을 만족해야한다.
      • 교환 법칙 : 상수의 위치가 변경돼도 가능한 것.
        • 1 + 2 == 2 + 1 ( OK )
        • 1 / 2 != 2 / 1 ( X )
      • 결합 법칙 : 연산의 순서가 변경돼도 가능한 것.
        • 간단히 말하면 임의로 괄호로 묶어 연산해도 동일한 결과
        • 1 + 2 + 3 == 1 + (2 + 3) ( OK )
        • 1 + 2 * 2 != ( 1 + 2 ) * 2 ( X )
    • 위 결과 도출 방법
      • best,worst : 4
      • good,bad : 2 * 3
      • plus,minus : 5
        foldByKey
    • Key값 기준으로 Value연산
    • reducebykey와 동일하지만, 차이점은 기본 값이 있는 것이다.
    • 위 결과 도출 방법
      • best,worst : 1 * 4
      • good,bad : 1 * 2 * 3
      • plus,minus : 1 * 5
종류 교환법칙 결합법칙 초기값
reduceByKey o o x
foldByKey x o o

combineByKey / aggregateByKey

case class Fruit(var price: Long, var amount: Long = 1) {  
  def add(p: Long): Fruit = {  
	add(Fruit(p))  
  }  
  def add(other: Fruit): Fruit = {  
	this.price += other.price  
	this.amount += other.amount  
	this  
  }  
}  

val fruit_rdd = sc.parallelize(Seq(("Apple", 1000L), ("banana", 2000L), ("Apple", 1500L), ("banana", 2600L), ("banana", 1300L), ("Apple", 3000L)))  
println(  
  s"""combineByKey  
	 |${  
	fruit_rdd.combineByKey(  
	  createCombiner = (p: Long) => Fruit(p)  
	  , mergeValue = (f: Fruit, v: Long) => f.add(v)  
	  , mergeCombiners = (f1: Fruit, f2: Fruit) => f1.add(f2)  
	  ).collect().mkString(", ")  
  }  
	 |""".stripMargin  
  )
println(  
  s"""combineByKey  
	 |${  
	fruit_rdd.aggregateByKey(Fruit(3000, 0))(  
	  (f: Fruit, v: Long) => f.add(v)  
	  , (f1: Fruit, f2: Fruit) => f1.add(f2)  
	  ).collect().mkString(", ")  
  }  
	 |""".stripMargin  
  )  
  • combineByKey
    • 새로운 타입으로 반환 가능하게 만듦
    • 아래 예시를 통해 Fruit 이라는 Case Class로 반환하는 예시
    • createCombiner
      • 연산을 위해 컴파이너를 생성하는 것.
      • 개인적으로 이해한 바는 타입 변경하여 초기 값을 생성하는 작업
    • mergeValue
      • Combiner가 존재하면 추후 실행할 연산, 이 연산을 통해 값이 누적되어 쌓인다
      • 동을한 key를 가진 값들을 병합
    • mergeCombiners
      • 병합이 끝난 Combiner들을 최종적으로 병합한 Combiner를 생성
      • 다른 Partition에 존재하는 결과 값을 병합
    • 조금 더 알아봐야겠다.
    • 위 예시 결과 값 도출 사유
      • 결과 값 : (banana,Fruit(5900,3)), (Apple,Fruit(5500,3))
      • banana : 2000L + 2600L + 1300L , count = 3
      • Apple : 1000L + 1500L + 3000L , count = 3
  • aggregateByKey
    • 초기값이 존재하는 combineByKey
      • 즉 createCombiner를 생성 후 수행하는 CombineByKey
    • 위 예시 결과 값 도출 사유
      • 결과 값 : (banana,Fruit(14900,3)), (Apple,Fruit(14500,3))
      • banana : (3000 + 2000L) + (3000 + 2600L) +(3000 + 1300L) , count = 3
      • Apple : (3000 + 1000L) + (3000 + 1500L) +(3000 + 3000L) , count = 3

Pipe

println(  
  s"""pipe
	 |${characterRDD.pipe("cut -d , -f 2").collect().mkString(", ")}
	 |""".stripMargin      )
// bad, bad, worst, minus  
  • 결과 값을 bshell 등 외부 프로세스의 인자 값으로 실행시킴

값추출

println(  
  s"""  
	 |keys : ${numericPairRDD.keys.collect.mkString(", ")}  
	 |value: ${numericPairRDD.values.collect.mkString(", ")}  
	 |""".stripMargin  
)  
//    keys : 776, 377, 559, 275, 818, 457, 173, 248, 135, 679  
//    value: -, -, -, -, -, -, -, -, -, -  
println(  
  s"""  
	 |중복 불가 : ${numericPairRDD.sample(withReplacement = false,fraction = 0.7).collect.mkString(", ")}  
	 |중복 허용 : ${numericPairRDD.sample(withReplacement = true,fraction = 0.7).collect.mkString(", ")}  
	 |""".stripMargin  
)  
//    중복 불가 (306,-), (376,-), (630,-), (417,-), (439,-), (738,-), (691,-), (435,-)
//    중복 허용 (210,-), (376,-), (630,-), (417,-), (417,-), (439,-), (435,-)
  • keys
    • key값 반환
  • values
    • value값을 반환
  • sample
    • 데이터 샘플링.
      • withReplacement : 데이터 중복 가능 여부
      • fraction : 전체 중 n% 추출 (0 ~ 1 사이 값)

filter

// 이름 그대로 filter조건에 부합하는 것을 걸러줌  
println(  
  s"""filter  
	 |${numericRDD.filter(_ > 2).collect.mkString(", ")}  
	 |""".stripMargin)  
// 3, 4  
// 1,2,3,4 중 3,4만 출력  
  • 값 필터링
    • SQL의 WHERE절이라고 생각하면 편하다

SortByKey

// Key값을 기준으로 정렬시켜줌  
println(  
  s"""sortByKey  
	 |${numericPairRDD.sortByKey().collect.mkString(", ")}  
	 |""".stripMargin  
)  
// (135, -), (173, -), (248, -), (275, -), (377, -), (457, -), (559, -), (679, -), (776, -), (818, -)  
  • Key값을 기준으로 정렬

Partition / Coalesce

println(  
  s"""repartition  
	 |default = ${characterRDD.getNumPartitions}  
	 |coalesce = ${characterRDD.coalesce(1).getNumPartitions}  
	 |repartition = ${characterRDD.repartition(5).getNumPartitions}  
	 |""".stripMargin  
	 // default = 3
     // coalesce = 1
     // repartition = 5
  )  
  • Partition 수를 증감시킴
    • repartition 는 Shuffle이 발생하면서 partition 재분배
      • Shuffle이 발생하지 않아 Network 통신을 하여 상대적으로 느리다
        • 데이터의 Parition 분배가 될 수 있다.
      • partition 수 증감 모두 가능
    • coalesce 는 shuffle이 발생하지 않고 partition 재분배
      • Shuffle이 발생하지 않아 Network 통신을 하지 않아 상대적으로 빠르다
        • 데이터의 Parition 분배는 일어나지 않는다
        • 간단히 생각하면 동일 Node에 있는 Partition들을 Union 한다고 생각하면 편하다
      • partition 수 감소만 가능

partitionBy / repartitionAndSortWithinPartitions

numericPairRDD.partitionBy(new HashPartitioner(3))  
			  .foreachPartition(it => {  
				println(s"-------")  
				it.foreach(v => println(v))  
			  })  
 
numericPairRDD.repartitionAndSortWithinPartitions(new HashPartitioner(3))  
			  .foreachPartition(it => {  
				println(s"==========")  
				it.foreach(v => println(v))  
			  })  
  • 공통
    • RDD를 N+1개의 Partition으로 특정 기준(Partitioner)에 따라 분배 해준다
      • Partitioner : Partition을 나누는 역할을 하는 객체
        • HASH Partitioner
          • Hash값을 이용하여 분할
        • RANGE Partitioner
          • 범위 혹은 순서로 분할
    • (key - value)로 구성되어 있어야 한다.
      • key 값을 기준으로 분리한다
    • 지정한 Partitioner를 기준으로 분배하기 때문에, Skew현상이 발생할 수 있다.
      • Skew : 데이터 쏠림 현상
      • 위 예시를 실행할 때 아래와 같이 데이터 Skew현상이 발생할 수 있다.
partition partitionBy repartitionAndSortWithinPartitions
partition 3 (89,-) (150,-)
partition 3 (780,-) (89,-)
partition 3 (622,-) (175,-)
partition 3 (548,-) (134,-)
partition 3 (150,-) (530,-)
partition 3 (646,-) (548,-)
partition 3 (134,-) (780,-)
partition 3 (175,-) (622,-)
partition 3 (966,-) (966,-)
partition 3 (530,-) (646,-)

[ Reference ]


개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

'프로그래밍 및 IT > Spark' 카테고리의 다른 글

spark-scheduling  (0) 2023.11.28
WordCount  (0) 2023.10.13
RDD Action  (0) 2023.10.13
Spark_RDD  (0) 2023.10.05
Spark  (0) 2023.10.04

+ Recent posts