이 글은 Spark의 Data Skew 문제를 해결하기 위해 시도했던 방법과 이후 Suffyan Asad님의 글을 기반으로 학습한 방법을 적용해 그 결과를 비교 및 분석한 포스팅입니다.
한국어 번역 및 재구성에 대해 저자의 허락을 받았습니다.
1. Introduction
이번 포스팅은 Apache Spark에서 Data Skew를 감지하고 처리하는 방법에 대해 설명하며 다음의 주제에 대해 살펴본다.
- Data Skew란 무엇이고, Spark 작업의 성능에 어떤 영향을 미치는지, 특히 조인 작업의 데이터 스큐를 중심으로 설명한다.
- Spark UI를 사용하여 Data Skew 확인하는 방법
- 최신 버전의 Spark에서 제공되는 기본 최적화 기능과 데이터 변경을 사용하여 Data skew를 처리하는 몇 가지 예제
Data Skew가 뭔데?
Spark에서 데이터 스큐(data skew)는 데이터가 파티션들 간에 고르게 분산되지 않는 상황을 말한다.
이상적인 경우라면 데이터는 모든 파티션에 고르게 분배되어 최대 병렬 처리가 가능하고, 이를 통해 처리 속도가 빨라져야 한다.
하지만 대부분의 현실 세계의 데이터는 주로 불균형하기 때문에, 일부 파티션에 데이터가 몰리면서 이 데이터 스큐 현상이 발생한다.
이러한 데이터 불균형은 Spark 애플리케이션 성능에 부정적인 영향을 미치며 처리 시간 증가, 리소스 비효율적 사용, 심지어 메모리 부족에러(out-of-memory errors)까지 유발할 수 있다. 최악의 경우, 데이터가 편향된 하나의 파티션이 전체 작업 속도를 크게 늦출 수 있다.
Spark 애플리케이션에서 Data Skew가 발생할 수 있는 잠재적 원인은 다음과 같다.
- 편향된 데이터 분포: 현실 세계의 데이터는 대체로 고르게 분포되지 않고 특정 키가 많은 빈도를 갖는다. 이러한 Hot keys(특정 키가 매우 자주 등장하거나 많은 데이터를 포함하는 키)가 생기면 특정 파티션에 데이터가 몰려 편향(Skew)이 발생하게 된다.
- 부적절한 파티셔닝 전략: Spark의 기본 파티셔닝 전략이 항상 모든 데이터셋에 최적화돼있지 않다. 예를 들어, 기본 해시 파티셔닝 전략은 특정 키들이 동일한 파티션에 할당될 경우 skew를 유발할 수 있다.
- Join 연산: 조인 연산을 할 때, 조인 대상 데이터셋의 키들이 고르게 분포되지 않으면 편향이 발생할 수 있다. 특히, 큰 데이터셋과 작은 데이터셋을 유니크 키가 아닌 키로 조인할 경우 skew 현상이 더 두드러지게 나타난다.
- GroupBy 연산: 조인과 마찬가지로, GroupBy 연산에서도 특정 키에 값이 몰리면 데이터 편향이 생길 수 있다. 특정 키에 값이 많을수록 해당 파티션에 부하가 집중되기 때문이다.
Skew Detection
예제 데이터셋
Data Skew를 감지하는 방법을 확인하기 앞서 사용할 데이터셋은 다음과 같다.
이 데이터셋은 2022년 1월부터 6월까지의 뉴욕 옐로우 택시 운행 기록과 택시존 정보를 포함하며, 특정 픽업 위치(237번)에 데이터가 집중되도록 조작하여 Data Skew(데이터 스큐)를 의도적으로 유발한 데이터이다.
사실, Spark에서 Data Skew가 발생했을 때 Query Plan(쿼리 계획)만으로 이를 감지하기는 어렵다.
Query Plan(쿼리 계획)은 Spark 작업의 각 단계에서 어떤 연산이 수행되는지 설명해 주지만, 각 작업 이후의 데이터 분포에 대한 정보를 제공하지는 않기 때문이다. 따라서 Query Plan만으로는 데이터가 특정 파티션에 얼마나 몰렸는지, 즉 데이터 스큐가 발생했는지를 파악하기 어렵다.
데이터 분포를 자세히 확인하려면 Spark UI를 사용해야 한다. Spark UI는 작업 진행 상황과 파티션별 데이터 분포를 시각적으로 보여주기 때문에 각 단계 이후 데이터가 어떻게 분산되는지 직접 확인할 수 있다. 기본적으로 http://localhost:4040에서 접근이 가능하다.
아래의 예제에서는 픽업 지역(pickup zone)과 자치구(borough)별로 이동 거리 평균을 계산하는 작업을 수행하는데 이 작업을 통해 각 지역의 이동 거리 평균을 계산할 때 Spark UI를 활용해 작업 단계별 데이터 분포를 확인하고, 특정 지역에 데이터가 몰려있는지(스큐가 발생했는지) 파악할 수 있다.
데이터 스큐와 그로 인한 영향을 관찰하려면 먼저 Adaptive Query Execution (AQE)을 비활성화해야 한다.
AQE는 Spark에서 쿼리 실행 중간에 스큐를 감지해 자동으로 최적화하기 때문이다.
또한, 브로드캐스트 조인도 비활성화해줘야 한다. 이 글에서는 두 데이터셋이 모두 커서 브로드캐스트 조인을 사용할 수 없는 상황을 가정하고, Sort-Merge 조인을 사용하도록 설정하기 위함이다.
from pyspark.sql import SparkSession
from pyspark import SparkConf
def create_spark_session_with_aqe_disabled() -> SparkSession:
# SparkConf 설정 생성
conf = SparkConf() \
# 드라이버 메모리 설정 (4GB)
.set("spark.driver.memory", "4G") \
# Broadcast Join 비활성화 (-1로 설정)
.set("spark.sql.autoBroadcastJoinThreshold", "-1") \
# 셔플 파티션 수 설정 (201)
.set("spark.sql.shuffle.partitions", "201") \
# AQE (Adaptive Query Execution) 비활성화
.set("spark.sql.adaptive.enabled", "false")
# SparkSession 생성
spark_session = SparkSession\
.builder\
# 로컬 모드에서 실행 (8개의 CPU 코어 사용)
.master("local[8]")\
# 설정(conf)을 적용한 세션 구성
.config(conf=conf)\
# 애플리케이션 이름 설정
.appName("Read from JDBC tutorial") \
# 세션 생성 또는 기존 세션 재사용
.getOrCreate()
return spark_session # 생성된 SparkSession 반환
def join_on_skewed_data(spark: SparkSession):
trips_data = prepare_trips_data(spark=spark)
location_details_data = spark.read.option("header", True).csv("data/taxi+_zone_lookup.csv")
trips_with_pickup_location_details = trips_data\
.join(location_details_data, F.col("PULocationID") == F.col("LocationID"), "inner")
trips_with_pickup_location_details \
.groupBy("Zone") \
.agg(F.avg("trip_distance").alias("avg_trip_distance")) \
.sort(F.col("avg_trip_distance").desc()) \
.show(truncate=False, n=1000)
trips_with_pickup_location_details \
.groupBy("Borough") \
.agg(F.avg("trip_distance").alias("avg_trip_distance")) \
.sort(F.col("avg_trip_distance").desc()) \
.show(truncate=False, n=1000)
다음의 코드를 실행하여 로컬 기준으로 16~18초 정도의 시간이 걸렸다.
Job은 총 4개의 stage로 나뉘어졌고, 마지막 두 개의 작업이 각각 5초, 6초의 시간이 걸렸다. job 2를 눌러 stage를 살펴보면 다음과 같다.
총 4개의 stage중 stage 4가 3초로 가장 오래 걸렸고, 다음으로 stage 3가 2초로 두 번째로 오래 걸렸다.
stage 4를 자세히 살펴보면, 각 작업의 세부 정보, 작업 시간과 201개의 모든 작업(Task)에 대한 요약 통계를 확인할 수 있다.
또한, Event Timeline을 열면 데이터 스큐 역시 확인할 수 있다.
일반적으로, 만약 중앙값(Median)과 75번째 백분위수(75th Percentile) 또는 최댓값(Max) 간에 큰 차이가 있다면, 이는 데이터 스큐가 존재하는 지표로 볼 수 있다. 이상적으로, 파티션들은 최대한 균형 있게 분배돼야 한다.
또한, 위 작업은 다른 작업에 비해 처리한 레코드 수가 훨씬 많아 데이터 스큐로 인해 다른 작업보다 훨씬 오래 걸리고 있다. 이로 인해 스테이지가 해당 작업이 완료될 때까지 대기해야 하는 상황이 발생하는데, 이러한 작업을 straggling task라고 한다.
2. 시도했던 방법
회사에서 근무할 당시에는 Salt Repartitioning 기법을 사용해 데이터 스큐 문제를 해결하고자 했다.
(사실 그다지 효과적이진 않았다. 그 이유는 아래에서 구체적으로 설명한다.)
물론 실제로 사내 데이터셋과는 다르지만 예제 데이터를 통해 Salting 기법을 사용했을 때의 전후 차이를 비교해 보자.
Salting Technique
Salting은 특정 키(Hot key)의 빈도가 높아 데이터 스큐가 발생할 때 유용한 기법이다.
동일한 키에 랜덤 숫자(salt)를 추가하여, 같은 키를 가진 레코드들이 여러 키로 분산되도록 하는 방식이다.
from pyspark.sql.functions import expr
def apply_salt_repartitioning(trips_data: DataFrame, salt_range=10) -> DataFrame:
# "salt"라는 임의의 값을 추가하여 PULocationID를 분산
salted_trips_data = trips_data.withColumn(
"salted_PULocationID",
F.concat(F.col("PULocationID"), F.lit("_"), expr(f"cast(floor(rand() * {salt_range}) as int)"))
)
return salted_trips_data
def join_with_salting(spark: SparkSession):
trips_data = prepare_trips_data(spark=spark)
location_details_data = spark.read.option("header", True).csv("data/taxi+_zone_lookup.csv")
# Salt Repartitioning 적용
salted_trips_data = apply_salt_repartitioning(trips_data)
location_details_data = location_details_data.withColumn("salted_LocationID", F.concat(F.col("LocationID"), F.lit("_"), F.lit(0))) # 예시로 salting applied
trips_with_pickup_location_details = salted_trips_data.join(
location_details_data,
F.expr(f"substring_index(salted_PULocationID, '_', 1)") == F.col("LocationID"),
"inner"
)
# Salt를 제거하고 원래 PULocationID로 복원한 후 집계 수행
trips_with_pickup_location_details = trips_with_pickup_location_details.withColumn(
"PULocationID", F.expr("split(salted_PULocationID, '_')[0]")
)
# 원래 PULocationID를 기준으로 집계
trips_with_pickup_location_details \
.groupBy("Zone") \
.agg(F.avg("trip_distance").alias("avg_trip_distance")) \
.sort(F.col("avg_trip_distance").desc()) \
.show(truncate=False, n=1000)
trips_with_pickup_location_details \
.groupBy("Borough") \
.agg(F.avg("trip_distance").alias("avg_trip_distance")) \
.sort(F.col("avg_trip_distance").desc()) \
.show(truncate=False, n=1000)
예제 데이터에서는 다음의 방법으로 Salting 기법을 적용했다.
1. Salted Key 생성
- trips_data와 location_details_data 모두에 salting을 적용한다.
- location_details_data에는 crossJoin과 sequence를 사용하여 LocationID와 각 salt 값 조합으로 된 salted_LocationID를 만든다.
2. 조인 수행
- salted_PULocationID와 salted_LocationID 간에 조인을 수행한다.
3. 원래 키로 복원 후 집계
- 조인 후 원래 PULocationID로 복원하여 집계를 수행한다.
위와 같이 Salt Repartitioning을 적용했을 때, 실행시간은 약 20초로 4초 정도 더 증가했고 다음과 같은 결과를 확인할 수 있다.
하지만, 전체 실행시간이 오히려 약 4초가 증가했으며 groupBy 연산의 경우 특정 키(예: Zone이나 Borough) 별로 집계를 수행하기 때문에 결국 특정 값이 많은 경우에는 그 값이 특정 파티션에 집중되면서 다시 데이터 스큐가 발생할 수 있다. 이는 groupBy 연산 자체가 원래의 데이터를 다시 한 곳에 모으는 작업을 필요로 하기 때문이다. 즉, groupBy 연산에 Salting을 적용하면 Salt가 추가된 상태에서는 데이터가 분산되더라도 최종 집계 단계에서 다시 데이터를 원래 키로 복원해 합산해야 하기 때문에 실질적으로 데이터 스큐 문제를 해결하기 어렵다.
3. 학습 후 시도한 방법
Dividing large partitions — using a derived column
데이터를 더 고르게 분배하는 한 가지 방법은 조인 양쪽에 새로운 컬럼을 추가하는 방법이다.
- 작은 데이터셋인 zone lookup 데이터에서 각 행을 n번 반복하여 새로운 컬럼의 값으로 0부터 n-1까지를 할당한다. 이 새로운 컬럼을 location_id_alt라고 한다.
- 데이터 스큐가 발생한 큰 데이터셋인 rides 데이터에서는 pickup timestamp (tpep_pickup_datetime) 컬럼을 사용해 추가적인 값을 n-1까지 할당한다. 여기서는 tpep_pickup_datetime 컬럼을 dayofyear Spark SQL 함수로 변환하여 해당 날짜를 얻고, 이 값을 n으로 나눈 나머지를 구하여 새로운 컬럼을 생성한다. 이제 양쪽 데이터셋 모두 0부터 n-1까지의 값을 가지는 추가적인 컬럼을 갖고 있다.
- 조인에 이 새로운 컬럼을 추가하면, 데이터가 많은 파티션을 포함하여 모든 파티션을 더 작은 여러 파티션으로 분산시킬 수 있다. 또한, 이 새로운 컬럼의 계산은 셔플이 필요하지 않기 때문에 윈도우 함수에서 순위나 행 번호 연산을 수행할 필요도 없다.
def join_on_skewed_data_with_subsplit(spark: SparkSession):
subsplit_partitions = 25
trips_data = prepare_trips_data(spark=spark)\
.withColumn("dom_mod", F.dayofyear(F.col("tpep_pickup_datetime"))).withColumn("dom_mod", F.col("dom_mod") % subsplit_partitions)
location_details_data = spark\
.read\
.option("header", True)\
.csv("data/taxi+_zone_lookup.csv")\
.withColumn("location_id_alt", F.array([F.lit(num) for num in range(0,subsplit_partitions)])) \
.withColumn("location_id_alt", F.explode(F.col("location_id_alt")))
trips_with_pickup_location_details = trips_data\
.join(
location_details_data,
(F.col("PULocationID") == F.col("LocationID")) & (F.col("location_id_alt") == F.col("dom_mod")),
"inner"
)
trips_with_pickup_location_details \
.groupBy("Zone") \
.agg(F.avg("trip_distance").alias("avg_trip_distance")) \
.sort(F.col("avg_trip_distance").desc()) \
.show(truncate=False, n=1000)
trips_with_pickup_location_details \
.groupBy("Borough") \
.agg(F.avg("trip_distance").alias("avg_trip_distance")) \
.sort(F.col("avg_trip_distance").desc()) \
.show(truncate=False, n=1000)
다음의 방식을 적용한 결과 총 실행시간은 13초로 기존의 방식보다 3초가 줄어들었으며 다음과 같이 job과 stage의 실행 시간 역시 줄어들었음을 확인할 수 있다.
이전과 달리, Stage 4가 전체적으로 더 빠르게 실행되고 있으며, 실행 시간이 매우 긴 개별 작업 (straggling task)이 없어 다른 작업들이 완료될 때까지 기다리는 일이 줄어든 상태이다. 이는 데이터 스큐가 일부 완화되면서 대체로 작업이 균일하게 완료되고 있는 것을 확인할 수 있다.
요약 통계에 따르면 가장 긴 태스크는 이제 1초이며, 중앙값은 17ms로 나타났다. 이는 데이터가 더 잘 분산되었음을 의미한다.
전체 실행 시간은 총 3초가 감소했고 큰 차이로 보이지 않을 수 있지만, 더 큰 데이터와 클러스터에서는 유의미한 차이가 될 수 있다.
하지만, 여전히 완전히 균일하게 분산되지 않고 있고 이는 아래의 방법들로 더욱 개선할 수 있다.
- DataFrame에 대해 요약 통계를 실행할 수 있거나 시스템에 다른 방식으로 통계가 제공되는 경우, 단일 수치 대신 각 그룹화 열 값에 대한 행 수에 따라 데이터를 분할할 수 있다.
- Spark 최신 버전(3.0 이상, 이상적으로는 3.2 이상)을 사용하는 경우 또는 사용할 수 있다면, Adaptive Query Execution (AQE)를 활용하여 데이터 분포를 더욱 개선하고 스큐 문제를 처리할 수 있다.
Using Adaptive Query Execution (AQE)
Adaptive Query Execution(AQE)은 Spark 3.0 이상에서 사용할 수 있으며, Spark 3.2.0 이상 버전에서는 기본적으로 활성화되어 있다. AQE는 "통계를 사용하여 더 효율적인 쿼리 실행 계획을 선택"한다고 설명되어 있으며, 관련 문서는 아래에 링크되어 있다.
AQE에는 긴 작업을 분할하여 스큐를 처리하는 기능인 skewJoin이 있다. 이 기능은 속성 spark.sql.adaptive.skewJoin.enabled를 True로 설정하여 활성화할 수 있다. 또한, AQE에서 skewJoin을 조정하기 위한 두 가지 추가 매개변수가 있다.
- spark.sql.adaptive.skewJoin.skewedPartitionFactor (기본값: 5): 이 값은 중간 파티션 크기에 곱해지는 계수를 조정한다. 파티션이 이 값보다 크면 스큐된 파티션으로 간주된다.
- spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (기본값: 256MB): 스큐된 파티션의 최소 크기를 설정하는 매개변수로, 이 값보다 크고 이전 매개변수(spark.sql.adaptive.skewJoin.skewedPartitionFactor)에 의해 스큐된 것으로 표시된 파티션은 스큐된 것으로 간주된다.
보통 Spark에서는 기본적으로 spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes를 256MB로 설정해서, 파티션 크기가 256MB를 넘을 때 스큐로 간주하지만 예제 데이터셋은 상대적으로 작아 파티션 크기가 81KB 미만이기 때문에, 기준을 256KB로 낮춰야 스큐된 파티션을 더 잘 감지할 수 있다.
또한, factor를 3으로 설정하면 파티션 크기가 평균 크기의 3배 이상일 때 스큐로 간주하기 때문에 아래와 같이 설정해 준다.
def create_spark_session_with_aqe_skew_join_enabled() -> SparkSession:
conf = SparkConf() \
.set("spark.driver.memory", "4G") \
.set("spark.sql.autoBroadcastJoinThreshold", "-1") \
.set("spark.sql.shuffle.partitions", "201") \
.set("spark.sql.adaptive.enabled", "true") \
.set("spark.sql.adaptive.coalescePartitions.enabled", "false") \
.set("spark.sql.adaptive.skewJoin.enabled", "true") \
.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "3") \
.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256K")
spark_session = SparkSession\
.builder\
.master("local[8]")\
.config(conf=conf)\
.appName("Read from JDBC tutorial") \
.getOrCreate()
return spark_session
- join_on_skewed_data 함수(추가 컬럼을 추가하지 않는 기본 조인 함수)를 실행했을 때, 실행 시간이 15초 정도 걸렸다.
- Spark의 SQL Query Plan을 확인하기 위해 상단 네비게이션 바에서 SQL/DataFrame 아이콘을 선택하고, Query ID 1을 선택하여 쿼리 플랜을 확인할 수 있다. 쿼리 플랜에서 shuffle 단계 이후 AQEShuffleRead 단계가 추가되었고, 이 단계에서 하나의 스큐된 파티션을 감지하여 두 개의 파티션으로 나누어진 것을 볼 수 있다.
처리 과정에서 전체 작업 수는 더 많아졌지만, job 4의 stage 6을 확인해 보면, 이제 큰 파티션이 두 개의 파티션으로 분할된 것을 볼 수 있다.
여전히 이상적이지는 않지만, 가장 긴 두 개의 태스크가 완료되는 데 3초가 걸렸다.
다음으로, AQE의 또 다른 기능인 Coalesce Partitions (spark.sql.adaptive.coalescePartitions.enabled)이 있다. 이 기능은 기본적으로 활성화되어 있으며, 이 기능을 활성화하면 Spark는 데이터 통계와 처리 자원을 기반으로 셔플 파티션 수를 조정하고, 작은 파티션들을 큰 파티션으로 병합하여 셔플 시간과 데이터 전송을 줄인다. 이제 AQE를 활성화하여 이를 확인해 보자.
def create_spark_session_with_aqe_enabled() -> SparkSession:
conf = SparkConf() \
.set("spark.driver.memory", "4G") \
.set("spark.sql.autoBroadcastJoinThreshold", "-1") \
.set("spark.sql.shuffle.partitions", "201") \
.set("spark.sql.adaptive.enabled", "true") \
.set("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.set("spark.sql.adaptive.skewJoin.enabled", "true") \
.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "3") \
.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256K")
spark_session = SparkSession\
.builder\
.master("local[8]")\
.config(conf=conf)\
.appName("Read from JDBC tutorial") \
.getOrCreate()
return spark_session
동일한 함수를 실행하면 처리 시간이 약 13-15초가 걸린다. 작업을 살펴보면 모든 단계에서 태스크 수가 줄어든 것을 즉시 확인할 수 있다. 이전에는 약 400개였던 태스크 수가 이제 4개로 줄었다(셔플 파티션 수가 201로 설정됨). 이는 AQE가 데이터를 더 적은 수의 파티션으로 병합했음을 나타낸다.
SQL 쿼리 계획을 보면 AQE 단계에 이제 분할과 병합 단계가 모두 포함된 것을 확인할 수 있다.
AGE 단계는 이전과 마찬가지로 더 큰 파티션을 두 개로 나누고 더 작은 파티션을 두 개의 파티션으로 나누었다. 총 4개의 파티션을 확인할 수 있다.
완료된 작업 중에서 Job 4의 Stage 6을 살펴보면, 2개의 작업이 0.4ms, 0.7ms가 걸렸고, 2개의 작업은 2, 3초가 걸렸다.
일부 작업은 아주 짧은 시간(0.4ms, 0.7ms)에 완료되고, 몇몇 작업은 상대적으로 오래(2초, 3초) 걸린 점을 통해 작은 파티션이 병합되어 효율적으로 처리된 반면, 여전히 큰 파티션이 존재하여 비교적 긴 시간이 걸린 작업도 있다는 것을 확인할 수 있다.
따라서 이제 큰 파티션을 분할해서 AQE의 작업 효율을 더욱 개선할 수 있다.
join_on_skewed_data_with_subsplit 함수를 AQE(Adaptive Query Execution)를 활성화한 상태로 실행한 결과, 처리 시간이 약 12-14초로 개선됐다. 쿼리 계획을 확인해 보면, 파티션이 9개로 병합되었고 분할된 파티션은 없다.
이는 DataFrame과 조인에 파생 열(derived column)을 추가함으로써 각 셔플 파티션이 더 작아졌기 때문이다.
- 파티션 병합과 분할: 원래 데이터 처리에서는 파티션이 많고, 특정 파티션에 데이터가 몰리면 스큐가 발생해 실행 시간이 느려진다. AQE는 데이터를 처리하면서 파티션의 크기를 분석하고, 필요에 따라 파티션을 병합하거나 분할하여 균형을 맞춘다.
- 현재 상황: join_on_skewed_data_with_subsplit 함수 실행 후, 파티션이 9개로 병합되었고, 스큐된 큰 파티션이 없어서 추가적인 파티션 분할이 필요하지 않다.
- 파생 열(derived column): 여기서 말하는 파생 열은, 조인 시에 데이터의 분산을 개선하기 위해 추가한 열이다. 이 열을 통해 데이터를 더 고르게 나누어 조인할 수 있게 되었기 때문에, AQE가 각 셔플 파티션을 효율적으로 작게 분할하여 데이터 스큐를 완화할 수 있었다.
현재 파티션 간 데이터 분배에 있어서 거의 이상적인 상태에 가깝고, 특히 Spark가 8개의 코어와 총 4GB의 RAM을 사용하고 있다는 점을 고려했을 때 가장 좋은 데이터 분배라고 할 수 있다.
결론적으로, Spark의 AQE는 특히 오래 실행되는 작업의 성능을 분석할 때 자원 활용을 최적화하고 전체 성능을 개선하기 위해 데이터 스큐를 식별하고 완화할 수 있다.
4. 결론 및 핵심 내용
- Data Skew(데이터 스큐)는 일부 파티션이 나머지 파티션에 비해 훨씬 더 많은 데이터를 가지고 있는 상태를 말한다. 이로 인해 작업이 지연되고 전체 성능이 저하되며, 디스크에 데이터를 저장하는 상황(스필)이 발생하거나 데이터가 너무 많을 경우 메모리 부족 에러가 발생할 수 있다.
- 데이터 스큐는 주로 조인 시 한두 개 이상의 열에 따라 데이터를 리파티셔닝할 때 발생할 수 있으며, 경우에 따라 groupBy 작업에서도 발생할 수 있다.
- 데이터 스큐는 Query Plan(쿼리 계획)을 통해 식별하기 어려우며, Spark UI를 사용하여 단계별 작업 통계를 확인하고 지연된 작업을 식별함으로써 감지할 수 있다.
- 데이터 스큐는 더 큰 파티션을 하위 분할하여 해결할 수 있으며, 이는 기존 또는 파생된 열을 추가함으로써 해결할 수 있다. 조인 작업에 열을 추가하여 큰 파티션을 더 작게 나눌 수 있다. 데이터셋과 열 값에 대한 충분한 이해는 데이터 스큐 문제 해결에 좋은 인사이트를 제공할 수 있다. 마찬가지로, Grouping 연산에서도 여러 단계에 걸쳐 점진적으로 열 수를 줄여가며 최종 결과를 얻을 수 있다.
- 내장된 Spark 기능도 데이터 스큐 문제 해결에 유용하다. 가능한 경우 브로드캐스트 조인을 사용하고, Adaptive Query Optimization(AQE)을 활성화하는 것도 데이터 스큐 문제를 해결하는 데 도움이 될 수 있다.
회사에서 Data Skew 문제를 해결하기 위해 Salt Repartitioning과 Broadcast 조인 기법을 적용해 보았지만, 특히 GroupBy 연산에서 Salt Repartitioning이 스큐 문제를 완벽히 해결하지 못해 아쉬움이 남았었다. 이번에 이 한계를 분명히 깨닫게 되었고, Spark의 Skew 문제 해결 방법에 대해 추가 학습을 진행했다.
이 과정에서 Suffyan Asad의 글 Handling Data Skew in Apache Spark: Techniques, Tips and Tricks to Improve Performance를 통해 Derived column을 활용해 큰 파티션을 분할하는 방법과 AQE를 통한 데이터 스큐 완화 방법을 학습했다.
원저자의 허락을 받아 글을 번역 및 재구성하며, Spark 환경에서 발생하는 데이터 스큐 문제를 해결할 수 있는 다양한 접근법을 체계적으로 정리하고 실무에서 보다 효과적으로 활용할 수 있는 기반을 다질 수 있었다.
'Data Engineering > Spark' 카테고리의 다른 글
[Spark 완벽 가이드] 3. 스파크 기능 둘러보기 (0) | 2024.05.26 |
---|---|
[Spark 완벽 가이드] 2. 스파크 간단히 살펴보기 (0) | 2024.05.24 |