Elasticsearch?

대량의 데이터를 NRT ( Near Real Time )으로 저장, 검색, 분석 할 수 있는 루씬 기반 Java 오픈소스 분산 검색 엔진

특징

Scale Out

Shard를 이용하여 수평적으로 늘어날 수 있다.

HA

Replica를 통해 데이터 안전성 보장

Schema Free

Json을 통해 데이터 검색

Restful

CRUD 작업을 HTTP Restful API를 통해 수행

Inverted Index

텍스트를 파싱하여 검색어 사전 생성

  • 속도가 빠르다.

요소별 역할

Node

Master

  • ES의 Instance 단위
    • Instance는 각자의 역할 존재
  • Node.roles를 이용하여 적절한 역할을 사용하는 것이 좋다.
    1. Tribe Node
      1. Cluster간 연결을 해주는 역할
        1. 요청에 따라 서로 다른 Cluster에 데이터를 분리해서 보냄
    2. Remote-eligible Node
      1. remote cluster clinet 역할
        1. 원격 cluster과 연결시켜주는 역할
      2. cross cluster를 remote client로 사용 가능한 노드
        1. croess cluster : 원격 cluster간 백업/검색 등을 하는 작업
    3. Machine Learning Node
      1. ML역할
        1. ES로 ML작업 시 필요

Data

  • Data를 적재/연산하는 Node
    • node.data=true를 설정에 삽입해주면 된다.
    • 부하 분배를 위해 Master과 분리하는 것을 권장
  • Content Datan Node
    • CRUD, 집계 등의 기능을 가짐
      • 기본적인 Data
이름 압축 성능 비고
Hot Data Node 4 1 사용 빈도가 많고 및 성능이 우선이기 때문에 SSD 권장
Warm Data Node 3 2 Hot보다 적은 빈도로 조회하는 노드, HDD권장
Cold Data Node 2 3 읽기 빈도가 적은 인덱스, 읽기 전용
Frozen Data Node 1 4 Shared_cache 옵션만으로 마운트 된 검색 가능한 Snapshot 을 저장

Ingest

  • Text 변환 및 전처리 담당.
  • Ingest pipeline을 Text에 적용하여 인덱싱 하기 전에 사용
    • indexing 이전에 Document를 변환하는 작업
  • Master Node에서 수행하는 Ingest를 대신 수행
    • Master Node의 부하 분산
    • 색인 작업이 많을경우 사용

Coordinating Only

  • Client와 통신만 하는 역할
    • Load Balancing 수행
    • 만약 Coordinate Node가 없으면 Datanode가 해당 역할 수행
    • 너무 많으면 서버에 부하
    • 발생
      • Coordinating Only 또한 Master Node가 알림을 기다리기 때문
    1. client의 cluster관련 요청을 Master Node에 전달
    2. client의 Data관련 요청을 data Node에 전달

Server

물리적 서버에는 N개이 Node를 사용할 수 있다.

  • Server 기준 Port
  • Node - Client
    • Port범위 : 9200 ~9299
    • 순차적으로 부여
  • Node - Node
    • Port 범위 : 9300 ~ 9399
    • 순차적으로 부여
  • Cluster 묶기
    • cluster.name을 동일한 이름으로 묶어준다
      • 이름이 다르면 같은 Network여도 논리적으로 분리됨
    • Cluster : 1개의 시스템이라고 보면됨.
      • 데이터가 공유되는 서버의 묶음

Index

  • Document의 집단
  • Index는 색인과 명칭과 혼동될 수 있어 Indices라고도 부른다.
  • 주로 REST API를 통해 CRUD 작업 수행
  • 실질적으로 Index는 Shard를 가리키는 논리적 Namespace
  • index는 1+N개의 Shard로 구성
  • Mapping 및 State 정보를 Cluster State에 저장
    • Mapping : Index 내부의 Document가 어떻게 저장될지 정의하는 것
      • Index별 설정 가능
    • cluster State : 정보를 추적하는 내부 데이터 구조
      • 분산된 Node간 데이터 일관성을 위해 Single-Thread로 업데이트
      • Single-Thread이기 때문에 규모가 커지면 성능 저하

