Docker를 이용하여 airflow설치

airflow 에서 docker-compose.yaml을 가져온다

  • curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.7.2/docker-compose.yaml' 를 이용하여 가져오든 다운로드 받든 상관 없다.
  • 사용할 공유 디렉토리 생성
    • mkdir -p ./dags ./logs ./plugins ./config 를 이용하여 생성하든, 하나 씩 만들든 상관 없다
  • 사용할 env 파일 생성해준다
    • 공홈에서 다운 받은 yaml을 보면 설정할 수 있는 변수 값들이 있다. 필요에 따라 설정하자
  • 초기 설정
    • docker compose up airflow-init 를 이용하여 작성한 변수 값 등을 적용한 airflow 초기화
  • 설치 검증
    • docker compose run airflow-worker airflow info 를 이용하여 설치 검증
  • 실행
    • docker compose up -d 를 이용하여 도커 실행
      • -d를 붙인 이유는 background로 실행시키기 위해 추가함

직접설치

설치하고자 하는 airflow의 버전에 맞춰 python 설치 후 아래 명령어 실행
sqlite도 필요하니 그것도 설치해준다.
Quick Start를 따라하면 된다.

# sqlite도 경로 설정해준다.
export LD_LIBRARY_PATH=$sqlite_HOME/lib:$LD_LIBRARY_PATH
export AIRFLOW_HOME=~/airflow
AIRFLOW_VERSION=2.7.2

# Python 버전 설정
# 아래와 같이 shell명령어 사용하지 말고, 직접 삽입해도  됨.
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"

# 아래와 같이 constraint명령어를 성성
# E.g
# airflow 2.7.2 버전은 Python 3.8, 3.9, 3.10, 3.11 을 지원한
# https://raw.githubusercontent.com/apache/airflow/constraints-2.7.2/constraints-3.8.txt
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

# 유저 생성, 초기
airflow db migrate

airflow users create \
    --username admin \ # 접속 id라고 보면된다
    --firstname Peter \ 
    --lastname Parker \
    --role Admin \ # 역할 
    --password admin \ # 접속 비밀번호
    --email spiderman@superhero.org

# 실행
airflow webserver --port 8080 -D

airflow scheduler -D

에러

