RDD Transformation?
- 간단히 설명하자면, 결과 값, 즉 return이
RDD로 반환
되는 연산이다. - RDD들은 지속적으로 기록만 하다가, RDD Action을 수행할 때 연쇄적으로 수행된다.
- lazy evaluation이다.
사용 변수들
val numericRDD = sc.parallelize(1 to 4, 3)
val characterRDD = sc.parallelize(Seq("good,bad", "good,bad", "best,worst", "plus,minus"), 3)
val characterRDD2 = sc.parallelize(Seq("good,bad", "good,bad", "plus,minus"), 3)
val pairRDD = characterRDD.zipWithIndex.map {case (key, value) => (key, value + 2) }
val pairRDD2 = characterRDD.zipWithIndex.map {case (key, value) => (key, value + 10) }
val pairRDD3 = characterRDD.zipWithIndex.map {case (key, value) => (value + 1, key) }
val numericPairRDD = sc.parallelize(for (i <- 1 to 10) yield (scala.util.Random.nextInt(1000), "-"))
- 기본적으로 위와 같이 RDD생성을 함
Map
Map
/*map*/
println(characterRDD.map(_.split(",")).collect().map(_.mkString("{", ",", "}")).mkString("(", ",", ")"))
// ({good,bad},{best,worst},{plus,minus})
/*mapValues*/
println(pairRDD.mapValues(num => num + 1).collect().mkString(", "))
// (good,bad,3), (good,bad,4), (best,worst,5), (plus,minus,6)
- map 실행 순서
characterRDD.map(_.split(","))
- “good,bad”, “good,bad”, “best,worst”, “plus,minus” 를 “,” 로 분할
- seq((“good”,“bad”), (“good”,“bad”), (“best”,“worst”), (“plus”,“minus”))
map(_.mkString("{", ",", "}"))
를 이용하여 문자열생성- seq({“good”,“bad”}, {“good”,“bad”}, {“best”,“worst”}, {“plus”,“minus”})
.mkString("(", ",", ")"))
를 이용하여 최종 출력 모양 생성- ({“good”,“bad”}, {“good”,“bad”}, {“best”,“worst”}, {“plus”,“minus”})
- mapValues
- Map과 동일하게 처리하는데, Value의 값을 조작하고 싶을 때 사용.
- 위 코드는 각 value에 1을 더하는 연산이다
Flatmap
Monad 개념인데, 이것은 추후 공부할 예정
/*flatMap*/
println(characterRDD.flatMap(_.split(",")).collect().mkString("(", ",", ")"))
// (good,bad,best,worst,plus,minus)
/*flatMapValues*/
val pairRDD3 = characterRDD.zipWithIndex.map {case (key, value) => (value + 1, key) }
println(pairRDD2.flatMapValues(_.split(",")).collect().mkString(", "))
// (1,good), (1,bad), (2,good), (2,bad), (3,best), (3,worst), (4,plus), (4,minus)
- flatmap 실행 순서
characterRDD.flatMap(_.split(","))
- 모든 값들을 풀어서 ","로 분할
- seq(“good”,“bad”,“good”,“bad”,“best”,“worst”,“plus”,“minus”)
.mkString("(", ",", ")"))
를 이용하여 최종 출력 모양 생성- (“good”,“bad”,“good”,“bad”,“best”,“worst”,“plus”,“minus”)
- flatMapValues
- flatMap과 동일하게 처리하는데, Value의 값을 조작하고 싶을 때 사용.
- key를 기준으로 value를 나누는 연산
- Seq(1,“good,bad”), (2,“good,bad”), (3,“best,worst”), (4,“plus,minus”) 를
- (1,good), (1,bad), (2,good), (2,bad), (3,best), (3,worst), (4,plus), (4,minus)로 변환
mapPartitions
println(
numericRDD.mapPartitions(nums => {
println("MapPartition loop")
nums.map(num => num * 2)
}).collect().mkString(", ")
)
// MapPartition loop
// MapPartition loop
// MapPartition loop
// mapPartitions
// 2, 4, 6, 8
// 짝수만 출력
println(
numericRDD.mapPartitionsWithIndex((idx, nums) => {
println("mapPartitionsWithIndex loop")
nums.flatMap {
case num if idx % 2 == 0 => Option(num * 2)
case _ => None
}
}).collect().mkString(", ")
)
// mapPartitionsWithIndex loop
// mapPartitionsWithIndex loop
// mapPartitionsWithIndex loop
// 2, 6, 8
- Partition를 기준으로 Map을 수행함
- 3개의 Partition이 존재하면 3번 순회한다.
- mapPartitions vs mapPartitionsWithIndex
- 간단히 말하면 둘다 Partition 기준으로 순회한다
- 차이점 : mapPartitionsWithIndex은 각 Row에 Index를 붙여서 반환한다.
Group
Zip
println(characterRDD.zip(numericRDD).collect().mkString("(", ",", ")"))
// ((good,bad,1),(good,bad,2),(best,worst,3),(plus,minus,4))
println(characterRDD.zipPartitions(numericRDD, numericRDD) {
(key, num, num2) => for {
key_result <- key
num_result <- num
num_result2 <- num2.map(_ + 1)
} yield (key_result, num_result, num_result2)
}.collect().mkString("(", ",", ")"))
// ((good,bad,1,2),(good,bad,2,3),(best,worst,3,4),(best,worst,3,5))
- zip
- 2개의 RDD를 Key : value 로 묶어줌
- 1 : 1 로 같은 인덱스 위치에 있는 것끼리 묶이기 때문에 길이가 및 파티션 수가 동일해야 함
- zipPartitions
- Partition을 요소로 사용하여 새로운 파티션을 생성
- spark 2.4에서는 인자로 RDD 4개까지 가능
- Partition 수가 동일해야함
numericRDD.groupBy {
case num: Int if num % 2 == 0 => "even"
case _ => "odd"
}.collect() foreach {
row => println(s"${row._1}, [${row._2.mkString(",")}]")
}
// even, [2,4]
// odd, [1,3]
pairRDD.groupByKey().collect() foreach {
row => println(s"${row._1}, [${row._2.mkString(",")}]")
}
// best,worst, [3]
// good,bad, [1,2]
// plus,minus, [4]
val rdd1 = sc.parallelize(Seq(("apple", 100), ("banana", 500), ("apple", 200)))
val rdd2 = sc.parallelize(Seq(("banana", 400), ("apple", 300)))
val result = rdd1.cogroup(rdd2)
result.collect.foreach {
case (k, (rdd1_val, rdd2_val)) =>
println(s"($k, [${rdd1_val.mkString(",")}], [${rdd2_val.mkString(",")}])")
}
// (apple, [100,200], [300])
// (banana, [500], [400])
- groupBy
- 조건에 의한 Grouping
- groupByKey
- Key값을 기준으로 grouping
- cogroup
- Key를 기준으로 각 RDD에 값을 묶음
- key , rdd1의 values, rdd2의 values
Set
Distinct
println(
s"""distinct
|${characterRDD.distinct.collect.mkString(", ")}
|""".stripMargin
)
// best,worst, good,bad, plus,minus
- 이름 그대로 중복을 제거하는 역할
cartesian
println(
s"""cartesian
|${characterRDD.cartesian(numericRDD).collect.mkString(", ")}
|""".stripMargin
)
// (good,bad,1), (good,bad,2), (good,bad,3), (good,bad,4),(good,bad,1), (good,bad,2), (good,bad,3), (good,bad,4),(best,worst,1), (plus,minus,1), (best,worst,2), (plus,minus,2), (best,worst,3), (best,worst,4), (plus,minus,3), (plus,minus,4)
- 카르테시안 곱
- 모든 경우의 수를 조합하는 것이라고 보면된다.
- Sql에서 Join 조건을 주지 않은 경우, Cross Join
subtract
println(
s"""subtract
|${characterRDD.subtract(characterRDD2).collect.mkString(", ")}
|""".stripMargin
)
// best,worst
- rdd1에 존재하지만, rdd2에 존재하지 않은 값들을 RDD로 생성하여 반환
- 예시
- characterRDD :
"good,bad", "good,bad", "best,worst", "plus,minus"
- characterRDD2 :
"good,bad", "good,bad", "plus,minus"
- characterRDD에 “best,worst” 가 존재하지만 characterRDD2 에는 존재하지 않음
- best,worst 반환
- characterRDD :
Union
println(
s"""union
|${characterRDD.union(characterRDD2).collect.mkString(", ")}
|""".stripMargin
)
// good,bad, good,bad, best,worst, plus,minus, good,bad, good,bad, plus,minus
- 2개의 RDD를 병합하는 것
- 자료형이 동일해야함
intersection
// 교집합을 구하는것
println(
s"""intersection
|${characterRDD.intersection(characterRDD2).collect.mkString(", ")}
|""".stripMargin
)
// good,bad, plus,minus
Join
println(
s"""join
|${pairRDD.join(pairRDD2).collect.mkString(", ")}
|""".stripMargin
)
//(best,worst,(3,12)), (good,bad,(1,10)), (good,bad,(1,11)), (good,bad,(2,10)), (good,bad,(2,11)), (plus,minus,(4,13))
println(
s"""leftOuterJoin
|${pairRDD.leftOuterJoin(pairRDD2).collect.mkString(", ")}
|""".stripMargin
)
// (best,worst,(3,Some(12))), (good,bad,(1,Some(10))), (good,bad,(1,Some(11))), (good,bad,(2,Some(10))), (good,bad,(2,Some(11))), (plus,minus,(4,Some(13)))
println(
s"""leftOuterJoin
|${pairRDD.subtractByKey(pairRDD2).collect.mkString(", ")}
|""".stripMargin
)
//(best,worst,3)
- Join
- Key값을 기준으로 Join연산
- (key , (rdd01 value, rdd2 value)) 조합으로 모든 값 Join
- leftOuterJoin / rightOuterJoin
- left join 연산을 한다. join결과값이 없을 수 있음으로 Option 타입으로 반환한다.
- subtractByKey
- rdd01.subtractByKey(rdd2)
- rdd01가 rdd2에 없는 key만 추출
Aggregate
reduceByKey / foldByKey
println(
s"""reduceByKey
|${pairRDD.reduceByKey(_ * _).collect().mkString(", ")}
|""".stripMargin
)
// (best,worst,4), (good,bad,6), (plus,minus,5)
println(
s"""foldByKey
|${pairRDD.foldByKey(1)(_ * _).collect().mkString(", ")}
|""".stripMargin
)
// (best,worst,4), (good,bad,6), (plus,minus,5)
- reduceByKey
- Key값을 기준으로 value에 주어진 연산 수행
- 기본 값이 존재하지 않기 때문에 교환 법칙 및 결합 법칙을 만족해야한다.
- 교환 법칙 : 상수의 위치가 변경돼도 가능한 것.
- 1 + 2 == 2 + 1 ( OK )
- 1 / 2 != 2 / 1 ( X )
- 결합 법칙 : 연산의 순서가 변경돼도 가능한 것.
- 간단히 말하면 임의로 괄호로 묶어 연산해도 동일한 결과
- 1 + 2 + 3 == 1 + (2 + 3) ( OK )
- 1 + 2 * 2 != ( 1 + 2 ) * 2 ( X )
- 교환 법칙 : 상수의 위치가 변경돼도 가능한 것.
- 위 결과 도출 방법
- best,worst : 4
- good,bad : 2 * 3
- plus,minus : 5
foldByKey
- Key값 기준으로 Value연산
- reducebykey와 동일하지만, 차이점은 기본 값이 있는 것이다.
- 위 결과 도출 방법
- best,worst : 1 * 4
- good,bad : 1 * 2 * 3
- plus,minus : 1 * 5
종류 | 교환법칙 | 결합법칙 | 초기값 |
---|---|---|---|
reduceByKey | o | o | x |
foldByKey | x | o | o |
combineByKey / aggregateByKey
case class Fruit(var price: Long, var amount: Long = 1) {
def add(p: Long): Fruit = {
add(Fruit(p))
}
def add(other: Fruit): Fruit = {
this.price += other.price
this.amount += other.amount
this
}
}
val fruit_rdd = sc.parallelize(Seq(("Apple", 1000L), ("banana", 2000L), ("Apple", 1500L), ("banana", 2600L), ("banana", 1300L), ("Apple", 3000L)))
println(
s"""combineByKey
|${
fruit_rdd.combineByKey(
createCombiner = (p: Long) => Fruit(p)
, mergeValue = (f: Fruit, v: Long) => f.add(v)
, mergeCombiners = (f1: Fruit, f2: Fruit) => f1.add(f2)
).collect().mkString(", ")
}
|""".stripMargin
)
println(
s"""combineByKey
|${
fruit_rdd.aggregateByKey(Fruit(3000, 0))(
(f: Fruit, v: Long) => f.add(v)
, (f1: Fruit, f2: Fruit) => f1.add(f2)
).collect().mkString(", ")
}
|""".stripMargin
)
- combineByKey
- 새로운 타입으로 반환 가능하게 만듦
- 아래 예시를 통해 Fruit 이라는 Case Class로 반환하는 예시
- createCombiner
- 연산을 위해 컴파이너를 생성하는 것.
- 개인적으로 이해한 바는 타입 변경하여 초기 값을 생성하는 작업
- mergeValue
- Combiner가 존재하면 추후 실행할 연산, 이 연산을 통해 값이 누적되어 쌓인다
- 동을한 key를 가진 값들을 병합
- mergeCombiners
- 병합이 끝난 Combiner들을 최종적으로 병합한 Combiner를 생성
- 다른 Partition에 존재하는 결과 값을 병합
- 조금 더 알아봐야겠다.
- 위 예시 결과 값 도출 사유
- 결과 값 : (banana,Fruit(5900,3)), (Apple,Fruit(5500,3))
- banana : 2000L + 2600L + 1300L , count = 3
- Apple : 1000L + 1500L + 3000L , count = 3
- aggregateByKey
- 초기값이 존재하는 combineByKey
- 즉 createCombiner를 생성 후 수행하는 CombineByKey
- 위 예시 결과 값 도출 사유
- 결과 값 : (banana,Fruit(14900,3)), (Apple,Fruit(14500,3))
- banana : (3000 + 2000L) + (3000 + 2600L) +(3000 + 1300L) , count = 3
- Apple : (3000 + 1000L) + (3000 + 1500L) +(3000 + 3000L) , count = 3
- 초기값이 존재하는 combineByKey
Pipe
println(
s"""pipe
|${characterRDD.pipe("cut -d , -f 2").collect().mkString(", ")}
|""".stripMargin )
// bad, bad, worst, minus
- 결과 값을 bshell 등 외부 프로세스의 인자 값으로 실행시킴
값추출
println(
s"""
|keys : ${numericPairRDD.keys.collect.mkString(", ")}
|value: ${numericPairRDD.values.collect.mkString(", ")}
|""".stripMargin
)
// keys : 776, 377, 559, 275, 818, 457, 173, 248, 135, 679
// value: -, -, -, -, -, -, -, -, -, -
println(
s"""
|중복 불가 : ${numericPairRDD.sample(withReplacement = false,fraction = 0.7).collect.mkString(", ")}
|중복 허용 : ${numericPairRDD.sample(withReplacement = true,fraction = 0.7).collect.mkString(", ")}
|""".stripMargin
)
// 중복 불가 (306,-), (376,-), (630,-), (417,-), (439,-), (738,-), (691,-), (435,-)
// 중복 허용 (210,-), (376,-), (630,-), (417,-), (417,-), (439,-), (435,-)
- keys
- key값 반환
- values
- value값을 반환
- sample
- 데이터 샘플링.
- withReplacement : 데이터 중복 가능 여부
- fraction : 전체 중 n% 추출 (0 ~ 1 사이 값)
- 데이터 샘플링.
filter
// 이름 그대로 filter조건에 부합하는 것을 걸러줌
println(
s"""filter
|${numericRDD.filter(_ > 2).collect.mkString(", ")}
|""".stripMargin)
// 3, 4
// 1,2,3,4 중 3,4만 출력
- 값 필터링
- SQL의 WHERE절이라고 생각하면 편하다
SortByKey
// Key값을 기준으로 정렬시켜줌
println(
s"""sortByKey
|${numericPairRDD.sortByKey().collect.mkString(", ")}
|""".stripMargin
)
// (135, -), (173, -), (248, -), (275, -), (377, -), (457, -), (559, -), (679, -), (776, -), (818, -)
- Key값을 기준으로 정렬
Partition / Coalesce
println(
s"""repartition
|default = ${characterRDD.getNumPartitions}
|coalesce = ${characterRDD.coalesce(1).getNumPartitions}
|repartition = ${characterRDD.repartition(5).getNumPartitions}
|""".stripMargin
// default = 3
// coalesce = 1
// repartition = 5
)
- Partition 수를 증감시킴
- repartition 는 Shuffle이 발생하면서 partition 재분배
- Shuffle이 발생하지 않아 Network 통신을 하여 상대적으로 느리다
- 데이터의 Parition 분배가 될 수 있다.
- partition 수 증감 모두 가능
- Shuffle이 발생하지 않아 Network 통신을 하여 상대적으로 느리다
- coalesce 는 shuffle이 발생하지 않고 partition 재분배
- Shuffle이 발생하지 않아 Network 통신을 하지 않아 상대적으로 빠르다
- 데이터의 Parition 분배는 일어나지 않는다
- 간단히 생각하면
동일 Node에 있는 Partition들을 Union
한다고 생각하면 편하다
- partition 수 감소만 가능
- Shuffle이 발생하지 않아 Network 통신을 하지 않아 상대적으로 빠르다
- repartition 는 Shuffle이 발생하면서 partition 재분배
partitionBy / repartitionAndSortWithinPartitions
numericPairRDD.partitionBy(new HashPartitioner(3))
.foreachPartition(it => {
println(s"-------")
it.foreach(v => println(v))
})
numericPairRDD.repartitionAndSortWithinPartitions(new HashPartitioner(3))
.foreachPartition(it => {
println(s"==========")
it.foreach(v => println(v))
})
- 공통
- RDD를 N+1개의 Partition으로 특정 기준(Partitioner)에 따라 분배 해준다
- Partitioner : Partition을 나누는 역할을 하는 객체
- HASH Partitioner
- Hash값을 이용하여 분할
- RANGE Partitioner
- 범위 혹은 순서로 분할
- HASH Partitioner
- Partitioner : Partition을 나누는 역할을 하는 객체
(key - value)
로 구성되어 있어야 한다.- key 값을 기준으로 분리한다
- 지정한 Partitioner를 기준으로 분배하기 때문에, Skew현상이 발생할 수 있다.
- Skew : 데이터 쏠림 현상
- 위 예시를 실행할 때 아래와 같이 데이터 Skew현상이 발생할 수 있다.
- RDD를 N+1개의 Partition으로 특정 기준(Partitioner)에 따라 분배 해준다
partition | partitionBy | repartitionAndSortWithinPartitions |
---|---|---|
partition 3 | (89,-) | (150,-) |
partition 3 | (780,-) | (89,-) |
partition 3 | (622,-) | (175,-) |
partition 3 | (548,-) | (134,-) |
partition 3 | (150,-) | (530,-) |
partition 3 | (646,-) | (548,-) |
partition 3 | (134,-) | (780,-) |
partition 3 | (175,-) | (622,-) |
partition 3 | (966,-) | (966,-) |
partition 3 | (530,-) | (646,-) |
[ Reference ]
개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!
'프로그래밍 및 IT > Spark' 카테고리의 다른 글
spark-scheduling (0) | 2023.11.28 |
---|---|
WordCount (0) | 2023.10.13 |
RDD Action (0) | 2023.10.13 |
Spark_RDD (0) | 2023.10.05 |
Spark (0) | 2023.10.04 |