Spark가 대체 뭐야? (1)

.

스파크 알아보기

설치

1
2
3
4
# unpack tgz file archive for installing spark

$ wget http://apache.mirror.cdnetworks.com/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
$ tar xvzf spark-2.4.4-bin-hadoop2.7.tgz
  • c: create
  • x: extract
  • v: verbose
  • z: compress
  • f: target tar/tar gz file argument should be placed last
1
2
3
4
5
# java installation
$ sudo -E apt-get update
$ sudo -E apt-get install openjdk-8-jdk
$ alias python='/usr/bin/python3'
$ export PYSPARK_PYTHON=python3
  • spark-2.4.4-bin-hadoop2.7/bin 을 PATH에 등록해놓고, pyspark ,spark-shell 등의 커맨드로 python, scala를 사용해 스파크 콘솔을 실행할 수 있다.
1
2
3
4
5
6
7
# pyspark
>> spark
#<pyspark.sql.session.SparkSession object at 0x7f6884934518>

# spark-shell
>> spark
#res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@5d03df76

2.1. 스파크의 기본 아키텍쳐

  • 단일 머신으로는 대용량 데이터에 대한 연산을 진행하기 힘듬.
  • 클러스터링을 통해 다수의 컴퓨터 자원을 모아 하나의 컴퓨터처럼 사용할 수 있음.
  • 단순 모으는 것 뿐만 아니라 작업 조율이 대단히 중요해지는데, 이에 통용되는 프레임워크가 스파크.
  • 결과적으로 스파크는 클러스터의 데이터 처리 작업을 관리하고 조율하는 프레임워크

  • 클러스터는 기본적으로 클러스터 매니저가 관리
    • Spark Standalone
    • 하둡 YARN
    • Mesos
  • 사용자는 클러스터 매니저(CM)에 스파크 어플리케이션을 제출 → CM은 애플리케이션 실행에 필요한 자원을 할당 → 유저는 할당받은 자원으로 작업을 처리.

2.1.1. 스파크 어플리케이션

  • Driver process + N*Executor process
  • Driver 프로세스
    • 클러스터 노드 중 하나에서 실행
    • main() 함수를 실행
    • 어플리케이션의 정보 유지관리, 사용자 프로그램이나 입력에 대한 응답, executor 프로세스 작업과 관련된 분석, 배포, 스케쥴링
    • 애플리케이션의 수명 주기동안 관련 정보를 모두 유지
  • Executor 프로세스
    • Driver가 할당한 작업을 수행
    • 할당한 코드를 실행하고, 진행상황을 Driver에게 보고
  • 로컬 모드로 실행 시, 드라이버와 익스큐터를 단일 머신에서 Thread 형태로 실행

  • 스파크는 사용 가능한 자원을 파악하기 위해 CM을 사용
  • Driver 프로세스는 주어진 작업을 완료하기 위해 드라이버 프로그램의 명령을 Executor에서 실행할 책임
    • Executor는 대부분 스파크 코드를 실행하는 역할
    • Driver는 스파크 언어 API를 통해 다양한 언어로 실행

2.2 스파크의 다양한 언어 API

  • 구조적 API로 작성된 코드는 언어에 상관없이 유사한 성능을 발휘
    • 스칼라 (default)
    • 자바
    • python
    • SQL
    • R
  • SparkSession 객체를 진입점으로 사용 → 스파크가 사용자를 대신해 파이썬으로 작성된 코드를 Executor의 JVM에서 실행할 수 있는 코드로 변환

2.3 스파크 API

  • Unstructured API
  • Structured API (최근에 미는 방향성)

2.5 SparkSession

  • 대화형 모드로 스파크를 시작하면 스파크 어플리케이션을 관리하는 SparkSession이 자동으로 생성
  • SparkSession = 드라이버 프로세스
  • 하나의 SparkSession은 하나의 스파크 어플리케이션에 대응
