RDD Action?

  • 간단히 설명하자면, 결과 값, 즉 return이 RDD가 아닌 다른 것으로 반환되는 연산이다.
  • RDD Transformation을 수행하게 만드는 도화선이라고 봐도 된다.
    • 즉 N개의 Transformation Method들이 쌓여도 수행하지 않다가 Action Method가 호출되면 그 때 순차적으로 수행한다

Output Function

기본적으로 Collection Type으로 반환하는 경우가 많기 때문에 너무 많은 개수를 반환하면, OOM 이 발생할 수 있다.

데이터 추출

// collect
characterRDD.collect.mkString("Array(", ", ", ")")
//Array(good,bad, good,bad, best,worst, plus,minus)

// first
numericRDD.first()
// 1

// take
numericRDD.take(2).mkString("Array(", ", ", ")")
// Array(1, 2)

// takeSample
numericRDD.takeSample(withReplacement = false, 3).mkString("Array(", ", ", ")")
numericRDD.takeSample(withReplacement = true, 3).mkString("Array(", ", ", ")")
// Array(1, 2, 3)
// Array(4, 2, 1)
  • collect
    • RDD를 Array[T]형태로 반환
    • 굳이 Mysql로 따지면 select라고 봐도 될듯하다.
  • first
    • 최초 1개의 값을 반환
    • mysql에서 limit 1이라고 생각해도 무방
  • take
    • 최초 값 부터 N개를 반환
    • mysql에서 limit n이라고 생각해도 무방
  • takeSample
    • sample과 동일하지만, 다른것은 RDD가 아닌 Collection Type으로 반환

Count

characterRDD.count  
// 4
characterRDD.countByValue
// Map(best,worst -> 1, good,bad -> 2, plus,minus -> 1)
  • count
    • 모든 요소 개수 반환
  • countByValue
    • 요소 별 개수

Reduce / Fold / aggregate

numericRDD.reduce(_ * _)
//reduce : 24
numericRDD.fold(1)(_ * _)
//fold : 24
numericRDD.fold(0)(_ * _)
//fold : 0
numericRDD.aggregate(zeroValue = Fruit(0, 0))(seqOp = (f: Fruit, v: Int) => f.add(v), combOp = (f1: Fruit, f2: Fruit) => f1.add(f2))
numericRDD.aggregate(zeroValue = Fruit(0, 0))(seqOp = _.add(_), combOp = _.add(_))
numericRDD.aggregate(zeroValue = Fruit(0, 0))(seqOp = _ add _, combOp = _ add _)
numericRDD.aggregate(Fruit(0, 0))(_ add _, _ add _)
// Fruit(10,4)
  • RDD의 요소에 관해 연산 후 결과값을 반환
    • reduce , fold 모두 병렬처리 하기 때문에 결합법칙을 만족해야한다
  • 결과 도출 사유
    • reduce
      • 초기 값 없이 보유한 요소 만으로 연산
      • 1 * 2 * 3 * 4 = 24
    • fold
      • 초기 값이 존재함
      • 1 * 1 * 2 * 3 * 4 = 24
      • 0 * 1 * 2 * 3 * 4 = 0
    • 위와 같은 이유로 코드 구현 시 잘 고민하고 구현해야 한다
  • aggregate
    • Fruit은 RDD Transformation 의 combineByKey / aggregateByKey 구현해 놓았다.
      • amount : Long => Int로 변경함
    • 인자별 의미
      • aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
      • zeroValue
        • 초기 값
      • seqOp
        • (U, T) => U
          • T : RDD의 요소들이 갖는 타입
        • 각 Partition에서 병합하는 함수
      • combOp
        • (U, U) => U
          • 인자 타입과 반환타입이 동일함
        • 모든 Partition들을 최종적으로 병합하는 함수
    • aggregate 은 scala에서 위 코드 순서대로 요약하여 사용할 수 있다.

Foreach / foreachPartition

numericRDD.foreach {  
  v => println(s"foreach values = $v")  
}  
  
