RDD Transformation?

  • 간단히 설명하자면, 결과 값, 즉 return이 RDD로 반환되는 연산이다.
  • RDD들은 지속적으로 기록만 하다가, RDD Action을 수행할 때 연쇄적으로 수행된다.
    • lazy evaluation이다.

사용 변수들

val numericRDD     = sc.parallelize(1 to 4, 3)  
val characterRDD   = sc.parallelize(Seq("good,bad", "good,bad", "best,worst", "plus,minus"), 3)  
val characterRDD2  = sc.parallelize(Seq("good,bad", "good,bad", "plus,minus"), 3)  
val pairRDD        = characterRDD.zipWithIndex.map {case (key, value) => (key, value + 2) }  
val pairRDD2       = characterRDD.zipWithIndex.map {case (key, value) => (key, value + 10) }  
val pairRDD3       = characterRDD.zipWithIndex.map {case (key, value) => (value + 1, key) }  
val numericPairRDD = sc.parallelize(for (i <- 1 to 10) yield (scala.util.Random.nextInt(1000), "-"))
  • 기본적으로 위와 같이 RDD생성을 함

Map

Map

/*map*/
println(characterRDD.map(_.split(",")).collect().map(_.mkString("{", ",", "}")).mkString("(", ",", ")"))  
//  ({good,bad},{best,worst},{plus,minus})

/*mapValues*/
println(pairRDD.mapValues(num => num + 1).collect().mkString(", "))  
// (good,bad,3), (good,bad,4), (best,worst,5), (plus,minus,6)
  • map 실행 순서
    1. characterRDD.map(_.split(","))
      1. “good,bad”, “good,bad”, “best,worst”, “plus,minus” 를 “,” 로 분할
      2. seq((“good”,“bad”), (“good”,“bad”), (“best”,“worst”), (“plus”,“minus”))
    2. map(_.mkString("{", ",", "}")) 를 이용하여 문자열생성
      1. seq({“good”,“bad”}, {“good”,“bad”}, {“best”,“worst”}, {“plus”,“minus”})
    3. .mkString("(", ",", ")"))를 이용하여 최종 출력 모양 생성
      1. ({“good”,“bad”}, {“good”,“bad”}, {“best”,“worst”}, {“plus”,“minus”})
  • mapValues
    • Map과 동일하게 처리하는데, Value의 값을 조작하고 싶을 때 사용.
    • 위 코드는 각 value에 1을 더하는 연산이다

Flatmap

Monad 개념인데, 이것은 추후 공부할 예정

/*flatMap*/
println(characterRDD.flatMap(_.split(",")).collect().mkString("(", ",", ")"))  
//  (good,bad,best,worst,plus,minus)  

/*flatMapValues*/
val pairRDD3 = characterRDD.zipWithIndex.map {case (key, value) => (value + 1, key) }  
println(pairRDD2.flatMapValues(_.split(",")).collect().mkString(", "))  
// (1,good), (1,bad), (2,good), (2,bad), (3,best), (3,worst), (4,plus), (4,minus)
  • flatmap 실행 순서
    1. characterRDD.flatMap(_.split(","))
      1. 모든 값들을 풀어서 ","로 분할
      2. seq(“good”,“bad”,“good”,“bad”,“best”,“worst”,“plus”,“minus”)
    2. .mkString("(", ",", ")"))를 이용하여 최종 출력 모양 생성
      1. (“good”,“bad”,“good”,“bad”,“best”,“worst”,“plus”,“minus”)
  • flatMapValues
    • flatMap과 동일하게 처리하는데, Value의 값을 조작하고 싶을 때 사용.
    • key를 기준으로 value를 나누는 연산
      • Seq(1,“good,bad”), (2,“good,bad”), (3,“best,worst”), (4,“plus,minus”) 를
      • (1,good), (1,bad), (2,good), (2,bad), (3,best), (3,worst), (4,plus), (4,minus)로 변환

mapPartitions

println(  
  numericRDD.mapPartitions(nums => {  
	println("MapPartition loop")  
	nums.map(num => num * 2)  
  }).collect().mkString(", ")  
  )  
// MapPartition loop
// MapPartition loop
// MapPartition loop
// mapPartitions
// 2, 4, 6, 8

// 짝수만 출력
println(  
  numericRDD.mapPartitionsWithIndex((idx, nums) => {  
	println("mapPartitionsWithIndex loop")  
	nums.flatMap {  
	  case num if idx % 2 == 0 => Option(num * 2)  
	  case _                   => None  
	}  
  }).collect().mkString(", ")  
  )  
// mapPartitionsWithIndex loop
// mapPartitionsWithIndex loop
// mapPartitionsWithIndex loop
// 2, 6, 8
  • Partition를 기준으로 Map을 수행함
    • 3개의 Partition이 존재하면 3번 순회한다.
  • mapPartitions vs mapPartitionsWithIndex
    • 간단히 말하면 둘다 Partition 기준으로 순회한다
    • 차이점 : mapPartitionsWithIndex은 각 Row에 Index를 붙여서 반환한다.

Group

Zip

println(characterRDD.zip(numericRDD).collect().mkString("(", ",", ")"))  
//   ((good,bad,1),(good,bad,2),(best,worst,3),(plus,minus,4))  
  