PermissionError: [Errno 13] Permission denied: 'airflow'
    self._execute_child(args, executable, preexec_fn, close_fds,
  File "/home/airflow/python_env/lib/python3.9/subprocess.py", line 1837, in _execute_child
    raise child_exception_type(errno_num, err_msg, err_filename)
PermissionError: [Errno 13] Permission denied: 'airflow'
    raise child_exception_type(errno_num, err_msg, err_filename)
PermissionError: [Errno 13] Permission denied: 'airflow'

만약 위와 같은 에러가 발생하면 권한 문제긴 하지만 ln -s airflow경로 /usr/bin/airflow 와 같이 링크를 걸지 않아 발생한 것 일 수 있다.

[ Reference ]


개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

Elasticsearch?

대량의 데이터를 NRT ( Near Real Time )으로 저장, 검색, 분석 할 수 있는 루씬 기반 Java 오픈소스 분산 검색 엔진

특징

Scale Out

Shard를 이용하여 수평적으로 늘어날 수 있다.

HA

Replica를 통해 데이터 안전성 보장

Schema Free

Json을 통해 데이터 검색

Restful

CRUD 작업을 HTTP Restful API를 통해 수행

Inverted Index

텍스트를 파싱하여 검색어 사전 생성

  • 속도가 빠르다.

요소별 역할

Node

Master

  • ES의 Instance 단위
    • Instance는 각자의 역할 존재
  • Node.roles를 이용하여 적절한 역할을 사용하는 것이 좋다.
    1. Tribe Node
      1. Cluster간 연결을 해주는 역할
        1. 요청에 따라 서로 다른 Cluster에 데이터를 분리해서 보냄
    2. Remote-eligible Node
      1. remote cluster clinet 역할
        1. 원격 cluster과 연결시켜주는 역할
      2. cross cluster를 remote client로 사용 가능한 노드
        1. croess cluster : 원격 cluster간 백업/검색 등을 하는 작업
    3. Machine Learning Node
      1. ML역할
        1. ES로 ML작업 시 필요

Data

  • Data를 적재/연산하는 Node
    • node.data=true를 설정에 삽입해주면 된다.
    • 부하 분배를 위해 Master과 분리하는 것을 권장
  • Content Datan Node
    • CRUD, 집계 등의 기능을 가짐
      • 기본적인 Data
이름 압축 성능 비고
Hot Data Node 4 1 사용 빈도가 많고 및 성능이 우선이기 때문에 SSD 권장
Warm Data Node 3 2 Hot보다 적은 빈도로 조회하는 노드, HDD권장
Cold Data Node 2 3 읽기 빈도가 적은 인덱스, 읽기 전용
Frozen Data Node 1 4 Shared_cache 옵션만으로 마운트 된 검색 가능한 Snapshot 을 저장

Ingest

  • Text 변환 및 전처리 담당.
  • Ingest pipeline을 Text에 적용하여 인덱싱 하기 전에 사용
    • indexing 이전에 Document를 변환하는 작업
  • Master Node에서 수행하는 Ingest를 대신 수행
    • Master Node의 부하 분산
    • 색인 작업이 많을경우 사용

Coordinating Only

  • Client와 통신만 하는 역할
    • Load Balancing 수행
    • 만약 Coordinate Node가 없으면 Datanode가 해당 역할 수행
    • 너무 많으면 서버에 부하
    • 발생
      • Coordinating Only 또한 Master Node가 알림을 기다리기 때문
    1. client의 cluster관련 요청을 Master Node에 전달
    2. client의 Data관련 요청을 data Node에 전달

Server

물리적 서버에는 N개이 Node를 사용할 수 있다.

  • Server 기준 Port
  • Node - Client
    • Port범위 : 9200 ~9299
    • 순차적으로 부여
  • Node - Node
    • Port 범위 : 9300 ~ 9399
    • 순차적으로 부여
  • Cluster 묶기
    • cluster.name을 동일한 이름으로 묶어준다
      • 이름이 다르면 같은 Network여도 논리적으로 분리됨
    • Cluster : 1개의 시스템이라고 보면됨.
      • 데이터가 공유되는 서버의 묶음

Index

  • Document의 집단
  • Index는 색인과 명칭과 혼동될 수 있어 Indices라고도 부른다.
  • 주로 REST API를 통해 CRUD 작업 수행
  • 실질적으로 Index는 Shard를 가리키는 논리적 Namespace
  • index는 1+N개의 Shard로 구성
  • Mapping 및 State 정보를 Cluster State에 저장
    • Mapping : Index 내부의 Document가 어떻게 저장될지 정의하는 것
      • Index별 설정 가능
    • cluster State : 정보를 추적하는 내부 데이터 구조
      • 분산된 Node간 데이터 일관성을 위해 Single-Thread로 업데이트
      • Single-Thread이기 때문에 규모가 커지면 성능 저하

Shard

  • Cluster 내의 데이터를 분산 저장하기 위한 단위
  • Lucine Index( Inverted Index / 역색인 )이다.
    • Inverted Index : Key : value 가 아닌 value : key
    • 속도 / 메모리 측면에서 좋다
      • Update 시 reindex을 해야해서 Cost가 증가한다.
  • Index는 N개의 Primary/Replica Shard로 구성된다.
    • Replica : Failover을 위해 사용하는 것이 좋다
      • 너무 많아지면 성능 저하됨
      • replica 또한 write를 하기 때문
  • Shard 추가 시 Reindex를 해줘야한다.
    • segment가 정렬되어 있기 때문
  • 설정
    • Failover을 위해 50GB 이상의 Shard는 권장하지 않는다.
    • Heap은 최대 32GB를 권장
      • System Memory / 2를 권장한다.
      • 32GB를 넘기면 32bit -> 64bit으로 변경되기 때문
    • Heap 1GB당 최대 20개의 Shard를 넘기지 않는 것이 좋다.
    • Shard가 작으면 작은 Segment가 생성된다.
    • 용양 및 설정 출처

Segment

  • 문서의 빠른 검색을 위해 설계된 자료구조
  • N개의 Segment로 Shard를 구성한다.
  • 데이터 색인 시 메모리상에 존재하다, Segment에 기록하여 검색 가능한 Segment 생성
  • Shard에서 검색 시 Segment를 순차 검색 후 결과를 Shard에 전달
    • segment가 많아지면 검색 속도 저하
    • 순차 검색 및 오버헤드 발생
  • update/delete 시 삭제 표기 후 다른 포인터 지칭
    • 데이터는 지연 삭제
  • 특정 주기/ 용양 임계치에 다다르면 데이터 정리 및 병합
    • 이 과정에 실질적 데이터 삭제
    • Merge은 새로운 Segment를 생성하는 것
      • 많은 Resource 소요
      • 부하가 적을 때 수행하는 것을 권장
        • Optimize API를 사용하여 수행

개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

Hive?

MapReduce 코드를 HiveQl을 이용하여 간편화 시킨 쿼리 엔진

  • 배치성 작업을 실행시킬 때 주로 사용
    • MapReduce,Tez, Spark등을 Engine으로 사용 가능
  • HiveQl
    • SQL과 유사한 Query 제공
    • 0.14.0 이후 버전부터 Update/Delete 지원
    • Insert는 덮어쓰기, 빈 테이블에만 사용 가능
    • Select문에서 Having절은 사용 불가하다

버전별 차이

  1. Hive 1.X
    • SQL을 이용하여 MapReduce 가능
    • 파일 데이터를 논리적 표현
  2. Hive 2.X
    • LLAP구조 추가
      • LLAP : 작업을 실행한 데몬을 유지하여 핫 데이터 케싱을 통해 빠른 데이터 처리
    • HPLSQL 추가
      • HPL/SQL를 보면 사용법이 나온다.
      • Hive의 PL/SQL이라고 보면 된다.
    • 기본 실행엔진 MapReduce -> TEZ
  3. Hive 3.X
    • MapReduce엔진, Hive CLI,Hcat CLI, WebHCat삭제
      • TEZ엔진, BeeLine 사용

요소

Pasted image 20230925110547.png

Metastore

  • Hive 테이블 및 파티션의 모든 Meta 정보 및 HDFS 맵핑 정보 저장
    • Hive는 RDB처럼 Schema에 맞춰 데이터를 적재하는 것이 아닌 데이터를 적재 후 Schema를 입히는 방식
      Pasted image 20230925112308.png

DataBase

  • Embedded
    • Derby를 이용하여 사용
    • 1번에 1개의 프로세스만 접근가능
      • 단위 테스트 용도로만 사용!
  • Local / Remote
    • Mysql과 같은 외부 RDBMS를 사용

Server

  • Embeded / Local
    • Hive Client내에 Library처럼 사용되는 서버
    • 주로 HiveServer2와 함께 사용
  • Remote
    • Metastore가 독립 Server로 구동
      • Client는 Thrift 통신을 이용하여 접근
        • Thrift : 서버에  있는 서로 다른 언어로 작성된 함수 혹은 프로시저를 호출해서 사용
      • 위와 같이 구동되는 서버를 Hcat 서버라 부름

HCatalog

  • Grid Computing 상에서 다양한 Processing tools를 이용하여 Read/Write를 보다 쉽게 할 수 있게 만들어주는 관리 계층
    • Grid Computing : 원격 병렬 컴퓨팅. Cluster Compung을 WAN으로 연결시킨 상태
    • Hive 0.11.0부터 Hcatalog와 Hive가 통합됨
  • SerDe( Serializer-Deserializer )할 수 있는 모든 Read/Write를 지원
    • e.g csv, Json …
    • SerDe : Hive가 데이터를 해석하는 방법 row data -> hdfs , hdfs-> row data
  • WebHCat
    • HCataLog를 REST API로 제공하는 것

HiveServer2

  • Client가 HiveQl을 수행할 수 있게 해주는 서비스
  • Thrift ,JDBC,ODBC등의 API 제공
  • Thrift 기반으로 동작하여 Thrift Server라고도 부름

Beeline

  • JDBC를 사용하여 HiveServer2에 접근하는 SQLline 기반도구
    • SQLLine : Java기반으로 DB와 연결하여 SQL을 실행하는 명령줄 유틸리티
  • Thrift통신으로 접속하고 , Comman Line을 이용
  • XML, CSV, TSV로 출력할 수 있다.

Table

Hive에는 크게 2종류의 Table 형식이 있다.

  1. Managed Table( Internal Table )
    1. Location을 따로 설정하지 않은 Table
    2. 외부 위치가 아닌 서버 내에 Table저장
    3. Drop table/partition시 메타스토어 정보 및 데이터가 삭제됨
    4. 주로 임시테이블, 생명 주기 관리에 사용
  2. External Table
    1. Location을 따로 설정하는 Table
    2. 외부 위치에 정의하는 테이블
    3. 파일이 이미 존재 혹은 원격 위치에 사용 시 적용
      1. HDFS, S3…
    4. Drop table/partition시 메타스토어 정보 만 제거됨

Partition

  • Hive의 Partition은 HDFS에 Tree구조로 적재된다.
  • Partition은 순서에 따라 디렉토리가 구성되기 때문에 cardinality에 따라 순서를 잘 정해야함.
  • 테이블의 크기가 너무 커지면 모든 것을 Row를 읽는 것과 같이 불필요한 I/O가 발생할 수 있어 필수적으로 넣는 것을 추천
  1. 동적 Partition
    • 내부 Column을 기준으로 자동 생성하는 Partition
    • 기본값 : 200개, 최대값 : 10000+ 가능 참조
      • 너무 많은 동적 Partition은 성능 저하 야기
        • 2K이상이면 30초 쿼리가 발생 할 수 있다함.
    • e.g. : insert into table tb_sample2 partition ( year,month,day ) select userid,viewtime,year,month,day from tb_sample;
  2. 정적 Partition
    • Partition 정보를 직접 삽입
    • eg. insert into table tb_sample partition(year = 2021, month = 05, day =04) values(555,444);
-- 테이블 생성 ( 정적 파티선 )
Create table tb_sample(
	userid bigint ,
	viewtime int
)
partition by ( year int , month int , date int)

-- 레코드 삽입 < 정적 Partition >
insert into table tb_sample
partition(year = 2021, month = 05, day =07)
values(123,456);

-- 레코드 삽입 < 동적 Partition >
insert into table tb_sample2
partition ( year,month,day )
select userid,viewtime,year,month,day from tb_sample;

Pasted image 20230925125709.png

  • HDFS에서 위와 같은 형태로 생성된다.
  • Partition의 위치 정보는 Metastore 에 적재됨
  • PARTITIONS
    • 파티션 기본 정보
    • Pasted image 20230925125840.png
  • PARTITION_KEYS
    • Partition에서 사용된 Column리스트
    • Pasted image 20230925130003.png
  • PART_KEY_VALS
    • partition의 필드값
    • Pasted image 20230925130124.png
  • PART_PARAMS
    • partition의 속성 정보
    • Pasted image 20230925130214.png

Bucketing

  • Partition처럼 테이블의 데이터를 분할하는 방식
  • N개의 파일로 나누어 저장하는 것.
  • Bucketing을 하면 Join시 SMB Join으로 수행되어 속도 향상을 야기
    • SMB Join(Sort Merge Bucket Join): Bucketing된 키의 정보를 이용한 Join
CREATE TABLE tbl1(
  col1 STRING,
  col2 STRING
) CLUSTERED BY (col2) INTO 5 BUCKETS  
stored as orc  
location 'Path'  
tblproperties('orc.compress' = 'SNAPPY')

CREATE TABLE tbl2(
  col1 STRING,
  col2 STRING
) CLUSTERED BY (col2) SORTED BY (col1) INTO 5 BUCKETS
  • 위와 같이 CLUSTERED BY (칼럼) [SORTED BY (칼럼)] INTO 버켓수 BUCKETS 을 추가하여 설정

Skew

  • Hive에서 Skew는 데이터 쏠림 현상이라고 볼 수 있다.
    • Partition을 나눠놨는데, 특정 Partition에만 데이터가 몰려 있으면 특정 Partition만 비약적으로 커지는 현상이라고 보면 된다.
  • Skew는 Partition과 유사한 기능을 하지만, 특정 데이터만 나눠주는 효과를 낼 수 있다.
    • e.g 알파벳을 적재하는 테이블이 존재
      1. a,b가 다른 알파벳에 비해 많이 유입됨
      2. a,b를 Skew로 등록
      3. 테이블은 a , b , c-z로 적재 시작
CREATE TABLE tbl (
	col1 STRING,
	col2 STRING
) SKEWED BY (col1)on ('a', 'b' )
[ STORED as DIRECTORIES ]
location 'Path'  
;
  • 위와 같은 형식으로 사용 가능
    • STORED as DIRECTORIES : 디렉토리 구분 여부

개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

'프로그래밍 및 IT > Hive' 카테고리의 다른 글

Hive 레코드 HDFS 이전  (0) 2023.08.09

HadoopSpark를 공부하기 시작하면 가장 처음 나오는 것이 Map /Reduce이다
- Map /Reduce 설명은 Hadoop링크로 들어가 보면 설명을 해놓았다.
이것을 가장 이해하기 편하게 예시를 드는 것이 WordCount이다.

간단 예시

구현

아래와 같이 구현을 하고 , args 를 local[*] 텍스트파일 경로 적재 위치 순으로 넣어서 실행해보면 된다.

import org.apache.spark.rdd.RDD  
import org.apache.spark.{SparkConf, SparkContext}  
  
object wordCount {  
  def main(args: Array[String]): Unit = {  
    // Args 3개 필수  
    require(args.length == 3, "Usage : <Master> <Input> <Output>")  
  
    // Spark 설정  
    val conf = new SparkConf()  
      .setAppName("WorkCount")  
      .setMaster(args(0))  
      .set("spark.local.ip", "localhost")  
      .set("spark.driver.host", "localhost")  
  
    // SparkContext생성  
    val sc   = new SparkContext(conf)  
    val input  = readFile(sc, args(1))  
    val result = process(input)  
    saveFile(result, args(2))  
  
    sc.stop()  
  }  
  
  def readFile(sc: SparkContext, path: String): RDD[String] = {  
    sc.textFile(path) // 파일을 읽고  
  }  
  
  def process(input: RDD[String]): RDD[(String, Int)] = {  
    input.flatMap(str => str.split(" ")) //공백으로 분류  
         .map((_, 1)) // 각각 key : 1 로 만듦  
         .reduceByKey(_ + _) // value들을 key를 기준으로 합산  
  }  
  
  def saveFile(output: RDD[(String, Int)], path: String): Unit = {  
    output.saveAsTextFile(path) // 적재  
  }  
}
  • 위 코드를 실행시키면 디렉토리가 생성되고 안에는 여러 파일들이 존재한다
    • 그 중 part-00000를 출력해보면 WordCount가 잘 동작했는지 확인 가능하다
    • tail,head,cat 원하는 것으로 확인해 보면 된다.
C:\Apps\hadoop-3.1.2\output>cat part-00000
(Unless,4)
(works,,4)
...

테스트 코드

  • JUnit을 이용하면 아래와같이
import org.apache.spark.{SparkConf, SparkContext}  
import org.junit.jupiter.api.Test  
  
import scala.collection.mutable.ListBuffer  
  
class wordCountTest {  
  
  @Test  
  def test(): Unit = {  
    val conf = new SparkConf()  
      .setMaster("local[*]")  
      .setAppName("wordCountTest")  
      .set("spark.local.ip", "localhost")  
      .set("spark.driver.host", "localhost")  
    val sc   = new SparkContext(conf)  
    val input = new ListBuffer[String]  
    input += "License shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document."  
    input += "Licensor shall mean the copyright owner or entity authorized by the copyright owner that is granting the License"  
    input.toList  
  
    val inputRDD = sc.parallelize(input)  
    val result = wordCount.process(inputRDD)  
    val resultMap = result.collectAsMap()  
  
    assert(resultMap("shall") == 2)  
    assert(resultMap("copyright") == 2)  
    assert(resultMap("License") == 2)  
    println(resultMap)
  
    sc.stop()  
  }  
}
  • FlatSpec을 이용하면 아래와 같이 구현하면 된다.
import org.apache.spark.{SparkConf, SparkContext}  
import org.scalatest.flatspec.FlatSpec  
import org.scalatest.matchers.should.Matchers  
  
  
class workCountTest_Spec extends FlatSpec with Matchers {  
  val conf = new SparkConf()  
    .setMaster("local[*]")  
    .setAppName("wordCountTest")  
    .set("spark.local.ip", "localhost")  
    .set("spark.driver.host", "localhost")  
  val sc   = new SparkContext(conf)  
  
  "WorkCount Process" should "Correctly count words in the input" in {  
    val inputRDD = sc.parallelize(Seq(  
      "License shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document."  
      , "Licensor shall mean the copyright owner or entity authorized by the copyright owner that is granting the License"  
      ))  
    val result = wordCount.process(inputRDD)  
  
    val resultMap = result.collectAsMap()  
    println(resultMap)  
    resultMap("shall") shouldEqual 2  
    resultMap("copyright") shouldEqual 2  
    resultMap("License") shouldEqual 2  
//    resultMap("License") shouldEqual 1
  }  
  
}

성공 시 성공했다고만 하고
실패시 아래와 같이 에러 발생한다
!Pasted image 20231005140735.png
Pasted image 20231005140926.png

  • Intellij에서 구동하였고, build tool을 Gradle로 해서 그런지 :test ....를 못 찾는다고 에러가 발생했다.
    • 간단 해결방법 아래와 같이 setting을 설정해 주면 된다
      • Run Tests using 을 Gradle -> Intellij IDEA로 변경
    • 추가적으로 intellij IDEA로 하면 조금 더 빠르다는 이야기가 있다.
      Pasted image 20231005141130.png

[ Reference ]


개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

'프로그래밍 및 IT > Spark' 카테고리의 다른 글

spark-Setting  (0) 2023.11.28
spark-scheduling  (0) 2023.11.28
RDD Action  (0) 2023.10.13
RDD Transformation  (0) 2023.10.13
Spark_RDD  (0) 2023.10.05

RDD Action?

  • 간단히 설명하자면, 결과 값, 즉 return이 RDD가 아닌 다른 것으로 반환되는 연산이다.
  • RDD Transformation을 수행하게 만드는 도화선이라고 봐도 된다.
    • 즉 N개의 Transformation Method들이 쌓여도 수행하지 않다가 Action Method가 호출되면 그 때 순차적으로 수행한다

Output Function

기본적으로 Collection Type으로 반환하는 경우가 많기 때문에 너무 많은 개수를 반환하면, OOM 이 발생할 수 있다.

데이터 추출

// collect
characterRDD.collect.mkString("Array(", ", ", ")")
//Array(good,bad, good,bad, best,worst, plus,minus)

// first
numericRDD.first()
// 1

// take
numericRDD.take(2).mkString("Array(", ", ", ")")
// Array(1, 2)

// takeSample
numericRDD.takeSample(withReplacement = false, 3).mkString("Array(", ", ", ")")
numericRDD.takeSample(withReplacement = true, 3).mkString("Array(", ", ", ")")
// Array(1, 2, 3)
// Array(4, 2, 1)
  • collect
    • RDD를 Array[T]형태로 반환
    • 굳이 Mysql로 따지면 select라고 봐도 될듯하다.
  • first
    • 최초 1개의 값을 반환
    • mysql에서 limit 1이라고 생각해도 무방
  • take
    • 최초 값 부터 N개를 반환
    • mysql에서 limit n이라고 생각해도 무방
  • takeSample
    • sample과 동일하지만, 다른것은 RDD가 아닌 Collection Type으로 반환

Count

characterRDD.count  
// 4
characterRDD.countByValue
// Map(best,worst -> 1, good,bad -> 2, plus,minus -> 1)
  • count
    • 모든 요소 개수 반환
  • countByValue
    • 요소 별 개수

Reduce / Fold / aggregate

numericRDD.reduce(_ * _)
//reduce : 24
numericRDD.fold(1)(_ * _)
//fold : 24
numericRDD.fold(0)(_ * _)
//fold : 0
numericRDD.aggregate(zeroValue = Fruit(0, 0))(seqOp = (f: Fruit, v: Int) => f.add(v), combOp = (f1: Fruit, f2: Fruit) => f1.add(f2))
numericRDD.aggregate(zeroValue = Fruit(0, 0))(seqOp = _.add(_), combOp = _.add(_))
numericRDD.aggregate(zeroValue = Fruit(0, 0))(seqOp = _ add _, combOp = _ add _)
numericRDD.aggregate(Fruit(0, 0))(_ add _, _ add _)
// Fruit(10,4)
  • RDD의 요소에 관해 연산 후 결과값을 반환
    • reduce , fold 모두 병렬처리 하기 때문에 결합법칙을 만족해야한다
  • 결과 도출 사유
    • reduce
      • 초기 값 없이 보유한 요소 만으로 연산
      • 1 * 2 * 3 * 4 = 24
    • fold
      • 초기 값이 존재함
      • 1 * 1 * 2 * 3 * 4 = 24
      • 0 * 1 * 2 * 3 * 4 = 0
    • 위와 같은 이유로 코드 구현 시 잘 고민하고 구현해야 한다
  • aggregate
    • Fruit은 RDD Transformation 의 combineByKey / aggregateByKey 구현해 놓았다.
      • amount : Long => Int로 변경함
    • 인자별 의미
      • aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
      • zeroValue
        • 초기 값
      • seqOp
        • (U, T) => U
          • T : RDD의 요소들이 갖는 타입
        • 각 Partition에서 병합하는 함수
      • combOp
        • (U, U) => U
          • 인자 타입과 반환타입이 동일함
        • 모든 Partition들을 최종적으로 병합하는 함수
    • aggregate 은 scala에서 위 코드 순서대로 요약하여 사용할 수 있다.

Foreach / foreachPartition

numericRDD.foreach {  
  v => println(s"foreach values = $v")  
}  
  
numericRDD.foreachPartition {  
  println("foreachPartition partition")  
  values => for (v <- values) println(s"foreachPartition $v")
  • 모든 RDD요소에 특정 함수를 적용시키는 것.
    • 모든 요소를 순회
  • Map과 다른점은 Map은 반환값이 존재하지만, foreach는 반환값이 존재하지 않고 행위만 존재한다
  • foreach - foreachPartition 차이점
    • foreach는 요소단위로 수행
    • foreachPartition은 Partition단위로 수행
  • Driver에서 수행하는 것이 아닌 개별 노드에서 실행된다
    • 분산 처리 하기 때문에 의도한 것과 구현한 것이 동일한지 잘 고민 해봐야 한다

cache / persist / unpersist

numericRDD.persist()  
numericRDD.unpersist()

numericRDD.cache()  
numericRDD.persist(StorageLevel.MEMORY_ONLY)  

Spark RDD는 수행할 때 마다 연산을 통해 새로은 RDD를 사용한다
이 때 RDD를 재사용한다면, 해당 연산을 통새 생성된 RDD를 저장하는 것을 위 메소드로 구현한다

  • persist / unpersist
    • 메모리 / 디스크에 저장하는 메소드
    • persist 로 저장하고 unpersist 로 제거한다
    • 옵션으로 아래와 같은것들을 통해 메모리, 디스크, 메모리 + 디스크 에 적재 할지 고를 수 있다.
      • NONE , DISK_ONLY , DISK_ONLY_2 , MEMORY_ONLY , MEMORY_ONLY_2 , MEMORY_ONLY_SER , MEMORY_ONLY_SER_2 , MEMORY_AND_DISK , MEMORY_AND_DISK_2 , MEMORY_AND_DISK_SER , MEMORY_AND_DISK_SER_2 , OFF_HEAP
  • cache
    • 메모리에 RDD를 적재하는 메소드
    • persist(StorageLevel.MEMORY_ONLY) 와 동일하다
      • API따라 들어가다 보면 위와 같음

File IO

characterRDD.saveAsTextFile("file://<File_Path>/test01")  
characterRDD.saveAsTextFile("file://<File_Path>/test02", classOf[org.apache.hadoop.io.compress.BZip2Codec])  
  
characterRDD2.saveAsObjectFile("file://<File_Path>/obj01")  
val rdd = sc.objectFile[String]("file://<File_Path>/obj01")  
rdd.collect().foreach (x => println(x))  
  
pairRDD.saveAsSequenceFile("file://<File_Path>/seq01")  
val rdd2 = sc.sequenceFile[String, Long]("file://<File_Path>/seq01")  
println(rdd2.collect().mkString(","))
  • saveAsTextFile
    • 기본적으로 TestFile 형태로 적재하는 방식
    • 위에 작성한 것과 같이 saveAsTextFile(경로 , 코덱)
      • 코덱을 통해 원하는 압축 형식을 지정할 수 있다.
      • bzip2,lz4,snappy,zlib
  • saveAsObjectFile
    • Object형태로 적재하는 방식
    • Java/ Scala에서는 자바 표준 직렬화를 사용한다.
    • SequenceFile와 비교하여 상대적으로 느리다
  • saveAsSequenceFile
    • objectFile 과 동일하게 Binary 파일 포멧으로 저장
      • 차이점
        • Hadoop자체에서 정의한 직렬화 프레임워크 사용
        • Hadoop Writeable 인터페이스를 구현하고 있어야 한다.
        • Scala의 경우 saveAsObjectFile와 차이 없이 간단하게 사용 가능

공유 변수

Broadcast Variables

val broadcastChar = sc.broadcast(Set("good,bad", "plus,minus"))
  • Spark Job이 실행되는 동안 모든 노드에 공유하는 읽기 전용 자원
    • 모든 노드에 공유 즉 복제 되어 사용한다
      • Network 비용이 절감하여 성능 향상을 기대할 수 있다.
    • 해당 변수는 Heap영역에 저장된다
      • 사용하는 동안 메모리를 지속적으로 사용한다.
      • unpersist를 이용하여 제거할 수 있다.

Accumulators

val error_cnt = sc.longAccumulator("invalidFormat")  
val error_data = sc.collectionAccumulator[String]("invalidFormat2")  
val data = List("Log01:haha", "Log02:hoho", "Log03::", "Log04:aaa", "Log05||Addr5", "Log06::Log7")  
sc.parallelize(data, 3).foreach {  
  v =>  
    if (v.split(":").length != 2) {  
      error_cnt.add(1L)  
      error_data.add(v)  
    }  
}  
println("잘못된 데이터 수:" + error_cnt.value)  
println("잘못된 데이터:" + error_data.value)
// 잘못된 데이터 수:3
// 잘못된 데이터:[Log05||Addr5, Log06::Log7, Log03::]
  • 실제로 사용 해본 적이 없어 완벽하게 이해가 되지 않아 조금 더 이해해야 할 것 같다
  • Broadcast 와 반대로 쓰기 동작을 위한 작업
  • 여러 노드에서 수행되는 동작을 한곳에 적재하는 방법
    • 생성,초기화는 드라이버 프로그램에서 동작한다
    • 클러스터의 여러 Task에서 병렬로 쓰여질 수 있다.
    • Accumulator의 값을 증가시킬 수 있지만, 값을 참조해서 사용하는 것은 불가능하다.
  • 특별한 목적이 없으면, Action Function을 수행할 때 사용해야한다
    • 불필요한 반복이 발생할 가능성이 높기 때문
    • eg. map, flatmap과 같은 것을 동작하면 불필요하게 값을 증가시킬 가능성이 높다
  • 기본적으로 제공하는 Accumulators이 존재한다
    • long,collection,double, 등이 존재한다
  • 위 코드와 같이 만들면, 각 노드에서 병렬로 실행 시 문제가 발생한 Log를 확인할 수 있다.
  • 사용자 지정으로 생성 할 수 있다.
    • 아래와 같이 case class를 생성하여 사용할 수 있다.
    • Python에서는 register를 하지 않고 바로 사용할 수 있다
    • 주요 메소드
      1. isZero
        1. 초기 상태 여부 판단
        2. 아래와 같이 조건문 등을 이용하여 판단할 수 있다.
      2. reset
        1. 이름 그대로 초기 상태로 되돌리는 메소드
      3. add
        1. 값을 추가할 때 사용하는 메소드. 사용자 정의시 아래와 같이 case class의 메소드를 이용하여 동작할 수 있다.
        2. 값 추가는 Accumulator 내부에서 동작한다.
      4. copy
        1. 해당 Accumulator를 복제하는 메소드
        2. 미리 데이터를 복제하여 안전하게 데이터를 읽을 때 혹은 병렬로 수정할 때 사용
      5. merge
        1. n+1개의 Accumulator를 병합하는 메소드
          1. n+1개의 테스크에서 생성된 Accumulato를 하나의 Accumulator로 병합할 때 사용
      6. value
        1. Accumulator의 값을 반환하는 메소드
package main.RDD_action  
  
import io.netty.handler.logging.LogLevel  
import org.apache.spark.util.AccumulatorV2  
import org.apache.spark.{SparkConf, SparkContext}  
  
case class Fruit(var price: Long, var amount: Long = 1) {  
  def add(p: Long): Fruit = add(Fruit(p))  
  def add(other: Fruit): Fruit = {  
    this.price += other.price  
    this.amount += other.amount  
    this  
  }  
}  
  
class fruitAccumulator extends AccumulatorV2[Fruit, (Long,Long)] {  
  private var acc = Fruit(0)  
  override def isZero: Boolean = acc.price == 0L && acc.amount == 1L  
  override def copy(): AccumulatorV2[Fruit, (Long,Long)] = {  
    val newAcc = new fruitAccumulator  
    newAcc.acc = Fruit(acc.price, acc.amount)  
    newAcc  
  }  
  override def reset(): Unit = {  
    acc.price = 0L  
    acc.amount = 1L  
  }  
  override def add(other: Fruit): Unit = acc.add(other)  
  override def merge(other: AccumulatorV2[Fruit, (Long,Long)]): Unit = other match{  
    case o: fruitAccumulator => acc.add(o.acc)  
    case _                   => throw new RuntimeException()  
  }  
  override def value: (Long,Long) = {  
    (acc.price, acc.amount)  
  }  
}  
  
object fruitAccumulator {  
  def main(args: Array[String]): Unit = {  
    // Spark 설정  
    val conf = new SparkConf()  
      .setAppName("fruitAccumulator")  
      .setMaster("local[*]")  
      .set("spark.local.ip", "localhost")  
      .set("spark.driver.host", "localhost")  
  
    // SparkContext생성  
    val sc = new SparkContext(conf)  
    sc.setLogLevel(LogLevel.WARN.name())  
  
    val acc = new fruitAccumulator  
    sc.register(acc , "fruitAccumulator")  
    val data =List("Fruit:2000", "Fruit02:3000", "Fruit03::", "Fruit04:2000", "Fruit05||4000", "Fruit06::Log7")  
    sc.parallelize(data,3).foreach {  
      v =>  
//        println(s"v : $v ")  
        if(v.split(":").length != 2){  
          println(s"added v : $v ")  
          acc.add(Fruit(2))  
        }  
    }  
    println(acc.value)  
  }  
}

[ Reference ]


개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

'프로그래밍 및 IT > Spark' 카테고리의 다른 글

spark-scheduling  (0) 2023.11.28
WordCount  (0) 2023.10.13
RDD Transformation  (0) 2023.10.13
Spark_RDD  (0) 2023.10.05
Spark  (0) 2023.10.04

RDD Transformation?

  • 간단히 설명하자면, 결과 값, 즉 return이 RDD로 반환되는 연산이다.
  • RDD들은 지속적으로 기록만 하다가, RDD Action을 수행할 때 연쇄적으로 수행된다.
    • lazy evaluation이다.

사용 변수들

val numericRDD     = sc.parallelize(1 to 4, 3)  
val characterRDD   = sc.parallelize(Seq("good,bad", "good,bad", "best,worst", "plus,minus"), 3)  
val characterRDD2  = sc.parallelize(Seq("good,bad", "good,bad", "plus,minus"), 3)  
val pairRDD        = characterRDD.zipWithIndex.map {case (key, value) => (key, value + 2) }  
val pairRDD2       = characterRDD.zipWithIndex.map {case (key, value) => (key, value + 10) }  
val pairRDD3       = characterRDD.zipWithIndex.map {case (key, value) => (value + 1, key) }  
val numericPairRDD = sc.parallelize(for (i <- 1 to 10) yield (scala.util.Random.nextInt(1000), "-"))
  • 기본적으로 위와 같이 RDD생성을 함

Map

Map

/*map*/
println(characterRDD.map(_.split(",")).collect().map(_.mkString("{", ",", "}")).mkString("(", ",", ")"))  
//  ({good,bad},{best,worst},{plus,minus})

/*mapValues*/
println(pairRDD.mapValues(num => num + 1).collect().mkString(", "))  
// (good,bad,3), (good,bad,4), (best,worst,5), (plus,minus,6)
  • map 실행 순서
    1. characterRDD.map(_.split(","))
      1. “good,bad”, “good,bad”, “best,worst”, “plus,minus” 를 “,” 로 분할
      2. seq((“good”,“bad”), (“good”,“bad”), (“best”,“worst”), (“plus”,“minus”))
    2. map(_.mkString("{", ",", "}")) 를 이용하여 문자열생성
      1. seq({“good”,“bad”}, {“good”,“bad”}, {“best”,“worst”}, {“plus”,“minus”})
    3. .mkString("(", ",", ")"))를 이용하여 최종 출력 모양 생성
      1. ({“good”,“bad”}, {“good”,“bad”}, {“best”,“worst”}, {“plus”,“minus”})
  • mapValues
    • Map과 동일하게 처리하는데, Value의 값을 조작하고 싶을 때 사용.
    • 위 코드는 각 value에 1을 더하는 연산이다

Flatmap

Monad 개념인데, 이것은 추후 공부할 예정

/*flatMap*/
println(characterRDD.flatMap(_.split(",")).collect().mkString("(", ",", ")"))  
//  (good,bad,best,worst,plus,minus)  

/*flatMapValues*/
val pairRDD3 = characterRDD.zipWithIndex.map {case (key, value) => (value + 1, key) }  
println(pairRDD2.flatMapValues(_.split(",")).collect().mkString(", "))  
// (1,good), (1,bad), (2,good), (2,bad), (3,best), (3,worst), (4,plus), (4,minus)
  • flatmap 실행 순서
    1. characterRDD.flatMap(_.split(","))
      1. 모든 값들을 풀어서 ","로 분할
      2. seq(“good”,“bad”,“good”,“bad”,“best”,“worst”,“plus”,“minus”)
    2. .mkString("(", ",", ")"))를 이용하여 최종 출력 모양 생성
      1. (“good”,“bad”,“good”,“bad”,“best”,“worst”,“plus”,“minus”)
  • flatMapValues
    • flatMap과 동일하게 처리하는데, Value의 값을 조작하고 싶을 때 사용.
    • key를 기준으로 value를 나누는 연산
      • Seq(1,“good,bad”), (2,“good,bad”), (3,“best,worst”), (4,“plus,minus”) 를
      • (1,good), (1,bad), (2,good), (2,bad), (3,best), (3,worst), (4,plus), (4,minus)로 변환

