SparkSql?

Spark에서 RDD는 값은 표현하지만, Schema에 대한 방법이 없는데 이것을 대체하는 방법이라고 보면된다.
간단히 말하면 Spark에서 Hive 혹은 Mysql과 같은 RDB의 SQL을 사용할 수 있게 만들어 주는 것이라고 보면 쉽다

  • SQL작성시 ANSI-SQL 및 Hive-QL문법을 사용가능하다.
    • Hive-QL문법을 대다수 지원하지만, 모두 지원하는 것은 아니다 예외 항목이 있으니 홈페이지를 확인해보는 것이 좋다
    • 2.0버전부터 SQL:2003표준을 지원한다

방식

기본적으로 Spark에서 데이터를 다루는 방법은 SQL를 사용하는것 혹은 DataSet API을 사용하는 방법이 있다.

  • Dataset

주요 프로그래밍 구성요소

SparkSession

  • RDD 사용시 SparkContext를 사용한것 처럼 DataSet을 사용할 때 SparkSession을 사용한다
  • UDF를 등록하기 위한 목적으로 사용
  • 생성 기본 형태
    • val ss = SparkSession.builder().config(conf).getOrCreate()
  • Spark 2.0부터 사용됨 기존에는SQLContext, HiveContext를 구분하여 사용했다
    • 2.0버전부터는 Hive를 기본 지원하기 때문에 SparkSession을 사용하면 된다
      • 지원을 한다는 것 이지 필수적으로 hive를 필수로 사용해야 하는 것은 아니다
      • 만약 Hive를 사용하고 Hive를 사용하고 싶다면, hive-site.xml, core-site.xml, hdfs-site.xml을 spark의 conf 디렉토리에 추가하면 된다
        • 저게 싫다면 spark-submit의 경우 --file 에 추가해도 된다.
      • .enableHiveSupport() 를 추가해주면 된다.

DataSet

  • 분산 오브젝트 컬렉션에 대한 프로그래밍 모델
    - Transformation, Action 연산을 포함하고 있다.
    • Spark 1.6 버전부터 DataFrame이라는 API를 사용함
    • Spark 2.0 버전부터 Dataset이 Spark SQL의 Main API로 지정
      • 이때부터 DataFrame은 미지원
        • Scala에서는 Type Alias를 허용 하여 하위 호환 유지가 됨.
          • DataSet을 따라 들어가보면 아래와 같이 구성되어 있다.
          • def toDF(): DataFrame = new Dataset[Row](sparkSession, queryExecution, RowEncoder(schema))
          • 즉 DataFrame이 있긴 하지만, Dataset으로 대체 됐다고 보면 된다.

DataFrame

  1. DataSet이 나오기 전에 사용하던 DataSet
  2. 맞는 표현은 아니지만, 간단한 비유로는 Row 와 Column구조를 가진 RDD
    1. RDD와 같이 분산 데이터 처리를 위한것.
    2. RDD : value에 초점을 맞춤
    3. DataFrame : value + schema를 다룸
  3. Row타입을 사용한다

DataFrameReader / DataFrameWriter

  1. DataFrameReader
    1. SparkSession.read를 이용하여 메소드 접근
      1. 데이터 소스를 지정하여 데이터를 읽어 데이터 프레임을 생성해준다
  2. DataFrameWriter
    1. Dataset.write를 이용하여 특정 데이터 소스로 적재한다.

Row, Column

  1. SQL의 Row,Column과 같은 표현 모델이라고 생각하면 된다.

functions

  1. 데이터 처리시 사용하는 API
    1. select, sum, filter 등 여러가지 메소드들이 존재한다
      1. 공식 docs 에서 확인할 수 있다

StructType / StructField

  1. 스키마 정보를 정의하는 API
  2. StructType (List(StructField,StructField,StructField)) 와 같은 형식으로 생성한다(Scala)

GroupedData, GroupedDataSetP

  1. groupBy 메소드 등에 이해 군집 연산을 수행할 때 사용

개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

'프로그래밍 및 IT > Spark' 카테고리의 다른 글

spark-Setting  (0) 2023.11.28
spark-scheduling  (0) 2023.11.28
WordCount  (0) 2023.10.13
RDD Action  (0) 2023.10.13
RDD Transformation  (0) 2023.10.13

Spark Setting

Spark를 실행할 때 여러가지 설정을 할 수 있다.
상세한 정보들은 너무 많기 때문에 help 명령어를 사용하여 어떤 설정을 할 수 있는지 확인해 볼 수 있다.
Spark 공식홈페이지에서 본인이 사용하는 Spark 버전에 따라 확인해봐도 된다.

간단 예시 spark-submit 예시

spark-submit \
  --class {실행할 파일의 클래스} \
  --name {어블리케이션 명} \
  --master {클러스터 메니져 정보} \
  --deploy-mode {디플로이 모드} \
  --files {추가할 파일} \
  --jars {추가할 Jars 파일들} \
  --conf {추가할 설정} \  
  --driver-cores {드라이버 코어수} \
  --driver-memory {드라이버 메모리} \
  --executor-cores {익세큐터 코어수} \
  --executor-memory {익세큐터 메모리} \
  --num-executors {익세큐터 수} \
  {실행할 파일 경로}

위와 같이 Spark Submit할 때 해도 되지만, spark-defaults.conf에 미리 설정하여 사용해도 된다.
Spark Submit혹은 spark-defaults.conf에 추가한 값들이 모여 Spark Properties에 반영된다.
설정 우선순위 : code -> spark-submit -> spark-defaults.conf

[ Reference ]


개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

'프로그래밍 및 IT > Spark' 카테고리의 다른 글

SparkSql  (0) 2023.11.28
spark-scheduling  (0) 2023.11.28
WordCount  (0) 2023.10.13
RDD Action  (0) 2023.10.13
RDD Transformation  (0) 2023.10.13

Spark-Scheduling?

개인적으로 Spark를 사용하는 이유는 cluster-mode를 사용하기 위해라고 생각한다
Yarn등을 이용하여 각 Driver,Excutor에게 core,memory등을 할당 하는데, 많이 할당한다고 항상 좋은 결과를 보는 것은 아니다
어디서 본지 기억이 나지 않지만, core는 1~5개, memory는 4gb이상을 사용하는 것이 좋다고 봤었다.

