RDD Action?
- 간단히 설명하자면, 결과 값, 즉 return이
RDD가 아닌 다른 것
으로 반환되는 연산이다.
- RDD Transformation을 수행하게 만드는 도화선이라고 봐도 된다.
- 즉 N개의
Transformation Method
들이 쌓여도 수행하지 않다가 Action Method
가 호출되면 그 때 순차적으로 수행한다
Output Function
기본적으로 Collection Type으로 반환하는 경우가 많기 때문에 너무 많은 개수를 반환하면, OOM
이 발생할 수 있다.
데이터 추출
// collect
characterRDD.collect.mkString("Array(", ", ", ")")
//Array(good,bad, good,bad, best,worst, plus,minus)
// first
numericRDD.first()
// 1
// take
numericRDD.take(2).mkString("Array(", ", ", ")")
// Array(1, 2)
// takeSample
numericRDD.takeSample(withReplacement = false, 3).mkString("Array(", ", ", ")")
numericRDD.takeSample(withReplacement = true, 3).mkString("Array(", ", ", ")")
// Array(1, 2, 3)
// Array(4, 2, 1)
- collect
- RDD를 Array[T]형태로 반환
- 굳이 Mysql로 따지면 select라고 봐도 될듯하다.
- first
- 최초 1개의 값을 반환
- mysql에서 limit 1이라고 생각해도 무방
- take
- 최초 값 부터 N개를 반환
- mysql에서 limit n이라고 생각해도 무방
- takeSample
- sample과 동일하지만, 다른것은 RDD가 아닌 Collection Type으로 반환
Count
characterRDD.count
// 4
characterRDD.countByValue
// Map(best,worst -> 1, good,bad -> 2, plus,minus -> 1)
Reduce / Fold / aggregate
numericRDD.reduce(_ * _)
//reduce : 24
numericRDD.fold(1)(_ * _)
//fold : 24
numericRDD.fold(0)(_ * _)
//fold : 0
numericRDD.aggregate(zeroValue = Fruit(0, 0))(seqOp = (f: Fruit, v: Int) => f.add(v), combOp = (f1: Fruit, f2: Fruit) => f1.add(f2))
numericRDD.aggregate(zeroValue = Fruit(0, 0))(seqOp = _.add(_), combOp = _.add(_))
numericRDD.aggregate(zeroValue = Fruit(0, 0))(seqOp = _ add _, combOp = _ add _)
numericRDD.aggregate(Fruit(0, 0))(_ add _, _ add _)
// Fruit(10,4)
- RDD의 요소에 관해 연산 후 결과값을 반환
- reduce , fold 모두 병렬처리 하기 때문에
결합법칙을 만족
해야한다
- 결과 도출 사유
- reduce
- 초기 값 없이 보유한 요소 만으로 연산
- 1 * 2 * 3 * 4 = 24
- fold
- 초기 값이 존재함
- 1 * 1 * 2 * 3 * 4 = 24
- 0 * 1 * 2 * 3 * 4 = 0
- 위와 같은 이유로 코드 구현 시 잘 고민하고 구현해야 한다
- aggregate
- Fruit은 RDD Transformation 의 combineByKey / aggregateByKey 구현해 놓았다.
- amount : Long => Int로 변경함
- 인자별 의미
aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
- zeroValue
- seqOp
- (U, T) => U
- 각 Partition에서 병합하는 함수
- combOp
- (U, U) => U
- 모든 Partition들을 최종적으로 병합하는 함수
- aggregate 은 scala에서 위 코드 순서대로 요약하여 사용할 수 있다.
Foreach / foreachPartition
numericRDD.foreach {
v => println(s"foreach values = $v")
}
numericRDD.foreachPartition {
println("foreachPartition partition")
values => for (v <- values) println(s"foreachPartition $v")
- 모든 RDD요소에 특정 함수를 적용시키는 것.
- Map과 다른점은 Map은 반환값이 존재하지만, foreach는 반환값이 존재하지 않고 행위만 존재한다
- foreach - foreachPartition 차이점
- foreach는 요소단위로 수행
- foreachPartition은 Partition단위로 수행
- Driver에서 수행하는 것이 아닌 개별 노드에서 실행된다
- 분산 처리 하기 때문에 의도한 것과 구현한 것이 동일한지 잘 고민 해봐야 한다
cache / persist / unpersist
numericRDD.persist()
numericRDD.unpersist()
numericRDD.cache()
numericRDD.persist(StorageLevel.MEMORY_ONLY)
Spark RDD는 수행할 때 마다 연산을 통해 새로은 RDD를 사용한다
이 때 RDD를 재사용한다면, 해당 연산을 통새 생성된 RDD를 저장하는 것을 위 메소드로 구현한다
- persist / unpersist
- 메모리 / 디스크에 저장하는 메소드
persist
로 저장하고 unpersist
로 제거한다
- 옵션으로 아래와 같은것들을 통해
메모리
, 디스크
, 메모리 + 디스크
에 적재 할지 고를 수 있다.
- NONE , DISK_ONLY , DISK_ONLY_2 , MEMORY_ONLY , MEMORY_ONLY_2 , MEMORY_ONLY_SER , MEMORY_ONLY_SER_2 , MEMORY_AND_DISK , MEMORY_AND_DISK_2 , MEMORY_AND_DISK_SER , MEMORY_AND_DISK_SER_2 , OFF_HEAP
- cache
- 메모리에 RDD를 적재하는 메소드
- persist(StorageLevel.MEMORY_ONLY) 와 동일하다
File IO
characterRDD.saveAsTextFile("file://<File_Path>/test01")
characterRDD.saveAsTextFile("file://<File_Path>/test02", classOf[org.apache.hadoop.io.compress.BZip2Codec])
characterRDD2.saveAsObjectFile("file://<File_Path>/obj01")
val rdd = sc.objectFile[String]("file://<File_Path>/obj01")
rdd.collect().foreach (x => println(x))
pairRDD.saveAsSequenceFile("file://<File_Path>/seq01")
val rdd2 = sc.sequenceFile[String, Long]("file://<File_Path>/seq01")
println(rdd2.collect().mkString(","))
- saveAsTextFile
- 기본적으로 TestFile 형태로 적재하는 방식
- 위에 작성한 것과 같이 saveAsTextFile(경로 , 코덱)
- 코덱을 통해 원하는 압축 형식을 지정할 수 있다.
- bzip2,lz4,snappy,zlib
- saveAsObjectFile
- Object형태로 적재하는 방식
- Java/ Scala에서는 자바 표준 직렬화를 사용한다.
- SequenceFile와 비교하여 상대적으로 느리다
- saveAsSequenceFile
- objectFile 과 동일하게 Binary 파일 포멧으로 저장
- 차이점
- Hadoop자체에서 정의한 직렬화 프레임워크 사용
- Hadoop Writeable 인터페이스를 구현하고 있어야 한다.
- Scala의 경우 saveAsObjectFile와 차이 없이 간단하게 사용 가능
공유 변수
Broadcast Variables
val broadcastChar = sc.broadcast(Set("good,bad", "plus,minus"))
- Spark Job이 실행되는 동안 모든 노드에 공유하는 읽기 전용 자원
- 모든 노드에 공유 즉 복제 되어 사용한다
- Network 비용이 절감하여 성능 향상을 기대할 수 있다.
- 해당 변수는 Heap영역에 저장된다
- 사용하는 동안 메모리를 지속적으로 사용한다.
- unpersist를 이용하여 제거할 수 있다.
Accumulators
val error_cnt = sc.longAccumulator("invalidFormat")
val error_data = sc.collectionAccumulator[String]("invalidFormat2")
val data = List("Log01:haha", "Log02:hoho", "Log03::", "Log04:aaa", "Log05||Addr5", "Log06::Log7")
sc.parallelize(data, 3).foreach {
v =>
if (v.split(":").length != 2) {
error_cnt.add(1L)
error_data.add(v)
}
}
println("잘못된 데이터 수:" + error_cnt.value)
println("잘못된 데이터:" + error_data.value)
// 잘못된 데이터 수:3
// 잘못된 데이터:[Log05||Addr5, Log06::Log7, Log03::]
- 실제로 사용 해본 적이 없어 완벽하게 이해가 되지 않아 조금 더 이해해야 할 것 같다
- Broadcast 와 반대로 쓰기 동작을 위한 작업
- 여러 노드에서 수행되는 동작을 한곳에 적재하는 방법
- 생성,초기화는 드라이버 프로그램에서 동작한다
- 클러스터의 여러 Task에서 병렬로 쓰여질 수 있다.
- Accumulator의 값을 증가시킬 수 있지만, 값을 참조해서 사용하는 것은 불가능하다.
- 특별한 목적이 없으면, Action Function을 수행할 때 사용해야한다
- 불필요한 반복이 발생할 가능성이 높기 때문
- eg. map, flatmap과 같은 것을 동작하면 불필요하게 값을 증가시킬 가능성이 높다
- 기본적으로 제공하는 Accumulators이 존재한다
- long,collection,double, 등이 존재한다
- 위 코드와 같이 만들면, 각 노드에서 병렬로 실행 시 문제가 발생한 Log를 확인할 수 있다.
- 사용자 지정으로 생성 할 수 있다.
- 아래와 같이 case class를 생성하여 사용할 수 있다.
- Python에서는 register를 하지 않고 바로 사용할 수 있다
- 주요 메소드
- isZero
- 초기 상태 여부 판단
- 아래와 같이 조건문 등을 이용하여 판단할 수 있다.
- reset
- 이름 그대로 초기 상태로 되돌리는 메소드
- add
- 값을 추가할 때 사용하는 메소드. 사용자 정의시 아래와 같이 case class의 메소드를 이용하여 동작할 수 있다.
- 값 추가는 Accumulator 내부에서 동작한다.
- copy
- 해당 Accumulator를 복제하는 메소드
- 미리 데이터를 복제하여 안전하게 데이터를 읽을 때 혹은 병렬로 수정할 때 사용
- merge
- n+1개의 Accumulator를 병합하는 메소드
- n+1개의 테스크에서 생성된 Accumulato를 하나의 Accumulator로 병합할 때 사용
- value
- Accumulator의 값을 반환하는 메소드
package main.RDD_action
import io.netty.handler.logging.LogLevel
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
case class Fruit(var price: Long, var amount: Long = 1) {
def add(p: Long): Fruit = add(Fruit(p))
def add(other: Fruit): Fruit = {
this.price += other.price
this.amount += other.amount
this
}
}
class fruitAccumulator extends AccumulatorV2[Fruit, (Long,Long)] {
private var acc = Fruit(0)
override def isZero: Boolean = acc.price == 0L && acc.amount == 1L
override def copy(): AccumulatorV2[Fruit, (Long,Long)] = {
val newAcc = new fruitAccumulator
newAcc.acc = Fruit(acc.price, acc.amount)
newAcc
}
override def reset(): Unit = {
acc.price = 0L
acc.amount = 1L
}
override def add(other: Fruit): Unit = acc.add(other)
override def merge(other: AccumulatorV2[Fruit, (Long,Long)]): Unit = other match{
case o: fruitAccumulator => acc.add(o.acc)
case _ => throw new RuntimeException()
}
override def value: (Long,Long) = {
(acc.price, acc.amount)
}
}
object fruitAccumulator {
def main(args: Array[String]): Unit = {
// Spark 설정
val conf = new SparkConf()
.setAppName("fruitAccumulator")
.setMaster("local[*]")
.set("spark.local.ip", "localhost")
.set("spark.driver.host", "localhost")
// SparkContext생성
val sc = new SparkContext(conf)
sc.setLogLevel(LogLevel.WARN.name())
val acc = new fruitAccumulator
sc.register(acc , "fruitAccumulator")
val data =List("Fruit:2000", "Fruit02:3000", "Fruit03::", "Fruit04:2000", "Fruit05||4000", "Fruit06::Log7")
sc.parallelize(data,3).foreach {
v =>
// println(s"v : $v ")
if(v.split(":").length != 2){
println(s"added v : $v ")
acc.add(Fruit(2))
}
}
println(acc.value)
}
}
[ Reference ]
개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!