Spark가 대체 뭐야? (4)

.

구조적 API의 기본 연산

  • DataFrame은 Row 타입의 레코드와 각 레코드에 수행할 연산 표현식을 나타내는 여러 컬럼으로 구성
  • 스키마 = 각 컬럼명과 데이터 타입을 정의
  • 파티셔닝 → DataFrame이나 Dataset이 클러스터에서 물리적으로 배치되는 형태
    • 파티셔닝 스키마 → 파티션을 배치하는 방법 정의
    • 파티셔닝의 분할 기준은 특정 컬럼이나 비결정론적 값을 기반으로 설정 가능
1
2
3
4
5
6
7
8
9
10
11
// DataFrame 생성
val df = spark.read.format("json")
                    .load("./data/flight-data/json/2015-summary.json")

df.printSchema()
/*
root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)
*/

5.1 스키마

  • Schema-on-Read 잘 동작,그러나 주의해야 할 점들 존재

ad-hoc 분석에서는 schema-on-read가 잘 동작. 그렇지만,csv나 json과 같은 일반 텍스트 파일을 사용하면 다소 느림.

Long 데이터 타입을 Integer로 추론하는 것과 같은 정밀도 문제 발생 가능. 따라서 ETL 작업 같은 곳에는 스키마를 미리 정의하는 것을 권장. (데이터 타입을 알기 힘든 csv, json을 사용할 시에 샘플 데이터 타입에 따라 데이터 타입을 결정해버릴 수 있기 때문)

1
2
3
4
5
6
7
spark.read.format("json").load("./data/flight-data/json/2015-summary.json").schema
/*
res1: org.apache.spark.sql.types.StructType =
StructType(StructField(DEST_COUNTRY_NAME,StringType,true),
StructField(ORIGIN_COUNTRY_NAME,StringType,true),
StructField(count,LongType,true))
*/
  • 스키마는 여러 개의 StructField 타입 필드로 구성된 StructType 객체
  • 스파크는 런타임에 데이터 타입이 스키마의 데이터 타입과 일치하지 않으면 오류 발생
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
import org.apache.spark.sql.types.Metadata

val myManualSchema = StructType(
    Array(
        StructField("DEST_COUNTRY_NAME", StringType, true),
        StructField("ORIGIN_COUNTRY_NAME", StringType,true),
        StructField("count", LongType, false,
            Metadata.fromJson("{\"hello\":\"world\"}"))
    )
)

val df = spark.read.format("json").schema(myManualSchema)
                        .load("./data/flight-data/json/2015-summary.json")

5.2 컬럼과 표현식

  • 사용자는 표현식으로 DataFrame의 컬럼을 선택, 조작, 제거할 수 있습니다.
  • 스파크의 컬럼은 표현식을 사용해 레코드 단위로 계산한 값을 단순하게 나타내는 논리적인 구조.
  • 컬럼 내용을 수정하려면 반드시 DataFrame의 스파크 트랜스포메이션을 사용해야 함.

5.2.1 컬럼

1
2
3
4
5
6
import org.apache.spark.sql.functions.{col, column}

col("someColumnName")
column("someColumnName")
$"myColum" // scala 고유의 방법
'myColum // scala 고유의 방법
  • 컬럼이 DataFrame에 있을지 없을지는 알 수 없음.
    • 컬럼은 컬럼명을 카탈로그에 저장된 정보와 비교하기 전까지 미확인 상태

명시적 컬럼 참조

1
df.col("count")  // DataFrame의 컬럼은 col 메서드로 참조

5.2.2 표현식

  • 표현식이란?
    • DataFrame 레코드의 여러 값에 대한 Transformation 집합을 의미
    • expr("someCol")col("someCol")과 동일
  • 컬럼은 단지 표현식
  • 컬럼과 컬럼의 트랜스포메이션은 파싱된 표현식과 동일한 논리적 실행 계획으로 컴파일

    import org.apache.spark.sql.functions.expr

    expr(“(((someCol + 5)*200)-6) < otherCol”)

  • SQL 표현식과 DataFrame코드는 실행 시점에 동일한 논리 트리로 컴파일

DataFrame 컬럼에 접근하기

1
2
spark.read.format("json").load("./data/flight-data/json/2015-summary.json")
    .columns

5.3 Record와 Row

  • DataFrame의 각 row는 하나의 레코드
  • 스파크는 레코드를 Row 객체로 표현
  • 스파크는 값을 생성하기 위해 컬럼 표현식으로 Row 객체를 다룸.
  • Row 객체는 내부에 바이트 배열을 가짐.
    • 바이트 배열 인터페이스는 컬럼 표현식으로만 다룰 수 있으므로, 사용자에게 노출되지 않음.
1
df.first()

5.3.1 Row 생성하기

  • Row 객체는 스키마 정보를 가지고 있지 않음 → DataFrame만 스키마를 가짐.
1
2
3
4
5
6
7
8
9
import org.apache.spark.sql.Row

val myRow = Row("Hello, null, 1, false)

// 데이터에 접근하는 방법
myRow(0)  // Any type
myRow(0).asInstanceOf[String]  // String type
myRow.getString(0)  // String type
myRow.getInt(2)  // Int type

.

5부에서 계속..