Shard

  • Cluster 내의 데이터를 분산 저장하기 위한 단위
  • Lucine Index( Inverted Index / 역색인 )이다.
    • Inverted Index : Key : value 가 아닌 value : key
    • 속도 / 메모리 측면에서 좋다
      • Update 시 reindex을 해야해서 Cost가 증가한다.
  • Index는 N개의 Primary/Replica Shard로 구성된다.
    • Replica : Failover을 위해 사용하는 것이 좋다
      • 너무 많아지면 성능 저하됨
      • replica 또한 write를 하기 때문
  • Shard 추가 시 Reindex를 해줘야한다.
    • segment가 정렬되어 있기 때문
  • 설정
    • Failover을 위해 50GB 이상의 Shard는 권장하지 않는다.
    • Heap은 최대 32GB를 권장
      • System Memory / 2를 권장한다.
      • 32GB를 넘기면 32bit -> 64bit으로 변경되기 때문
    • Heap 1GB당 최대 20개의 Shard를 넘기지 않는 것이 좋다.
    • Shard가 작으면 작은 Segment가 생성된다.
    • 용양 및 설정 출처

Segment

  • 문서의 빠른 검색을 위해 설계된 자료구조
  • N개의 Segment로 Shard를 구성한다.
  • 데이터 색인 시 메모리상에 존재하다, Segment에 기록하여 검색 가능한 Segment 생성
  • Shard에서 검색 시 Segment를 순차 검색 후 결과를 Shard에 전달
    • segment가 많아지면 검색 속도 저하
    • 순차 검색 및 오버헤드 발생
  • update/delete 시 삭제 표기 후 다른 포인터 지칭
    • 데이터는 지연 삭제
  • 특정 주기/ 용양 임계치에 다다르면 데이터 정리 및 병합
    • 이 과정에 실질적 데이터 삭제
    • Merge은 새로운 Segment를 생성하는 것
      • 많은 Resource 소요
      • 부하가 적을 때 수행하는 것을 권장
        • Optimize API를 사용하여 수행

개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

Hive?

MapReduce 코드를 HiveQl을 이용하여 간편화 시킨 쿼리 엔진

  • 배치성 작업을 실행시킬 때 주로 사용
    • MapReduce,Tez, Spark등을 Engine으로 사용 가능
  • HiveQl
    • SQL과 유사한 Query 제공
    • 0.14.0 이후 버전부터 Update/Delete 지원
    • Insert는 덮어쓰기, 빈 테이블에만 사용 가능
    • Select문에서 Having절은 사용 불가하다

버전별 차이

  1. Hive 1.X
    • SQL을 이용하여 MapReduce 가능
    • 파일 데이터를 논리적 표현
  2. Hive 2.X
    • LLAP구조 추가
      • LLAP : 작업을 실행한 데몬을 유지하여 핫 데이터 케싱을 통해 빠른 데이터 처리
    • HPLSQL 추가
      • HPL/SQL를 보면 사용법이 나온다.
      • Hive의 PL/SQL이라고 보면 된다.
    • 기본 실행엔진 MapReduce -> TEZ
  3. Hive 3.X
    • MapReduce엔진, Hive CLI,Hcat CLI, WebHCat삭제
      • TEZ엔진, BeeLine 사용

요소

Pasted image 20230925110547.png

Metastore

  • Hive 테이블 및 파티션의 모든 Meta 정보 및 HDFS 맵핑 정보 저장
    • Hive는 RDB처럼 Schema에 맞춰 데이터를 적재하는 것이 아닌 데이터를 적재 후 Schema를 입히는 방식
      Pasted image 20230925112308.png

DataBase

  • Embedded
    • Derby를 이용하여 사용
    • 1번에 1개의 프로세스만 접근가능
      • 단위 테스트 용도로만 사용!
  • Local / Remote
    • Mysql과 같은 외부 RDBMS를 사용

