.
DataFrame Transformation
앞에서 DataFrame의 핵심 영역을 알아봤다면, 이번 장에서는 DataFrame을 다루는 방법을 다룬다.
- Add Row, Column
- Delete Row, Column
- Transform row to column, vice versa
- Change sequences depending on column value
1. DataFrame 생성하기
- 원시 데이터 소스에서 DataFrame을 생성할 수도 있음.
1 |
|
- Row 객체를 가진 Seq타입을 직접 변환해 DataFrame생성
1 |
|
2. select, selectExpr
- select, selectExpr를 사용하면 데티어 테이블에 SQL을 실행하는 것처럼 DataFrame에서도 SQL를 사용할 수 있음.
1 |
|
- 위와 같이 다양한 방법으로 컬럼을 참조할 수 있음.
1 |
|
- 이렇게 select와 expr을 합쳐서 자주 사용하다보니 스파크에서는 selectExpr 메서드를 제공.
1 |
|
3. 스파크 데이터 타입으로 변환하기
- 새로운 컬럼이 아닌 명시적인 값을 스파크에 전달해야 할 때
1 |
|
4. 컬럼 추가하기
withColumn
메서드를 사용해서 컬럼 추가 가능
1 |
|
withColumn
메서드로 컬럼명을 변경하는 것도 가능1
2
3
4
5
6
7
8
9df.withColumn("Destination", expr("DEST_COUNTRY_NAME")).show(2) /* +-----------------+-------------------+-----+-------------+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| Destination| +-----------------+-------------------+-----+-------------+ | United States| Romania| 15|United States| | United States| Croatia| 1|United States| +-----------------+-------------------+-----+-------------+ */
5. 커럼명 변경하기
withColumn
메서드 대신withColumnRenamed
메서드로 컬럼명 변경 가능 ```scala df.withColumnRenamed(“DEST_COUNTRY_NAME”, “dest”).columns
// res7: Array[String] = Array(DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count) // res8: Array[String] = Array(dest, ORIGIN_COUNTRY_NAME, count)
1 |
|
7. 대소문자 구분
- 기본적으로 스파크는 대소문자를 가리지 않음
- 구분하게 만드는 방법
1 |
|
8. 컬럼 제거하기
1 |
|
9. 컬럼의 데이터 타입 변경하기
- 예를 들어 StringType을 정수형으로 변환해야 하는 것과 같은 형평환 필요 케이스가 발생 ```scala df.withColumn(“count2”, col(“count”).cast(“string”))
df.withColumn(“count2”, col(“count”).cast(“string”)).schema /* res17: org.apache.spark.sql.types.StructType = StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,LongType,true), StructField(count2,StringType,true)) */
1 |
|
- 스파크는 자동으로 필터의 순서와 상관없이 모든 필터링 작업을 수행
- 어떤 조건이 먼저 걸릴지는 스파크가 알아서 판단하므로 유의해야 함.
1 |
|
11. 고유한 Row 얻기
distinct
메서드를 이용해 DataFrame의 모든 Row에서 중복 데이터를 제거할 수 있음.
1 |
|
12. 무작위 샘플 만들기
sample
메서드를 이용해 무작위로 샘플링된 데이터를 얻을 수 있음.- Sample with/without replacement 지정 가능
1 |
|
13. 임의 분할하기
- DataFrame을 임의 크기로 분할할 때 유용
- 학습셋, 테스트셋을 나누는 것과 같은 경우에 사용 가능. ```scala val seed = 777
val dataFrames = df.randomSplit(Array(0.8, 0.2), seed) dataFrames(0).count() > dataFrames(1).count() // true
1 |
|
- spark에서는 반드시
=!=
를 사용해야 함=!=
와===
는 스파크의 Column 클래스에 정의된 함수
15. Row 정렬하기
sort
와orderBy
메서드를 사용해 DataFrame의 최댓값 혹은 최솟값이 상단에 위치하도록 할 수 있음.- 두 메서드는 완벽히 동일하게 동작
1 |
|
- asc_nulls_first, desc_nulls_first, asc_nulls_last, desc_nulls_last 메서드를 사용하여 정렬된 DataFrame에서 null값이 표시되는 기준을 지정 가능
1 |
|
16. Row수 제한하기
1 |
|
17. repartition, coalesce
- 자주 필터링하는 컬럼을 기준으로 데이터를 분할하는 방식으로 최적화 진행 가능
- repartition → 무조건 전체 데이터 셔플링
- 사용을 지양해야 함.
- 향후에 사용할 파티션 수가 현재 파티션 수보다 많거나 컬럼을 기준으로 파티션을 만드는 경우에만 사용해야 함.
1 |
|
- coalesce → 전체 데이터를 셔플하지 않고 파티션을 병합하려는 경우
- 파티션을 줄이려면 repartition 대신 coalesce를 사용해야 함.
1 |
|
18. 드라이버로 Row 데이터 수집하기
- 스파크는 드라이버에서 클러스터 상태정보를 유지
1 |
|
.
6부에서 계속.