Docker?

서비스를 패키징 및 배포를 하는데 유용한 오픈소스 프로그램
즉 , 컨테이너를 모듈식 가상 머신처럼 사용 가능하게 만들어주는 프로그램
카카오 컨퍼런스 이것을 보면 너무 컨테이너 관련 설명을 잘해주신다

기존 Linux Container과 차이점

Pasted image 20231122151223.png
Red Hat

위 사진을 보면 LXC는 간단히 설명하면 VM과 같은 머신을 이용하여 가상 환경을 띄우는 것 인데,
VM은 각 환경마다 OS 를 구축을 해야하지만, Docker은 Docker Engine 상에 프로세스를 띄워 사용하는 방식이기 때문에 보다 가벼워 졌다고 볼 수 있다.
과거에 Vagrant로 Hadoop EchoSystem을 구축해본 경험이 있는데 상당이 느리게 구축하였다, 하지만 Docker를 이용하니 상당히 빠른시간에 Build 및 구축이되서 상당히 당황스러웠다.

장점

  • 위와 같이 각각의 App, Service? 단위로 독립된 공간에 존재하여 서로 충돌이 발생하지 않게 구성하기 용이하다.
    • 마이크로 아키텍쳐 구성을 하기 용이하다.
  • 배포 시 이미지를 그대로 운영 서버에 배포하면 된다.
  • 같은 이미지를 사용하면, 같은 환경이라 봐도 무방하기 때문에 환경에 대한 제약이 적어져 배포, 확장이 용이해진다.

기본 준비 사항

window 기본적으로 Docker는 linux kernel 기반으로 동작을 하는 것으로 알고 있다.
Window에서 wsl 설정을 하여 우분투 환경을 구축 하고 docker desktop을 설치 혹은 docker를 설치하여 사용할 수 있다.
Docker Doc 혹은 MS Doc에 들어가서 따라하면 쉽게 할 수 있을 것 이다.

Docker 를 설치하였다면, Dockerfile혹은 docker-compose을 이용하여 이미지 빌드, 배포를 하여 사용할 수 있다.

[ Reference ]


개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

'프로그래밍 및 IT > docker' 카테고리의 다른 글

docker-compose  (1) 2023.11.27

Docker를 이용하여 airflow설치

airflow 에서 docker-compose.yaml을 가져온다

  • curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.7.2/docker-compose.yaml' 를 이용하여 가져오든 다운로드 받든 상관 없다.
  • 사용할 공유 디렉토리 생성
    • mkdir -p ./dags ./logs ./plugins ./config 를 이용하여 생성하든, 하나 씩 만들든 상관 없다
  • 사용할 env 파일 생성해준다
    • 공홈에서 다운 받은 yaml을 보면 설정할 수 있는 변수 값들이 있다. 필요에 따라 설정하자
  • 초기 설정
    • docker compose up airflow-init 를 이용하여 작성한 변수 값 등을 적용한 airflow 초기화
  • 설치 검증
    • docker compose run airflow-worker airflow info 를 이용하여 설치 검증
  • 실행
    • docker compose up -d 를 이용하여 도커 실행
      • -d를 붙인 이유는 background로 실행시키기 위해 추가함

직접설치

설치하고자 하는 airflow의 버전에 맞춰 python 설치 후 아래 명령어 실행
sqlite도 필요하니 그것도 설치해준다.
Quick Start를 따라하면 된다.

# sqlite도 경로 설정해준다.
export LD_LIBRARY_PATH=$sqlite_HOME/lib:$LD_LIBRARY_PATH
export AIRFLOW_HOME=~/airflow
AIRFLOW_VERSION=2.7.2

# Python 버전 설정
# 아래와 같이 shell명령어 사용하지 말고, 직접 삽입해도  됨.
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"

# 아래와 같이 constraint명령어를 성성
# E.g
# airflow 2.7.2 버전은 Python 3.8, 3.9, 3.10, 3.11 을 지원한
# https://raw.githubusercontent.com/apache/airflow/constraints-2.7.2/constraints-3.8.txt
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

# 유저 생성, 초기
airflow db migrate

airflow users create \
    --username admin \ # 접속 id라고 보면된다
    --firstname Peter \ 
    --lastname Parker \
    --role Admin \ # 역할 
    --password admin \ # 접속 비밀번호
    --email spiderman@superhero.org

# 실행
airflow webserver --port 8080 -D

airflow scheduler -D

에러

