Spark Setting

Spark를 실행할 때 여러가지 설정을 할 수 있다.
상세한 정보들은 너무 많기 때문에 help 명령어를 사용하여 어떤 설정을 할 수 있는지 확인해 볼 수 있다.
Spark 공식홈페이지에서 본인이 사용하는 Spark 버전에 따라 확인해봐도 된다.

간단 예시 spark-submit 예시

spark-submit \
  --class {실행할 파일의 클래스} \
  --name {어블리케이션 명} \
  --master {클러스터 메니져 정보} \
  --deploy-mode {디플로이 모드} \
  --files {추가할 파일} \
  --jars {추가할 Jars 파일들} \
  --conf {추가할 설정} \  
  --driver-cores {드라이버 코어수} \
  --driver-memory {드라이버 메모리} \
  --executor-cores {익세큐터 코어수} \
  --executor-memory {익세큐터 메모리} \
  --num-executors {익세큐터 수} \
  {실행할 파일 경로}

위와 같이 Spark Submit할 때 해도 되지만, spark-defaults.conf에 미리 설정하여 사용해도 된다.
Spark Submit혹은 spark-defaults.conf에 추가한 값들이 모여 Spark Properties에 반영된다.
설정 우선순위 : code -> spark-submit -> spark-defaults.conf

[ Reference ]


개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

'프로그래밍 및 IT > Spark' 카테고리의 다른 글

SparkSql  (0) 2023.11.28
spark-scheduling  (0) 2023.11.28
WordCount  (0) 2023.10.13
RDD Action  (0) 2023.10.13
RDD Transformation  (0) 2023.10.13

Spark-Scheduling?

개인적으로 Spark를 사용하는 이유는 cluster-mode를 사용하기 위해라고 생각한다
Yarn등을 이용하여 각 Driver,Excutor에게 core,memory등을 할당 하는데, 많이 할당한다고 항상 좋은 결과를 보는 것은 아니다
어디서 본지 기억이 나지 않지만, core는 1~5개, memory는 4gb이상을 사용하는 것이 좋다고 봤었다.

Cluster Scheduling

Static resource allocation

정적(고정) 자원 할당

  • 명칭 그대로 최초 할당한 그대로 어플리케이션이 종료될 때 까지 자원을 점유하는 방식
  • Pipe line에 따라 데이터를 처리하여 필요한 자원 사전 계산 후 할당하면 좋은 방식이다.

Dynamic resource allocation

동적 자원 할당

  • 자원을 필요에 따라 동적으로 할당하는 방식

    • Spark-Shell , 혹은 web기반으로 동작하면 항상 많은 자원을 할당받아 자원 낭비가 발생할 수 있는 상황에 합리적인 선택
  • 공통

    • spark.dynamicAllocation.enabled true 설정을 해야한다
  • Standalone

    • spark.shuffle.service.enabled true
  • Yarn

    • 모든 node manager에 spark-{version}-yarn-shuffle.jar를 class path등록해야 한다
    • yarn-site.xml에 아래와 같은 설정이 있어야 한다.(안 되어 있으면 설정 후 재시작)
    	<property>
            <name>yarn.nodemanager.aux-services</name>
            <value>mapreduce_shuffle,spark_shuffle</value>
        </property>
        <property>
            <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
            <value>org.apache.spark.network.yarn.YarnShuffleService</value>
        </property>
        <property>
            <name>spark.shuffle.service.enabled</name>
            <value>true</value>
        </property>
    
  • yarn에서 별도 Shuffle Service를 실행시킨 이유는 동작 도중 Executor가 제거될 수 있기 때문

    • Mapper에 저장된 데이터Reducer단계에 Network를 통해 데이터를 읽어 가는데 이때 Executor가 삭제되면 해당 데이터는 삭제
    • 그러한 문제가 발생하지 않기 위해 Shuffle data를 따로 관리 할 수 있는 Shuffle Process를 설정하는 것

Application Scheduling

Spark Context는 기본적으로 Multi-Thread 방식으로 실행된다.
그로인하여 N+1개의 Action Function을 동시 실행해도 문제가 발생하지 않는다.

  • Scheduling으로 FIFO방식으로 구동 된다
    • 후속 작업은 이전 작업이 끝날 때 까지 대기해야한다
    • Fair scheduler 방식도 가능하다
      • Round robin
    • 설정
      • Code 단 : conf.set("spark.scheduler.mode", "Fair")
      • spark-default.conf에서 설정해도 된다.

fairscheduler.xml에 poll 설정을 하여 원하는 Pool이용하는 코드를 작성할 수 있다

<allocations>
  <pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
  </pool>
</allocations>
  • fairscheduler.xml에 기본적으로 작성되어있는 것을 가져왔다.
    • weight : 우선순위
    • minShare : Pool의 최초 CPU Core
val conf = new SparkConf()  
  .setAppName("application_name")  
  .setMaster("local[*]")  
  .set("spark.local.ip", "localhost")  
  .set("spark.driver.host", "localhost")  
  // .set("spark.scheduler.mode", "Fair")  //이와 같이 특정 scheduler 사용가능
  .set("spark.scheduler.allocation.file" ,"file path") // 
val sc = new SparkContext(conf)
sc.setLocalProperty("spark.scheduler.pool", "production") // pool설정  
sc.setLocalProperty("spark.scheduler.pool", null) // 기본 pool설정
  • 위와 같이 사용하면 scheduler mode : FAIR , 최소 CPU core : 2
  • 이와 같이 N+1명의 사용자가 접근하면 사용자에 따른 자원 할당을 할 수 있다

[ Reference ]


개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

'프로그래밍 및 IT > Spark' 카테고리의 다른 글

SparkSql  (0) 2023.11.28
spark-Setting  (0) 2023.11.28
WordCount  (0) 2023.10.13
RDD Action  (0) 2023.10.13
RDD Transformation  (0) 2023.10.13

HadoopSpark를 공부하기 시작하면 가장 처음 나오는 것이 Map /Reduce이다
- Map /Reduce 설명은 Hadoop링크로 들어가 보면 설명을 해놓았다.
이것을 가장 이해하기 편하게 예시를 드는 것이 WordCount이다.

간단 예시

구현

아래와 같이 구현을 하고 , args 를 local[*] 텍스트파일 경로 적재 위치 순으로 넣어서 실행해보면 된다.