Server

  • Embeded / Local
    • Hive Client내에 Library처럼 사용되는 서버
    • 주로 HiveServer2와 함께 사용
  • Remote
    • Metastore가 독립 Server로 구동
      • Client는 Thrift 통신을 이용하여 접근
        • Thrift : 서버에  있는 서로 다른 언어로 작성된 함수 혹은 프로시저를 호출해서 사용
      • 위와 같이 구동되는 서버를 Hcat 서버라 부름

HCatalog

  • Grid Computing 상에서 다양한 Processing tools를 이용하여 Read/Write를 보다 쉽게 할 수 있게 만들어주는 관리 계층
    • Grid Computing : 원격 병렬 컴퓨팅. Cluster Compung을 WAN으로 연결시킨 상태
    • Hive 0.11.0부터 Hcatalog와 Hive가 통합됨
  • SerDe( Serializer-Deserializer )할 수 있는 모든 Read/Write를 지원
    • e.g csv, Json …
    • SerDe : Hive가 데이터를 해석하는 방법 row data -> hdfs , hdfs-> row data
  • WebHCat
    • HCataLog를 REST API로 제공하는 것

HiveServer2

  • Client가 HiveQl을 수행할 수 있게 해주는 서비스
  • Thrift ,JDBC,ODBC등의 API 제공
  • Thrift 기반으로 동작하여 Thrift Server라고도 부름

Beeline

  • JDBC를 사용하여 HiveServer2에 접근하는 SQLline 기반도구
    • SQLLine : Java기반으로 DB와 연결하여 SQL을 실행하는 명령줄 유틸리티
  • Thrift통신으로 접속하고 , Comman Line을 이용
  • XML, CSV, TSV로 출력할 수 있다.

Table

Hive에는 크게 2종류의 Table 형식이 있다.

  1. Managed Table( Internal Table )
    1. Location을 따로 설정하지 않은 Table
    2. 외부 위치가 아닌 서버 내에 Table저장
    3. Drop table/partition시 메타스토어 정보 및 데이터가 삭제됨
    4. 주로 임시테이블, 생명 주기 관리에 사용
  2. External Table
    1. Location을 따로 설정하는 Table
    2. 외부 위치에 정의하는 테이블
    3. 파일이 이미 존재 혹은 원격 위치에 사용 시 적용
      1. HDFS, S3…
    4. Drop table/partition시 메타스토어 정보 만 제거됨

Partition

  • Hive의 Partition은 HDFS에 Tree구조로 적재된다.
  • Partition은 순서에 따라 디렉토리가 구성되기 때문에 cardinality에 따라 순서를 잘 정해야함.
  • 테이블의 크기가 너무 커지면 모든 것을 Row를 읽는 것과 같이 불필요한 I/O가 발생할 수 있어 필수적으로 넣는 것을 추천
  1. 동적 Partition
    • 내부 Column을 기준으로 자동 생성하는 Partition
    • 기본값 : 200개, 최대값 : 10000+ 가능 참조
      • 너무 많은 동적 Partition은 성능 저하 야기
        • 2K이상이면 30초 쿼리가 발생 할 수 있다함.
    • e.g. : insert into table tb_sample2 partition ( year,month,day ) select userid,viewtime,year,month,day from tb_sample;
  2. 정적 Partition
    • Partition 정보를 직접 삽입
    • eg. insert into table tb_sample partition(year = 2021, month = 05, day =04) values(555,444);
-- 테이블 생성 ( 정적 파티선 )
Create table tb_sample(
	userid bigint ,
	viewtime int
)
partition by ( year int , month int , date int)

-- 레코드 삽입 < 정적 Partition >
insert into table tb_sample
partition(year = 2021, month = 05, day =07)
values(123,456);

-- 레코드 삽입 < 동적 Partition >
insert into table tb_sample2
partition ( year,month,day )
select userid,viewtime,year,month,day from tb_sample;