numericRDD.foreachPartition {  
  println("foreachPartition partition")  
  values => for (v <- values) println(s"foreachPartition $v")
  • 모든 RDD요소에 특정 함수를 적용시키는 것.
    • 모든 요소를 순회
  • Map과 다른점은 Map은 반환값이 존재하지만, foreach는 반환값이 존재하지 않고 행위만 존재한다
  • foreach - foreachPartition 차이점
    • foreach는 요소단위로 수행
    • foreachPartition은 Partition단위로 수행
  • Driver에서 수행하는 것이 아닌 개별 노드에서 실행된다
    • 분산 처리 하기 때문에 의도한 것과 구현한 것이 동일한지 잘 고민 해봐야 한다

cache / persist / unpersist

numericRDD.persist()  
numericRDD.unpersist()

numericRDD.cache()  
numericRDD.persist(StorageLevel.MEMORY_ONLY)  

Spark RDD는 수행할 때 마다 연산을 통해 새로은 RDD를 사용한다
이 때 RDD를 재사용한다면, 해당 연산을 통새 생성된 RDD를 저장하는 것을 위 메소드로 구현한다

  • persist / unpersist
    • 메모리 / 디스크에 저장하는 메소드
    • persist 로 저장하고 unpersist 로 제거한다
    • 옵션으로 아래와 같은것들을 통해 메모리, 디스크, 메모리 + 디스크 에 적재 할지 고를 수 있다.
      • NONE , DISK_ONLY , DISK_ONLY_2 , MEMORY_ONLY , MEMORY_ONLY_2 , MEMORY_ONLY_SER , MEMORY_ONLY_SER_2 , MEMORY_AND_DISK , MEMORY_AND_DISK_2 , MEMORY_AND_DISK_SER , MEMORY_AND_DISK_SER_2 , OFF_HEAP
  • cache
    • 메모리에 RDD를 적재하는 메소드
    • persist(StorageLevel.MEMORY_ONLY) 와 동일하다
      • API따라 들어가다 보면 위와 같음

File IO

characterRDD.saveAsTextFile("file://<File_Path>/test01")  
characterRDD.saveAsTextFile("file://<File_Path>/test02", classOf[org.apache.hadoop.io.compress.BZip2Codec])  
  
characterRDD2.saveAsObjectFile("file://<File_Path>/obj01")  
val rdd = sc.objectFile[String]("file://<File_Path>/obj01")  
rdd.collect().foreach (x => println(x))  
  
pairRDD.saveAsSequenceFile("file://<File_Path>/seq01")  
val rdd2 = sc.sequenceFile[String, Long]("file://<File_Path>/seq01")  
println(rdd2.collect().mkString(","))
  • saveAsTextFile
    • 기본적으로 TestFile 형태로 적재하는 방식
    • 위에 작성한 것과 같이 saveAsTextFile(경로 , 코덱)
      • 코덱을 통해 원하는 압축 형식을 지정할 수 있다.
      • bzip2,lz4,snappy,zlib
  • saveAsObjectFile
    • Object형태로 적재하는 방식
    • Java/ Scala에서는 자바 표준 직렬화를 사용한다.
    • SequenceFile와 비교하여 상대적으로 느리다
  • saveAsSequenceFile
    • objectFile 과 동일하게 Binary 파일 포멧으로 저장
      • 차이점
        • Hadoop자체에서 정의한 직렬화 프레임워크 사용
        • Hadoop Writeable 인터페이스를 구현하고 있어야 한다.
        • Scala의 경우 saveAsObjectFile와 차이 없이 간단하게 사용 가능

공유 변수

Broadcast Variables

val broadcastChar = sc.broadcast(Set("good,bad", "plus,minus"))
  • Spark Job이 실행되는 동안 모든 노드에 공유하는 읽기 전용 자원
    • 모든 노드에 공유 즉 복제 되어 사용한다
      • Network 비용이 절감하여 성능 향상을 기대할 수 있다.
    • 해당 변수는 Heap영역에 저장된다
      • 사용하는 동안 메모리를 지속적으로 사용한다.
      • unpersist를 이용하여 제거할 수 있다.

Accumulators

val error_cnt = sc.longAccumulator("invalidFormat")  
val error_data = sc.collectionAccumulator[String]("invalidFormat2")  
val data = List("Log01:haha", "Log02:hoho", "Log03::", "Log04:aaa", "Log05||Addr5", "Log06::Log7")  
sc.parallelize(data, 3).foreach {  
  v =>  
    if (v.split(":").length != 2) {  
      error_cnt.add(1L)  
      error_data.add(v)  
    }  
}  
println("잘못된 데이터 수:" + error_cnt.value)  
println("잘못된 데이터:" + error_data.value)
// 잘못된 데이터 수:3
// 잘못된 데이터:[Log05||Addr5, Log06::Log7, Log03::]
  • 실제로 사용 해본 적이 없어 완벽하게 이해가 되지 않아 조금 더 이해해야 할 것 같다
  • Broadcast 와 반대로 쓰기 동작을 위한 작업
  • 여러 노드에서 수행되는 동작을 한곳에 적재하는 방법
    • 생성,초기화는 드라이버 프로그램에서 동작한다
    • 클러스터의 여러 Task에서 병렬로 쓰여질 수 있다.
    • Accumulator의 값을 증가시킬 수 있지만, 값을 참조해서 사용하는 것은 불가능하다.
  • 특별한 목적이 없으면, Action Function을 수행할 때 사용해야한다
    • 불필요한 반복이 발생할 가능성이 높기 때문
    • eg. map, flatmap과 같은 것을 동작하면 불필요하게 값을 증가시킬 가능성이 높다
  • 기본적으로 제공하는 Accumulators이 존재한다
    • long,collection,double, 등이 존재한다
  • 위 코드와 같이 만들면, 각 노드에서 병렬로 실행 시 문제가 발생한 Log를 확인할 수 있다.
  • 사용자 지정으로 생성 할 수 있다.
    • 아래와 같이 case class를 생성하여 사용할 수 있다.
    • Python에서는 register를 하지 않고 바로 사용할 수 있다
    • 주요 메소드
      1. isZero
        1. 초기 상태 여부 판단
        2. 아래와 같이 조건문 등을 이용하여 판단할 수 있다.
      2. reset
        1. 이름 그대로 초기 상태로 되돌리는 메소드
      3. add
        1. 값을 추가할 때 사용하는 메소드. 사용자 정의시 아래와 같이 case class의 메소드를 이용하여 동작할 수 있다.
        2. 값 추가는 Accumulator 내부에서 동작한다.
      4. copy
        1. 해당 Accumulator를 복제하는 메소드
        2. 미리 데이터를 복제하여 안전하게 데이터를 읽을 때 혹은 병렬로 수정할 때 사용
      5. merge
        1. n+1개의 Accumulator를 병합하는 메소드
          1. n+1개의 테스크에서 생성된 Accumulato를 하나의 Accumulator로 병합할 때 사용
      6. value
        1. Accumulator의 값을 반환하는 메소드
package main.RDD_action  
  
import io.netty.handler.logging.LogLevel  
import org.apache.spark.util.AccumulatorV2  
import org.apache.spark.{SparkConf, SparkContext}  
  
case class Fruit(var price: Long, var amount: Long = 1) {  
  def add(p: Long): Fruit = add(Fruit(p))  
  def add(other: Fruit): Fruit = {  
    this.price += other.price  
    this.amount += other.amount  
    this  
  }  
}  
  
class fruitAccumulator extends AccumulatorV2[Fruit, (Long,Long)] {  
  private var acc = Fruit(0)  
  override def isZero: Boolean = acc.price == 0L && acc.amount == 1L  
  override def copy(): AccumulatorV2[Fruit, (Long,Long)] = {  
    val newAcc = new fruitAccumulator  
    newAcc.acc = Fruit(acc.price, acc.amount)  
    newAcc  
  }  
  override def reset(): Unit = {  
    acc.price = 0L  
    acc.amount = 1L  
  }  
  override def add(other: Fruit): Unit = acc.add(other)  
  override def merge(other: AccumulatorV2[Fruit, (Long,Long)]): Unit = other match{  
    case o: fruitAccumulator => acc.add(o.acc)  
    case _                   => throw new RuntimeException()  
  }  
  override def value: (Long,Long) = {  
    (acc.price, acc.amount)  
  }  
}  
  
object fruitAccumulator {  
  def main(args: Array[String]): Unit = {  
    // Spark 설정  
    val conf = new SparkConf()  
      .setAppName("fruitAccumulator")  
      .setMaster("local[*]")  
      .set("spark.local.ip", "localhost")  
      .set("spark.driver.host", "localhost")  
  
    // SparkContext생성  
    val sc = new SparkContext(conf)  
    sc.setLogLevel(LogLevel.WARN.name())  
  
    val acc = new fruitAccumulator  
    sc.register(acc , "fruitAccumulator")  
    val data =List("Fruit:2000", "Fruit02:3000", "Fruit03::", "Fruit04:2000", "Fruit05||4000", "Fruit06::Log7")  
    sc.parallelize(data,3).foreach {  
      v =>  
//        println(s"v : $v ")  
        if(v.split(":").length != 2){  
          println(s"added v : $v ")  
          acc.add(Fruit(2))  
        }  
    }  
    println(acc.value)  
  }  
}

[ Reference ]


개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

'프로그래밍 및 IT > Spark' 카테고리의 다른 글

spark-scheduling  (0) 2023.11.28
WordCount  (0) 2023.10.13
RDD Transformation  (0) 2023.10.13
Spark_RDD  (0) 2023.10.05
Spark  (0) 2023.10.04

+ Recent posts