import org.apache.spark.rdd.RDD  
import org.apache.spark.{SparkConf, SparkContext}  
  
object wordCount {  
  def main(args: Array[String]): Unit = {  
    // Args 3개 필수  
    require(args.length == 3, "Usage : <Master> <Input> <Output>")  
  
    // Spark 설정  
    val conf = new SparkConf()  
      .setAppName("WorkCount")  
      .setMaster(args(0))  
      .set("spark.local.ip", "localhost")  
      .set("spark.driver.host", "localhost")  
  
    // SparkContext생성  
    val sc   = new SparkContext(conf)  
    val input  = readFile(sc, args(1))  
    val result = process(input)  
    saveFile(result, args(2))  
  
    sc.stop()  
  }  
  
  def readFile(sc: SparkContext, path: String): RDD[String] = {  
    sc.textFile(path) // 파일을 읽고  
  }  
  
  def process(input: RDD[String]): RDD[(String, Int)] = {  
    input.flatMap(str => str.split(" ")) //공백으로 분류  
         .map((_, 1)) // 각각 key : 1 로 만듦  
         .reduceByKey(_ + _) // value들을 key를 기준으로 합산  
  }  
  
  def saveFile(output: RDD[(String, Int)], path: String): Unit = {  
    output.saveAsTextFile(path) // 적재  
  }  
}
  • 위 코드를 실행시키면 디렉토리가 생성되고 안에는 여러 파일들이 존재한다
    • 그 중 part-00000를 출력해보면 WordCount가 잘 동작했는지 확인 가능하다
    • tail,head,cat 원하는 것으로 확인해 보면 된다.
C:\Apps\hadoop-3.1.2\output>cat part-00000
(Unless,4)
(works,,4)
...

테스트 코드

  • JUnit을 이용하면 아래와같이
import org.apache.spark.{SparkConf, SparkContext}  
import org.junit.jupiter.api.Test  
  
import scala.collection.mutable.ListBuffer  
  
class wordCountTest {  
  
  @Test  
  def test(): Unit = {  
    val conf = new SparkConf()  
      .setMaster("local[*]")  
      .setAppName("wordCountTest")  
      .set("spark.local.ip", "localhost")  
      .set("spark.driver.host", "localhost")  
    val sc   = new SparkContext(conf)  
    val input = new ListBuffer[String]  
    input += "License shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document."  
    input += "Licensor shall mean the copyright owner or entity authorized by the copyright owner that is granting the License"  
    input.toList  
  
    val inputRDD = sc.parallelize(input)  
    val result = wordCount.process(inputRDD)  
    val resultMap = result.collectAsMap()  
  
    assert(resultMap("shall") == 2)  
    assert(resultMap("copyright") == 2)  
    assert(resultMap("License") == 2)  
    println(resultMap)
  
    sc.stop()  
  }  
}
  • FlatSpec을 이용하면 아래와 같이 구현하면 된다.
import org.apache.spark.{SparkConf, SparkContext}  
import org.scalatest.flatspec.FlatSpec  
import org.scalatest.matchers.should.Matchers  
  
  
class workCountTest_Spec extends FlatSpec with Matchers {  
  val conf = new SparkConf()  
    .setMaster("local[*]")  
    .setAppName("wordCountTest")  
    .set("spark.local.ip", "localhost")  
    .set("spark.driver.host", "localhost")  
  val sc   = new SparkContext(conf)  
  
  "WorkCount Process" should "Correctly count words in the input" in {  
    val inputRDD = sc.parallelize(Seq(  
      "License shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document."  
      , "Licensor shall mean the copyright owner or entity authorized by the copyright owner that is granting the License"  
      ))  
    val result = wordCount.process(inputRDD)  
  
    val resultMap = result.collectAsMap()  
    println(resultMap)  
    resultMap("shall") shouldEqual 2  
    resultMap("copyright") shouldEqual 2  
    resultMap("License") shouldEqual 2  
//    resultMap("License") shouldEqual 1
  }  
  
}

성공 시 성공했다고만 하고
실패시 아래와 같이 에러 발생한다
!Pasted image 20231005140735.png
Pasted image 20231005140926.png

  • Intellij에서 구동하였고, build tool을 Gradle로 해서 그런지 :test ....를 못 찾는다고 에러가 발생했다.
    • 간단 해결방법 아래와 같이 setting을 설정해 주면 된다
      • Run Tests using 을 Gradle -> Intellij IDEA로 변경
    • 추가적으로 intellij IDEA로 하면 조금 더 빠르다는 이야기가 있다.
      Pasted image 20231005141130.png

[ Reference ]


개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

'프로그래밍 및 IT > Spark' 카테고리의 다른 글

spark-Setting  (0) 2023.11.28
spark-scheduling  (0) 2023.11.28
RDD Action  (0) 2023.10.13
RDD Transformation  (0) 2023.10.13
Spark_RDD  (0) 2023.10.05

RDD Action?

  • 간단히 설명하자면, 결과 값, 즉 return이 RDD가 아닌 다른 것으로 반환되는 연산이다.
  • RDD Transformation을 수행하게 만드는 도화선이라고 봐도 된다.
    • 즉 N개의 Transformation Method들이 쌓여도 수행하지 않다가 Action Method가 호출되면 그 때 순차적으로 수행한다

Output Function

기본적으로 Collection Type으로 반환하는 경우가 많기 때문에 너무 많은 개수를 반환하면, OOM 이 발생할 수 있다.

데이터 추출

// collect
characterRDD.collect.mkString("Array(", ", ", ")")
//Array(good,bad, good,bad, best,worst, plus,minus)

// first
numericRDD.first()
// 1

// take
numericRDD.take(2).mkString("Array(", ", ", ")")
// Array(1, 2)

// takeSample
numericRDD.takeSample(withReplacement = false, 3).mkString("Array(", ", ", ")")
numericRDD.takeSample(withReplacement = true, 3).mkString("Array(", ", ", ")")
// Array(1, 2, 3)
// Array(4, 2, 1)
  • collect
    • RDD를 Array[T]형태로 반환
    • 굳이 Mysql로 따지면 select라고 봐도 될듯하다.
  • first
    • 최초 1개의 값을 반환
    • mysql에서 limit 1이라고 생각해도 무방
  • take
    • 최초 값 부터 N개를 반환
    • mysql에서 limit n이라고 생각해도 무방
  • takeSample
    • sample과 동일하지만, 다른것은 RDD가 아닌 Collection Type으로 반환

