Spark가 대체 뭐야? (2)

.

기본 개념 익히기

스파크의 기본요소

  • 구조적 API
    • Dataset
    • DataFrame
    • SQL
  • 저수준 API
    • RDD
    • 분산형 변수

3.1 운영용 어플리케이션 실행

  • spark-submit 을 사용해서 대화형 쉘에서 개발하던 프로그램을 운영용 어플리케이션으로 쉽게 전환 가능
    • spark-submit은 어플리케이션 코드를 클러스터에 전송해 실행시키는 역할
  • 스파크 어플리케이션 → StandAlone, Mesos, YARN 클러스터 매니저를 통해 실행
1
2
3
4
# Spark를 다운 받은 폴더에서 실행
$ spark-submit --class org.apache.spark.examples.SparkPi --master local ./examples/jars/spark-examples_2.11-2.4.4.jar 10

$ spark-submit --master local ./examples/src/main/python/pi.py 10

3.2 Dataset: 타입 안정성을 제공하는 구조적 API

  • 자바와 스칼라의 정적 데이터 타입에 맞는 정적 타입코드를 지원하기 위해 고안된 스파크의 구조적 API
  • 타입 안정성 지원
  • 파이썬과 R에서는 사용 불가능

  • DataFrame은 다양한 데이터 타입의 테이블형 데이터를 보관할 수 있는 Row타입의 객체로 구성된 분산 컬렉션
  • Dataset → DataFrame의 레코드를 사용자가 자바나 스칼라로 정의한 클래스에 할당
  • 자바의 ArrayList나 스칼라의 Seq 객체 등의 고정 타입형 컬렉션으로 다룰 수 있는 기능
  • 타입 안정성을 지원 → 초기화에 사용한 클래스 대신 다른 클래스를 사용해 접근 불가능
  • 대규모 어플리케이션 개발에 효율적
1
Dataset[Person] // Person 클래스 객체만 가질 수 있음
  • Dataset은 필요한 경우에 선택적으로 사용할 수 있다는 장점
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
case class Flight(DEST_COUNTRY_NAME: String,
                                 ORIGIN_COUNTRY_NAME: String,
                                 count: BigInt)
val flightDF = spark.read.parquet("./data/flight-data/parquet/2010-summary.parquet/")
/*
res0: org.apache.spark.sql.DataFrame =
[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]
*/

val flight = flightDF.as[Flight]
/*
res1: org.apache.spark.sql.Dataset[Flight] =
[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]
*/

flight
    .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
    .map(flight_row => flight_row)
    .take(5)

flight
    .take(5)
    .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
    .map(fr => Flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME, fr.count+5))

3.3 구조적 스트리밍

  • 스파크 2.2 버전에서 안정화된 스트림 처리용 고수준 API
  • 구조적 API로 개발된 배치모드의 연산을 스트리밍 방식으로 실행 가능.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
val staticDataFrame = spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("./data/retail-data/by-day/*.csv")

/*
staticDataFrame: org.apache.spark.sql.DataFrame =
[InvoiceNo: string, StockCode: string ... 6 more fields]
*/

staticDataFrame.createOrReplaceTempView("retail_data")

val staticSchema = staticDataFrame.schema
/*
staticSchema:
org.apache.spark.sql.types.StructType =
StructType(StructField(InvoiceNo,StringType,true),
StructField(StockCode,StringType,true),
StructField(Description,StringType,true),
StructField(Quantity,IntegerType,true),
StructField(InvoiceDate,TimestampType,true),
StructField(UnitPrice,DoubleType,true),
StructField(CustomerID,DoubleType,true),
StructField(Country,StringType,true))
*/
  • 윈도우 함수는 집계 시에 시계열 컬럼을 기준으로 각 날짜에 대한 전체 데이터를 가지는 윈도우를 구성.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import org.apache.spark.sql.functions.{window, col}

staticDataFrame
    .selectExpr(
        "CustomerId",
        "(UnitPrice*Quantity) as total_cost",
        "InvoiceDate"
    )
    .groupBy(
        col("CustomerId"), window(col("InvoiceDate"), "1 day")
    )
    .sum("total_cost")
    .show(5)

/*
+----------+--------------------+-----------------+
|CustomerId|              window|  sum(total_cost)|
+----------+--------------------+-----------------+
|   16057.0|[2011-12-05 09:00...|            -37.6|
|   14126.0|[2011-11-29 09:00...|643.6300000000001|
|   13500.0|[2011-11-16 09:00...|497.9700000000001|
|   17160.0|[2011-11-08 09:00...|516.8499999999999|
|   15608.0|[2011-11-11 09:00...|            122.4|
+----------+--------------------+-----------------+
*/
  • 스트리밍은 나머지는 동일하지만 read대신 readStream을 사용
  • maxFilesPerTrigger → 한번에 읽을 파일 수 설정
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
staticDataFrame.isStreaming
// res4: Boolean = false