Cluster Scheduling

Static resource allocation

정적(고정) 자원 할당

  • 명칭 그대로 최초 할당한 그대로 어플리케이션이 종료될 때 까지 자원을 점유하는 방식
  • Pipe line에 따라 데이터를 처리하여 필요한 자원 사전 계산 후 할당하면 좋은 방식이다.

Dynamic resource allocation

동적 자원 할당

  • 자원을 필요에 따라 동적으로 할당하는 방식

    • Spark-Shell , 혹은 web기반으로 동작하면 항상 많은 자원을 할당받아 자원 낭비가 발생할 수 있는 상황에 합리적인 선택
  • 공통

    • spark.dynamicAllocation.enabled true 설정을 해야한다
  • Standalone

    • spark.shuffle.service.enabled true
  • Yarn

    • 모든 node manager에 spark-{version}-yarn-shuffle.jar를 class path등록해야 한다
    • yarn-site.xml에 아래와 같은 설정이 있어야 한다.(안 되어 있으면 설정 후 재시작)
    	<property>
            <name>yarn.nodemanager.aux-services</name>
            <value>mapreduce_shuffle,spark_shuffle</value>
        </property>
        <property>
            <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
            <value>org.apache.spark.network.yarn.YarnShuffleService</value>
        </property>
        <property>
            <name>spark.shuffle.service.enabled</name>
            <value>true</value>
        </property>
    
  • yarn에서 별도 Shuffle Service를 실행시킨 이유는 동작 도중 Executor가 제거될 수 있기 때문

    • Mapper에 저장된 데이터Reducer단계에 Network를 통해 데이터를 읽어 가는데 이때 Executor가 삭제되면 해당 데이터는 삭제
    • 그러한 문제가 발생하지 않기 위해 Shuffle data를 따로 관리 할 수 있는 Shuffle Process를 설정하는 것

Application Scheduling

Spark Context는 기본적으로 Multi-Thread 방식으로 실행된다.
그로인하여 N+1개의 Action Function을 동시 실행해도 문제가 발생하지 않는다.

  • Scheduling으로 FIFO방식으로 구동 된다
    • 후속 작업은 이전 작업이 끝날 때 까지 대기해야한다
    • Fair scheduler 방식도 가능하다
      • Round robin
    • 설정
      • Code 단 : conf.set("spark.scheduler.mode", "Fair")
      • spark-default.conf에서 설정해도 된다.

fairscheduler.xml에 poll 설정을 하여 원하는 Pool이용하는 코드를 작성할 수 있다

<allocations>
  <pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
  </pool>
</allocations>
  • fairscheduler.xml에 기본적으로 작성되어있는 것을 가져왔다.
    • weight : 우선순위
    • minShare : Pool의 최초 CPU Core
val conf = new SparkConf()  
  .setAppName("application_name")  
  .setMaster("local[*]")  
  .set("spark.local.ip", "localhost")  
  .set("spark.driver.host", "localhost")  
  // .set("spark.scheduler.mode", "Fair")  //이와 같이 특정 scheduler 사용가능
  .set("spark.scheduler.allocation.file" ,"file path") // 
val sc = new SparkContext(conf)
sc.setLocalProperty("spark.scheduler.pool", "production") // pool설정  
sc.setLocalProperty("spark.scheduler.pool", null) // 기본 pool설정
  • 위와 같이 사용하면 scheduler mode : FAIR , 최소 CPU core : 2
  • 이와 같이 N+1명의 사용자가 접근하면 사용자에 따른 자원 할당을 할 수 있다

[ Reference ]


개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

'프로그래밍 및 IT > Spark' 카테고리의 다른 글

SparkSql  (0) 2023.11.28
spark-Setting  (0) 2023.11.28
WordCount  (0) 2023.10.13
RDD Action  (0) 2023.10.13
RDD Transformation  (0) 2023.10.13

HadoopSpark를 공부하기 시작하면 가장 처음 나오는 것이 Map /Reduce이다
- Map /Reduce 설명은 Hadoop링크로 들어가 보면 설명을 해놓았다.
이것을 가장 이해하기 편하게 예시를 드는 것이 WordCount이다.

간단 예시

구현

아래와 같이 구현을 하고 , args 를 local[*] 텍스트파일 경로 적재 위치 순으로 넣어서 실행해보면 된다.

import org.apache.spark.rdd.RDD  
import org.apache.spark.{SparkConf, SparkContext}  
  
object wordCount {  
  def main(args: Array[String]): Unit = {  
    // Args 3개 필수  
    require(args.length == 3, "Usage : <Master> <Input> <Output>")  
  
    // Spark 설정  
    val conf = new SparkConf()  
      .setAppName("WorkCount")  
      .setMaster(args(0))  
      .set("spark.local.ip", "localhost")  
      .set("spark.driver.host", "localhost")  
  
    // SparkContext생성  
    val sc   = new SparkContext(conf)  
    val input  = readFile(sc, args(1))  
    val result = process(input)  
    saveFile(result, args(2))  
  
    sc.stop()  
  }  
  
  def readFile(sc: SparkContext, path: String): RDD[String] = {  
    sc.textFile(path) // 파일을 읽고  
  }  
  
  def process(input: RDD[String]): RDD[(String, Int)] = {  
    input.flatMap(str => str.split(" ")) //공백으로 분류  
         .map((_, 1)) // 각각 key : 1 로 만듦  
         .reduceByKey(_ + _) // value들을 key를 기준으로 합산  
  }  
  
  def saveFile(output: RDD[(String, Int)], path: String): Unit = {  
    output.saveAsTextFile(path) // 적재  
  }  
}
  • 위 코드를 실행시키면 디렉토리가 생성되고 안에는 여러 파일들이 존재한다
    • 그 중 part-00000를 출력해보면 WordCount가 잘 동작했는지 확인 가능하다
    • tail,head,cat 원하는 것으로 확인해 보면 된다.
C:\Apps\hadoop-3.1.2\output>cat part-00000
(Unless,4)
(works,,4)
...

테스트 코드

  • JUnit을 이용하면 아래와같이