Count

characterRDD.count  
// 4
characterRDD.countByValue
// Map(best,worst -> 1, good,bad -> 2, plus,minus -> 1)
  • count
    • 모든 요소 개수 반환
  • countByValue
    • 요소 별 개수

Reduce / Fold / aggregate

numericRDD.reduce(_ * _)
//reduce : 24
numericRDD.fold(1)(_ * _)
//fold : 24
numericRDD.fold(0)(_ * _)
//fold : 0
numericRDD.aggregate(zeroValue = Fruit(0, 0))(seqOp = (f: Fruit, v: Int) => f.add(v), combOp = (f1: Fruit, f2: Fruit) => f1.add(f2))
numericRDD.aggregate(zeroValue = Fruit(0, 0))(seqOp = _.add(_), combOp = _.add(_))
numericRDD.aggregate(zeroValue = Fruit(0, 0))(seqOp = _ add _, combOp = _ add _)
numericRDD.aggregate(Fruit(0, 0))(_ add _, _ add _)
// Fruit(10,4)
  • RDD의 요소에 관해 연산 후 결과값을 반환
    • reduce , fold 모두 병렬처리 하기 때문에 결합법칙을 만족해야한다
  • 결과 도출 사유
    • reduce
      • 초기 값 없이 보유한 요소 만으로 연산
      • 1 * 2 * 3 * 4 = 24
    • fold
      • 초기 값이 존재함
      • 1 * 1 * 2 * 3 * 4 = 24
      • 0 * 1 * 2 * 3 * 4 = 0
    • 위와 같은 이유로 코드 구현 시 잘 고민하고 구현해야 한다
  • aggregate
    • Fruit은 RDD Transformation 의 combineByKey / aggregateByKey 구현해 놓았다.
      • amount : Long => Int로 변경함
    • 인자별 의미
      • aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
      • zeroValue
        • 초기 값
      • seqOp
        • (U, T) => U
          • T : RDD의 요소들이 갖는 타입
        • 각 Partition에서 병합하는 함수
      • combOp
        • (U, U) => U
          • 인자 타입과 반환타입이 동일함
        • 모든 Partition들을 최종적으로 병합하는 함수
    • aggregate 은 scala에서 위 코드 순서대로 요약하여 사용할 수 있다.

Foreach / foreachPartition

numericRDD.foreach {  
  v => println(s"foreach values = $v")  
}  
  