Pasted image 20230925125709.png

  • HDFS에서 위와 같은 형태로 생성된다.
  • Partition의 위치 정보는 Metastore 에 적재됨
  • PARTITIONS
    • 파티션 기본 정보
    • Pasted image 20230925125840.png
  • PARTITION_KEYS
    • Partition에서 사용된 Column리스트
    • Pasted image 20230925130003.png
  • PART_KEY_VALS
    • partition의 필드값
    • Pasted image 20230925130124.png
  • PART_PARAMS
    • partition의 속성 정보
    • Pasted image 20230925130214.png

Bucketing

  • Partition처럼 테이블의 데이터를 분할하는 방식
  • N개의 파일로 나누어 저장하는 것.
  • Bucketing을 하면 Join시 SMB Join으로 수행되어 속도 향상을 야기
    • SMB Join(Sort Merge Bucket Join): Bucketing된 키의 정보를 이용한 Join
CREATE TABLE tbl1(
  col1 STRING,
  col2 STRING
) CLUSTERED BY (col2) INTO 5 BUCKETS  
stored as orc  
location 'Path'  
tblproperties('orc.compress' = 'SNAPPY')

CREATE TABLE tbl2(
  col1 STRING,
  col2 STRING
) CLUSTERED BY (col2) SORTED BY (col1) INTO 5 BUCKETS
  • 위와 같이 CLUSTERED BY (칼럼) [SORTED BY (칼럼)] INTO 버켓수 BUCKETS 을 추가하여 설정

Skew

  • Hive에서 Skew는 데이터 쏠림 현상이라고 볼 수 있다.
    • Partition을 나눠놨는데, 특정 Partition에만 데이터가 몰려 있으면 특정 Partition만 비약적으로 커지는 현상이라고 보면 된다.
  • Skew는 Partition과 유사한 기능을 하지만, 특정 데이터만 나눠주는 효과를 낼 수 있다.
    • e.g 알파벳을 적재하는 테이블이 존재
      1. a,b가 다른 알파벳에 비해 많이 유입됨
      2. a,b를 Skew로 등록
      3. 테이블은 a , b , c-z로 적재 시작
CREATE TABLE tbl (
	col1 STRING,
	col2 STRING
) SKEWED BY (col1)on ('a', 'b' )
[ STORED as DIRECTORIES ]
location 'Path'  
;
  • 위와 같은 형식으로 사용 가능
    • STORED as DIRECTORIES : 디렉토리 구분 여부

개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

'프로그래밍 및 IT > Hive' 카테고리의 다른 글

Hive 레코드 HDFS 이전  (0) 2023.08.09

Spark?

  • Hadoop과 유사한 클러스터 기반의 분산처리 기능을 제공하는 오픈소스 프레임워크
    • 빅데이터 분산처리 플랫폼 / 엔진
  • In-Memory기반으로 처리하기 때문에 반복적인 데이터 처리, 즉 ML 과 같은 것에 뛰어난 성능을 보인다.

Spark Application

  • 제출한 Job을 Task로 변환하여 Executor에게 분산 처리함

Driver

  • 스파크에서 Job을 수행하는 프로그램
    • Main함수를 수행하는 프로그램
    • SparkContext를 생성하고 해당 Instance를 포함한 프로그램
  • Cluster모드에 따라 다른 위치에서 수행되지만 대체로 실행 서버에서 구동.
  • Worker 노드에 작업을 시키고, 결과를 취합한다

Worker

  • 작업을 하는 노드
  • Executor가 동작하는 노드
    • 작업을 수행하는 프로세스
    • Partition 단위로 Job을 수행
    • 1개의 Worker 노드에 N개의 Executor가 동작할 수 있다

Application

  • Spark 위에서 구동되는 프로그램
    • Driver
      • Spark Application을 실행하는 프로세스
      • Spark Context를 생성
      • Spark Application Life cycle 관리
    • Executor
      • Task 실행을 담당
      • 실행 결과를 Driver에게 전달