import org.apache.spark.{SparkConf, SparkContext}  
import org.junit.jupiter.api.Test  
  
import scala.collection.mutable.ListBuffer  
  
class wordCountTest {  
  
  @Test  
  def test(): Unit = {  
    val conf = new SparkConf()  
      .setMaster("local[*]")  
      .setAppName("wordCountTest")  
      .set("spark.local.ip", "localhost")  
      .set("spark.driver.host", "localhost")  
    val sc   = new SparkContext(conf)  
    val input = new ListBuffer[String]  
    input += "License shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document."  
    input += "Licensor shall mean the copyright owner or entity authorized by the copyright owner that is granting the License"  
    input.toList  
  
    val inputRDD = sc.parallelize(input)  
    val result = wordCount.process(inputRDD)  
    val resultMap = result.collectAsMap()  
  
    assert(resultMap("shall") == 2)  
    assert(resultMap("copyright") == 2)  
    assert(resultMap("License") == 2)  
    println(resultMap)
  
    sc.stop()  
  }  
}
  • FlatSpec을 이용하면 아래와 같이 구현하면 된다.
import org.apache.spark.{SparkConf, SparkContext}  
import org.scalatest.flatspec.FlatSpec  
import org.scalatest.matchers.should.Matchers  
  
  
class workCountTest_Spec extends FlatSpec with Matchers {  
  val conf = new SparkConf()  
    .setMaster("local[*]")  
    .setAppName("wordCountTest")  
    .set("spark.local.ip", "localhost")  
    .set("spark.driver.host", "localhost")  
  val sc   = new SparkContext(conf)  
  
  "WorkCount Process" should "Correctly count words in the input" in {  
    val inputRDD = sc.parallelize(Seq(  
      "License shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document."  
      , "Licensor shall mean the copyright owner or entity authorized by the copyright owner that is granting the License"  
      ))  
    val result = wordCount.process(inputRDD)  
  
    val resultMap = result.collectAsMap()  
    println(resultMap)  
    resultMap("shall") shouldEqual 2  
    resultMap("copyright") shouldEqual 2  
    resultMap("License") shouldEqual 2  
//    resultMap("License") shouldEqual 1
  }  
  
}

성공 시 성공했다고만 하고
실패시 아래와 같이 에러 발생한다
!Pasted image 20231005140735.png
Pasted image 20231005140926.png

  • Intellij에서 구동하였고, build tool을 Gradle로 해서 그런지 :test ....를 못 찾는다고 에러가 발생했다.
    • 간단 해결방법 아래와 같이 setting을 설정해 주면 된다
      • Run Tests using 을 Gradle -> Intellij IDEA로 변경
    • 추가적으로 intellij IDEA로 하면 조금 더 빠르다는 이야기가 있다.
      Pasted image 20231005141130.png

[ Reference ]


개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

'프로그래밍 및 IT > Spark' 카테고리의 다른 글

spark-Setting  (0) 2023.11.28
spark-scheduling  (0) 2023.11.28
RDD Action  (0) 2023.10.13
RDD Transformation  (0) 2023.10.13
Spark_RDD  (0) 2023.10.05

RDD Action?

  • 간단히 설명하자면, 결과 값, 즉 return이 RDD가 아닌 다른 것으로 반환되는 연산이다.
  • RDD Transformation을 수행하게 만드는 도화선이라고 봐도 된다.
    • 즉 N개의 Transformation Method들이 쌓여도 수행하지 않다가 Action Method가 호출되면 그 때 순차적으로 수행한다

Output Function

기본적으로 Collection Type으로 반환하는 경우가 많기 때문에 너무 많은 개수를 반환하면, OOM 이 발생할 수 있다.

데이터 추출

// collect
characterRDD.collect.mkString("Array(", ", ", ")")
//Array(good,bad, good,bad, best,worst, plus,minus)

// first
numericRDD.first()
// 1

// take
numericRDD.take(2).mkString("Array(", ", ", ")")
// Array(1, 2)

// takeSample
numericRDD.takeSample(withReplacement = false, 3).mkString("Array(", ", ", ")")
numericRDD.takeSample(withReplacement = true, 3).mkString("Array(", ", ", ")")
// Array(1, 2, 3)
// Array(4, 2, 1)
  • collect
    • RDD를 Array[T]형태로 반환
    • 굳이 Mysql로 따지면 select라고 봐도 될듯하다.
  • first
    • 최초 1개의 값을 반환
    • mysql에서 limit 1이라고 생각해도 무방
  • take
    • 최초 값 부터 N개를 반환
    • mysql에서 limit n이라고 생각해도 무방
  • takeSample
    • sample과 동일하지만, 다른것은 RDD가 아닌 Collection Type으로 반환

Count

characterRDD.count  
// 4
characterRDD.countByValue
// Map(best,worst -> 1, good,bad -> 2, plus,minus -> 1)
  • count
    • 모든 요소 개수 반환
  • countByValue
    • 요소 별 개수

Reduce / Fold / aggregate

numericRDD.reduce(_ * _)
//reduce : 24
numericRDD.fold(1)(_ * _)
//fold : 24
numericRDD.fold(0)(_ * _)
//fold : 0
numericRDD.aggregate(zeroValue = Fruit(0, 0))(seqOp = (f: Fruit, v: Int) => f.add(v), combOp = (f1: Fruit, f2: Fruit) => f1.add(f2))
numericRDD.aggregate(zeroValue = Fruit(0, 0))(seqOp = _.add(_), combOp = _.add(_))
numericRDD.aggregate(zeroValue = Fruit(0, 0))(seqOp = _ add _, combOp = _ add _)
numericRDD.aggregate(Fruit(0, 0))(_ add _, _ add _)
// Fruit(10,4)
  • RDD의 요소에 관해 연산 후 결과값을 반환
    • reduce , fold 모두 병렬처리 하기 때문에 결합법칙을 만족해야한다
  • 결과 도출 사유
    • reduce
      • 초기 값 없이 보유한 요소 만으로 연산
      • 1 * 2 * 3 * 4 = 24
    • fold
      • 초기 값이 존재함
      • 1 * 1 * 2 * 3 * 4 = 24
      • 0 * 1 * 2 * 3 * 4 = 0
    • 위와 같은 이유로 코드 구현 시 잘 고민하고 구현해야 한다
  • aggregate
    • Fruit은 RDD Transformation 의 combineByKey / aggregateByKey 구현해 놓았다.
      • amount : Long => Int로 변경함
    • 인자별 의미
      • aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
      • zeroValue
        • 초기 값
      • seqOp
        • (U, T) => U
          • T : RDD의 요소들이 갖는 타입
        • 각 Partition에서 병합하는 함수
      • combOp
        • (U, U) => U
          • 인자 타입과 반환타입이 동일함
        • 모든 Partition들을 최종적으로 병합하는 함수
    • aggregate 은 scala에서 위 코드 순서대로 요약하여 사용할 수 있다.