numericRDD.foreachPartition {  
  println("foreachPartition partition")  
  values => for (v <- values) println(s"foreachPartition $v")
  • 모든 RDD요소에 특정 함수를 적용시키는 것.
    • 모든 요소를 순회
  • Map과 다른점은 Map은 반환값이 존재하지만, foreach는 반환값이 존재하지 않고 행위만 존재한다
  • foreach - foreachPartition 차이점
    • foreach는 요소단위로 수행
    • foreachPartition은 Partition단위로 수행
  • Driver에서 수행하는 것이 아닌 개별 노드에서 실행된다
    • 분산 처리 하기 때문에 의도한 것과 구현한 것이 동일한지 잘 고민 해봐야 한다

cache / persist / unpersist

numericRDD.persist()  
numericRDD.unpersist()

numericRDD.cache()  
numericRDD.persist(StorageLevel.MEMORY_ONLY)  

Spark RDD는 수행할 때 마다 연산을 통해 새로은 RDD를 사용한다
이 때 RDD를 재사용한다면, 해당 연산을 통새 생성된 RDD를 저장하는 것을 위 메소드로 구현한다

  • persist / unpersist
    • 메모리 / 디스크에 저장하는 메소드
    • persist 로 저장하고 unpersist 로 제거한다
    • 옵션으로 아래와 같은것들을 통해 메모리, 디스크, 메모리 + 디스크 에 적재 할지 고를 수 있다.
      • NONE , DISK_ONLY , DISK_ONLY_2 , MEMORY_ONLY , MEMORY_ONLY_2 , MEMORY_ONLY_SER , MEMORY_ONLY_SER_2 , MEMORY_AND_DISK , MEMORY_AND_DISK_2 , MEMORY_AND_DISK_SER , MEMORY_AND_DISK_SER_2 , OFF_HEAP
  • cache
    • 메모리에 RDD를 적재하는 메소드
    • persist(StorageLevel.MEMORY_ONLY) 와 동일하다
      • API따라 들어가다 보면 위와 같음

File IO

characterRDD.saveAsTextFile("file://<File_Path>/test01")  
characterRDD.saveAsTextFile("file://<File_Path>/test02", classOf[org.apache.hadoop.io.compress.BZip2Codec])  
  
characterRDD2.saveAsObjectFile("file://<File_Path>/obj01")  
val rdd = sc.objectFile[String]("file://<File_Path>/obj01")  
rdd.collect().foreach (x => println(x))  
  
pairRDD.saveAsSequenceFile("file://<File_Path>/seq01")  
val rdd2 = sc.sequenceFile[String, Long]("file://<File_Path>/seq01")  
println(rdd2.collect().mkString(","))
  • saveAsTextFile
    • 기본적으로 TestFile 형태로 적재하는 방식
    • 위에 작성한 것과 같이 saveAsTextFile(경로 , 코덱)
      • 코덱을 통해 원하는 압축 형식을 지정할 수 있다.
      • bzip2,lz4,snappy,zlib
  • saveAsObjectFile
    • Object형태로 적재하는 방식
    • Java/ Scala에서는 자바 표준 직렬화를 사용한다.
    • SequenceFile와 비교하여 상대적으로 느리다
  • saveAsSequenceFile
    • objectFile 과 동일하게 Binary 파일 포멧으로 저장
      • 차이점
        • Hadoop자체에서 정의한 직렬화 프레임워크 사용
        • Hadoop Writeable 인터페이스를 구현하고 있어야 한다.
        • Scala의 경우 saveAsObjectFile와 차이 없이 간단하게 사용 가능

공유 변수

Broadcast Variables

val broadcastChar = sc.broadcast(Set("good,bad", "plus,minus"))
  • Spark Job이 실행되는 동안 모든 노드에 공유하는 읽기 전용 자원
    • 모든 노드에 공유 즉 복제 되어 사용한다
      • Network 비용이 절감하여 성능 향상을 기대할 수 있다.
    • 해당 변수는 Heap영역에 저장된다
      • 사용하는 동안 메모리를 지속적으로 사용한다.
      • unpersist를 이용하여 제거할 수 있다.

Accumulators

val error_cnt = sc.longAccumulator("invalidFormat")  
val error_data = sc.collectionAccumulator[String]("invalidFormat2")  
val data = List("Log01:haha", "Log02:hoho", "Log03::", "Log04:aaa", "Log05||Addr5", "Log06::Log7")  
sc.parallelize(data, 3).foreach {  
  v =>  
    if (v.split(":").length != 2) {  
      error_cnt.add(1L)  
      error_data.add(v)  
    }  
}  
println("잘못된 데이터 수:" + error_cnt.value)  
println("잘못된 데이터:" + error_data.value)
// 잘못된 데이터 수:3
// 잘못된 데이터:[Log05||Addr5, Log06::Log7, Log03::]
  • 실제로 사용 해본 적이 없어 완벽하게 이해가 되지 않아 조금 더 이해해야 할 것 같다
  • Broadcast 와 반대로 쓰기 동작을 위한 작업
  • 여러 노드에서 수행되는 동작을 한곳에 적재하는 방법
    • 생성,초기화는 드라이버 프로그램에서 동작한다
    • 클러스터의 여러 Task에서 병렬로 쓰여질 수 있다.
    • Accumulator의 값을 증가시킬 수 있지만, 값을 참조해서 사용하는 것은 불가능하다.
  • 특별한 목적이 없으면, Action Function을 수행할 때 사용해야한다
    • 불필요한 반복이 발생할 가능성이 높기 때문
    • eg. map, flatmap과 같은 것을 동작하면 불필요하게 값을 증가시킬 가능성이 높다
  • 기본적으로 제공하는 Accumulators이 존재한다
    • long,collection,double, 등이 존재한다
  • 위 코드와 같이 만들면, 각 노드에서 병렬로 실행 시 문제가 발생한 Log를 확인할 수 있다.
  • 사용자 지정으로 생성 할 수 있다.
    • 아래와 같이 case class를 생성하여 사용할 수 있다.
    • Python에서는 register를 하지 않고 바로 사용할 수 있다
    • 주요 메소드
      1. isZero
        1. 초기 상태 여부 판단
        2. 아래와 같이 조건문 등을 이용하여 판단할 수 있다.
      2. reset
        1. 이름 그대로 초기 상태로 되돌리는 메소드
      3. add
        1. 값을 추가할 때 사용하는 메소드. 사용자 정의시 아래와 같이 case class의 메소드를 이용하여 동작할 수 있다.
        2. 값 추가는 Accumulator 내부에서 동작한다.
      4. copy
        1. 해당 Accumulator를 복제하는 메소드
        2. 미리 데이터를 복제하여 안전하게 데이터를 읽을 때 혹은 병렬로 수정할 때 사용
      5. merge
        1. n+1개의 Accumulator를 병합하는 메소드
          1. n+1개의 테스크에서 생성된 Accumulato를 하나의 Accumulator로 병합할 때 사용
      6. value
        1. Accumulator의 값을 반환하는 메소드
package main.RDD_action  
  
import io.netty.handler.logging.LogLevel  
import org.apache.spark.util.AccumulatorV2  
import org.apache.spark.{SparkConf, SparkContext}  
  
case class Fruit(var price: Long, var amount: Long = 1) {  
  def add(p: Long): Fruit = add(Fruit(p))  
  def add(other: Fruit): Fruit = {  
    this.price += other.price  
    this.amount += other.amount  
    this  
  }  
}  
  
class fruitAccumulator extends AccumulatorV2[Fruit, (Long,Long)] {  
  private var acc = Fruit(0)  
  override def isZero: Boolean = acc.price == 0L && acc.amount == 1L  
  override def copy(): AccumulatorV2[Fruit, (Long,Long)] = {  
    val newAcc = new fruitAccumulator  
    newAcc.acc = Fruit(acc.price, acc.amount)  
    newAcc  
  }  
  override def reset(): Unit = {  
    acc.price = 0L  
    acc.amount = 1L  
  }  
  override def add(other: Fruit): Unit = acc.add(other)  
  override def merge(other: AccumulatorV2[Fruit, (Long,Long)]): Unit = other match{  
    case o: fruitAccumulator => acc.add(o.acc)  
    case _                   => throw new RuntimeException()  
  }  
  override def value: (Long,Long) = {  
    (acc.price, acc.amount)  
  }  
}  
  
object fruitAccumulator {  
  def main(args: Array[String]): Unit = {  
    // Spark 설정  
    val conf = new SparkConf()  
      .setAppName("fruitAccumulator")  
      .setMaster("local[*]")  
      .set("spark.local.ip", "localhost")  
      .set("spark.driver.host", "localhost")  
  
    // SparkContext생성  
    val sc = new SparkContext(conf)  
    sc.setLogLevel(LogLevel.WARN.name())  
  
    val acc = new fruitAccumulator  
    sc.register(acc , "fruitAccumulator")  
    val data =List("Fruit:2000", "Fruit02:3000", "Fruit03::", "Fruit04:2000", "Fruit05||4000", "Fruit06::Log7")  
    sc.parallelize(data,3).foreach {  
      v =>  
//        println(s"v : $v ")  
        if(v.split(":").length != 2){  
          println(s"added v : $v ")  
          acc.add(Fruit(2))  
        }  
    }  
    println(acc.value)  
  }  
}

[ Reference ]


개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

'프로그래밍 및 IT > Spark' 카테고리의 다른 글

spark-scheduling  (0) 2023.11.28
WordCount  (0) 2023.10.13
RDD Transformation  (0) 2023.10.13
Spark_RDD  (0) 2023.10.05
Spark  (0) 2023.10.04

RDD Transformation?

  • 간단히 설명하자면, 결과 값, 즉 return이 RDD로 반환되는 연산이다.
  • RDD들은 지속적으로 기록만 하다가, RDD Action을 수행할 때 연쇄적으로 수행된다.
    • lazy evaluation이다.

사용 변수들

val numericRDD     = sc.parallelize(1 to 4, 3)  
val characterRDD   = sc.parallelize(Seq("good,bad", "good,bad", "best,worst", "plus,minus"), 3)  
val characterRDD2  = sc.parallelize(Seq("good,bad", "good,bad", "plus,minus"), 3)  
val pairRDD        = characterRDD.zipWithIndex.map {case (key, value) => (key, value + 2) }  
val pairRDD2       = characterRDD.zipWithIndex.map {case (key, value) => (key, value + 10) }  
val pairRDD3       = characterRDD.zipWithIndex.map {case (key, value) => (value + 1, key) }  
val numericPairRDD = sc.parallelize(for (i <- 1 to 10) yield (scala.util.Random.nextInt(1000), "-"))
  • 기본적으로 위와 같이 RDD생성을 함

Map

Map

/*map*/
println(characterRDD.map(_.split(",")).collect().map(_.mkString("{", ",", "}")).mkString("(", ",", ")"))  
//  ({good,bad},{best,worst},{plus,minus})

/*mapValues*/
println(pairRDD.mapValues(num => num + 1).collect().mkString(", "))  
// (good,bad,3), (good,bad,4), (best,worst,5), (plus,minus,6)
  • map 실행 순서
    1. characterRDD.map(_.split(","))
      1. “good,bad”, “good,bad”, “best,worst”, “plus,minus” 를 “,” 로 분할
      2. seq((“good”,“bad”), (“good”,“bad”), (“best”,“worst”), (“plus”,“minus”))
    2. map(_.mkString("{", ",", "}")) 를 이용하여 문자열생성
      1. seq({“good”,“bad”}, {“good”,“bad”}, {“best”,“worst”}, {“plus”,“minus”})
    3. .mkString("(", ",", ")"))를 이용하여 최종 출력 모양 생성
      1. ({“good”,“bad”}, {“good”,“bad”}, {“best”,“worst”}, {“plus”,“minus”})
  • mapValues
    • Map과 동일하게 처리하는데, Value의 값을 조작하고 싶을 때 사용.
    • 위 코드는 각 value에 1을 더하는 연산이다