mapPartitions

println(  
  numericRDD.mapPartitions(nums => {  
	println("MapPartition loop")  
	nums.map(num => num * 2)  
  }).collect().mkString(", ")  
  )  
// MapPartition loop
// MapPartition loop
// MapPartition loop
// mapPartitions
// 2, 4, 6, 8

// 짝수만 출력
println(  
  numericRDD.mapPartitionsWithIndex((idx, nums) => {  
	println("mapPartitionsWithIndex loop")  
	nums.flatMap {  
	  case num if idx % 2 == 0 => Option(num * 2)  
	  case _                   => None  
	}  
  }).collect().mkString(", ")  
  )  
// mapPartitionsWithIndex loop
// mapPartitionsWithIndex loop
// mapPartitionsWithIndex loop
// 2, 6, 8
  • Partition를 기준으로 Map을 수행함
    • 3개의 Partition이 존재하면 3번 순회한다.
  • mapPartitions vs mapPartitionsWithIndex
    • 간단히 말하면 둘다 Partition 기준으로 순회한다
    • 차이점 : mapPartitionsWithIndex은 각 Row에 Index를 붙여서 반환한다.

Group

Zip

println(characterRDD.zip(numericRDD).collect().mkString("(", ",", ")"))  
//   ((good,bad,1),(good,bad,2),(best,worst,3),(plus,minus,4))  
  