1
2
3
val myRange = spark.range(1000).toDF("number")  // 분산 컬렉션
// 클러스터 모드에서 해당 코드를 실행하면 숫자 범위의 각 부분이 서로 다른 익스큐터에 할당됨.
// -> 스파크의 DataFrame

2.6 DataFrame

  • 가장 대표적인 구조적 API
  • 테이블의 데이터를 row와 column으로 단순하게 표현
  • 컬럼, 컬럼타입 정의목록 → 스키마
  • DataFrame ⇒ 컬럼에 이름을 붙인 spreadsheet꽈 유사
    • 그렇지만 클러스터에 분산되어 저장된다는 점에서 차이를 보임

2.6.1 파티션

  • 스파크는 모든 executor가 병렬로 작업을 수행할 수 있도록 partition이라 불리는 Chunk 단위로 데이터를 분할
    • 파티션은 클러스터의 물리적 머신에 존재하는 row의 집합
  • DataFrame의 파티션은 실행 중에 데이터가 클러스터에서 물리적으로 분산되는 방식을 나타냄
  • DataFrame 사용 시 파티션을 수동 혹은 개별적으로 처리할 필요없음.
    • 스파크가 실제 처리방법을 결정

2.7 Transformation

  • 스파크의 핵심 데이터 구조 → 불변성
1
2
3
val divisBy2 = myRange.where("number %2 = 0")
// 추상적인 Transformation만 지정한 상태이기 때문에 Action을 호출하지 않으면
// 실제 Transformation을 수행하지 않음 -> lazy execution
  • Transformation → 스파크에서 비즈니스 로직을 표현하는 핵심 개념
  • 의존성
    • 좁은 의존성 (narrow dependency)
      • 입력 파티션이 하나의 출력 파티션에만 영향을 미침
    • 넓은 의존성 (wide dependency)
      • 하나의 입력파티션이 여러 출력 파티션에 영향을 미침
  • 셔플? 파이프라이닝?

2.7.1 지연 연산

  • 스파크가 연산 그래프를 처리하기 직전까지 기다리는 동작 방식
  • 스파크는 실행 계획을 작성
    • 코드를 실행하는 마지막 순간까지 대기하다가 Transformation을 간결한 물리적 실행계획으로 컴파일
  • 조건절 푸시다운?

2.8 Action

  • 실제 연산을 수행하기 위해서 액션 명령을 내려야 함.
  • 일련의 Transformation들로부터 결과를 실행하라는 뜻
    • 가장 간단한 예시로 count
  • 액션에는 3가지 존재
    • 콘솔에서 데이터를 보는 액션
    • 각 언어로 된 네이티브 객체에 데이터를 모으는 액션
    • 출력 데이터소스에 저장하는 액션

2.10 종합 예제

  • 데이터를 읽는 방식 → SparkSession의 DataFrameReader 클래스를 사용해서 읽음.
  • 스파크는 스키마 정보를 얻기 위해 데이터를 조금 읽어 들임 → 해당 row의 데이터 타입을 스파크 데이터 타입에 맞게 분석
1
2
3
4
5
6
7
8
9
10
11
12
13
val flightData2015 = spark.read.option("inferSchema", "true").option("header", "true").csv("./data/flight-data/csv/2015-summary.csv")

flightData2015.take(3)  // show와 유사하지만, 값들을 임시 variable에 담아서 리턴
flightData2015.show(3)  // formatted 된 형태로 3개의 row를 보여줌

flightData2015.sort("count").explain()

/*
== Physical Plan ==
*(2) Sort [count#12 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#12 ASC NULLS FIRST, 200)
   +- *(1) FileScan csv [DEST_COUNTRY_NAME#10,ORIGIN_COUNTRY_NAME#11,count#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/deploy/Spark-The-Definitive-Guide/data/flight-data/csv/2015-summary...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>
*/
  • sort 메서드는 DataFrame을 변경하는 것이 아님. 새로운 DataFrame을 생성해 리턴할 뿐.
  • 결론적으로 csv file → read → new DataFrame → sort (넓은 transformation) → new DataFrame → take(3) → Array…
  • 실행계획에서 data source는 가장 아래, 최종 결과는 가장 위에