Flatmap

Monad 개념인데, 이것은 추후 공부할 예정

/*flatMap*/
println(characterRDD.flatMap(_.split(",")).collect().mkString("(", ",", ")"))  
//  (good,bad,best,worst,plus,minus)  

/*flatMapValues*/
val pairRDD3 = characterRDD.zipWithIndex.map {case (key, value) => (value + 1, key) }  
println(pairRDD2.flatMapValues(_.split(",")).collect().mkString(", "))  
// (1,good), (1,bad), (2,good), (2,bad), (3,best), (3,worst), (4,plus), (4,minus)
  • flatmap 실행 순서
    1. characterRDD.flatMap(_.split(","))
      1. 모든 값들을 풀어서 ","로 분할
      2. seq(“good”,“bad”,“good”,“bad”,“best”,“worst”,“plus”,“minus”)
    2. .mkString("(", ",", ")"))를 이용하여 최종 출력 모양 생성
      1. (“good”,“bad”,“good”,“bad”,“best”,“worst”,“plus”,“minus”)
  • flatMapValues
    • flatMap과 동일하게 처리하는데, Value의 값을 조작하고 싶을 때 사용.
    • key를 기준으로 value를 나누는 연산
      • Seq(1,“good,bad”), (2,“good,bad”), (3,“best,worst”), (4,“plus,minus”) 를
      • (1,good), (1,bad), (2,good), (2,bad), (3,best), (3,worst), (4,plus), (4,minus)로 변환

mapPartitions

println(  
  numericRDD.mapPartitions(nums => {  
	println("MapPartition loop")  
	nums.map(num => num * 2)  
  }).collect().mkString(", ")  
  )  
// MapPartition loop
// MapPartition loop
// MapPartition loop
// mapPartitions
// 2, 4, 6, 8

// 짝수만 출력
println(  
  numericRDD.mapPartitionsWithIndex((idx, nums) => {  
	println("mapPartitionsWithIndex loop")  
	nums.flatMap {  
	  case num if idx % 2 == 0 => Option(num * 2)  
	  case _                   => None  
	}  
  }).collect().mkString(", ")  
  )  
// mapPartitionsWithIndex loop
// mapPartitionsWithIndex loop
// mapPartitionsWithIndex loop
// 2, 6, 8
  • Partition를 기준으로 Map을 수행함
    • 3개의 Partition이 존재하면 3번 순회한다.
  • mapPartitions vs mapPartitionsWithIndex
    • 간단히 말하면 둘다 Partition 기준으로 순회한다
    • 차이점 : mapPartitionsWithIndex은 각 Row에 Index를 붙여서 반환한다.

Group

Zip

println(characterRDD.zip(numericRDD).collect().mkString("(", ",", ")"))  
//   ((good,bad,1),(good,bad,2),(best,worst,3),(plus,minus,4))  
  
println(characterRDD.zipPartitions(numericRDD, numericRDD) {  
  (key, num, num2) => for {  
    key_result <- key  
    num_result <- num  
    num_result2 <- num2.map(_ + 1)  
  } yield (key_result, num_result, num_result2)  
}.collect().mkString("(", ",", ")"))  
//  ((good,bad,1,2),(good,bad,2,3),(best,worst,3,4),(best,worst,3,5))  
  • zip
    • 2개의 RDD를 Key : value 로 묶어줌
    • 1 : 1 로 같은 인덱스 위치에 있는 것끼리 묶이기 때문에 길이가 및 파티션 수가 동일해야 함
  • zipPartitions
    • Partition을 요소로 사용하여 새로운 파티션을 생성
    • spark 2.4에서는 인자로 RDD 4개까지 가능
    • Partition 수가 동일해야함
numericRDD.groupBy {  
  case num: Int if num % 2 == 0 => "even"  
  case _                        => "odd"  
}.collect() foreach {  
  row => println(s"${row._1}, [${row._2.mkString(",")}]")  
}  
//      even, [2,4]  
//      odd, [1,3]  
  
pairRDD.groupByKey().collect() foreach {  
  row => println(s"${row._1}, [${row._2.mkString(",")}]")  
}  
//    best,worst, [3]  
//    good,bad, [1,2]  
//    plus,minus, [4]  
  