println(characterRDD.zipPartitions(numericRDD, numericRDD) {  
  (key, num, num2) => for {  
    key_result <- key  
    num_result <- num  
    num_result2 <- num2.map(_ + 1)  
  } yield (key_result, num_result, num_result2)  
}.collect().mkString("(", ",", ")"))  
//  ((good,bad,1,2),(good,bad,2,3),(best,worst,3,4),(best,worst,3,5))  
  • zip
    • 2개의 RDD를 Key : value 로 묶어줌
    • 1 : 1 로 같은 인덱스 위치에 있는 것끼리 묶이기 때문에 길이가 및 파티션 수가 동일해야 함
  • zipPartitions
    • Partition을 요소로 사용하여 새로운 파티션을 생성
    • spark 2.4에서는 인자로 RDD 4개까지 가능
    • Partition 수가 동일해야함
numericRDD.groupBy {  
  case num: Int if num % 2 == 0 => "even"  
  case _                        => "odd"  
}.collect() foreach {  
  row => println(s"${row._1}, [${row._2.mkString(",")}]")  
}  
//      even, [2,4]  
//      odd, [1,3]  
  
pairRDD.groupByKey().collect() foreach {  
  row => println(s"${row._1}, [${row._2.mkString(",")}]")  
}  
//    best,worst, [3]  
//    good,bad, [1,2]  
//    plus,minus, [4]  
  
