Hadoop 과 Spark를 공부하기 시작하면 가장 처음 나오는 것이 Map /Reduce이다
- Map /Reduce 설명은 Hadoop링크로 들어가 보면 설명을 해놓았다.
이것을 가장 이해하기 편하게 예시를 드는 것이 WordCount이다.
간단 예시
구현
아래와 같이 구현을 하고 , args 를 local[*]
텍스트파일 경로
적재 위치
순으로 넣어서 실행해보면 된다.
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object wordCount {
def main(args: Array[String]): Unit = {
// Args 3개 필수
require(args.length == 3, "Usage : <Master> <Input> <Output>")
// Spark 설정
val conf = new SparkConf()
.setAppName("WorkCount")
.setMaster(args(0))
.set("spark.local.ip", "localhost")
.set("spark.driver.host", "localhost")
// SparkContext생성
val sc = new SparkContext(conf)
val input = readFile(sc, args(1))
val result = process(input)
saveFile(result, args(2))
sc.stop()
}
def readFile(sc: SparkContext, path: String): RDD[String] = {
sc.textFile(path) // 파일을 읽고
}
def process(input: RDD[String]): RDD[(String, Int)] = {
input.flatMap(str => str.split(" ")) //공백으로 분류
.map((_, 1)) // 각각 key : 1 로 만듦
.reduceByKey(_ + _) // value들을 key를 기준으로 합산
}
def saveFile(output: RDD[(String, Int)], path: String): Unit = {
output.saveAsTextFile(path) // 적재
}
}
- 위 코드를 실행시키면 디렉토리가 생성되고 안에는 여러 파일들이 존재한다
- 그 중 part-00000를 출력해보면 WordCount가 잘 동작했는지 확인 가능하다
- tail,head,cat 원하는 것으로 확인해 보면 된다.
C:\Apps\hadoop-3.1.2\output>cat part-00000
(Unless,4)
(works,,4)
...
테스트 코드
- JUnit을 이용하면 아래와같이
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.jupiter.api.Test
import scala.collection.mutable.ListBuffer
class wordCountTest {
@Test
def test(): Unit = {
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("wordCountTest")
.set("spark.local.ip", "localhost")
.set("spark.driver.host", "localhost")
val sc = new SparkContext(conf)
val input = new ListBuffer[String]
input += "License shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document."
input += "Licensor shall mean the copyright owner or entity authorized by the copyright owner that is granting the License"
input.toList
val inputRDD = sc.parallelize(input)
val result = wordCount.process(inputRDD)
val resultMap = result.collectAsMap()
assert(resultMap("shall") == 2)
assert(resultMap("copyright") == 2)
assert(resultMap("License") == 2)
println(resultMap)
sc.stop()
}
}
- FlatSpec을 이용하면 아래와 같이 구현하면 된다.
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.flatspec.FlatSpec
import org.scalatest.matchers.should.Matchers
class workCountTest_Spec extends FlatSpec with Matchers {
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("wordCountTest")
.set("spark.local.ip", "localhost")
.set("spark.driver.host", "localhost")
val sc = new SparkContext(conf)
"WorkCount Process" should "Correctly count words in the input" in {
val inputRDD = sc.parallelize(Seq(
"License shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document."
, "Licensor shall mean the copyright owner or entity authorized by the copyright owner that is granting the License"
))
val result = wordCount.process(inputRDD)
val resultMap = result.collectAsMap()
println(resultMap)
resultMap("shall") shouldEqual 2
resultMap("copyright") shouldEqual 2
resultMap("License") shouldEqual 2
// resultMap("License") shouldEqual 1
}
}
성공 시 성공했다고만 하고
실패시 아래와 같이 에러 발생한다
!Pasted image 20231005140735.png
- Intellij에서 구동하였고, build tool을
Gradle
로 해서 그런지:test ....
를 못 찾는다고 에러가 발생했다.- 간단 해결방법 아래와 같이 setting을 설정해 주면 된다
- Run Tests using 을 Gradle -> Intellij IDEA로 변경
- 추가적으로 intellij IDEA로 하면 조금 더 빠르다는 이야기가 있다.
- 간단 해결방법 아래와 같이 setting을 설정해 주면 된다
[ Reference ]
개인적으로 공부한 내용 포스팅 중
잘못된 정보는 지적해주시면 좋겠습니다!
'프로그래밍 및 IT > Spark' 카테고리의 다른 글
spark-Setting (0) | 2023.11.28 |
---|---|
spark-scheduling (0) | 2023.11.28 |
RDD Action (0) | 2023.10.13 |
RDD Transformation (0) | 2023.10.13 |
Spark_RDD (0) | 2023.10.05 |