Foreach / foreachPartition

numericRDD.foreach {  
  v => println(s"foreach values = $v")  
}  
  
numericRDD.foreachPartition {  
  println("foreachPartition partition")  
  values => for (v <- values) println(s"foreachPartition $v")
  • 모든 RDD요소에 특정 함수를 적용시키는 것.
    • 모든 요소를 순회
  • Map과 다른점은 Map은 반환값이 존재하지만, foreach는 반환값이 존재하지 않고 행위만 존재한다
  • foreach - foreachPartition 차이점
    • foreach는 요소단위로 수행
    • foreachPartition은 Partition단위로 수행
  • Driver에서 수행하는 것이 아닌 개별 노드에서 실행된다
    • 분산 처리 하기 때문에 의도한 것과 구현한 것이 동일한지 잘 고민 해봐야 한다

cache / persist / unpersist

numericRDD.persist()  
numericRDD.unpersist()

numericRDD.cache()  
numericRDD.persist(StorageLevel.MEMORY_ONLY)  

Spark RDD는 수행할 때 마다 연산을 통해 새로은 RDD를 사용한다
이 때 RDD를 재사용한다면, 해당 연산을 통새 생성된 RDD를 저장하는 것을 위 메소드로 구현한다

  • persist / unpersist
    • 메모리 / 디스크에 저장하는 메소드
    • persist 로 저장하고 unpersist 로 제거한다
    • 옵션으로 아래와 같은것들을 통해 메모리, 디스크, 메모리 + 디스크 에 적재 할지 고를 수 있다.
      • NONE , DISK_ONLY , DISK_ONLY_2 , MEMORY_ONLY , MEMORY_ONLY_2 , MEMORY_ONLY_SER , MEMORY_ONLY_SER_2 , MEMORY_AND_DISK , MEMORY_AND_DISK_2 , MEMORY_AND_DISK_SER , MEMORY_AND_DISK_SER_2 , OFF_HEAP
  • cache
    • 메모리에 RDD를 적재하는 메소드
    • persist(StorageLevel.MEMORY_ONLY) 와 동일하다
      • API따라 들어가다 보면 위와 같음

File IO

characterRDD.saveAsTextFile("file://<File_Path>/test01")  
characterRDD.saveAsTextFile("file://<File_Path>/test02", classOf[org.apache.hadoop.io.compress.BZip2Codec])  
  
characterRDD2.saveAsObjectFile("file://<File_Path>/obj01")  
val rdd = sc.objectFile[String]("file://<File_Path>/obj01")  
rdd.collect().foreach (x => println(x))  
  
pairRDD.saveAsSequenceFile("file://<File_Path>/seq01")  
val rdd2 = sc.sequenceFile[String, Long]("file://<File_Path>/seq01")  
println(rdd2.collect().mkString(","))
  • saveAsTextFile
    • 기본적으로 TestFile 형태로 적재하는 방식
    • 위에 작성한 것과 같이 saveAsTextFile(경로 , 코덱)
      • 코덱을 통해 원하는 압축 형식을 지정할 수 있다.
      • bzip2,lz4,snappy,zlib
  • saveAsObjectFile
    • Object형태로 적재하는 방식
    • Java/ Scala에서는 자바 표준 직렬화를 사용한다.
    • SequenceFile와 비교하여 상대적으로 느리다
  • saveAsSequenceFile
    • objectFile 과 동일하게 Binary 파일 포멧으로 저장
      • 차이점
        • Hadoop자체에서 정의한 직렬화 프레임워크 사용
        • Hadoop Writeable 인터페이스를 구현하고 있어야 한다.
        • Scala의 경우 saveAsObjectFile와 차이 없이 간단하게 사용 가능

공유 변수

Broadcast Variables

val broadcastChar = sc.broadcast(Set("good,bad", "plus,minus"))
  • Spark Job이 실행되는 동안 모든 노드에 공유하는 읽기 전용 자원
    • 모든 노드에 공유 즉 복제 되어 사용한다
      • Network 비용이 절감하여 성능 향상을 기대할 수 있다.
    • 해당 변수는 Heap영역에 저장된다
      • 사용하는 동안 메모리를 지속적으로 사용한다.
      • unpersist를 이용하여 제거할 수 있다.

Accumulators

val error_cnt = sc.longAccumulator("invalidFormat")  
val error_data = sc.collectionAccumulator[String]("invalidFormat2")  
val data = List("Log01:haha", "Log02:hoho", "Log03::", "Log04:aaa", "Log05||Addr5", "Log06::Log7")  
sc.parallelize(data, 3).foreach {  
  v =>  
    if (v.split(":").length != 2) {  
      error_cnt.add(1L)  
      error_data.add(v)  
    }  
}  
println("잘못된 데이터 수:" + error_cnt.value)  
println("잘못된 데이터:" + error_data.value)
// 잘못된 데이터 수:3
// 잘못된 데이터:[Log05||Addr5, Log06::Log7, Log03::]
  • 실제로 사용 해본 적이 없어 완벽하게 이해가 되지 않아 조금 더 이해해야 할 것 같다
  • Broadcast 와 반대로 쓰기 동작을 위한 작업
  • 여러 노드에서 수행되는 동작을 한곳에 적재하는 방법
    • 생성,초기화는 드라이버 프로그램에서 동작한다
    • 클러스터의 여러 Task에서 병렬로 쓰여질 수 있다.
    • Accumulator의 값을 증가시킬 수 있지만, 값을 참조해서 사용하는 것은 불가능하다.
  • 특별한 목적이 없으면, Action Function을 수행할 때 사용해야한다
    • 불필요한 반복이 발생할 가능성이 높기 때문
    • eg. map, flatmap과 같은 것을 동작하면 불필요하게 값을 증가시킬 가능성이 높다
  • 기본적으로 제공하는 Accumulators이 존재한다
    • long,collection,double, 등이 존재한다
  • 위 코드와 같이 만들면, 각 노드에서 병렬로 실행 시 문제가 발생한 Log를 확인할 수 있다.
  • 사용자 지정으로 생성 할 수 있다.
    • 아래와 같이 case class를 생성하여 사용할 수 있다.
    • Python에서는 register를 하지 않고 바로 사용할 수 있다
    • 주요 메소드
      1. isZero
        1. 초기 상태 여부 판단
        2. 아래와 같이 조건문 등을 이용하여 판단할 수 있다.
      2. reset
        1. 이름 그대로 초기 상태로 되돌리는 메소드
      3. add
        1. 값을 추가할 때 사용하는 메소드. 사용자 정의시 아래와 같이 case class의 메소드를 이용하여 동작할 수 있다.
        2. 값 추가는 Accumulator 내부에서 동작한다.
      4. copy
        1. 해당 Accumulator를 복제하는 메소드
        2. 미리 데이터를 복제하여 안전하게 데이터를 읽을 때 혹은 병렬로 수정할 때 사용
      5. merge
        1. n+1개의 Accumulator를 병합하는 메소드
          1. n+1개의 테스크에서 생성된 Accumulato를 하나의 Accumulator로 병합할 때 사용
      6. value
        1. Accumulator의 값을 반환하는 메소드
package main.RDD_action  
  
import io.netty.handler.logging.LogLevel  
import org.apache.spark.util.AccumulatorV2  
import org.apache.spark.{SparkConf, SparkContext}  
  
case class Fruit(var price: Long, var amount: Long = 1) {  
  def add(p: Long): Fruit = add(Fruit(p))  
  def add(other: Fruit): Fruit = {  
    this.price += other.price  
    this.amount += other.amount  
    this  
  }  
}  
  
class fruitAccumulator extends AccumulatorV2[Fruit, (Long,Long)] {  
  private var acc = Fruit(0)  
  override def isZero: Boolean = acc.price == 0L && acc.amount == 1L  
  override def copy(): AccumulatorV2[Fruit, (Long,Long)] = {  
    val newAcc = new fruitAccumulator  
    newAcc.acc = Fruit(acc.price, acc.amount)  
    newAcc  
  }  
  override def reset(): Unit = {  
    acc.price = 0L  
    acc.amount = 1L  
  }  
  override def add(other: Fruit): Unit = acc.add(other)  
  override def merge(other: AccumulatorV2[Fruit, (Long,Long)]): Unit = other match{  
    case o: fruitAccumulator => acc.add(o.acc)  
    case _                   => throw new RuntimeException()  
  }  
  override def value: (Long,Long) = {  
    (acc.price, acc.amount)  
  }  
}  
  
object fruitAccumulator {  
  def main(args: Array[String]): Unit = {  
    // Spark 설정  
    val conf = new SparkConf()  
      .setAppName("fruitAccumulator")  
      .setMaster("local[*]")  
      .set("spark.local.ip", "localhost")  
      .set("spark.driver.host", "localhost")  
  
    // SparkContext생성  
    val sc = new SparkContext(conf)  
    sc.setLogLevel(LogLevel.WARN.name())  
  
    val acc = new fruitAccumulator  
    sc.register(acc , "fruitAccumulator")  
    val data =List("Fruit:2000", "Fruit02:3000", "Fruit03::", "Fruit04:2000", "Fruit05||4000", "Fruit06::Log7")  
    sc.parallelize(data,3).foreach {  
      v =>  
//        println(s"v : $v ")  
        if(v.split(":").length != 2){  
          println(s"added v : $v ")  
          acc.add(Fruit(2))  
        }  
    }  
    println(acc.value)  
  }  
}

[ Reference ]


개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

'프로그래밍 및 IT > Spark' 카테고리의 다른 글

spark-scheduling  (0) 2023.11.28
WordCount  (0) 2023.10.13
RDD Transformation  (0) 2023.10.13
Spark_RDD  (0) 2023.10.05
Spark  (0) 2023.10.04

RDD Transformation?

  • 간단히 설명하자면, 결과 값, 즉 return이 RDD로 반환되는 연산이다.
  • RDD들은 지속적으로 기록만 하다가, RDD Action을 수행할 때 연쇄적으로 수행된다.
    • lazy evaluation이다.

사용 변수들

val numericRDD     = sc.parallelize(1 to 4, 3)  
val characterRDD   = sc.parallelize(Seq("good,bad", "good,bad", "best,worst", "plus,minus"), 3)  
val characterRDD2  = sc.parallelize(Seq("good,bad", "good,bad", "plus,minus"), 3)  
val pairRDD        = characterRDD.zipWithIndex.map {case (key, value) => (key, value + 2) }  
val pairRDD2       = characterRDD.zipWithIndex.map {case (key, value) => (key, value + 10) }  
val pairRDD3       = characterRDD.zipWithIndex.map {case (key, value) => (value + 1, key) }  
val numericPairRDD = sc.parallelize(for (i <- 1 to 10) yield (scala.util.Random.nextInt(1000), "-"))
  • 기본적으로 위와 같이 RDD생성을 함

Map

Map

/*map*/
println(characterRDD.map(_.split(",")).collect().map(_.mkString("{", ",", "}")).mkString("(", ",", ")"))  
//  ({good,bad},{best,worst},{plus,minus})

/*mapValues*/
println(pairRDD.mapValues(num => num + 1).collect().mkString(", "))  
// (good,bad,3), (good,bad,4), (best,worst,5), (plus,minus,6)
  • map 실행 순서
    1. characterRDD.map(_.split(","))
      1. “good,bad”, “good,bad”, “best,worst”, “plus,minus” 를 “,” 로 분할
      2. seq((“good”,“bad”), (“good”,“bad”), (“best”,“worst”), (“plus”,“minus”))
    2. map(_.mkString("{", ",", "}")) 를 이용하여 문자열생성
      1. seq({“good”,“bad”}, {“good”,“bad”}, {“best”,“worst”}, {“plus”,“minus”})
    3. .mkString("(", ",", ")"))를 이용하여 최종 출력 모양 생성
      1. ({“good”,“bad”}, {“good”,“bad”}, {“best”,“worst”}, {“plus”,“minus”})
  • mapValues
    • Map과 동일하게 처리하는데, Value의 값을 조작하고 싶을 때 사용.
    • 위 코드는 각 value에 1을 더하는 연산이다