1
2
3
4
5
6
7
// 스파크는 셔플 수행 시 기본 200개의 셔플 파티션 생성
spark.conf.set("spark.sql.shuffle.partitions", "5")  // 5개로 제한

// 설정 확인을 위해서는 아마도 이렇게 하겠지 싶겠지만 실제로 맞다
spark.conf.get("spark.sql.shuffle.partitions")

flightData2015.sort("count").take(2)
  • Transformation의 논리적 실행 계획은 DataFrame의 계보를 정의
  • 사용자는 물리적 데이터를 직접 다루는 것이 아니라 위와 같이 속성 컨트롤을 통해 물리적 실행 특성을 제어.

2.10.1 DataFrame과 SQL

  • 스파크는 언어에 관계없이 같은 방식으로 Transformation을 수행 가능.
  • 사용자가 SQL이나 DataFrame으로 비즈니스 로직을 표현하면, 스파크에서 실제 코드를 실행하기 전에 그 로직을 기본실행계획으로 컴파일
  • 스파크는 SQL쿼리를 DataFrame 코드와 같은 실행계획으로 컴파일 하므로 성능차이 없음.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// DataFrame -> 테이블, 뷰
flightData2015.createOrReplaceTempView("flight_data_2015")
// 이렇게 하면 SQL로 데이터를 조회할 수 있음.

val sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

val dataFrameWay = flightData2015.groupBy('DEST_COUNTRY_NAME).count()

sqlWay.explain()
/*
== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#10], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#10, 5)
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#10], functions=[partial_count(1)])
      +- *(1) FileScan csv [DEST_COUNTRY_NAME#10] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/deploy/Spark-The-Definitive-Guide/data/flight-data/csv/2015-summary...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>
*/

dataFrameWay.explain()
/*
== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#10], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#10, 5)
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#10], functions=[partial_count(1)])
      +- *(1) FileScan csv [DEST_COUNTRY_NAME#10] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/deploy/Spark-The-Definitive-Guide/data/flight-data/csv/2015-summary...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>
*/
  • 둘 다 동일한 실행계획으로 나타남
  • Spark DataFrame과 SQL은 다양한 데이터 처리 기능을 제공
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
spark.sql("SELECT max(count) FROM flight_data_2015").take(1)  // SQL style

import org.apache.spark.sql.functions.max

flightData2015.select(max("count")).take(1)  // Using DataFrame

val maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) AS destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")

// maxSql.show()
/*
+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// DataFrame Style
// :paste 후 붙여넣는 방식으로 멀티라인을 손쉽게 복붙가능
import org.apache.spark.sql.functions.desc

flightData2015.groupBy("DEST_COUNTRY_NAME")
                            .sum("count")
                            .withColumnRenamed("sum(count)", "destination_total")
                            .sort(desc("destination_total"))
                            .limit(5)
                            .show()

/*
+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+
*/

//실행 계획
/*
== Physical Plan ==
TakeOrderedAndProject(limit=5, orderBy=[destination_total#65L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#10,destination_total#65L])
+- *(2) HashAggregate(keys=[DEST_COUNTRY_NAME#10], functions=[sum(cast(count#12 as bigint))])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#10, 200)
      +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#10], functions=[partial_sum(cast(count#12 as bigint))])
         +- *(1) FileScan csv [DEST_COUNTRY_NAME#10,count#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/deploy/Spark-The-Definitive-Guide/data/flight-data/csv/2015-summary...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>
*/
  • 스파크는 해당 DataFrame이나 자신의 원본 DataFrame에 action이 호출되기 전까지 데이터를 읽지 않음.

2편에서 계속..