해당 내용은 스파크 완벽 가이드를 개인적인 학습 내용과 함께 정리한 내용입니다.
- spark-submit: 명령으로 운영용 애플리케이션 실행
- Dataset: 타입 안정성(type-safe)을 제공하는 구조적 API
- 구조적 스트리밍
- 머신러닝과 고급 분석
- RDD: 스파크의 저수준 API
- SparkR
3.1 운영용 애플리케이션 실행하기
spark-submit 명령은 애플리케이션 코드를 클러스터에 전송해 실행시키는 역할을 한다.
클러스터에 제출된 애플리케이션은 작업이 종료되거나 에러가 발생할때까지 실행된다.
spark 애플리케이션은 Standalone, mesos, YARN 클러스터 매니저를 이용해 실행된다.
3.2 Dataset: 타입 안정성을 제공하는 구조적 API
Dataset은 자바와 스칼라의 정적 데이터 타입에 맞는 정적 타입 코드(statically typed code)를 지원하기 위해 고안된 스파크의 구조적 API이다. (동적 타입 언어인 파이썬과 R은 사용할 수 없다)
Dataset API는 특히 타입 안정성(type safety)를 제공해, DataFrame보다 구조적이고 정형화된 데이터 작업에 적합하다.
DataFrame의 레코드를 사용자가 (자바나 스칼라로) 정의한 클래스에 할당하고, 자바의 ArrayList 또는 스칼라의 Seq 객체 등의 고정 타입형 컬렉션으로 다룰 수 있는 기능을 제공한다.
타입 안정성(type safety)
타입 안정성은 코드를 작성할 때 데이터의 타입(ex. 정수, 문자열 등)이 명시적으로 지정하고 이에 맞게 처리함으로써, 컴파일 시점에 타입 오류를 검출할 수 있다.
Dataset 클래스와 타입 매개변수
- 타입 매개변수: Dataset 클래스는 자바에서 Dataset<T> 형식, 스칼라에서는 Dataset[T] 형식으로 사용된다. 여기서 T는 데이터 타입을 의미한다. 이 타입 매개변수를 사용함으로써, Dataset은 특정한 타입의 데이터만을 저장하고 처리할 수 있다.
- 예시: 만약 Person 클래스가 있다면, Dataset[Person]은 Person 타입의 객체들만을 저장할 수 있다.
Dataset이 특정 타입 T만을 허용하는 이유는 데이터의 일관성과 안정성을 확보하기 위해서다.
Dataset의 장점은 필요한 경우에 선택적으로 사용할 수 있다는 점이다. 스파크는 map, filter 함수 등의 처리를 마치고 결과를 DataFrame으로 자동 변환해 반환한다.
또, collect 메서드나 take 메서드를 호출하면 DataFrame을 구성하는 Row 타입의 객체가 아닌 Dataset에 매개변수로 지정한 타입의 객체를 반환한다는 장점이 있다. (Row는 타입이 지정되지 않은 일반적인 구조로, 데이터의 어떤 필드가 어떤 타입인지 명확하지 않다.)
즉, Dataset을 사용하면 타입을 명확히 지정하여 데이터를 안전하게 다룰 수 있으며, collect나 take 같은 메서드를 사용할 때도 타입 정보가 유지돼, 반환되는 데이터를 직접적으로 사용자 정의 타입의 객체로 다룰 수 있다는 장점이 있다. (로컬이나 분산 클러스터 환경에서 데이터를 안전하고 정확하게 다룰 수 있다.)
3.3 구조적 스트리밍
구조적 스트리밍은 안정화된 스트림 처리용 고수준 API이다.
구조적 API로 개발된 배치모드의 연산을 스트리밍 방식으로 실행할 수 있으며, 지연 시간을 줄이고 증분 처리할 수 있다.
프로토타입을 배치 잡으로 개발한 다음, 스티리밍 잡으로 변환할 수 있다.
공식 Docs에서 네트워크 소스에서 텍스트 데이터를 읽고, 데이터를 처리하여 각 단어의 출현 횟수를 실시간으로 콘솔에 출력하는 예제 코드가 있다.
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split
# SparkSession 초기화
spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()
# 스트리밍 데이터 소스로부터 데이터 읽기 설정
lines = spark.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
# 입력 데이터에서 단어 분할
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# 단어의 출현 빈도 계산
wordCounts = words.groupBy("word").count()
# 결과를 콘솔에 출력하는 스트리밍 쿼리 시작
query = wordCounts.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
위 코드를 실행하기 전에, 로컬에서 nc 명령어(nc -lk 9999)를 사용하여 네트워크 소켓 서버를 시작해줘야 한다.
nc -lk 9999는 네트워크 유틸리티인 netcat을 사용해, 특정 포트에서 네트워크 연결을 열어두고 데이터를 수신하거나 전송할 수 있게 한다. 특히, 네트워크 프로토콜을 테스트하거나 개발 중인 프로그램에서 임시적으로 데이터를 송수신할 때 유용하다.
- nc: netcat 프로그램 실행 명령어
- l: listening 모드 활성화(이 옵션을 통해 netcat은 연결을 기다리는 서버로 동작)
- k: 연결이 끊어진 후에도 계속해서 리스닝을 유지한다.
- 9999: netcat이 연결을 수신할 네트워크 포트 번호
명령어를 통해 데이터를 수신하고 스파크가 연결할 수 있는 상태로 만들어 준 후 코드를 실행한다.
실시간으로 단어를 입력받고 단어의 수를 입력받은 단어의 수를 세는 것을 확인할 수 있다.
기본 동작 방식은 스트리밍 데이터를 일반적인 데이터베이스의 테이블처럼 생각할 수 있다. 데이터가 도착할 때마다 실시간으로 unbounded table에 행이 추가되는 방식이다.
3.4 머신러닝과 고급 분석
내장된 머신러닝 알고리즘 라이브러리인 MLlib을 사용해 대규모 머신러닝을 수행할 수 있다.
MLlib을 사용하면, 대용량 데이터를 대상으로 preprocessing, munging, model traing and prediction을 할 수 있다.
(사실, 이 부분은 현재 내 관심사가 아니기 때문에 공식 문서를 따로 살펴보지 않고 넘어간다.)
3.5 저수준 API
스파크는 RDD(Resilient Distributed Dataset)를 통해 자바와 파이썬 객체를 다루는데 필요한 다양한 기본 기능(저수준 API)을 제공한다.
스파크의 거의 모든 기능은 RDD를 기반으로 만들어졌다. DataFrame 연산도 RDD를 기반으로 만들어졌으며, 효율적인 분산처리를 위해 저수준 명령으로 컴파일된다.
낮은 버전의 스파크 코드를 계속 사용해야 하는 상황이 아니라면, RDD를 사용해 스파크 코드를 작성할 필요가 없다.
대부분은 구조적 API를 사용하는 것이 좋다.
다만, 비정형 데이터나 정제되지 않은 원시 데이터를 처리해야 할 때는 RDD를 사용해야 한다.
3.6 SparkR
SparkR은 스파크를 R언어로 사용하기 위한 기능이다.
파이썬 API와 매우 유사하며, 파이썬 대신 R 문법을 사용한다는 점만 다르다.
(이 부분도 아직까지는 SparkR을 사용할 일이 없기 때문에 가볍게 개념만 확인 후 넘어가다.)
'Data Engineering > Spark' 카테고리의 다른 글
데이터가 Even하게 분산되지 않았어요, Spark에서 Data Skew 해결하기! (0) | 2024.10.27 |
---|---|
[Spark 완벽 가이드] 2. 스파크 간단히 살펴보기 (0) | 2024.05.24 |