Flatmap

Monad 개념인데, 이것은 추후 공부할 예정

/*flatMap*/
println(characterRDD.flatMap(_.split(",")).collect().mkString("(", ",", ")"))  
//  (good,bad,best,worst,plus,minus)  

/*flatMapValues*/
val pairRDD3 = characterRDD.zipWithIndex.map {case (key, value) => (value + 1, key) }  
println(pairRDD2.flatMapValues(_.split(",")).collect().mkString(", "))  
// (1,good), (1,bad), (2,good), (2,bad), (3,best), (3,worst), (4,plus), (4,minus)
  • flatmap 실행 순서
    1. characterRDD.flatMap(_.split(","))
      1. 모든 값들을 풀어서 ","로 분할
      2. seq(“good”,“bad”,“good”,“bad”,“best”,“worst”,“plus”,“minus”)
    2. .mkString("(", ",", ")"))를 이용하여 최종 출력 모양 생성
      1. (“good”,“bad”,“good”,“bad”,“best”,“worst”,“plus”,“minus”)
  • flatMapValues
    • flatMap과 동일하게 처리하는데, Value의 값을 조작하고 싶을 때 사용.
    • key를 기준으로 value를 나누는 연산
      • Seq(1,“good,bad”), (2,“good,bad”), (3,“best,worst”), (4,“plus,minus”) 를
      • (1,good), (1,bad), (2,good), (2,bad), (3,best), (3,worst), (4,plus), (4,minus)로 변환