val rdd1   = sc.parallelize(Seq(("apple", 100), ("banana", 500), ("apple", 200)))  
val rdd2   = sc.parallelize(Seq(("banana", 400), ("apple", 300)))  
val result = rdd1.cogroup(rdd2)  
result.collect.foreach {  
  case (k, (rdd1_val, rdd2_val)) =>  
    println(s"($k, [${rdd1_val.mkString(",")}], [${rdd2_val.mkString(",")}])")  
}  
//    (apple, [100,200], [300])  
//    (banana, [500], [400])
  • groupBy
    • 조건에 의한 Grouping
  • groupByKey
    • Key값을 기준으로 grouping
  • cogroup
    • Key를 기준으로 각 RDD에 값을 묶음
    • key , rdd1의 values, rdd2의 values

Set

Distinct

println(  
  s"""distinct  
     |${characterRDD.distinct.collect.mkString(", ")}  
     |""".stripMargin  
  )  
//    best,worst, good,bad, plus,minus  
  • 이름 그대로 중복을 제거하는 역할

cartesian

println(  
  s"""cartesian  
     |${characterRDD.cartesian(numericRDD).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//    (good,bad,1), (good,bad,2), (good,bad,3), (good,bad,4),(good,bad,1), (good,bad,2), (good,bad,3), (good,bad,4),(best,worst,1), (plus,minus,1), (best,worst,2), (plus,minus,2), (best,worst,3), (best,worst,4), (plus,minus,3), (plus,minus,4)  
  • 카르테시안 곱
  • 모든 경우의 수를 조합하는 것이라고 보면된다.
  • Sql에서 Join 조건을 주지 않은 경우, Cross Join

subtract

println(  
  s"""subtract  
     |${characterRDD.subtract(characterRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//    best,worst  
  • rdd1에 존재하지만, rdd2에 존재하지 않은 값들을 RDD로 생성하여 반환
  • 예시
    • characterRDD : "good,bad", "good,bad", "best,worst", "plus,minus"
    • characterRDD2 : "good,bad", "good,bad", "plus,minus"
    • characterRDD에 “best,worst” 가 존재하지만 characterRDD2 에는 존재하지 않음
      • best,worst 반환

Union

println(  
  s"""union  
     |${characterRDD.union(characterRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//    good,bad, good,bad, best,worst, plus,minus, good,bad, good,bad, plus,minus  
  • 2개의 RDD를 병합하는 것
  • 자료형이 동일해야함

intersection

// 교집합을 구하는것  
println(  
  s"""intersection  
     |${characterRDD.intersection(characterRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//    good,bad, plus,minus 

Join

println(  
  s"""join  
     |${pairRDD.join(pairRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//(best,worst,(3,12)), (good,bad,(1,10)), (good,bad,(1,11)), (good,bad,(2,10)), (good,bad,(2,11)), (plus,minus,(4,13))  
  

println(  
  s"""leftOuterJoin  
     |${pairRDD.leftOuterJoin(pairRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
// (best,worst,(3,Some(12))), (good,bad,(1,Some(10))), (good,bad,(1,Some(11))), (good,bad,(2,Some(10))), (good,bad,(2,Some(11))), (plus,minus,(4,Some(13)))  
  

println(  
  s"""leftOuterJoin  
     |${pairRDD.subtractByKey(pairRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//(best,worst,3)
  • Join
    • Key값을 기준으로 Join연산
    • (key , (rdd01 value, rdd2 value)) 조합으로 모든 값 Join
  • leftOuterJoin / rightOuterJoin
    • left join 연산을 한다. join결과값이 없을 수 있음으로 Option 타입으로 반환한다.
  • subtractByKey
    • rdd01.subtractByKey(rdd2)
    • rdd01가 rdd2에 없는 key만 추출

Aggregate

reduceByKey / foldByKey

    println(  
      s"""reduceByKey  
         |${pairRDD.reduceByKey(_ * _).collect().mkString(", ")}  
         |""".stripMargin  
      )  
    // (best,worst,4), (good,bad,6), (plus,minus,5)  

    println(  
      s"""foldByKey  
         |${pairRDD.foldByKey(1)(_ * _).collect().mkString(", ")}  
         |""".stripMargin  
      )  
    // (best,worst,4), (good,bad,6), (plus,minus,5)  
  • reduceByKey
    • Key값을 기준으로 value에 주어진 연산 수행
    • 기본 값이 존재하지 않기 때문에 교환 법칙 및 결합 법칙을 만족해야한다.
      • 교환 법칙 : 상수의 위치가 변경돼도 가능한 것.
        • 1 + 2 == 2 + 1 ( OK )
        • 1 / 2 != 2 / 1 ( X )
      • 결합 법칙 : 연산의 순서가 변경돼도 가능한 것.
        • 간단히 말하면 임의로 괄호로 묶어 연산해도 동일한 결과
        • 1 + 2 + 3 == 1 + (2 + 3) ( OK )
        • 1 + 2 * 2 != ( 1 + 2 ) * 2 ( X )
    • 위 결과 도출 방법
      • best,worst : 4
      • good,bad : 2 * 3
      • plus,minus : 5
        foldByKey
    • Key값 기준으로 Value연산
    • reducebykey와 동일하지만, 차이점은 기본 값이 있는 것이다.
    • 위 결과 도출 방법
      • best,worst : 1 * 4
      • good,bad : 1 * 2 * 3
      • plus,minus : 1 * 5
종류 교환법칙 결합법칙 초기값
reduceByKey o o x
foldByKey x o o

combineByKey / aggregateByKey

case class Fruit(var price: Long, var amount: Long = 1) {  
  def add(p: Long): Fruit = {  
	add(Fruit(p))  
  }  
  def add(other: Fruit): Fruit = {  
	this.price += other.price  
	this.amount += other.amount  
	this  
  }  
}  

val fruit_rdd = sc.parallelize(Seq(("Apple", 1000L), ("banana", 2000L), ("Apple", 1500L), ("banana", 2600L), ("banana", 1300L), ("Apple", 3000L)))  
println(  
  s"""combineByKey  
	 |${  
	fruit_rdd.combineByKey(  
	  createCombiner = (p: Long) => Fruit(p)  
	  , mergeValue = (f: Fruit, v: Long) => f.add(v)  
	  , mergeCombiners = (f1: Fruit, f2: Fruit) => f1.add(f2)  
	  ).collect().mkString(", ")  
  }  
	 |""".stripMargin  
  )
println(  
  s"""combineByKey  
	 |${  
	fruit_rdd.aggregateByKey(Fruit(3000, 0))(  
	  (f: Fruit, v: Long) => f.add(v)  
	  , (f1: Fruit, f2: Fruit) => f1.add(f2)  
	  ).collect().mkString(", ")  
  }  
	 |""".stripMargin  
  )  
  • combineByKey
    • 새로운 타입으로 반환 가능하게 만듦
    • 아래 예시를 통해 Fruit 이라는 Case Class로 반환하는 예시
    • createCombiner
      • 연산을 위해 컴파이너를 생성하는 것.
      • 개인적으로 이해한 바는 타입 변경하여 초기 값을 생성하는 작업
    • mergeValue
      • Combiner가 존재하면 추후 실행할 연산, 이 연산을 통해 값이 누적되어 쌓인다
      • 동을한 key를 가진 값들을 병합
    • mergeCombiners
      • 병합이 끝난 Combiner들을 최종적으로 병합한 Combiner를 생성
      • 다른 Partition에 존재하는 결과 값을 병합
    • 조금 더 알아봐야겠다.
    • 위 예시 결과 값 도출 사유
      • 결과 값 : (banana,Fruit(5900,3)), (Apple,Fruit(5500,3))
      • banana : 2000L + 2600L + 1300L , count = 3
      • Apple : 1000L + 1500L + 3000L , count = 3
  • aggregateByKey
    • 초기값이 존재하는 combineByKey
      • 즉 createCombiner를 생성 후 수행하는 CombineByKey
    • 위 예시 결과 값 도출 사유
      • 결과 값 : (banana,Fruit(14900,3)), (Apple,Fruit(14500,3))
      • banana : (3000 + 2000L) + (3000 + 2600L) +(3000 + 1300L) , count = 3
      • Apple : (3000 + 1000L) + (3000 + 1500L) +(3000 + 3000L) , count = 3

Pipe

println(  
  s"""pipe
	 |${characterRDD.pipe("cut -d , -f 2").collect().mkString(", ")}
	 |""".stripMargin      )
// bad, bad, worst, minus  
  • 결과 값을 bshell 등 외부 프로세스의 인자 값으로 실행시킴

값추출

println(  
  s"""  
	 |keys : ${numericPairRDD.keys.collect.mkString(", ")}  
	 |value: ${numericPairRDD.values.collect.mkString(", ")}  
	 |""".stripMargin  
)  
//    keys : 776, 377, 559, 275, 818, 457, 173, 248, 135, 679  
//    value: -, -, -, -, -, -, -, -, -, -  
println(  
  s"""  
	 |중복 불가 : ${numericPairRDD.sample(withReplacement = false,fraction = 0.7).collect.mkString(", ")}  
	 |중복 허용 : ${numericPairRDD.sample(withReplacement = true,fraction = 0.7).collect.mkString(", ")}  
	 |""".stripMargin  
)  
//    중복 불가 (306,-), (376,-), (630,-), (417,-), (439,-), (738,-), (691,-), (435,-)
//    중복 허용 (210,-), (376,-), (630,-), (417,-), (417,-), (439,-), (435,-)
  • keys
    • key값 반환
  • values
    • value값을 반환
  • sample
    • 데이터 샘플링.
      • withReplacement : 데이터 중복 가능 여부
      • fraction : 전체 중 n% 추출 (0 ~ 1 사이 값)

filter

// 이름 그대로 filter조건에 부합하는 것을 걸러줌  
println(  
  s"""filter  
	 |${numericRDD.filter(_ > 2).collect.mkString(", ")}  
	 |""".stripMargin)  
// 3, 4  
// 1,2,3,4 중 3,4만 출력  
  • 값 필터링
    • SQL의 WHERE절이라고 생각하면 편하다

SortByKey

// Key값을 기준으로 정렬시켜줌  
println(  
  s"""sortByKey  
	 |${numericPairRDD.sortByKey().collect.mkString(", ")}  
	 |""".stripMargin  
)  
// (135, -), (173, -), (248, -), (275, -), (377, -), (457, -), (559, -), (679, -), (776, -), (818, -)  
  • Key값을 기준으로 정렬

Partition / Coalesce

println(  
  s"""repartition  
	 |default = ${characterRDD.getNumPartitions}  
	 |coalesce = ${characterRDD.coalesce(1).getNumPartitions}  
	 |repartition = ${characterRDD.repartition(5).getNumPartitions}  
	 |""".stripMargin  
	 // default = 3
     // coalesce = 1
     // repartition = 5
  )  
  • Partition 수를 증감시킴
    • repartition 는 Shuffle이 발생하면서 partition 재분배
      • Shuffle이 발생하지 않아 Network 통신을 하여 상대적으로 느리다
        • 데이터의 Parition 분배가 될 수 있다.
      • partition 수 증감 모두 가능
    • coalesce 는 shuffle이 발생하지 않고 partition 재분배
      • Shuffle이 발생하지 않아 Network 통신을 하지 않아 상대적으로 빠르다
        • 데이터의 Parition 분배는 일어나지 않는다
        • 간단히 생각하면 동일 Node에 있는 Partition들을 Union 한다고 생각하면 편하다
      • partition 수 감소만 가능

partitionBy / repartitionAndSortWithinPartitions

numericPairRDD.partitionBy(new HashPartitioner(3))  
			  .foreachPartition(it => {  
				println(s"-------")  
				it.foreach(v => println(v))  
			  })  
 
numericPairRDD.repartitionAndSortWithinPartitions(new HashPartitioner(3))  
			  .foreachPartition(it => {  
				println(s"==========")  
				it.foreach(v => println(v))  
			  })  
  • 공통
    • RDD를 N+1개의 Partition으로 특정 기준(Partitioner)에 따라 분배 해준다
      • Partitioner : Partition을 나누는 역할을 하는 객체
        • HASH Partitioner
          • Hash값을 이용하여 분할
        • RANGE Partitioner
          • 범위 혹은 순서로 분할
    • (key - value)로 구성되어 있어야 한다.
      • key 값을 기준으로 분리한다
    • 지정한 Partitioner를 기준으로 분배하기 때문에, Skew현상이 발생할 수 있다.
      • Skew : 데이터 쏠림 현상
      • 위 예시를 실행할 때 아래와 같이 데이터 Skew현상이 발생할 수 있다.
partition partitionBy repartitionAndSortWithinPartitions
partition 3 (89,-) (150,-)
partition 3 (780,-) (89,-)
partition 3 (622,-) (175,-)
partition 3 (548,-) (134,-)
partition 3 (150,-) (530,-)
partition 3 (646,-) (548,-)
partition 3 (134,-) (780,-)
partition 3 (175,-) (622,-)
partition 3 (966,-) (966,-)
partition 3 (530,-) (646,-)

[ Reference ]


개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

'프로그래밍 및 IT > Spark' 카테고리의 다른 글

spark-scheduling  (0) 2023.11.28
WordCount  (0) 2023.10.13
RDD Action  (0) 2023.10.13
Spark_RDD  (0) 2023.10.05
Spark  (0) 2023.10.04

RDD?

  • Resilient Distributed Dataset 의 약자로써 Spark의 최초에 도입된 데이터 구조
    1. Resilient ( 회복력 있는, 불변의)
      1. 메모리 내 데이터가 손실되도 복구 가능 하고, 생성된 데이터는 불변이다.
      2. 복구는 데이터를 저장하는 것이 아닌 새로 만들어 복구한다
    2. Distributed (분산된)
      1. Spark Cluster를 통해 메모리에 분산되어 있음
    3. Dataset
      1. 데이터
    • 데이터는 여러 서버에 분산되어 저장되고, 처리할 때 동시에 병렬로 처리가능하고 일부 서버에서 장애 발생시 복구가 가능하다.
      • 데이터 요소가 모인 집합
      • 스파크 클러스터에 나눠져 저장됨

Partition

  • RDD는 Partition 단위로 처리가 이루어 진다.
    • Partition단위로 스파크 클러스터에 분산 관리됨
    • Hadoop의 FS인 HDFS를 사용하면 1 Block = 1 Partition으로 구성된다
      • 기본값이 저렇게 되고, Spark API를 이용하여 Partition 수는 조정할 수 있다.
    • Partition 크기,개수에 따라 성능에 영향을 준다.
    • 처리중 Partition 재구성 혹은 Network를 통해 다른서버로 이동되는 Shuffle이 발생할 수 있다
      • Shuffle이 많이 발생하면 성능 저하를 야기할 수 있어 주의해야 한다.
      • Partition 수를 직접 지정할 수 있어 적절히 설정하는 것을 권장

Operation

Spark는 기본적으로 Lazy evaluation으로 처리된다.
RDD는 기본적으로 2개의 큰 연산으로 구분된다.

Lazy evaluation

  • Spark는 기본적으로 지연 구동이 된다.
    • 최척화를 해주는 장점이 있지만, 잘못하면 원치 않게 동작할 수 있어 잘 고민하고 구현해야한다.
  • 최적화 ?
    • Lazy로 동작하여 이미 동작해야 할 것을 알고 있기에 최적화된 방식으로 동작한다
    • Pasted image 20231006111119.png
      • 위 그림처럼 프로그래머가 최적화 하지 않고 수행한다면 A처럼 순서에 따라 동작할 것이다
      • 하지만 Lazy하게 동작하여 자동으로 최적화 된다면 B처럼 동작할 수 있다

Transformation

  • 기존의 RDD를 새로운 RDD로 생성하는 것.
    • 이름 그대로 RDD에 변형이 발생하는 것.
      • e.g) Filter , Join …
  • Transformation은 Action Function이 호출되기 전까지는 Lineage만 생성하고 쌓아둔다
  • Return 값은 RDD 이다
    • RDD01를 이용하여 RDD02를 생성하는 것.
  • Transformation은 2개로 구분된다
    • Pasted image 20231004161104.png
    1. narrow dependency
      • 위 그림과 같이 단순히 더하는 것 과 같이 Partition 이동이 없고, Shuffle이 발생하지 않는 상황
      • 즉 다른 Partition에 존재하는 데이터를 알 필요 없는 연산
        • e.g val rdd2 = rdd2.map(case (key,value) => (key,value +1))
    2. wide dependency
      • 위 그림과 같이 단순히 더하는 것 과 같이 Partition 이동이 있거나 , Shuffle이 발생하는 상황
      • 즉 서로 다른 Partition에 존재하는 데이터를 참조해야 하는 연
        • e.g val rdd3 = rdd2.reduceByKey(_ + _ )

Action

  • 실질적으로 연산을 하는 연산자
    • 즉 , return 값이 데이터 혹은 실행 결과인 것
    • Lazy evaluation에 따라 Action Function이 호출 될 때 연산을 시작한다.
      • 한번에 실행시키기 때문에 최적화된 방법으로 처리할 수 있다.
    • collect(), count() 등이 있다.

Lineage

  • Resilient를 만족 시킬때 RDD는 실질적인 데이터를 저장하는 것이 아니라, 작업 내용을 기억하고 있는 것이다.
  • 이러한 작업 과정 읽기 전용 모델로 만든 것을 Lineage라고 부른다.
  • Lineage는 Dag의 형태로 구성되어있다.

Dag

Directed Acyclic Graph 즉 방향성을 가진 그래프
Pasted image 20231004160214.png

  • 노드간 순환이 없고, 순서가 중요하다.
  • 만약 D에서 장애가 발생하면 B부터 다시 처리하면 되기 때문에 Resilient를 만족시킨다
    • 이러한 특성으로 Spark RDD는 Fault-tolerant를 보장한다.
  • Spark는 Stage로 나누어서 처리되고 Task단위로 연산한다
    • Task는 작업 단위, Task를 묶어 한번에 병렬 처리되는 것을 Job이라고 부른다.
    • E.g위 그림을 보면 Stage 1에는 B와 D의 Task 2개가 존재한다

[ Reference ]


개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

'프로그래밍 및 IT > Spark' 카테고리의 다른 글

spark-scheduling  (0) 2023.11.28
WordCount  (0) 2023.10.13
RDD Action  (0) 2023.10.13
RDD Transformation  (0) 2023.10.13
Spark  (0) 2023.10.04

+ Recent posts