PermissionError: [Errno 13] Permission denied: 'airflow'
    self._execute_child(args, executable, preexec_fn, close_fds,
  File "/home/airflow/python_env/lib/python3.9/subprocess.py", line 1837, in _execute_child
    raise child_exception_type(errno_num, err_msg, err_filename)
PermissionError: [Errno 13] Permission denied: 'airflow'
    raise child_exception_type(errno_num, err_msg, err_filename)
PermissionError: [Errno 13] Permission denied: 'airflow'

만약 위와 같은 에러가 발생하면 권한 문제긴 하지만 ln -s airflow경로 /usr/bin/airflow 와 같이 링크를 걸지 않아 발생한 것 일 수 있다.

[ Reference ]


개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

Elasticsearch?

대량의 데이터를 NRT ( Near Real Time )으로 저장, 검색, 분석 할 수 있는 루씬 기반 Java 오픈소스 분산 검색 엔진

특징

Scale Out

Shard를 이용하여 수평적으로 늘어날 수 있다.

HA

Replica를 통해 데이터 안전성 보장

Schema Free

Json을 통해 데이터 검색

Restful

CRUD 작업을 HTTP Restful API를 통해 수행

Inverted Index

텍스트를 파싱하여 검색어 사전 생성

  • 속도가 빠르다.

요소별 역할

Node

Master

  • ES의 Instance 단위
    • Instance는 각자의 역할 존재
  • Node.roles를 이용하여 적절한 역할을 사용하는 것이 좋다.
    1. Tribe Node
      1. Cluster간 연결을 해주는 역할
        1. 요청에 따라 서로 다른 Cluster에 데이터를 분리해서 보냄
    2. Remote-eligible Node
      1. remote cluster clinet 역할
        1. 원격 cluster과 연결시켜주는 역할
      2. cross cluster를 remote client로 사용 가능한 노드
        1. croess cluster : 원격 cluster간 백업/검색 등을 하는 작업
    3. Machine Learning Node
      1. ML역할
        1. ES로 ML작업 시 필요

Data

  • Data를 적재/연산하는 Node
    • node.data=true를 설정에 삽입해주면 된다.
    • 부하 분배를 위해 Master과 분리하는 것을 권장
  • Content Datan Node
    • CRUD, 집계 등의 기능을 가짐
      • 기본적인 Data
이름 압축 성능 비고
Hot Data Node 4 1 사용 빈도가 많고 및 성능이 우선이기 때문에 SSD 권장
Warm Data Node 3 2 Hot보다 적은 빈도로 조회하는 노드, HDD권장
Cold Data Node 2 3 읽기 빈도가 적은 인덱스, 읽기 전용
Frozen Data Node 1 4 Shared_cache 옵션만으로 마운트 된 검색 가능한 Snapshot 을 저장

Ingest

  • Text 변환 및 전처리 담당.
  • Ingest pipeline을 Text에 적용하여 인덱싱 하기 전에 사용
    • indexing 이전에 Document를 변환하는 작업
  • Master Node에서 수행하는 Ingest를 대신 수행
    • Master Node의 부하 분산
    • 색인 작업이 많을경우 사용

Coordinating Only

  • Client와 통신만 하는 역할
    • Load Balancing 수행
    • 만약 Coordinate Node가 없으면 Datanode가 해당 역할 수행
    • 너무 많으면 서버에 부하
    • 발생
      • Coordinating Only 또한 Master Node가 알림을 기다리기 때문
    1. client의 cluster관련 요청을 Master Node에 전달
    2. client의 Data관련 요청을 data Node에 전달

Server

물리적 서버에는 N개이 Node를 사용할 수 있다.

  • Server 기준 Port
  • Node - Client
    • Port범위 : 9200 ~9299
    • 순차적으로 부여
  • Node - Node
    • Port 범위 : 9300 ~ 9399
    • 순차적으로 부여
  • Cluster 묶기
    • cluster.name을 동일한 이름으로 묶어준다
      • 이름이 다르면 같은 Network여도 논리적으로 분리됨
    • Cluster : 1개의 시스템이라고 보면됨.
      • 데이터가 공유되는 서버의 묶음

Index

  • Document의 집단
  • Index는 색인과 명칭과 혼동될 수 있어 Indices라고도 부른다.
  • 주로 REST API를 통해 CRUD 작업 수행
  • 실질적으로 Index는 Shard를 가리키는 논리적 Namespace
  • index는 1+N개의 Shard로 구성
  • Mapping 및 State 정보를 Cluster State에 저장
    • Mapping : Index 내부의 Document가 어떻게 저장될지 정의하는 것
      • Index별 설정 가능
    • cluster State : 정보를 추적하는 내부 데이터 구조
      • 분산된 Node간 데이터 일관성을 위해 Single-Thread로 업데이트
      • Single-Thread이기 때문에 규모가 커지면 성능 저하

Shard

  • Cluster 내의 데이터를 분산 저장하기 위한 단위
  • Lucine Index( Inverted Index / 역색인 )이다.
    • Inverted Index : Key : value 가 아닌 value : key
    • 속도 / 메모리 측면에서 좋다
      • Update 시 reindex을 해야해서 Cost가 증가한다.
  • Index는 N개의 Primary/Replica Shard로 구성된다.
    • Replica : Failover을 위해 사용하는 것이 좋다
      • 너무 많아지면 성능 저하됨
      • replica 또한 write를 하기 때문
  • Shard 추가 시 Reindex를 해줘야한다.
    • segment가 정렬되어 있기 때문
  • 설정
    • Failover을 위해 50GB 이상의 Shard는 권장하지 않는다.
    • Heap은 최대 32GB를 권장
      • System Memory / 2를 권장한다.
      • 32GB를 넘기면 32bit -> 64bit으로 변경되기 때문
    • Heap 1GB당 최대 20개의 Shard를 넘기지 않는 것이 좋다.
    • Shard가 작으면 작은 Segment가 생성된다.
    • 용양 및 설정 출처

Segment

  • 문서의 빠른 검색을 위해 설계된 자료구조
  • N개의 Segment로 Shard를 구성한다.
  • 데이터 색인 시 메모리상에 존재하다, Segment에 기록하여 검색 가능한 Segment 생성
  • Shard에서 검색 시 Segment를 순차 검색 후 결과를 Shard에 전달
    • segment가 많아지면 검색 속도 저하
    • 순차 검색 및 오버헤드 발생
  • update/delete 시 삭제 표기 후 다른 포인터 지칭
    • 데이터는 지연 삭제
  • 특정 주기/ 용양 임계치에 다다르면 데이터 정리 및 병합
    • 이 과정에 실질적 데이터 삭제
    • Merge은 새로운 Segment를 생성하는 것
      • 많은 Resource 소요
      • 부하가 적을 때 수행하는 것을 권장
        • Optimize API를 사용하여 수행

개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

Hive?

MapReduce 코드를 HiveQl을 이용하여 간편화 시킨 쿼리 엔진

  • 배치성 작업을 실행시킬 때 주로 사용
    • MapReduce,Tez, Spark등을 Engine으로 사용 가능
  • HiveQl
    • SQL과 유사한 Query 제공
    • 0.14.0 이후 버전부터 Update/Delete 지원
    • Insert는 덮어쓰기, 빈 테이블에만 사용 가능
    • Select문에서 Having절은 사용 불가하다

버전별 차이

  1. Hive 1.X
    • SQL을 이용하여 MapReduce 가능
    • 파일 데이터를 논리적 표현
  2. Hive 2.X
    • LLAP구조 추가
      • LLAP : 작업을 실행한 데몬을 유지하여 핫 데이터 케싱을 통해 빠른 데이터 처리
    • HPLSQL 추가
      • HPL/SQL를 보면 사용법이 나온다.
      • Hive의 PL/SQL이라고 보면 된다.
    • 기본 실행엔진 MapReduce -> TEZ
  3. Hive 3.X
    • MapReduce엔진, Hive CLI,Hcat CLI, WebHCat삭제
      • TEZ엔진, BeeLine 사용

요소

Pasted image 20230925110547.png

Metastore

  • Hive 테이블 및 파티션의 모든 Meta 정보 및 HDFS 맵핑 정보 저장
    • Hive는 RDB처럼 Schema에 맞춰 데이터를 적재하는 것이 아닌 데이터를 적재 후 Schema를 입히는 방식
      Pasted image 20230925112308.png

DataBase

  • Embedded
    • Derby를 이용하여 사용
    • 1번에 1개의 프로세스만 접근가능
      • 단위 테스트 용도로만 사용!
  • Local / Remote
    • Mysql과 같은 외부 RDBMS를 사용

Server

  • Embeded / Local
    • Hive Client내에 Library처럼 사용되는 서버
    • 주로 HiveServer2와 함께 사용
  • Remote
    • Metastore가 독립 Server로 구동
      • Client는 Thrift 통신을 이용하여 접근
        • Thrift : 서버에  있는 서로 다른 언어로 작성된 함수 혹은 프로시저를 호출해서 사용
      • 위와 같이 구동되는 서버를 Hcat 서버라 부름

HCatalog

  • Grid Computing 상에서 다양한 Processing tools를 이용하여 Read/Write를 보다 쉽게 할 수 있게 만들어주는 관리 계층
    • Grid Computing : 원격 병렬 컴퓨팅. Cluster Compung을 WAN으로 연결시킨 상태
    • Hive 0.11.0부터 Hcatalog와 Hive가 통합됨
  • SerDe( Serializer-Deserializer )할 수 있는 모든 Read/Write를 지원
    • e.g csv, Json …
    • SerDe : Hive가 데이터를 해석하는 방법 row data -> hdfs , hdfs-> row data
  • WebHCat
    • HCataLog를 REST API로 제공하는 것

HiveServer2

  • Client가 HiveQl을 수행할 수 있게 해주는 서비스
  • Thrift ,JDBC,ODBC등의 API 제공
  • Thrift 기반으로 동작하여 Thrift Server라고도 부름

Beeline

  • JDBC를 사용하여 HiveServer2에 접근하는 SQLline 기반도구
    • SQLLine : Java기반으로 DB와 연결하여 SQL을 실행하는 명령줄 유틸리티
  • Thrift통신으로 접속하고 , Comman Line을 이용
  • XML, CSV, TSV로 출력할 수 있다.

Table

Hive에는 크게 2종류의 Table 형식이 있다.

  1. Managed Table( Internal Table )
    1. Location을 따로 설정하지 않은 Table
    2. 외부 위치가 아닌 서버 내에 Table저장
    3. Drop table/partition시 메타스토어 정보 및 데이터가 삭제됨
    4. 주로 임시테이블, 생명 주기 관리에 사용
  2. External Table
    1. Location을 따로 설정하는 Table
    2. 외부 위치에 정의하는 테이블
    3. 파일이 이미 존재 혹은 원격 위치에 사용 시 적용
      1. HDFS, S3…
    4. Drop table/partition시 메타스토어 정보 만 제거됨

Partition

  • Hive의 Partition은 HDFS에 Tree구조로 적재된다.
  • Partition은 순서에 따라 디렉토리가 구성되기 때문에 cardinality에 따라 순서를 잘 정해야함.
  • 테이블의 크기가 너무 커지면 모든 것을 Row를 읽는 것과 같이 불필요한 I/O가 발생할 수 있어 필수적으로 넣는 것을 추천
  1. 동적 Partition
    • 내부 Column을 기준으로 자동 생성하는 Partition
    • 기본값 : 200개, 최대값 : 10000+ 가능 참조
      • 너무 많은 동적 Partition은 성능 저하 야기
        • 2K이상이면 30초 쿼리가 발생 할 수 있다함.
    • e.g. : insert into table tb_sample2 partition ( year,month,day ) select userid,viewtime,year,month,day from tb_sample;
  2. 정적 Partition
    • Partition 정보를 직접 삽입
    • eg. insert into table tb_sample partition(year = 2021, month = 05, day =04) values(555,444);
-- 테이블 생성 ( 정적 파티선 )
Create table tb_sample(
	userid bigint ,
	viewtime int
)
partition by ( year int , month int , date int)

-- 레코드 삽입 < 정적 Partition >
insert into table tb_sample
partition(year = 2021, month = 05, day =07)
values(123,456);

-- 레코드 삽입 < 동적 Partition >
insert into table tb_sample2
partition ( year,month,day )
select userid,viewtime,year,month,day from tb_sample;

Pasted image 20230925125709.png

  • HDFS에서 위와 같은 형태로 생성된다.
  • Partition의 위치 정보는 Metastore 에 적재됨
  • PARTITIONS
    • 파티션 기본 정보
    • Pasted image 20230925125840.png
  • PARTITION_KEYS
    • Partition에서 사용된 Column리스트
    • Pasted image 20230925130003.png
  • PART_KEY_VALS
    • partition의 필드값
    • Pasted image 20230925130124.png
  • PART_PARAMS
    • partition의 속성 정보
    • Pasted image 20230925130214.png

Bucketing

  • Partition처럼 테이블의 데이터를 분할하는 방식
  • N개의 파일로 나누어 저장하는 것.
  • Bucketing을 하면 Join시 SMB Join으로 수행되어 속도 향상을 야기
    • SMB Join(Sort Merge Bucket Join): Bucketing된 키의 정보를 이용한 Join
CREATE TABLE tbl1(
  col1 STRING,
  col2 STRING
) CLUSTERED BY (col2) INTO 5 BUCKETS  
stored as orc  
location 'Path'  
tblproperties('orc.compress' = 'SNAPPY')

CREATE TABLE tbl2(
  col1 STRING,
  col2 STRING
) CLUSTERED BY (col2) SORTED BY (col1) INTO 5 BUCKETS
  • 위와 같이 CLUSTERED BY (칼럼) [SORTED BY (칼럼)] INTO 버켓수 BUCKETS 을 추가하여 설정

Skew

  • Hive에서 Skew는 데이터 쏠림 현상이라고 볼 수 있다.
    • Partition을 나눠놨는데, 특정 Partition에만 데이터가 몰려 있으면 특정 Partition만 비약적으로 커지는 현상이라고 보면 된다.
  • Skew는 Partition과 유사한 기능을 하지만, 특정 데이터만 나눠주는 효과를 낼 수 있다.
    • e.g 알파벳을 적재하는 테이블이 존재
      1. a,b가 다른 알파벳에 비해 많이 유입됨
      2. a,b를 Skew로 등록
      3. 테이블은 a , b , c-z로 적재 시작
CREATE TABLE tbl (
	col1 STRING,
	col2 STRING
) SKEWED BY (col1)on ('a', 'b' )
[ STORED as DIRECTORIES ]
location 'Path'  
;
  • 위와 같은 형식으로 사용 가능
    • STORED as DIRECTORIES : 디렉토리 구분 여부

개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

'프로그래밍 및 IT > Hive' 카테고리의 다른 글

Hive 레코드 HDFS 이전  (0) 2023.08.09

HadoopSpark를 공부하기 시작하면 가장 처음 나오는 것이 Map /Reduce이다
- Map /Reduce 설명은 Hadoop링크로 들어가 보면 설명을 해놓았다.
이것을 가장 이해하기 편하게 예시를 드는 것이 WordCount이다.

간단 예시

구현

아래와 같이 구현을 하고 , args 를 local[*] 텍스트파일 경로 적재 위치 순으로 넣어서 실행해보면 된다.

import org.apache.spark.rdd.RDD  
import org.apache.spark.{SparkConf, SparkContext}  
  
object wordCount {  
  def main(args: Array[String]): Unit = {  
    // Args 3개 필수  
    require(args.length == 3, "Usage : <Master> <Input> <Output>")  
  
    // Spark 설정  
    val conf = new SparkConf()  
      .setAppName("WorkCount")  
      .setMaster(args(0))  
      .set("spark.local.ip", "localhost")  
      .set("spark.driver.host", "localhost")  
  
    // SparkContext생성  
    val sc   = new SparkContext(conf)  
    val input  = readFile(sc, args(1))  
    val result = process(input)  
    saveFile(result, args(2))  
  
    sc.stop()  
  }  
  
  def readFile(sc: SparkContext, path: String): RDD[String] = {  
    sc.textFile(path) // 파일을 읽고  
  }  
  
  def process(input: RDD[String]): RDD[(String, Int)] = {  
    input.flatMap(str => str.split(" ")) //공백으로 분류  
         .map((_, 1)) // 각각 key : 1 로 만듦  
         .reduceByKey(_ + _) // value들을 key를 기준으로 합산  
  }  
  
  def saveFile(output: RDD[(String, Int)], path: String): Unit = {  
    output.saveAsTextFile(path) // 적재  
  }  
}
  • 위 코드를 실행시키면 디렉토리가 생성되고 안에는 여러 파일들이 존재한다
    • 그 중 part-00000를 출력해보면 WordCount가 잘 동작했는지 확인 가능하다
    • tail,head,cat 원하는 것으로 확인해 보면 된다.
C:\Apps\hadoop-3.1.2\output>cat part-00000
(Unless,4)
(works,,4)
...

테스트 코드

  • JUnit을 이용하면 아래와같이
import org.apache.spark.{SparkConf, SparkContext}  
import org.junit.jupiter.api.Test  
  
import scala.collection.mutable.ListBuffer  
  
class wordCountTest {  
  
  @Test  
  def test(): Unit = {  
    val conf = new SparkConf()  
      .setMaster("local[*]")  
      .setAppName("wordCountTest")  
      .set("spark.local.ip", "localhost")  
      .set("spark.driver.host", "localhost")  
    val sc   = new SparkContext(conf)  
    val input = new ListBuffer[String]  
    input += "License shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document."  
    input += "Licensor shall mean the copyright owner or entity authorized by the copyright owner that is granting the License"  
    input.toList  
  
    val inputRDD = sc.parallelize(input)  
    val result = wordCount.process(inputRDD)  
    val resultMap = result.collectAsMap()  
  
    assert(resultMap("shall") == 2)  
    assert(resultMap("copyright") == 2)  
    assert(resultMap("License") == 2)  
    println(resultMap)
  
    sc.stop()  
  }  
}
  • FlatSpec을 이용하면 아래와 같이 구현하면 된다.
import org.apache.spark.{SparkConf, SparkContext}  
import org.scalatest.flatspec.FlatSpec  
import org.scalatest.matchers.should.Matchers  
  
  
class workCountTest_Spec extends FlatSpec with Matchers {  
  val conf = new SparkConf()  
    .setMaster("local[*]")  
    .setAppName("wordCountTest")  
    .set("spark.local.ip", "localhost")  
    .set("spark.driver.host", "localhost")  
  val sc   = new SparkContext(conf)  
  
  "WorkCount Process" should "Correctly count words in the input" in {  
    val inputRDD = sc.parallelize(Seq(  
      "License shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document."  
      , "Licensor shall mean the copyright owner or entity authorized by the copyright owner that is granting the License"  
      ))  
    val result = wordCount.process(inputRDD)  
  
    val resultMap = result.collectAsMap()  
    println(resultMap)  
    resultMap("shall") shouldEqual 2  
    resultMap("copyright") shouldEqual 2  
    resultMap("License") shouldEqual 2  
//    resultMap("License") shouldEqual 1
  }  
  
}

성공 시 성공했다고만 하고
실패시 아래와 같이 에러 발생한다
!Pasted image 20231005140735.png
Pasted image 20231005140926.png

  • Intellij에서 구동하였고, build tool을 Gradle로 해서 그런지 :test ....를 못 찾는다고 에러가 발생했다.
    • 간단 해결방법 아래와 같이 setting을 설정해 주면 된다
      • Run Tests using 을 Gradle -> Intellij IDEA로 변경
    • 추가적으로 intellij IDEA로 하면 조금 더 빠르다는 이야기가 있다.
      Pasted image 20231005141130.png

[ Reference ]


개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

'프로그래밍 및 IT > Spark' 카테고리의 다른 글

spark-Setting  (0) 2023.11.28
spark-scheduling  (0) 2023.11.28
RDD Action  (0) 2023.10.13
RDD Transformation  (0) 2023.10.13
Spark_RDD  (0) 2023.10.05

RDD Action?

  • 간단히 설명하자면, 결과 값, 즉 return이 RDD가 아닌 다른 것으로 반환되는 연산이다.
  • RDD Transformation을 수행하게 만드는 도화선이라고 봐도 된다.
    • 즉 N개의 Transformation Method들이 쌓여도 수행하지 않다가 Action Method가 호출되면 그 때 순차적으로 수행한다

Output Function

기본적으로 Collection Type으로 반환하는 경우가 많기 때문에 너무 많은 개수를 반환하면, OOM 이 발생할 수 있다.

데이터 추출

// collect
characterRDD.collect.mkString("Array(", ", ", ")")
//Array(good,bad, good,bad, best,worst, plus,minus)

// first
numericRDD.first()
// 1

// take
numericRDD.take(2).mkString("Array(", ", ", ")")
// Array(1, 2)

// takeSample
numericRDD.takeSample(withReplacement = false, 3).mkString("Array(", ", ", ")")
numericRDD.takeSample(withReplacement = true, 3).mkString("Array(", ", ", ")")
// Array(1, 2, 3)
// Array(4, 2, 1)
  • collect
    • RDD를 Array[T]형태로 반환
    • 굳이 Mysql로 따지면 select라고 봐도 될듯하다.
  • first
    • 최초 1개의 값을 반환
    • mysql에서 limit 1이라고 생각해도 무방
  • take
    • 최초 값 부터 N개를 반환
    • mysql에서 limit n이라고 생각해도 무방
  • takeSample
    • sample과 동일하지만, 다른것은 RDD가 아닌 Collection Type으로 반환

Count

characterRDD.count  
// 4
characterRDD.countByValue
// Map(best,worst -> 1, good,bad -> 2, plus,minus -> 1)
  • count
    • 모든 요소 개수 반환
  • countByValue
    • 요소 별 개수

Reduce / Fold / aggregate

numericRDD.reduce(_ * _)
//reduce : 24
numericRDD.fold(1)(_ * _)
//fold : 24
numericRDD.fold(0)(_ * _)
//fold : 0
numericRDD.aggregate(zeroValue = Fruit(0, 0))(seqOp = (f: Fruit, v: Int) => f.add(v), combOp = (f1: Fruit, f2: Fruit) => f1.add(f2))
numericRDD.aggregate(zeroValue = Fruit(0, 0))(seqOp = _.add(_), combOp = _.add(_))
numericRDD.aggregate(zeroValue = Fruit(0, 0))(seqOp = _ add _, combOp = _ add _)
numericRDD.aggregate(Fruit(0, 0))(_ add _, _ add _)
// Fruit(10,4)
  • RDD의 요소에 관해 연산 후 결과값을 반환
    • reduce , fold 모두 병렬처리 하기 때문에 결합법칙을 만족해야한다
  • 결과 도출 사유
    • reduce
      • 초기 값 없이 보유한 요소 만으로 연산
      • 1 * 2 * 3 * 4 = 24
    • fold
      • 초기 값이 존재함
      • 1 * 1 * 2 * 3 * 4 = 24
      • 0 * 1 * 2 * 3 * 4 = 0
    • 위와 같은 이유로 코드 구현 시 잘 고민하고 구현해야 한다
  • aggregate
    • Fruit은 RDD Transformation 의 combineByKey / aggregateByKey 구현해 놓았다.
      • amount : Long => Int로 변경함
    • 인자별 의미
      • aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
      • zeroValue
        • 초기 값
      • seqOp
        • (U, T) => U
          • T : RDD의 요소들이 갖는 타입
        • 각 Partition에서 병합하는 함수
      • combOp
        • (U, U) => U
          • 인자 타입과 반환타입이 동일함
        • 모든 Partition들을 최종적으로 병합하는 함수
    • aggregate 은 scala에서 위 코드 순서대로 요약하여 사용할 수 있다.

Foreach / foreachPartition

numericRDD.foreach {  
  v => println(s"foreach values = $v")  
}  
  
numericRDD.foreachPartition {  
  println("foreachPartition partition")  
  values => for (v <- values) println(s"foreachPartition $v")
  • 모든 RDD요소에 특정 함수를 적용시키는 것.
    • 모든 요소를 순회
  • Map과 다른점은 Map은 반환값이 존재하지만, foreach는 반환값이 존재하지 않고 행위만 존재한다
  • foreach - foreachPartition 차이점
    • foreach는 요소단위로 수행
    • foreachPartition은 Partition단위로 수행
  • Driver에서 수행하는 것이 아닌 개별 노드에서 실행된다
    • 분산 처리 하기 때문에 의도한 것과 구현한 것이 동일한지 잘 고민 해봐야 한다

cache / persist / unpersist

numericRDD.persist()  
numericRDD.unpersist()

numericRDD.cache()  
numericRDD.persist(StorageLevel.MEMORY_ONLY)  

Spark RDD는 수행할 때 마다 연산을 통해 새로은 RDD를 사용한다
이 때 RDD를 재사용한다면, 해당 연산을 통새 생성된 RDD를 저장하는 것을 위 메소드로 구현한다

  • persist / unpersist
    • 메모리 / 디스크에 저장하는 메소드
    • persist 로 저장하고 unpersist 로 제거한다
    • 옵션으로 아래와 같은것들을 통해 메모리, 디스크, 메모리 + 디스크 에 적재 할지 고를 수 있다.
      • NONE , DISK_ONLY , DISK_ONLY_2 , MEMORY_ONLY , MEMORY_ONLY_2 , MEMORY_ONLY_SER , MEMORY_ONLY_SER_2 , MEMORY_AND_DISK , MEMORY_AND_DISK_2 , MEMORY_AND_DISK_SER , MEMORY_AND_DISK_SER_2 , OFF_HEAP
  • cache
    • 메모리에 RDD를 적재하는 메소드
    • persist(StorageLevel.MEMORY_ONLY) 와 동일하다
      • API따라 들어가다 보면 위와 같음

File IO

characterRDD.saveAsTextFile("file://<File_Path>/test01")  
characterRDD.saveAsTextFile("file://<File_Path>/test02", classOf[org.apache.hadoop.io.compress.BZip2Codec])  
  
characterRDD2.saveAsObjectFile("file://<File_Path>/obj01")  
val rdd = sc.objectFile[String]("file://<File_Path>/obj01")  
rdd.collect().foreach (x => println(x))  
  
pairRDD.saveAsSequenceFile("file://<File_Path>/seq01")  
val rdd2 = sc.sequenceFile[String, Long]("file://<File_Path>/seq01")  
println(rdd2.collect().mkString(","))
  • saveAsTextFile
    • 기본적으로 TestFile 형태로 적재하는 방식
    • 위에 작성한 것과 같이 saveAsTextFile(경로 , 코덱)
      • 코덱을 통해 원하는 압축 형식을 지정할 수 있다.
      • bzip2,lz4,snappy,zlib
  • saveAsObjectFile
    • Object형태로 적재하는 방식
    • Java/ Scala에서는 자바 표준 직렬화를 사용한다.
    • SequenceFile와 비교하여 상대적으로 느리다
  • saveAsSequenceFile
    • objectFile 과 동일하게 Binary 파일 포멧으로 저장
      • 차이점
        • Hadoop자체에서 정의한 직렬화 프레임워크 사용
        • Hadoop Writeable 인터페이스를 구현하고 있어야 한다.
        • Scala의 경우 saveAsObjectFile와 차이 없이 간단하게 사용 가능

공유 변수

Broadcast Variables

val broadcastChar = sc.broadcast(Set("good,bad", "plus,minus"))
  • Spark Job이 실행되는 동안 모든 노드에 공유하는 읽기 전용 자원
    • 모든 노드에 공유 즉 복제 되어 사용한다
      • Network 비용이 절감하여 성능 향상을 기대할 수 있다.
    • 해당 변수는 Heap영역에 저장된다
      • 사용하는 동안 메모리를 지속적으로 사용한다.
      • unpersist를 이용하여 제거할 수 있다.

Accumulators

val error_cnt = sc.longAccumulator("invalidFormat")  
val error_data = sc.collectionAccumulator[String]("invalidFormat2")  
val data = List("Log01:haha", "Log02:hoho", "Log03::", "Log04:aaa", "Log05||Addr5", "Log06::Log7")  
sc.parallelize(data, 3).foreach {  
  v =>  
    if (v.split(":").length != 2) {  
      error_cnt.add(1L)  
      error_data.add(v)  
    }  
}  
println("잘못된 데이터 수:" + error_cnt.value)  
println("잘못된 데이터:" + error_data.value)
// 잘못된 데이터 수:3
// 잘못된 데이터:[Log05||Addr5, Log06::Log7, Log03::]
  • 실제로 사용 해본 적이 없어 완벽하게 이해가 되지 않아 조금 더 이해해야 할 것 같다
  • Broadcast 와 반대로 쓰기 동작을 위한 작업
  • 여러 노드에서 수행되는 동작을 한곳에 적재하는 방법
    • 생성,초기화는 드라이버 프로그램에서 동작한다
    • 클러스터의 여러 Task에서 병렬로 쓰여질 수 있다.
    • Accumulator의 값을 증가시킬 수 있지만, 값을 참조해서 사용하는 것은 불가능하다.
  • 특별한 목적이 없으면, Action Function을 수행할 때 사용해야한다
    • 불필요한 반복이 발생할 가능성이 높기 때문
    • eg. map, flatmap과 같은 것을 동작하면 불필요하게 값을 증가시킬 가능성이 높다
  • 기본적으로 제공하는 Accumulators이 존재한다
    • long,collection,double, 등이 존재한다
  • 위 코드와 같이 만들면, 각 노드에서 병렬로 실행 시 문제가 발생한 Log를 확인할 수 있다.
  • 사용자 지정으로 생성 할 수 있다.
    • 아래와 같이 case class를 생성하여 사용할 수 있다.
    • Python에서는 register를 하지 않고 바로 사용할 수 있다
    • 주요 메소드
      1. isZero
        1. 초기 상태 여부 판단
        2. 아래와 같이 조건문 등을 이용하여 판단할 수 있다.
      2. reset
        1. 이름 그대로 초기 상태로 되돌리는 메소드
      3. add
        1. 값을 추가할 때 사용하는 메소드. 사용자 정의시 아래와 같이 case class의 메소드를 이용하여 동작할 수 있다.
        2. 값 추가는 Accumulator 내부에서 동작한다.
      4. copy
        1. 해당 Accumulator를 복제하는 메소드
        2. 미리 데이터를 복제하여 안전하게 데이터를 읽을 때 혹은 병렬로 수정할 때 사용
      5. merge
        1. n+1개의 Accumulator를 병합하는 메소드
          1. n+1개의 테스크에서 생성된 Accumulato를 하나의 Accumulator로 병합할 때 사용
      6. value
        1. Accumulator의 값을 반환하는 메소드
package main.RDD_action  
  
import io.netty.handler.logging.LogLevel  
import org.apache.spark.util.AccumulatorV2  
import org.apache.spark.{SparkConf, SparkContext}  
  
case class Fruit(var price: Long, var amount: Long = 1) {  
  def add(p: Long): Fruit = add(Fruit(p))  
  def add(other: Fruit): Fruit = {  
    this.price += other.price  
    this.amount += other.amount  
    this  
  }  
}  
  
class fruitAccumulator extends AccumulatorV2[Fruit, (Long,Long)] {  
  private var acc = Fruit(0)  
  override def isZero: Boolean = acc.price == 0L && acc.amount == 1L  
  override def copy(): AccumulatorV2[Fruit, (Long,Long)] = {  
    val newAcc = new fruitAccumulator  
    newAcc.acc = Fruit(acc.price, acc.amount)  
    newAcc  
  }  
  override def reset(): Unit = {  
    acc.price = 0L  
    acc.amount = 1L  
  }  
  override def add(other: Fruit): Unit = acc.add(other)  
  override def merge(other: AccumulatorV2[Fruit, (Long,Long)]): Unit = other match{  
    case o: fruitAccumulator => acc.add(o.acc)  
    case _                   => throw new RuntimeException()  
  }  
  override def value: (Long,Long) = {  
    (acc.price, acc.amount)  
  }  
}  
  
object fruitAccumulator {  
  def main(args: Array[String]): Unit = {  
    // Spark 설정  
    val conf = new SparkConf()  
      .setAppName("fruitAccumulator")  
      .setMaster("local[*]")  
      .set("spark.local.ip", "localhost")  
      .set("spark.driver.host", "localhost")  
  
    // SparkContext생성  
    val sc = new SparkContext(conf)  
    sc.setLogLevel(LogLevel.WARN.name())  
  
    val acc = new fruitAccumulator  
    sc.register(acc , "fruitAccumulator")  
    val data =List("Fruit:2000", "Fruit02:3000", "Fruit03::", "Fruit04:2000", "Fruit05||4000", "Fruit06::Log7")  
    sc.parallelize(data,3).foreach {  
      v =>  
//        println(s"v : $v ")  
        if(v.split(":").length != 2){  
          println(s"added v : $v ")  
          acc.add(Fruit(2))  
        }  
    }  
    println(acc.value)  
  }  
}

[ Reference ]


개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

'프로그래밍 및 IT > Spark' 카테고리의 다른 글

spark-scheduling  (0) 2023.11.28
WordCount  (0) 2023.10.13
RDD Transformation  (0) 2023.10.13
Spark_RDD  (0) 2023.10.05
Spark  (0) 2023.10.04

+ Recent posts