mapPartitions

println(  
  numericRDD.mapPartitions(nums => {  
	println("MapPartition loop")  
	nums.map(num => num * 2)  
  }).collect().mkString(", ")  
  )  
// MapPartition loop
// MapPartition loop
// MapPartition loop
// mapPartitions
// 2, 4, 6, 8

// 짝수만 출력
println(  
  numericRDD.mapPartitionsWithIndex((idx, nums) => {  
	println("mapPartitionsWithIndex loop")  
	nums.flatMap {  
	  case num if idx % 2 == 0 => Option(num * 2)  
	  case _                   => None  
	}  
  }).collect().mkString(", ")  
  )  
// mapPartitionsWithIndex loop
// mapPartitionsWithIndex loop
// mapPartitionsWithIndex loop
// 2, 6, 8
  • Partition를 기준으로 Map을 수행함
    • 3개의 Partition이 존재하면 3번 순회한다.
  • mapPartitions vs mapPartitionsWithIndex
    • 간단히 말하면 둘다 Partition 기준으로 순회한다
    • 차이점 : mapPartitionsWithIndex은 각 Row에 Index를 붙여서 반환한다.

Group

Zip

println(characterRDD.zip(numericRDD).collect().mkString("(", ",", ")"))  
//   ((good,bad,1),(good,bad,2),(best,worst,3),(plus,minus,4))  
  
println(characterRDD.zipPartitions(numericRDD, numericRDD) {  
  (key, num, num2) => for {  
    key_result <- key  
    num_result <- num  
    num_result2 <- num2.map(_ + 1)  
  } yield (key_result, num_result, num_result2)  
}.collect().mkString("(", ",", ")"))  
//  ((good,bad,1,2),(good,bad,2,3),(best,worst,3,4),(best,worst,3,5))  
  • zip
    • 2개의 RDD를 Key : value 로 묶어줌
    • 1 : 1 로 같은 인덱스 위치에 있는 것끼리 묶이기 때문에 길이가 및 파티션 수가 동일해야 함
  • zipPartitions
    • Partition을 요소로 사용하여 새로운 파티션을 생성
    • spark 2.4에서는 인자로 RDD 4개까지 가능
    • Partition 수가 동일해야함
numericRDD.groupBy {  
  case num: Int if num % 2 == 0 => "even"  
  case _                        => "odd"  
}.collect() foreach {  
  row => println(s"${row._1}, [${row._2.mkString(",")}]")  
}  
//      even, [2,4]  
//      odd, [1,3]  
  
pairRDD.groupByKey().collect() foreach {  
  row => println(s"${row._1}, [${row._2.mkString(",")}]")  
}  
//    best,worst, [3]  
//    good,bad, [1,2]  
//    plus,minus, [4]  
  
val rdd1   = sc.parallelize(Seq(("apple", 100), ("banana", 500), ("apple", 200)))  
val rdd2   = sc.parallelize(Seq(("banana", 400), ("apple", 300)))  
val result = rdd1.cogroup(rdd2)  
result.collect.foreach {  
  case (k, (rdd1_val, rdd2_val)) =>  
    println(s"($k, [${rdd1_val.mkString(",")}], [${rdd2_val.mkString(",")}])")  
}  
//    (apple, [100,200], [300])  
//    (banana, [500], [400])
  • groupBy
    • 조건에 의한 Grouping
  • groupByKey
    • Key값을 기준으로 grouping
  • cogroup
    • Key를 기준으로 각 RDD에 값을 묶음
    • key , rdd1의 values, rdd2의 values

Set

Distinct

println(  
  s"""distinct  
     |${characterRDD.distinct.collect.mkString(", ")}  
     |""".stripMargin  
  )  
//    best,worst, good,bad, plus,minus  
  • 이름 그대로 중복을 제거하는 역할

cartesian