val rdd1   = sc.parallelize(Seq(("apple", 100), ("banana", 500), ("apple", 200)))  
val rdd2   = sc.parallelize(Seq(("banana", 400), ("apple", 300)))  
val result = rdd1.cogroup(rdd2)  
result.collect.foreach {  
  case (k, (rdd1_val, rdd2_val)) =>  
    println(s"($k, [${rdd1_val.mkString(",")}], [${rdd2_val.mkString(",")}])")  
}  
//    (apple, [100,200], [300])  
//    (banana, [500], [400])
  • groupBy
    • 조건에 의한 Grouping
  • groupByKey
    • Key값을 기준으로 grouping
  • cogroup
    • Key를 기준으로 각 RDD에 값을 묶음
    • key , rdd1의 values, rdd2의 values

Set

Distinct

println(  
  s"""distinct  
     |${characterRDD.distinct.collect.mkString(", ")}  
     |""".stripMargin  
  )  
//    best,worst, good,bad, plus,minus  
  • 이름 그대로 중복을 제거하는 역할

cartesian

println(  
  s"""cartesian  
     |${characterRDD.cartesian(numericRDD).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//    (good,bad,1), (good,bad,2), (good,bad,3), (good,bad,4),(good,bad,1), (good,bad,2), (good,bad,3), (good,bad,4),(best,worst,1), (plus,minus,1), (best,worst,2), (plus,minus,2), (best,worst,3), (best,worst,4), (plus,minus,3), (plus,minus,4)  
  • 카르테시안 곱
  • 모든 경우의 수를 조합하는 것이라고 보면된다.
  • Sql에서 Join 조건을 주지 않은 경우, Cross Join

subtract

println(  
  s"""subtract  
     |${characterRDD.subtract(characterRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//    best,worst  
  • rdd1에 존재하지만, rdd2에 존재하지 않은 값들을 RDD로 생성하여 반환
  • 예시
    • characterRDD : "good,bad", "good,bad", "best,worst", "plus,minus"
    • characterRDD2 : "good,bad", "good,bad", "plus,minus"
    • characterRDD에 “best,worst” 가 존재하지만 characterRDD2 에는 존재하지 않음
      • best,worst 반환

Union

println(  
  s"""union  
     |${characterRDD.union(characterRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//    good,bad, good,bad, best,worst, plus,minus, good,bad, good,bad, plus,minus  
  • 2개의 RDD를 병합하는 것
  • 자료형이 동일해야함

intersection

// 교집합을 구하는것  
println(  
  s"""intersection  
     |${characterRDD.intersection(characterRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//    good,bad, plus,minus 

Join

println(  
  s"""join  
     |${pairRDD.join(pairRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//(best,worst,(3,12)), (good,bad,(1,10)), (good,bad,(1,11)), (good,bad,(2,10)), (good,bad,(2,11)), (plus,minus,(4,13))  
  

println(  
  s"""leftOuterJoin  
     |${pairRDD.leftOuterJoin(pairRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
// (best,worst,(3,Some(12))), (good,bad,(1,Some(10))), (good,bad,(1,Some(11))), (good,bad,(2,Some(10))), (good,bad,(2,Some(11))), (plus,minus,(4,Some(13)))  
  

println(  
  s"""leftOuterJoin  
     |${pairRDD.subtractByKey(pairRDD2).collect.mkString(", ")}  
     |""".stripMargin  
  )  
//(best,worst,3)
  • Join
    • Key값을 기준으로 Join연산
    • (key , (rdd01 value, rdd2 value)) 조합으로 모든 값 Join
  • leftOuterJoin / rightOuterJoin
    • left join 연산을 한다. join결과값이 없을 수 있음으로 Option 타입으로 반환한다.
  • subtractByKey
    • rdd01.subtractByKey(rdd2)
    • rdd01가 rdd2에 없는 key만 추출

Aggregate

reduceByKey / foldByKey

    println(  
      s"""reduceByKey  
         |${pairRDD.reduceByKey(_ * _).collect().mkString(", ")}  
         |""".stripMargin  
      )  
    // (best,worst,4), (good,bad,6), (plus,minus,5)  

    println(  
      s"""foldByKey  
         |${pairRDD.foldByKey(1)(_ * _).collect().mkString(", ")}  
         |""".stripMargin  
      )  
    // (best,worst,4), (good,bad,6), (plus,minus,5)  
  • reduceByKey
    • Key값을 기준으로 value에 주어진 연산 수행
    • 기본 값이 존재하지 않기 때문에 교환 법칙 및 결합 법칙을 만족해야한다.
      • 교환 법칙 : 상수의 위치가 변경돼도 가능한 것.
        • 1 + 2 == 2 + 1 ( OK )
        • 1 / 2 != 2 / 1 ( X )
      • 결합 법칙 : 연산의 순서가 변경돼도 가능한 것.
        • 간단히 말하면 임의로 괄호로 묶어 연산해도 동일한 결과
        • 1 + 2 + 3 == 1 + (2 + 3) ( OK )
        • 1 + 2 * 2 != ( 1 + 2 ) * 2 ( X )
    • 위 결과 도출 방법
      • best,worst : 4
      • good,bad : 2 * 3
      • plus,minus : 5
        foldByKey
    • Key값 기준으로 Value연산
    • reducebykey와 동일하지만, 차이점은 기본 값이 있는 것이다.
    • 위 결과 도출 방법
      • best,worst : 1 * 4
      • good,bad : 1 * 2 * 3
      • plus,minus : 1 * 5
종류 교환법칙 결합법칙 초기값
reduceByKey o o x
foldByKey x o o

combineByKey / aggregateByKey

case class Fruit(var price: Long, var amount: Long = 1) {  
  def add(p: Long): Fruit = {  
	add(Fruit(p))  
  }  
  def add(other: Fruit): Fruit = {  
	this.price += other.price  
	this.amount += other.amount  
	this  
  }  
}  

val fruit_rdd = sc.parallelize(Seq(("Apple", 1000L), ("banana", 2000L), ("Apple", 1500L), ("banana", 2600L), ("banana", 1300L), ("Apple", 3000L)))  
println(  
  s"""combineByKey  
	 |${  
	fruit_rdd.combineByKey(  
	  createCombiner = (p: Long) => Fruit(p)  
	  , mergeValue = (f: Fruit, v: Long) => f.add(v)  
	  , mergeCombiners = (f1: Fruit, f2: Fruit) => f1.add(f2)  
	  ).collect().mkString(", ")  
  }  
	 |""".stripMargin  
  )
println(  
  s"""combineByKey  
	 |${  
	fruit_rdd.aggregateByKey(Fruit(3000, 0))(  
	  (f: Fruit, v: Long) => f.add(v)  
	  , (f1: Fruit, f2: Fruit) => f1.add(f2)  
	  ).collect().mkString(", ")  
  }  
	 |""".stripMargin  
  )  
  • combineByKey
    • 새로운 타입으로 반환 가능하게 만듦
    • 아래 예시를 통해 Fruit 이라는 Case Class로 반환하는 예시
    • createCombiner
      • 연산을 위해 컴파이너를 생성하는 것.
      • 개인적으로 이해한 바는 타입 변경하여 초기 값을 생성하는 작업
    • mergeValue
      • Combiner가 존재하면 추후 실행할 연산, 이 연산을 통해 값이 누적되어 쌓인다
      • 동을한 key를 가진 값들을 병합
    • mergeCombiners
      • 병합이 끝난 Combiner들을 최종적으로 병합한 Combiner를 생성
      • 다른 Partition에 존재하는 결과 값을 병합
    • 조금 더 알아봐야겠다.
    • 위 예시 결과 값 도출 사유
      • 결과 값 : (banana,Fruit(5900,3)), (Apple,Fruit(5500,3))
      • banana : 2000L + 2600L + 1300L , count = 3
      • Apple : 1000L + 1500L + 3000L , count = 3
  • aggregateByKey
    • 초기값이 존재하는 combineByKey
      • 즉 createCombiner를 생성 후 수행하는 CombineByKey
    • 위 예시 결과 값 도출 사유
      • 결과 값 : (banana,Fruit(14900,3)), (Apple,Fruit(14500,3))
      • banana : (3000 + 2000L) + (3000 + 2600L) +(3000 + 1300L) , count = 3
      • Apple : (3000 + 1000L) + (3000 + 1500L) +(3000 + 3000L) , count = 3

Pipe

println(  
  s"""pipe
	 |${characterRDD.pipe("cut -d , -f 2").collect().mkString(", ")}
	 |""".stripMargin      )
// bad, bad, worst, minus  
  • 결과 값을 bshell 등 외부 프로세스의 인자 값으로 실행시킴

값추출

println(  
  s"""  
	 |keys : ${numericPairRDD.keys.collect.mkString(", ")}  
	 |value: ${numericPairRDD.values.collect.mkString(", ")}  
	 |""".stripMargin  
)  
//    keys : 776, 377, 559, 275, 818, 457, 173, 248, 135, 679  
//    value: -, -, -, -, -, -, -, -, -, -  
println(  
  s"""  
	 |중복 불가 : ${numericPairRDD.sample(withReplacement = false,fraction = 0.7).collect.mkString(", ")}  
	 |중복 허용 : ${numericPairRDD.sample(withReplacement = true,fraction = 0.7).collect.mkString(", ")}  
	 |""".stripMargin  
)  
//    중복 불가 (306,-), (376,-), (630,-), (417,-), (439,-), (738,-), (691,-), (435,-)
//    중복 허용 (210,-), (376,-), (630,-), (417,-), (417,-), (439,-), (435,-)
  • keys
    • key값 반환
  • values
    • value값을 반환
  • sample
    • 데이터 샘플링.
      • withReplacement : 데이터 중복 가능 여부
      • fraction : 전체 중 n% 추출 (0 ~ 1 사이 값)

filter

// 이름 그대로 filter조건에 부합하는 것을 걸러줌  
println(  
  s"""filter  
	 |${numericRDD.filter(_ > 2).collect.mkString(", ")}  
	 |""".stripMargin)  
// 3, 4  
// 1,2,3,4 중 3,4만 출력  
  • 값 필터링
    • SQL의 WHERE절이라고 생각하면 편하다

SortByKey

// Key값을 기준으로 정렬시켜줌  
println(  
  s"""sortByKey  
	 |${numericPairRDD.sortByKey().collect.mkString(", ")}  
	 |""".stripMargin  
)  
// (135, -), (173, -), (248, -), (275, -), (377, -), (457, -), (559, -), (679, -), (776, -), (818, -)  
  • Key값을 기준으로 정렬

Partition / Coalesce

println(  
  s"""repartition  
	 |default = ${characterRDD.getNumPartitions}  
	 |coalesce = ${characterRDD.coalesce(1).getNumPartitions}  
	 |repartition = ${characterRDD.repartition(5).getNumPartitions}  
	 |""".stripMargin  
	 // default = 3
     // coalesce = 1
     // repartition = 5
  )  
  • Partition 수를 증감시킴
    • repartition 는 Shuffle이 발생하면서 partition 재분배
      • Shuffle이 발생하지 않아 Network 통신을 하여 상대적으로 느리다
        • 데이터의 Parition 분배가 될 수 있다.
      • partition 수 증감 모두 가능
    • coalesce 는 shuffle이 발생하지 않고 partition 재분배
      • Shuffle이 발생하지 않아 Network 통신을 하지 않아 상대적으로 빠르다
        • 데이터의 Parition 분배는 일어나지 않는다
        • 간단히 생각하면 동일 Node에 있는 Partition들을 Union 한다고 생각하면 편하다
      • partition 수 감소만 가능

partitionBy / repartitionAndSortWithinPartitions

numericPairRDD.partitionBy(new HashPartitioner(3))  
			  .foreachPartition(it => {  
				println(s"-------")  
				it.foreach(v => println(v))  
			  })  
 
numericPairRDD.repartitionAndSortWithinPartitions(new HashPartitioner(3))  
			  .foreachPartition(it => {  
				println(s"==========")  
				it.foreach(v => println(v))  
			  })  
  • 공통
    • RDD를 N+1개의 Partition으로 특정 기준(Partitioner)에 따라 분배 해준다
      • Partitioner : Partition을 나누는 역할을 하는 객체
        • HASH Partitioner
          • Hash값을 이용하여 분할
        • RANGE Partitioner
          • 범위 혹은 순서로 분할
    • (key - value)로 구성되어 있어야 한다.
      • key 값을 기준으로 분리한다
    • 지정한 Partitioner를 기준으로 분배하기 때문에, Skew현상이 발생할 수 있다.
      • Skew : 데이터 쏠림 현상
      • 위 예시를 실행할 때 아래와 같이 데이터 Skew현상이 발생할 수 있다.
partition partitionBy repartitionAndSortWithinPartitions
partition 3 (89,-) (150,-)
partition 3 (780,-) (89,-)
partition 3 (622,-) (175,-)
partition 3 (548,-) (134,-)
partition 3 (150,-) (530,-)
partition 3 (646,-) (548,-)
partition 3 (134,-) (780,-)
partition 3 (175,-) (622,-)
partition 3 (966,-) (966,-)
partition 3 (530,-) (646,-)

[ Reference ]


개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

'프로그래밍 및 IT > Spark' 카테고리의 다른 글

spark-scheduling  (0) 2023.11.28
WordCount  (0) 2023.10.13
RDD Action  (0) 2023.10.13
Spark_RDD  (0) 2023.10.05
Spark  (0) 2023.10.04

+ Recent posts