println(characterRDD.zipPartitions(numericRDD, numericRDD) {  
  (key, num, num2) => for {  
    key_result <- key  
    num_result <- num  
    num_result2 <- num2.map(_ + 1)  
  } yield (key_result, num_result, num_result2)  
}.collect().mkString("(", ",", ")"))  
//  ((good,bad,1,2),(good,bad,2,3),(best,worst,3,4),(best,worst,3,5))  
  • zip
    • 2개의 RDD를 Key : value 로 묶어줌
    • 1 : 1 로 같은 인덱스 위치에 있는 것끼리 묶이기 때문에 길이가 및 파티션 수가 동일해야 함
  • zipPartitions
    • Partition을 요소로 사용하여 새로운 파티션을 생성
    • spark 2.4에서는 인자로 RDD 4개까지 가능
    • Partition 수가 동일해야함
numericRDD.groupBy {  
  case num: Int if num % 2 == 0 => "even"  
  case _                        => "odd"  
}.collect() foreach {  
  row => println(s"${row._1}, [${row._2.mkString(",")}]")  
}  
//      even, [2,4]  
//      odd, [1,3]  
  
pairRDD.groupByKey().collect() foreach {  
  row => println(s"${row._1}, [${row._2.mkString(",")}]")  
}  
//    best,worst, [3]  
//    good,bad, [1,2]  
//    plus,minus, [4]  
  
val rdd1   = sc.parallelize(Seq(("apple", 100), ("banana", 500), ("apple", 200)))  
val rdd2   = sc.parallelize(Seq(("banana", 400), ("apple", 300)))  
val result = rdd1.cogroup(rdd2)  
result.collect.foreach {  
  case (k, (rdd1_val, rdd2_val)) =>  
    println(s"($k, [${rdd1_val.mkString(",")}], [${rdd2_val.mkString(",")}])")  
}  
//    (apple, [100,200], [300])  
//    (banana, [500], [400])
  • groupBy
    • 조건에 의한 Grouping
  • groupByKey
    • Key값을 기준으로 grouping
  • cogroup
    • Key를 기준으로 각 RDD에 값을 묶음
    • key , rdd1의 values, rdd2의 values

Set

Distinct

println(  
  s"""distinct  
     |${characterRDD.distinct.collect.mkString(", ")}  
     |""".stripMargin  
  )  
//    best,worst, good,bad, plus,minus  
  • 이름 그대로 중복을 제거하는 역할

cartesian