println(  
  s"""cartesian  
     |${characterRDD.cartesian(numericRDD).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//    (good,bad,1), (good,bad,2), (good,bad,3), (good,bad,4),(good,bad,1), (good,bad,2), (good,bad,3), (good,bad,4),(best,worst,1), (plus,minus,1), (best,worst,2), (plus,minus,2), (best,worst,3), (best,worst,4), (plus,minus,3), (plus,minus,4)  
  • 카르테시안 곱
  • 모든 경우의 수를 조합하는 것이라고 보면된다.
  • Sql에서 Join 조건을 주지 않은 경우, Cross Join

subtract

println(  
  s"""subtract  
     |${characterRDD.subtract(characterRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//    best,worst  
  • rdd1에 존재하지만, rdd2에 존재하지 않은 값들을 RDD로 생성하여 반환
  • 예시
    • characterRDD : "good,bad", "good,bad", "best,worst", "plus,minus"
    • characterRDD2 : "good,bad", "good,bad", "plus,minus"
    • characterRDD에 “best,worst” 가 존재하지만 characterRDD2 에는 존재하지 않음
      • best,worst 반환

Union

println(  
  s"""union  
     |${characterRDD.union(characterRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//    good,bad, good,bad, best,worst, plus,minus, good,bad, good,bad, plus,minus  
  • 2개의 RDD를 병합하는 것
  • 자료형이 동일해야함

intersection

// 교집합을 구하는것  
println(  
  s"""intersection  
     |${characterRDD.intersection(characterRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//    good,bad, plus,minus 

Join

println(  
  s"""join  
     |${pairRDD.join(pairRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//(best,worst,(3,12)), (good,bad,(1,10)), (good,bad,(1,11)), (good,bad,(2,10)), (good,bad,(2,11)), (plus,minus,(4,13))  
  

println(  
  s"""leftOuterJoin  
     |${pairRDD.leftOuterJoin(pairRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
// (best,worst,(3,Some(12))), (good,bad,(1,Some(10))), (good,bad,(1,Some(11))), (good,bad,(2,Some(10))), (good,bad,(2,Some(11))), (plus,minus,(4,Some(13)))  
  

println(  
  s"""leftOuterJoin  
     |${pairRDD.subtractByKey(pairRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//(best,worst,3)
  • Join
    • Key값을 기준으로 Join연산
    • (key , (rdd01 value, rdd2 value)) 조합으로 모든 값 Join
  • leftOuterJoin / rightOuterJoin
    • left join 연산을 한다. join결과값이 없을 수 있음으로 Option 타입으로 반환한다.
  • subtractByKey
    • rdd01.subtractByKey(rdd2)
    • rdd01가 rdd2에 없는 key만 추출

Aggregate

reduceByKey / foldByKey

    println(  
      s"""reduceByKey  
         |${pairRDD.reduceByKey(_ * _).collect().mkString(", ")}  
         |""".stripMargin  
      )  
    // (best,worst,4), (good,bad,6), (plus,minus,5)  

    println(  
      s"""foldByKey  
         |${pairRDD.foldByKey(1)(_ * _).collect().mkString(", ")}  
         |""".stripMargin  
      )  
    // (best,worst,4), (good,bad,6), (plus,minus,5)  
  • reduceByKey
    • Key값을 기준으로 value에 주어진 연산 수행
    • 기본 값이 존재하지 않기 때문에 교환 법칙 및 결합 법칙을 만족해야한다.
      • 교환 법칙 : 상수의 위치가 변경돼도 가능한 것.
        • 1 + 2 == 2 + 1 ( OK )
        • 1 / 2 != 2 / 1 ( X )
      • 결합 법칙 : 연산의 순서가 변경돼도 가능한 것.
        • 간단히 말하면 임의로 괄호로 묶어 연산해도 동일한 결과
        • 1 + 2 + 3 == 1 + (2 + 3) ( OK )
        • 1 + 2 * 2 != ( 1 + 2 ) * 2 ( X )
    • 위 결과 도출 방법
      • best,worst : 4
      • good,bad : 2 * 3
      • plus,minus : 5
        foldByKey
    • Key값 기준으로 Value연산
    • reducebykey와 동일하지만, 차이점은 기본 값이 있는 것이다.
    • 위 결과 도출 방법
      • best,worst : 1 * 4
      • good,bad : 1 * 2 * 3
      • plus,minus : 1 * 5
종류 교환법칙 결합법칙 초기값
reduceByKey o o x
foldByKey x o o

combineByKey / aggregateByKey

case class Fruit(var price: Long, var amount: Long = 1) {  
  def add(p: Long): Fruit = {  
	add(Fruit(p))  
  }  
  def add(other: Fruit): Fruit = {  
	this.price += other.price  
	this.amount += other.amount  
	this  
  }  
}  

val fruit_rdd = sc.parallelize(Seq(("Apple", 1000L), ("banana", 2000L), ("Apple", 1500L), ("banana", 2600L), ("banana", 1300L), ("Apple", 3000L)))  
println(  
  s"""combineByKey  
	 |${  
	fruit_rdd.combineByKey(  
	  createCombiner = (p: Long) => Fruit(p)  
	  , mergeValue = (f: Fruit, v: Long) => f.add(v)  
	  , mergeCombiners = (f1: Fruit, f2: Fruit) => f1.add(f2)  
	  ).collect().mkString(", ")  
  }  
	 |""".stripMargin  
  )
println(  
  s"""combineByKey  
	 |${  
	fruit_rdd.aggregateByKey(Fruit(3000, 0))(  
	  (f: Fruit, v: Long) => f.add(v)  
	  , (f1: Fruit, f2: Fruit) => f1.add(f2)  
	  ).collect().mkString(", ")  
  }  
	 |""".stripMargin  
  )  
  • combineByKey
    • 새로운 타입으로 반환 가능하게 만듦
    • 아래 예시를 통해 Fruit 이라는 Case Class로 반환하는 예시
    • createCombiner
      • 연산을 위해 컴파이너를 생성하는 것.
      • 개인적으로 이해한 바는 타입 변경하여 초기 값을 생성하는 작업
    • mergeValue
      • Combiner가 존재하면 추후 실행할 연산, 이 연산을 통해 값이 누적되어 쌓인다
      • 동을한 key를 가진 값들을 병합
    • mergeCombiners
      • 병합이 끝난 Combiner들을 최종적으로 병합한 Combiner를 생성
      • 다른 Partition에 존재하는 결과 값을 병합
    • 조금 더 알아봐야겠다.
    • 위 예시 결과 값 도출 사유
      • 결과 값 : (banana,Fruit(5900,3)), (Apple,Fruit(5500,3))
      • banana : 2000L + 2600L + 1300L , count = 3
      • Apple : 1000L + 1500L + 3000L , count = 3
  • aggregateByKey
    • 초기값이 존재하는 combineByKey
      • 즉 createCombiner를 생성 후 수행하는 CombineByKey
    • 위 예시 결과 값 도출 사유
      • 결과 값 : (banana,Fruit(14900,3)), (Apple,Fruit(14500,3))
      • banana : (3000 + 2000L) + (3000 + 2600L) +(3000 + 1300L) , count = 3
      • Apple : (3000 + 1000L) + (3000 + 1500L) +(3000 + 3000L) , count = 3

Pipe

println(  
  s"""pipe
	 |${characterRDD.pipe("cut -d , -f 2").collect().mkString(", ")}
	 |""".stripMargin      )
// bad, bad, worst, minus  
  • 결과 값을 bshell 등 외부 프로세스의 인자 값으로 실행시킴

값추출

println(  
  s"""  
	 |keys : ${numericPairRDD.keys.collect.mkString(", ")}  
	 |value: ${numericPairRDD.values.collect.mkString(", ")}  
	 |""".stripMargin  
)  
//    keys : 776, 377, 559, 275, 818, 457, 173, 248, 135, 679  
//    value: -, -, -, -, -, -, -, -, -, -  
println(  
  s"""  
	 |중복 불가 : ${numericPairRDD.sample(withReplacement = false,fraction = 0.7).collect.mkString(", ")}  
	 |중복 허용 : ${numericPairRDD.sample(withReplacement = true,fraction = 0.7).collect.mkString(", ")}  
	 |""".stripMargin  
)  
//    중복 불가 (306,-), (376,-), (630,-), (417,-), (439,-), (738,-), (691,-), (435,-)
//    중복 허용 (210,-), (376,-), (630,-), (417,-), (417,-), (439,-), (435,-)
  • keys
    • key값 반환
  • values
    • value값을 반환
  • sample
    • 데이터 샘플링.
      • withReplacement : 데이터 중복 가능 여부
      • fraction : 전체 중 n% 추출 (0 ~ 1 사이 값)

filter

// 이름 그대로 filter조건에 부합하는 것을 걸러줌  
println(  
  s"""filter  
	 |${numericRDD.filter(_ > 2).collect.mkString(", ")}  
	 |""".stripMargin)  
// 3, 4  
// 1,2,3,4 중 3,4만 출력  
  • 값 필터링
    • SQL의 WHERE절이라고 생각하면 편하다

SortByKey

// Key값을 기준으로 정렬시켜줌  
println(  
  s"""sortByKey  
	 |${numericPairRDD.sortByKey().collect.mkString(", ")}  
	 |""".stripMargin  
)  
// (135, -), (173, -), (248, -), (275, -), (377, -), (457, -), (559, -), (679, -), (776, -), (818, -)  
  • Key값을 기준으로 정렬

Partition / Coalesce

println(  
  s"""repartition  
	 |default = ${characterRDD.getNumPartitions}  
	 |coalesce = ${characterRDD.coalesce(1).getNumPartitions}  
	 |repartition = ${characterRDD.repartition(5).getNumPartitions}  
	 |""".stripMargin  
	 // default = 3
     // coalesce = 1
     // repartition = 5
  )  
  • Partition 수를 증감시킴
    • repartition 는 Shuffle이 발생하면서 partition 재분배
      • Shuffle이 발생하지 않아 Network 통신을 하여 상대적으로 느리다
        • 데이터의 Parition 분배가 될 수 있다.
      • partition 수 증감 모두 가능
    • coalesce 는 shuffle이 발생하지 않고 partition 재분배
      • Shuffle이 발생하지 않아 Network 통신을 하지 않아 상대적으로 빠르다
        • 데이터의 Parition 분배는 일어나지 않는다
        • 간단히 생각하면 동일 Node에 있는 Partition들을 Union 한다고 생각하면 편하다
      • partition 수 감소만 가능

partitionBy / repartitionAndSortWithinPartitions

numericPairRDD.partitionBy(new HashPartitioner(3))  
			  .foreachPartition(it => {  
				println(s"-------")  
				it.foreach(v => println(v))  
			  })  
 
numericPairRDD.repartitionAndSortWithinPartitions(new HashPartitioner(3))  
			  .foreachPartition(it => {  
				println(s"==========")  
				it.foreach(v => println(v))  
			  })  
  • 공통
    • RDD를 N+1개의 Partition으로 특정 기준(Partitioner)에 따라 분배 해준다
      • Partitioner : Partition을 나누는 역할을 하는 객체
        • HASH Partitioner
          • Hash값을 이용하여 분할
        • RANGE Partitioner
          • 범위 혹은 순서로 분할
    • (key - value)로 구성되어 있어야 한다.
      • key 값을 기준으로 분리한다
    • 지정한 Partitioner를 기준으로 분배하기 때문에, Skew현상이 발생할 수 있다.
      • Skew : 데이터 쏠림 현상
      • 위 예시를 실행할 때 아래와 같이 데이터 Skew현상이 발생할 수 있다.
partition partitionBy repartitionAndSortWithinPartitions
partition 3 (89,-) (150,-)
partition 3 (780,-) (89,-)
partition 3 (622,-) (175,-)
partition 3 (548,-) (134,-)
partition 3 (150,-) (530,-)
partition 3 (646,-) (548,-)
partition 3 (134,-) (780,-)
partition 3 (175,-) (622,-)
partition 3 (966,-) (966,-)
partition 3 (530,-) (646,-)

[ Reference ]


개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

'프로그래밍 및 IT > Spark' 카테고리의 다른 글

spark-scheduling  (0) 2023.11.28
WordCount  (0) 2023.10.13
RDD Action  (0) 2023.10.13
Spark_RDD  (0) 2023.10.05
Spark  (0) 2023.10.04

+ Recent posts