val streamingDataFrame = spark.readStream
    .schema(staticSchema)
    .option("maxFilesPerTrigger", 1)
    .format("csv")
    .option("header", "true")
    .load("./data/retail-data/by-day/*.csv")

streamingDataFrame.isStreaming
// res5: Boolean = true

val purchaseByCustomerPerHour = streamingDataFrame
    .selectExpr(
        "CustomerId",
        "(UnitPrice*Quantity) as total_cost",
        "InvoiceDate"
    )
    .groupBy(
        col("CustomerId"), window(col("InvoiceDate"), "1 day")
    )
    .sum("total_cost")
  • 이 작업 역시 지연연산이므로 실행하기 위해 스트리밍 액션을 호출해야 함.
1
2
3
4
5
6

purchaseByCustomerPerHour.writeStream
    .format("memory") // 인메모리 테이블에 저장
    .queryName("customer_purchases")  // 테이블 명
    .outputMode("complete") // 모든 카운트 수행결과를 테이블에 저장
    .start()
  • 스파크는 이전 집계값보다 더 큰 값이 발생한 경우에만 인메모리 테이블을 갱신 → 항상 가장 큰 값을 얻을 수 있음.
1
2
3
4
5
6
7
8
9
10
11
spark.sql("""
    SELECT *
    FROM customer_purchases
    ORDER BY `sum(total_cost)` DESC
""").show()

purchaseByCustomerPerHour.writeStream
    .format("console")
    .queryName("customer_purchases_2")
    .outputMode("complete")
    .start()

3.4 머신러닝과 고급 분석

  • MLlib 활용에 대한 내용
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
staticDataFrame.printSchema()
/*
root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)
*/

import org.apache.spark.sql.functions.date_format

val preppedDataFrame = staticDataFrame
    .na.fill(0)
    .withColumn("day_of_week", date_format($"InvoiceDate", "EEEE"))
    .coalesce(5)

val trainDataFrame = preppedDataFrame
    .where("InvoiceDate < '2011-07-01'")
val testDataFrame = preppedDataFrame
    .where("InvoiceDate >= '2011-07-01'")

trainDataFrame.count()
//res4: Long = 245903

testDataFrame.count()
// res5: Long = 296006

import org.apache.spark.ml.feature.StringIndexer

val indexer = new StringIndexer()
    .setInputCol("day_of_week")
    .setOutputCol("day_of_week_index")
  • 토요일은 6, 월요일은 1로 표현이 되므로 암묵적인 대소관계가 담길 수 있으므로 옳지않음.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import org.apache.spark.ml.feature.OneHotEncoder

val encoder = new OneHotEncoder()
    .setInputCol("day_of_week_index")
    .setOutputCol("day_of_week_encoded")

import org.apache.spark.ml.feature.VectorAssembler

val vectorAssembler = new VectorAssembler()
    .setInputCols(Array("UnitPrice", "Quantity", "day_of_week_encoded"))
    .setOutputCol("features")

import org.apache.spark.ml.Pipeline

val transformationPipeline = new Pipeline()
    .setStages(Array(indexer, encoder, vectorAssembler))

val fittedPipeline = transformationPipeline.fit(trainDataFrame)

val transformedTraining = fittedPipeline.transform(train)

val transformedTraining = fittedPipeline.transform(trainDataFrame)

// 모델 학습
import org.apache.spark.ml.clustering.KMeans

val kmeans = new KMeans()
    .setK(20)
    .setSeed(1L)
  • 학습 전 알고리즘 명칭 : Algorithm
  • 학습 후 알고리즘 명칭 : AlgorithmModel
1
2
3
4
5
6
7
val kmModel = kmeans.fit(transformedTraining)

kmModel.computeCost(transformedTraining)  // 군집비용

val transformedTest = fittedPipeline.transform(testDataFrame)

kmModel.computeCost(transformedTest)

3.5 저수준 API

  • 스파크는 RDD를 통해 자바와 파이썬 객체를 다루는데 필요한 기본 기능을 제공.
  • 스파크의 거의 모든 기능은 RDD 기반
1
spark.sparkContext.parallelize(Seq(1,2,3)).toDF()
  • RDD 사용법은 파이썬과 스칼라가 다름.
  • 최신 버전의 스파크에서는 기본적으로 RDD를 사용하지 않지만, 비정형 데이터나 정제되지 않은 원시 데이터를 처리해야 한다면 RDD 사용

.

3부에서 계속