Spark가 대체 뭐야? (5)

.

DataFrame Transformation

앞에서 DataFrame의 핵심 영역을 알아봤다면, 이번 장에서는 DataFrame을 다루는 방법을 다룬다.

  • Add Row, Column
  • Delete Row, Column
  • Transform row to column, vice versa
  • Change sequences depending on column value

1. DataFrame 생성하기

  • 원시 데이터 소스에서 DataFrame을 생성할 수도 있음.
1
2
3
4
val df = spark.read.format("json")
                    .load("./data/flight-data/json/2015-summary.json")

df.createOrReplaceTempView("dfTable")
  • Row 객체를 가진 Seq타입을 직접 변환해 DataFrame생성
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}

val myManualSchema = new StructType(
    Array(
        new StructField("some", StringType, true),
        new StructField("col", StringType, true),
        new StructField("names", LongType, false)
    )
)

val myRows = Seq(Row("Hello", null, 1L))
val myRDD = spark.sparkContext.parallelize(myRows)
val myDf = spark.createDataFrame(myRDD, myManualSchema)

myDf.show()

val myDf = Seq(("Hello", 2, 1L)).toDF("col1", "col2", "col3")

2. select, selectExpr

  • select, selectExpr를 사용하면 데티어 테이블에 SQL을 실행하는 것처럼 DataFrame에서도 SQL를 사용할 수 있음.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
df.select("DEST_COUNTRY_NAME").show(2)

// SQL
// SELECT DEST_COUNTRY_NAME FROM dfTable LIMIT 2

df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)

// SQL
// SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME FROM dfTable LIMIT 2

import org.apache.spark.sql.functions.{expr, col, column}

df.select(
    df.col("DEST_COUNTRY_NAME"),
    col("DEST_COUNTRY_NAME"),
    column("DEST_COUNTRY_NAME"),
    'DEST_COUNTRY_NAME,
    $"DEST_COUNTRY_NAME",
    expr("DEST_COUNTRY_NAME")
).show(2)
  • 위와 같이 다양한 방법으로 컬럼을 참조할 수 있음.
1
2
3
4
5
6
7
8
9
10
11
df.select(col("DEST_COUNTRY_NAME"), "DEST_COUNTRY_NAME")  //컴파일에러

// expr은 가장 유연한 참조 방법

df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)

// SQL
// SELECT DEST_COUNTRY_NAME as destination FROM dfTable LIMIT 2

df.select(expr("DEST_COUNTRY_NAME as destination").alias("DEST_COUNTRY_NAME"))
    .show(2)
  • 이렇게 select와 expr을 합쳐서 자주 사용하다보니 스파크에서는 selectExpr 메서드를 제공.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
df.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME").show(2)

df.selectExpr(
    "*",  // 모든 원본 컬럼 포함
    "(DEST_COUNTRY_NAME = ORITIN_COUNTRY_NAME) as withinCountry"
).show(2)

// SQL
// SELECT *, (DEST_COUNTRY_NAME=ORIGIN_COUNTRY_NAME) as withinCountry
// FROM dfTable LIMIT 2

// 집계함수 지정
df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)

// SQL
// SELECT avg(count), count(distinct(DEST_COUNTRY_NAME)) FROM dfTable LIMIT 2

3. 스파크 데이터 타입으로 변환하기

  • 새로운 컬럼이 아닌 명시적인 값을 스파크에 전달해야 할 때
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.apache.spark.sql.functions.lit

df.select(expr("*"), lit(1).as("One")).show(2)

/*
+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
*/

// SQL version
spark.sql("SELECT *, 1 as One FROM dfTable limit 2").show()

4. 컬럼 추가하기

  • withColumn 메서드를 사용해서 컬럼 추가 가능
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
df.withColumn("numberOne", lit(1)).show(2)
/*
+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
*/

// SQL version
spark.sql("SELECT *, 1 as numberOne FROM dfTable LIMIT 2").show()