println(  
  s"""cartesian  
     |${characterRDD.cartesian(numericRDD).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//    (good,bad,1), (good,bad,2), (good,bad,3), (good,bad,4),(good,bad,1), (good,bad,2), (good,bad,3), (good,bad,4),(best,worst,1), (plus,minus,1), (best,worst,2), (plus,minus,2), (best,worst,3), (best,worst,4), (plus,minus,3), (plus,minus,4)  
  • 카르테시안 곱
  • 모든 경우의 수를 조합하는 것이라고 보면된다.
  • Sql에서 Join 조건을 주지 않은 경우, Cross Join

subtract

println(  
  s"""subtract  
     |${characterRDD.subtract(characterRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//    best,worst  
  • rdd1에 존재하지만, rdd2에 존재하지 않은 값들을 RDD로 생성하여 반환
  • 예시
    • characterRDD : "good,bad", "good,bad", "best,worst", "plus,minus"
    • characterRDD2 : "good,bad", "good,bad", "plus,minus"
    • characterRDD에 “best,worst” 가 존재하지만 characterRDD2 에는 존재하지 않음
      • best,worst 반환

Union

println(  
  s"""union  
     |${characterRDD.union(characterRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//    good,bad, good,bad, best,worst, plus,minus, good,bad, good,bad, plus,minus  
  • 2개의 RDD를 병합하는 것
  • 자료형이 동일해야함

intersection

// 교집합을 구하는것  
println(  
  s"""intersection  
     |${characterRDD.intersection(characterRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//    good,bad, plus,minus 

Join

println(  
  s"""join  
     |${pairRDD.join(pairRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//(best,worst,(3,12)), (good,bad,(1,10)), (good,bad,(1,11)), (good,bad,(2,10)), (good,bad,(2,11)), (plus,minus,(4,13))  
  

println(  
  s"""leftOuterJoin  
     |${pairRDD.leftOuterJoin(pairRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
// (best,worst,(3,Some(12))), (good,bad,(1,Some(10))), (good,bad,(1,Some(11))), (good,bad,(2,Some(10))), (good,bad,(2,Some(11))), (plus,minus,(4,Some(13)))  
  

println(  
  s"""leftOuterJoin  
     |${pairRDD.subtractByKey(pairRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//(best,worst,3)
  • Join
    • Key값을 기준으로 Join연산
    • (key , (rdd01 value, rdd2 value)) 조합으로 모든 값 Join
  • leftOuterJoin / rightOuterJoin
    • left join 연산을 한다. join결과값이 없을 수 있음으로 Option 타입으로 반환한다.
  • subtractByKey
    • rdd01.subtractByKey(rdd2)
    • rdd01가 rdd2에 없는 key만 추출

Aggregate

reduceByKey / foldByKey

    println(  
      s"""reduceByKey  
         |${pairRDD.reduceByKey(_ * _).collect().mkString(", ")}  
         |""".stripMargin  
      )  
    // (best,worst,4), (good,bad,6), (plus,minus,5)  

    println(  
      s"""foldByKey  
         |${pairRDD.foldByKey(1)(_ * _).collect().mkString(", ")}  
         |""".stripMargin  
      )  
    // (best,worst,4), (good,bad,6), (plus,minus,5)  
  • reduceByKey
    • Key값을 기준으로 value에 주어진 연산 수행
    • 기본 값이 존재하지 않기 때문에 교환 법칙 및 결합 법칙을 만족해야한다.
      • 교환 법칙 : 상수의 위치가 변경돼도 가능한 것.
        • 1 + 2 == 2 + 1 ( OK )
        • 1 / 2 != 2 / 1 ( X )
      • 결합 법칙 : 연산의 순서가 변경돼도 가능한 것.
        • 간단히 말하면 임의로 괄호로 묶어 연산해도 동일한 결과
        • 1 + 2 + 3 == 1 + (2 + 3) ( OK )
        • 1 + 2 * 2 != ( 1 + 2 ) * 2 ( X )
    • 위 결과 도출 방법
      • best,worst : 4
      • good,bad : 2 * 3
      • plus,minus : 5
        foldByKey
    • Key값 기준으로 Value연산
    • reducebykey와 동일하지만, 차이점은 기본 값이 있는 것이다.
    • 위 결과 도출 방법
      • best,worst : 1 * 4
      • good,bad : 1 * 2 * 3
      • plus,minus : 1 * 5
종류 교환법칙 결합법칙 초기값
reduceByKey o o x
foldByKey x o o

combineByKey / aggregateByKey

case class Fruit(var price: Long, var amount: Long = 1) {  
  def add(p: Long): Fruit = {  
	add(Fruit(p))  
  }  
  def add(other: Fruit): Fruit = {  
	this.price += other.price  
	this.amount += other.amount  
	this  
  }  
}  

val fruit_rdd = sc.parallelize(Seq(("Apple", 1000L), ("banana", 2000L), ("Apple", 1500L), ("banana", 2600L), ("banana", 1300L), ("Apple", 3000L)))  
println(  
  s"""combineByKey  
	 |${  
	fruit_rdd.combineByKey(  
	  createCombiner = (p: Long) => Fruit(p)  
	  , mergeValue = (f: Fruit, v: Long) => f.add(v)  
	  , mergeCombiners = (f1: Fruit, f2: Fruit) => f1.add(f2)  
	  ).collect().mkString(", ")  
  }  
	 |""".stripMargin  
  )
println(  
  s"""combineByKey  
	 |${  
	fruit_rdd.aggregateByKey(Fruit(3000, 0))(  
	  (f: Fruit, v: Long) => f.add(v)  
	  , (f1: Fruit, f2: Fruit) => f1.add(f2)  
	  ).collect().mkString(", ")  
  }  
	 |""".stripMargin  
  )  
  • combineByKey
    • 새로운 타입으로 반환 가능하게 만듦
    • 아래 예시를 통해 Fruit 이라는 Case Class로 반환하는 예시
    • createCombiner
      • 연산을 위해 컴파이너를 생성하는 것.
      • 개인적으로 이해한 바는 타입 변경하여 초기 값을 생성하는 작업
    • mergeValue
      • Combiner가 존재하면 추후 실행할 연산, 이 연산을 통해 값이 누적되어 쌓인다
      • 동을한 key를 가진 값들을 병합
    • mergeCombiners
      • 병합이 끝난 Combiner들을 최종적으로 병합한 Combiner를 생성
      • 다른 Partition에 존재하는 결과 값을 병합
    • 조금 더 알아봐야겠다.
    • 위 예시 결과 값 도출 사유
      • 결과 값 : (banana,Fruit(5900,3)), (Apple,Fruit(5500,3))
      • banana : 2000L + 2600L + 1300L , count = 3
      • Apple : 1000L + 1500L + 3000L , count = 3
  • aggregateByKey
    • 초기값이 존재하는 combineByKey
      • 즉 createCombiner를 생성 후 수행하는 CombineByKey
    • 위 예시 결과 값 도출 사유
      • 결과 값 : (banana,Fruit(14900,3)), (Apple,Fruit(14500,3))
      • banana : (3000 + 2000L) + (3000 + 2600L) +(3000 + 1300L) , count = 3
      • Apple : (3000 + 1000L) + (3000 + 1500L) +(3000 + 3000L) , count = 3

Pipe

println(  
  s"""pipe
	 |${characterRDD.pipe("cut -d , -f 2").collect().mkString(", ")}
	 |""".stripMargin      )
// bad, bad, worst, minus  
  • 결과 값을 bshell 등 외부 프로세스의 인자 값으로 실행시킴

값추출

println(  
  s"""  
	 |keys : ${numericPairRDD.keys.collect.mkString(", ")}  
	 |value: ${numericPairRDD.values.collect.mkString(", ")}  
	 |""".stripMargin  
)  
//    keys : 776, 377, 559, 275, 818, 457, 173, 248, 135, 679  
//    value: -, -, -, -, -, -, -, -, -, -  
println(  
  s"""  
	 |중복 불가 : ${numericPairRDD.sample(withReplacement = false,fraction = 0.7).collect.mkString(", ")}  
	 |중복 허용 : ${numericPairRDD.sample(withReplacement = true,fraction = 0.7).collect.mkString(", ")}  
	 |""".stripMargin  
)  
//    중복 불가 (306,-), (376,-), (630,-), (417,-), (439,-), (738,-), (691,-), (435,-)
//    중복 허용 (210,-), (376,-), (630,-), (417,-), (417,-), (439,-), (435,-)
  • keys
    • key값 반환
  • values
    • value값을 반환
  • sample
    • 데이터 샘플링.
      • withReplacement : 데이터 중복 가능 여부
      • fraction : 전체 중 n% 추출 (0 ~ 1 사이 값)

filter

// 이름 그대로 filter조건에 부합하는 것을 걸러줌  
println(  
  s"""filter  
	 |${numericRDD.filter(_ > 2).collect.mkString(", ")}  
	 |""".stripMargin)  
// 3, 4  
// 1,2,3,4 중 3,4만 출력  
  • 값 필터링
    • SQL의 WHERE절이라고 생각하면 편하다

SortByKey

// Key값을 기준으로 정렬시켜줌  
println(  
  s"""sortByKey  
	 |${numericPairRDD.sortByKey().collect.mkString(", ")}  
	 |""".stripMargin  
)  
// (135, -), (173, -), (248, -), (275, -), (377, -), (457, -), (559, -), (679, -), (776, -), (818, -)  
  • Key값을 기준으로 정렬

Partition / Coalesce

println(  
  s"""repartition  
	 |default = ${characterRDD.getNumPartitions}  
	 |coalesce = ${characterRDD.coalesce(1).getNumPartitions}  
	 |repartition = ${characterRDD.repartition(5).getNumPartitions}  
	 |""".stripMargin  
	 // default = 3
     // coalesce = 1
     // repartition = 5
  )  
  • Partition 수를 증감시킴
    • repartition 는 Shuffle이 발생하면서 partition 재분배
      • Shuffle이 발생하지 않아 Network 통신을 하여 상대적으로 느리다
        • 데이터의 Parition 분배가 될 수 있다.
      • partition 수 증감 모두 가능
    • coalesce 는 shuffle이 발생하지 않고 partition 재분배
      • Shuffle이 발생하지 않아 Network 통신을 하지 않아 상대적으로 빠르다
        • 데이터의 Parition 분배는 일어나지 않는다
        • 간단히 생각하면 동일 Node에 있는 Partition들을 Union 한다고 생각하면 편하다
      • partition 수 감소만 가능

partitionBy / repartitionAndSortWithinPartitions

numericPairRDD.partitionBy(new HashPartitioner(3))  
			  .foreachPartition(it => {  
				println(s"-------")  
				it.foreach(v => println(v))  
			  })  
 
numericPairRDD.repartitionAndSortWithinPartitions(new HashPartitioner(3))  
			  .foreachPartition(it => {  
				println(s"==========")  
				it.foreach(v => println(v))  
			  })  
  • 공통
    • RDD를 N+1개의 Partition으로 특정 기준(Partitioner)에 따라 분배 해준다
      • Partitioner : Partition을 나누는 역할을 하는 객체
        • HASH Partitioner
          • Hash값을 이용하여 분할
        • RANGE Partitioner
          • 범위 혹은 순서로 분할
    • (key - value)로 구성되어 있어야 한다.
      • key 값을 기준으로 분리한다
    • 지정한 Partitioner를 기준으로 분배하기 때문에, Skew현상이 발생할 수 있다.
      • Skew : 데이터 쏠림 현상
      • 위 예시를 실행할 때 아래와 같이 데이터 Skew현상이 발생할 수 있다.
partition partitionBy repartitionAndSortWithinPartitions
partition 3 (89,-) (150,-)
partition 3 (780,-) (89,-)
partition 3 (622,-) (175,-)
partition 3 (548,-) (134,-)
partition 3 (150,-) (530,-)
partition 3 (646,-) (548,-)
partition 3 (134,-) (780,-)
partition 3 (175,-) (622,-)
partition 3 (966,-) (966,-)
partition 3 (530,-) (646,-)

[ Reference ]


개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

'프로그래밍 및 IT > Spark' 카테고리의 다른 글

spark-scheduling  (0) 2023.11.28
WordCount  (0) 2023.10.13
RDD Action  (0) 2023.10.13
Spark_RDD  (0) 2023.10.05
Spark  (0) 2023.10.04

Pasted image 20230920173141.png

Zookeeper?

  • 분산 처리 어플리케이션을 위한 코디네이터 시스템
    • HA 구성 시 서비스간 데이터 공유, 서버 Health Check등을 해주는 역할

역할

  1. 설정 관리 (Configuration Management)

    • Cluster의 설정 정보를 최신화 유지를 위한 조율 시스템
  2. 클러스터 관리 (Cluster Management)

    • Cluster 추가 / 삭제 시 정보를 서버간 공유
  3. 리더 채택 (Leader Selection)

    • Multi App중 Leader Node를 선정할 로직 생성
    • 주로 복제된 여러 노드 중 연산이 이루어지는 1개의 노드를 택하는데 사용
  4. 락/동기화 서비스 ( Lock and Synchronization Service)

    • Cluster 전체를 대상으로 동기화(Lock)하여 경쟁 상태를 방지하기 위해 사용
    • 연산이 빈번한 경우 경쟁 상태가 될 수 있다
    • 경쟁 상태 여러 프로세스가 동일 자원 접근 시도 데이터 불일치 야기

구성

Server

1개의 Leader, N개의 Follwer, N개의 Observer로 구성된다.
기본적으로 Ensemble 을 구성하기 위해 3개 이상의 홀수 Node가 존재해야한다.

  1. Leader
    • Client가 제출한 모든 요청을 및 데이터 변경을 관리하는 역할
    • 다른 서버들에게 상태 정보 분산
    • 새로운 요청 처리
    • 데이터 동기화
  2. Follwer
    • Client가 제출한 요청을 Leader에게 전달 및 처리
    • Leader Node의 상태를 동기화함
      • Leader에게 문제가 발생하면 Voting을 통해 Leader로 선출됨
    • Follwer가 많아지면 Throuput은 늘어나지만, Failover시 Voting에 시간 소모가 많음
  3. Observer
    • Follwer Node의 일종
      • 차이점 : Voting 권한이 없다.
        • Leader가 될 수 없는 Node이다
    • Data Center의 Bridge, Message Bus의 Link 등의 역할로 활용

Request Processor

Client가 제출한 요청을 Chain으로 연결시켜 Sequential하게 처리하는 프로세스
일반적으로 Single-Thread로 처리함

  • Leader
    • Follower / Client에게 받은 Transaction을 처리해줌
  • Follower / Obserber
    • Client에게 받은 Transaction을 Leader에게 전달

Atomic Broadcast

Zookeeper Server의 데이터 일관성을 ZAB을 이용하여 보장

  • zab : Zookeeper Atomic Broadcast Prodocol
    Pasted image 20230920184247.png
  • 수행 과정
    1. Client에게 요청받은 Leader은 Follower에게 Propose 요청
      • Propose : 수행여부
    2. Propose 받은 Follwer은 해당 Transaction 수행 허용 여부를 Act로 Leader에게 전달
    3. Act를 받은 Leader은 Transaction 처리 명령을 Follwer들에게 Broadcast / Commit

In-Memory DB

Znode를 적재하는 DB

  • Local FS에 In-Memory DB정보를 Replication할 수 있다.
  • Transaction Log가 일정량 이상 되면 Snapshot 후 In-Memory 데이터는 삭제한다.
    • 장애 발생 시 Snapshot을 이용하여 복구

ZNode

Data / 계층을 생성하는 단위
Tree 구조로 구성되어 있다.
Pasted image 20230920184741.png

보유 데이터

  1. ACL( Access Control List )
    1. 접근 권한
  2. ACL Permissions
    1. 행위 권한 ( CRUD 등 )
  3. schemas
    1. 접근 위치 권한 (IP 등)
  4. stat
    1. Node의 정보( 생성 시간, Transaction ID…)

Node 종류

  1. Persistent Node
    1. 영구 노드 ( Non-Falsh)
  2. Ephemeral Node
    1. 휘발성 노드 (Flash )
  3. Sequence Node
    1. 순서가 있는 노드

Watcher

Znode의 변경 사항을 Client / Server에게 알려주는 역할

  • Znode에 Watcher를 등록하여 사용
    • 모니터링 / 알람 역할

Client

zookeeper의 In-Memory DB를 사용하는 Application

  • Client는 Server에게 Heart beat를 전송한다
    • timeout 발생
      • client -> server
        • server/Network Issue, 다른 서버에 연결시도
      • server -> client
        • client/Network Issue, 해당 Session종료

개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

Yarn?

Yet Another Resource Negotiator

  • Hadoop 2.0 에서 시작된 Resource 관리 / 스케줄링 하는 Hadoop 컴포넌트
  • Hadoop 1 에서 사용하던 JobTracker, TasksTracker의 역할 대체
    • 완벽하게 동일한 기능은 아니지만 대다수 대체됨
    • 등록 가능 Node수 4000 -> 클러스터 당 10,000개 이상
    • JobTracker -> ResourceManager
    • TasksTracker -> Node Manager, Container(Slot대체?)

아키텍쳐

Pasted image 20230921185436.png
Resouce Manager은 Master , Node Manager은 Slave로 구성된다.

구성

Resource Manager

  • 클러스터의 자원관리 및 Task 스케줄링이 메인 역할
    1. Client가 Application 실행 요청시 Application Manster을 실행
    2. Node Manager와 통신하여 자원상황 체크
    3. Application Master와 통신하여 필요 자원 관리
  • Hadoop 2.4 부터는 Active - stanby로 구성되어 SPOF 위험 제거

Node Manager

  • Node당 1개 존재
  • Yarn 의 Slave daemon으로서 컨테이너의 자원 관리 역할
  • 사용 중인 자원을 모니터링하고, Resource Manger에게 자원 상황 보고
  • Resource Manager의 요청에 따라 Container 생성

Application Master

  • Application당 1개 존재
  • Resource Manager로부터 Container를 할당 받음
  • Application 실행 상태를 모니터링 및 관리하는 역할

Container

  • 리소스 자원
  • cpu, Disk, Memory 등
      - Application 실행하는 Task들은 Container상에서 실행된다.

개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

'프로그래밍 및 IT > Hadoop' 카테고리의 다른 글

Hadoop  (0) 2023.10.05

Apache에서 오픈소스로 제공하는 분산처리 프레임워크
즉, 대용량 데이터를 여러 서버를 클러스터링 하여 분산처리 하는 것이 목적인 프레임워크
현재 hadoop1.x, hadoop2.x, hadoop3.x 로 발전해왔다
Hadoop-Echosystem으로 여러 Framework와 조합하여 사용한다
Pasted image 20230921152112.png

  • 위 사진에 있다고 필수적으로 모두 사용하는 것은 아니다.
  • 사진 출처

HDFS

Hadoop FileSystem의 약자로 Hadoop의 분산 파일 시스템.

  1. Block단위로 파일을 적재한다
    1. 블록의 수가 너무 많아지면 좋지 않다
      1. Block에 관한 Meta정보를 Namenode에서 관리하여 리소스 낭비가 발생할 수 있다.
    2. 안정성, 다중 처리를 위해 replication을 생성하여 사용한다.
    3. Scale-Out이 용이하다

Mapreduce

Pasted image 20230921151917.png

  • Map과 Reduce 2개의 메소드로 구성된 대용량 데이터 처리 분산 프로그래밍 모델
    • 정형 / 비정형 데이터를 동일한 크기로 나두고, 처리 완료 한 데이터를 병합해주는 역할

Hadoop1.x

Pasted image 20230921152300.png
Hadoop 1.x 버전의 구성은 위와 같다.

Client

NameNode를 통해 정보를 받고, DataNode와 직접 통신

NameNode

Datanode 관리, Resource 관리 등을 하는 Node

  1. MasterNode
    1. SlaveNode의 정보를 저장
    2. HDFS의 Meta data 보관
    3. Client가 데이터 요청 시 위치 정보 전달
    4. 안전성을 위해 Editlog 생성
      1. Editlog : 변경 로그
  2. secondary node
    1. MasterNode가 장애 발생시 대체
    2. MasterNode의 Editslog등을 Pull하여 BackUp/Merge를 수행하여 fsimage 생성
      1. fsimage : File System Image
      2. Editlog의 크기와 fsimage의 생성 시간은 반비례 한다.
      3. fsimage를 MasterNode에 전달하여 동기화
  3. JobTracker
    1. 실행할 task를 관리한다
    2. Job이 끝날 때까지 스케줄링/모니터링을 담당
    3. 클러스터 자원 관리

SlaveNode

데이터가 적재되어있고, 연산 작업을 수행하는 Node

  1. TaskTracker
    1. Task를 수행한다
    2. Map/Reduce Task로 나누어짐
      1. 병렬 처리 작업 단위 : Slot
      2. Map/Reduce slot은 개수가 정해져 있다.
      3. Map/Reduce은 역할이 정해지면 용도 변경이 불가
        1. 작업을 하지 않으면 대기 상태
        2. 클러스터가 100% 활용되지 않을 경우가 존재
  2. DataNode
    1. 데이터를 분산하여 가지고 있따.
    2. 데이터는 Block단위로 적재된다
    3. Fault Recovery를 위해 Duplication을 유지한다
      1. Default : 3
    4. NameNode에게 HeartBeat를 통해 파일위치 및 장애여부 전송

실행 순서 (MapReduce)

  1. client node : M/R 를 수행하기 위해 Job Client를 실행시킴
  2. Job Client : Job을 할당 및 수행하기 위해 JobTracker에게 새로운 Job ID를 수령
  3. Job Client : Jar,Config,Input Split Data을 HDFS에 Job ID디렉토리로 복사
  4. Job Client : JobTracker에게 시작 준비 완료 알림
  5. JobTracker : 실행할 Job 객체 생성
  6. JobTracker : 수행할 Task List 생성을 위해 Job Client가 계산한 Input Split Data를 기반으로 Task 생성
  7. Job,Task Tracker : 상호 Heartbeat 교환
    • task tracker : task 수행 가능여부
    • job tacker : task 가 존재하면 task 반환 후 task tracker가 task를 수행하게 만듦
  8. task tracker : Input Split (Job JAR File)를 HDFS에 복사 후 task runner instance를 생성 후 task수행
  9. task tracker : 각 task를 수행하면서 n초마다 수행 감시

MapReduce의 단일 고장점

  • JobTracker는 모든 MapReduce의 요청을 받는다
    • JobTracker에 문제 발생시 MapReduce는 수행할 수 없다 ( Fail Point )
    • JobTracker가 독점적으로 작업 수령 및 리소스 관리하여 SPOF 위험이 있다.
      • SPOF (Single Point Of Failure) : 1곳에서 장애 발생 시 전 시스템이 중단

클러스터 확장성

종류 최대치
단일 클러스터 4,000
동시 실행 테스크 40,000

리소스 관리

Slot으로 자원 관리하여 Mapper / Reducer의 자원을 효율적으로 사용하지 못 한다P
MapReduce가 아닌 다른 서비스로 인한 자원 영향 파악 불가

HADOOP2.x

Hadoop1 과 차이점

  1. JobTracker, TasksTracker의 역할을 Yarn으로 대체됨
    • 완벽하게 동일한 기능은 아니지만 대다수 대체됨
    • 등록가능 Node수 4000 -> 클러스터 당 10,000개 이상
    • JobTracker -> ResourceManager
    • TasksTracker -> Node Manager, Container(Slot대체?)
  2. Spark, Hive 등 M/R 이외의 분산 처리 모델 지원

Hadoop HA

대표적인 방법으로 2가지가 존재한다

  1. QJM
    1. Journal node를 이용한 HA구성
  2. NFS
    1. Nas를 이용하여 HA를 구성
      Pasted image 20230921185909.png

NFS

Network File System

  • 위 그림은 QJM구성인데, JN부분에 SS로 구성된다면 NFS라고 볼 수 있다.
  • SS ( Shared Storage)
    • NAS ( Network Attached Storeage) 의 묶음
      • Network를 이용한 원격 저장소
    • Namenode의 Journaling 정보를 Nas에 저장 후 동기화 시킴
      • Network, Cpu 등의 문제로 Split Brain이 발생할 수 있다.
        • Split Brain : Context Switch 시 동시에 접근하여 정보 불일치가 발생한는것
          • 이곳에서는 Active Node의 정보가 충돌하여 Node의 정보 불일치

QJM

Quorum Journal Manager

상단의 그림이 QJM이다

  • JN ( Jounal Node)
    • Active NN 과 Standby NN간 동기화를 유지하기 위해 사용되는 Daemon 그룹
      • Daemon : Background 프로그램
    • Active NN 의 Journaling 정보를 지속적으로 기록한다
      • journaling 정보 : 시스템 변경사항 (e.g Edit log)
    • Failover시 Standby NN가 jounaling 정보를 읽었는지 확인 후 Active NN로 성격
    • Quorum을 위해 홀수를 권장
      • 서버 다운을 대비해 JN은 서로 다른 서버에 구성

공통

  • NN ( Name Node)

    • Split Brain 방지를 위해 Active Node와 Standby Node중 Active Node만 동작함
    1. Active Node ( Master Node )
      1. 활동중인 NN
    2. Standby Node ( Slave Node )
      1. Failover을 위해 Active Node와 동기화
        1. Active Node 장애 발생 시 Active로 승격
      2. NFS 의 SS, QJM의 JN를 이용하여 동기화
      3. Standby Node가 존재하면 SecondName / Checkpoin / Backup Node 불필요
        1. 위 3개와 동시에 동작 시 Error 발생
    3. Observer Node ( Hadoop 3.x 이후 추가)
      1. Hadoop Cluster의 규모가 커지면서 Active Node가 관리하는 Object가 증가함
        1. OverHead가 증가하여 Active Node의 부하가 증가
      2. Active Node의 과부하 방지를 위해 Active Node의 Read역할을 대신 해준다
        1. Traffic 분산
  • DN ( Data Node )

    • Data를 분산 저장하는 Node
    • Darta는 기본값으로 3개의 replication을 가지고 있다
  • ZK( Zookeeper )

    • Coordinator로써 Server의 Leader선정시 사용
  • ZKFC( Zookeeper Failover Controller )

    • ZK - NN 사이에서 모니터링 , 세션관리, 장애 복구를 수행하는 프로세스
      • Health Monitoring
        • NN과 ping 즉 heartbeat를 주고 받으면서 상태 확인 ( Timeout 기반)
        • NN의 장애 (충돌, 정지, …) 발생시 NN비정상 표기
      • Session Management
        • NN 정상
          • Session이 열린 상태를 유지한다
            • Active NN가 임시 Lock ZNode를 사용
        • NN 비정상
          • Session 종료 후 ZK에 알람
        • NN 종료
          • 자동으로 할당한 임시 Lock ZNode를 삭제
      • zookeeper-based Election
        • ZK기반으로 Active NN 선정
        • Active NN 장애 발생 시 Standby NN는 Lock Node를 선점 시도
          • 1+N개의 Standby NN가 존재하면 최초 선점함 NN가 Active로 승격
      • Fencing
        • Standby NN가 Active NN로 승격하는 과정에서 기존 Active NN가 죽지 않아 2개의 Active NN가 되는 걸 방지하기 위해 기존 Active NN를 죽이는 기법
        • Active NN를 Standby NN로 변경하거나 Kill한다
    • NN과 같은 Host에 구성된 ZK와 상호장용하는 Controller

개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

'프로그래밍 및 IT > Hadoop' 카테고리의 다른 글

Yarn  (0) 2023.10.05

RDD?

  • Resilient Distributed Dataset 의 약자로써 Spark의 최초에 도입된 데이터 구조
    1. Resilient ( 회복력 있는, 불변의)
      1. 메모리 내 데이터가 손실되도 복구 가능 하고, 생성된 데이터는 불변이다.
      2. 복구는 데이터를 저장하는 것이 아닌 새로 만들어 복구한다
    2. Distributed (분산된)
      1. Spark Cluster를 통해 메모리에 분산되어 있음
    3. Dataset
      1. 데이터
    • 데이터는 여러 서버에 분산되어 저장되고, 처리할 때 동시에 병렬로 처리가능하고 일부 서버에서 장애 발생시 복구가 가능하다.
      • 데이터 요소가 모인 집합
      • 스파크 클러스터에 나눠져 저장됨

Partition

  • RDD는 Partition 단위로 처리가 이루어 진다.
    • Partition단위로 스파크 클러스터에 분산 관리됨
    • Hadoop의 FS인 HDFS를 사용하면 1 Block = 1 Partition으로 구성된다
      • 기본값이 저렇게 되고, Spark API를 이용하여 Partition 수는 조정할 수 있다.
    • Partition 크기,개수에 따라 성능에 영향을 준다.
    • 처리중 Partition 재구성 혹은 Network를 통해 다른서버로 이동되는 Shuffle이 발생할 수 있다
      • Shuffle이 많이 발생하면 성능 저하를 야기할 수 있어 주의해야 한다.
      • Partition 수를 직접 지정할 수 있어 적절히 설정하는 것을 권장

Operation

Spark는 기본적으로 Lazy evaluation으로 처리된다.
RDD는 기본적으로 2개의 큰 연산으로 구분된다.

Lazy evaluation

  • Spark는 기본적으로 지연 구동이 된다.
    • 최척화를 해주는 장점이 있지만, 잘못하면 원치 않게 동작할 수 있어 잘 고민하고 구현해야한다.
  • 최적화 ?
    • Lazy로 동작하여 이미 동작해야 할 것을 알고 있기에 최적화된 방식으로 동작한다
    • Pasted image 20231006111119.png
      • 위 그림처럼 프로그래머가 최적화 하지 않고 수행한다면 A처럼 순서에 따라 동작할 것이다
      • 하지만 Lazy하게 동작하여 자동으로 최적화 된다면 B처럼 동작할 수 있다

Transformation

  • 기존의 RDD를 새로운 RDD로 생성하는 것.
    • 이름 그대로 RDD에 변형이 발생하는 것.
      • e.g) Filter , Join …
  • Transformation은 Action Function이 호출되기 전까지는 Lineage만 생성하고 쌓아둔다
  • Return 값은 RDD 이다
    • RDD01를 이용하여 RDD02를 생성하는 것.
  • Transformation은 2개로 구분된다
    • Pasted image 20231004161104.png
    1. narrow dependency
      • 위 그림과 같이 단순히 더하는 것 과 같이 Partition 이동이 없고, Shuffle이 발생하지 않는 상황
      • 즉 다른 Partition에 존재하는 데이터를 알 필요 없는 연산
        • e.g val rdd2 = rdd2.map(case (key,value) => (key,value +1))
    2. wide dependency
      • 위 그림과 같이 단순히 더하는 것 과 같이 Partition 이동이 있거나 , Shuffle이 발생하는 상황
      • 즉 서로 다른 Partition에 존재하는 데이터를 참조해야 하는 연
        • e.g val rdd3 = rdd2.reduceByKey(_ + _ )

Action

  • 실질적으로 연산을 하는 연산자
    • 즉 , return 값이 데이터 혹은 실행 결과인 것
    • Lazy evaluation에 따라 Action Function이 호출 될 때 연산을 시작한다.
      • 한번에 실행시키기 때문에 최적화된 방법으로 처리할 수 있다.
    • collect(), count() 등이 있다.

Lineage

  • Resilient를 만족 시킬때 RDD는 실질적인 데이터를 저장하는 것이 아니라, 작업 내용을 기억하고 있는 것이다.
  • 이러한 작업 과정 읽기 전용 모델로 만든 것을 Lineage라고 부른다.
  • Lineage는 Dag의 형태로 구성되어있다.

Dag

Directed Acyclic Graph 즉 방향성을 가진 그래프
Pasted image 20231004160214.png

  • 노드간 순환이 없고, 순서가 중요하다.
  • 만약 D에서 장애가 발생하면 B부터 다시 처리하면 되기 때문에 Resilient를 만족시킨다
    • 이러한 특성으로 Spark RDD는 Fault-tolerant를 보장한다.
  • Spark는 Stage로 나누어서 처리되고 Task단위로 연산한다
    • Task는 작업 단위, Task를 묶어 한번에 병렬 처리되는 것을 Job이라고 부른다.
    • E.g위 그림을 보면 Stage 1에는 B와 D의 Task 2개가 존재한다

[ Reference ]


개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

'프로그래밍 및 IT > Spark' 카테고리의 다른 글

spark-scheduling  (0) 2023.11.28
WordCount  (0) 2023.10.13
RDD Action  (0) 2023.10.13
RDD Transformation  (0) 2023.10.13
Spark  (0) 2023.10.04

Bubble Sort

  • 이름 그대로 방울이 터지는 것 처럼 연쇄적으로 비교하여 정렬
  • 상당히 간단하면서, 효율은 그렇게 좋지 못 한 정렬
    • 1956년 논문 “Electronic Computer Systems” “Sort in” 에 존재 기재되어 있다 한다.
  • 최초 시작부터 최종까지 앞뒤를 비교하면서 모든 값을 비교하며 정렬

시간 복잡도

(Worst) (Avg)

간단 코드

# Java
static void bubbleSort(int arr[], int n)
{
	int i, j, temp;
	boolean swapped; // swap이 발생하지 않으면 정렬이 끝났다 판단 후 종료
	for (i = 0; i < n - 1; i++) {
		swapped = false;
		for (j = 0; j < n - i - 1; j++) {
			if (arr[j] > arr[j + 1]) {
				temp = arr[j];
				arr[j] = arr[j + 1];
				arr[j + 1] = temp;
				swapped = true;
			}
		}
		if (swapped == false)
			break;
	}
}

간단 예시

빨간색 파란색이랑 와 비교하여 정렬

시작
10 9 6 3 6 1
loop01
9 10 6 3 6 1
9 6 10 3 6 1
9 6 3 10 6 1
9 6 3 6 10 1
9 6 3 6 1 10
loop02
6 9 3 6 1 10
6 3 9 6 1 10
6 3 6 9 1 10
6 3 6 1 9 10
loop03
3 6 6 1 9 10
3 6 6 1 9 10
3 6 1 6 9 10
loop04
3 6 1 6 9 10
3 1 6 6 9 10
loop05
1 3 6 6 9 10

개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

+ Recent posts