Cluster Manager

  • 이름 그대로 cluster를 관리하는 매니져
  • cluster mode 종류
    • Yarn
      • Hadoop Cluster Manager
      • Resource Manager / Node Manager로 구성
    • Memos
      • Apache Cluster Manager
      • Master /Slave 구성
    • StandAlone
      • Spark 자체 제공 Cluster Manager
      • 1 Node 1 Executor만 가능
  • deploy mode
    • client
      • Driver Program을 submit한 곳에서 실행
        • 즉 Client Server에서 Driver을 실행
    • cluster
      • Driver Program을 cluster중 무작위 실행
        • 즉 Client Server이외의 worker Node에서 Driver을 실행

Job

  • Spark Application에 제출된 작업
  • N개의 Task로 구성된 병렬 연산 단위
    • Spark Application은 N+1개의 Job으로 구성됨
  • 각 Job은 DAG로 변환된다
  • Dag에서 각각의 Job Node들은 하나 이상의 Spark Stage에 포함됨

Stage

  • N+1개의 Task의 집합
  • 병렬/순차 적으로 수행될 수 있는 작업을 기반으로 생성됨

Task

  • Executor에서 구동하는 최소 실행 단위

실행 순서

Pasted image 20231004170237.png

  • 간단히 보면 위와 같은 방식으로 Job 1 안에 Stage들을 순차적으로 처리 완료 후 job 2를 수 행한다.
  • 처리 순서
    1. SparkContext를 이용하여 RDD연산 정보를 DAG Scheduler에게 전달
    2. DAG를 생성하면, 해당 DAG를 Cluster Manager에게 전달
    3. 이때 Scheduler는 최대한 locality를 높여 stage를 구성
    4. 생성된 DAG를 수행

데이터 구조

  1. Spark_RDD
  2. DataFrame
  3. DataSet

사용 언어

  1. Java
  2. Scala
  3. R
  4. Python

HA

Spark 또한 Master - Slave 구성으로 되어 있기 때문에 HA구성을 만드는 것이 좋다
만약 HA구성을 하지 않으면 SPOF 위험이 생긴다.
구성 시 Zookeeper 등을 이용하여 사용한다

  • 기본 설정
    • SPARK_DAEMON_JAVA_OPTS에 아래 변수들을 추가한다
      • spark.deploy.recoveryMode=ZOOKEEPER
      • spark.deploy.zookeeper.url=
      • spark.deploy.zookeeper.dir=
        • 기본값 : /spark
    • spark-env.sh에 아래와 같이 추가
      • export SPARK_DAEMON_JAVA_OPTS=“-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url={zookeeper url} -Dspark.deploy.zookeeper.dir={zookeeper에서 저장할 경로}”
  • HA구성 하지 않고 단일 복구 모드를 사용하고 싶다면 아래와 같이 하면된다
    • spark-env.sh에 아래와 같이 추가
      - export SPARK_DAEMON_JAVA_OPTS=“-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory={복구 데이터 경로}”

History Server

Spark의 동작을 확인할 수 있는 Web Server

  • Event Log를 확인 할 수 있다
  • HDSF를 사용한다면 아래와 같이 spark-defaults.conf 에 추가하여 후 start-history-server.sh를 실행하여 구동시켜 사용하면 된다.
spark.eventLog.enabled true // 이벤트로그 저장 여부
spark.eventLog.dir hdfs:///spark-history //이벤트 로그 저장
spark.history.fs.logDirectory hdfs:///spark-history // 종료된 App 로그 저장

보유 기능

  1. SparkStreaming
  2. SparkSQL
  3. SparkMLib
  4. SparkGraphx

[ Reference ]


개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!

'프로그래밍 및 IT > Spark' 카테고리의 다른 글

spark-scheduling  (0) 2023.11.28
WordCount  (0) 2023.10.13
RDD Action  (0) 2023.10.13
RDD Transformation  (0) 2023.10.13
Spark_RDD  (0) 2023.10.05

+ Recent posts