df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME")).show(2)
/*
+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
*/
  • withColumn 메서드로 컬럼명을 변경하는 것도 가능
    1
    2
    3
    4
    5
    6
    7
    8
    9
    df.withColumn("Destination", expr("DEST_COUNTRY_NAME")).show(2)
    /*
    +-----------------+-------------------+-----+-------------+
    |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|  Destination|
    +-----------------+-------------------+-----+-------------+
    |    United States|            Romania|   15|United States|
    |    United States|            Croatia|    1|United States|
    +-----------------+-------------------+-----+-------------+
    */
    

5. 커럼명 변경하기

  • withColumn 메서드 대신 withColumnRenamed 메서드로 컬럼명 변경 가능 ```scala df.withColumnRenamed(“DEST_COUNTRY_NAME”, “dest”).columns

// res7: Array[String] = Array(DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count) // res8: Array[String] = Array(dest, ORIGIN_COUNTRY_NAME, count)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

## 6. 예약 문자와 키워드

- 공백이나 하이푼은 컬럼명에 사용할 수 없음
- 굳이 사용하기를 원한다면 backtick(`)을 이용해서 escaping해야 함.
```scala
import org.apache.spark.sql.functions.expr

val dfWithLongColName = df.withColumn(
      "This is Long ColName", expr("ORIGIN_COUNTRY_NAME"))
// 문자열로 입력했기 때문에 공백을 escaping할 필요 없음.

// 아래 경우는 표현식으로 컬럼을 참조하기 때문에 escaping이 필요
dfWithLongColName.selectExpr(
    "`This is Long ColName`",
    "`This is Long ColName` as `new col`").show(2)

7. 대소문자 구분

  • 기본적으로 스파크는 대소문자를 가리지 않음
  • 구분하게 만드는 방법
1
2
// SQL
set spark.sql.caseSensitive true

8. 컬럼 제거하기

1
2
3
df.drop("ORIGIN_COUNTRY_NAME").columns

dfWithLongColName.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME")

9. 컬럼의 데이터 타입 변경하기

  • 예를 들어 StringType을 정수형으로 변환해야 하는 것과 같은 형평환 필요 케이스가 발생 ```scala df.withColumn(“count2”, col(“count”).cast(“string”))

df.withColumn(“count2”, col(“count”).cast(“string”)).schema /* res17: org.apache.spark.sql.types.StructType = StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,LongType,true), StructField(count2,StringType,true)) */

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

## 10. Row 필터링

- Row를 필터링하기 위해서는 true, false를 판별하는 표현식이 필요
- filter, where 메서드 사용 가능
```scala
df.filter(col("count")<2).show()
/*
+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Croatia|    1|
|       United States|          Singapore|    1|
|             Moldova|      United States|    1|
|               Malta|      United States|    1|
|       United States|          Gibraltar|    1|
|Saint Vincent and...|      United States|    1|
|            Suriname|      United States|    1|
|       United States|             Cyprus|    1|
|        Burkina Faso|      United States|    1|
|            Djibouti|      United States|    1|
|       United States|            Estonia|    1|
|              Zambia|      United States|    1|
|              Cyprus|      United States|    1|
|       United States|          Lithuania|    1|
|       United States|           Bulgaria|    1|
|       United States|            Georgia|    1|
|       United States|            Bahrain|    1|
|       Cote d'Ivoire|      United States|    1|
|       United States|   Papua New Guinea|    1|
|              Kosovo|      United States|    1|
+--------------------+-------------------+-----+
*/

df.where("count < 2").show(2)
/*
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
*/
  • 스파크는 자동으로 필터의 순서와 상관없이 모든 필터링 작업을 수행
  • 어떤 조건이 먼저 걸릴지는 스파크가 알아서 판단하므로 유의해야 함.
1
2
3
4
5
6
7
8
9
df.where(col("count")<2).where(col("ORIGIN_COUNTRY_NAME") =!= "Croatia").show(2)
/*
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|          Singapore|    1|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
*/

11. 고유한 Row 얻기

  • distinct 메서드를 이용해 DataFrame의 모든 Row에서 중복 데이터를 제거할 수 있음.
1
2
3
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()

df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().show(5)

12. 무작위 샘플 만들기

  • sample 메서드를 이용해 무작위로 샘플링된 데이터를 얻을 수 있음.
  • Sample with/without replacement 지정 가능
1
2
3
4
5
6
val seed = 5
val withReplacement = false
val fraction = 0.5

df.sample(withReplacement, fraction, seed).count()
// 126

13. 임의 분할하기

  • DataFrame을 임의 크기로 분할할 때 유용
  • 학습셋, 테스트셋을 나누는 것과 같은 경우에 사용 가능. ```scala val seed = 777

val dataFrames = df.randomSplit(Array(0.8, 0.2), seed) dataFrames(0).count() > dataFrames(1).count() // true

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

## 14. Row 합치기, 추가하기

- DataFrame은 불변성을 가짐
- 따라서, DataFrame에 레코드를 추가하려면 원본을 새로운 DataFrame과 통합해야 함.
- `union` 메서드는 현재 스키마가 아닌 컬럼 위치를 기반으로 동작하므로 유저가 생각한대로 자동 정렬이 되지 않을 수도 있음.
```scala
import org.apache.spark.sql.Row

val schema = df.schema

val newRows = Seq(
      Row("New Country", "OtherCountry", 5L),
      Row("New Country 2", "OtherCountry 2", 1L)
      )
val parallelizedRows = spark.sparkContext.parallelize(newRows)

val newDF = spark.createDataFrame(parallelizedRows, schema)

df.union(newDF).where("count=1")
    .where($"ORIGIN_COUNTRY_NAME" =!= "United States").show()
  • spark에서는 반드시 =!=를 사용해야 함
    • =!====는 스파크의 Column 클래스에 정의된 함수

15. Row 정렬하기

  • sortorderBy 메서드를 사용해 DataFrame의 최댓값 혹은 최솟값이 상단에 위치하도록 할 수 있음.
  • 두 메서드는 완벽히 동일하게 동작
1
2
3
4
5
6
7
8
9
10
df.sort("count").show(5)

df.orderBy("count", "DEST_COUNTRY_NAME").show(5)

df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show()

import org.apache.spark.sql.functions.{desc, asc}

df.orderBy(expr("count desc")).show(2)
df.orderBy(desc("count"), asc("DEST_COUNTRY_NAME")).show(2)
  • asc_nulls_first, desc_nulls_first, asc_nulls_last, desc_nulls_last 메서드를 사용하여 정렬된 DataFrame에서 null값이 표시되는 기준을 지정 가능
1
2
3
4
// 파티션별 정렬

spark.read.format("json").load("./data/flight-data/json/*-summary.json")
    .sortWithinPartition("count")

16. Row수 제한하기

1
2
3
df.limit(5).show()

df.orderBy(expr("count desc")).limit(6).show()

17. repartition, coalesce

  • 자주 필터링하는 컬럼을 기준으로 데이터를 분할하는 방식으로 최적화 진행 가능
  • repartition → 무조건 전체 데이터 셔플링
    • 사용을 지양해야 함.
    • 향후에 사용할 파티션 수가 현재 파티션 수보다 많거나 컬럼을 기준으로 파티션을 만드는 경우에만 사용해야 함.
1
2
3
4
5
6
7
8
9
10
df.rdd.getNumPartitions
// 1

df.repartition(5)

// 특정 컬럼을 기준으로 파티션을 재분배
df.repartition(col("DEST_COUNTRY_NAME"))
df.repartition(expr("ORIGIN_COUNTRY_NAME"))

df.repartition(5, col("DEST_COUNTRY_NAME"))
  • coalesce → 전체 데이터를 셔플하지 않고 파티션을 병합하려는 경우
    • 파티션을 줄이려면 repartition 대신 coalesce를 사용해야 함.
1
2
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)
// 셔플을 통해 5개의 파티션으로 나누고 전체 데이터를 셔플 없이 병합

18. 드라이버로 Row 데이터 수집하기

  • 스파크는 드라이버에서 클러스터 상태정보를 유지
1
2
3
4
5
6
7
8
val collectDF = df.limit(10)
collectDF.take(5)
collectDF.show()
collectDF.show(5, false)
collectDF.collect()

collectDF.toLocalIterator()  // 모든 파티션의 데이터를 드라이버에 전달.
// 데이터셋의 파티션을 차례로 반복처리 가능

.

6부에서 계속.