목차

  1. Apache Spark란?
  2. 왜 Spark를 사용하는가?
  3. Spark 아키텍처
  4. Transformation vs Action
  5. DataFrame & Spark SQL
  6. GroupBy 내부 작동 방식

1. Apache Spark란?

Apache Spark는 대규모 데이터를 분산 처리하기 위한 통합 분석 엔진입니다.

한 줄 요약: “여러 대의 컴퓨터를 하나의 컴퓨터처럼 사용하여 TB~PB 규모 데이터를 처리하는 엔진”

핵심 특성

특성설명
분산 처리데이터를 여러 노드에 나눠서 병렬 처리
인메모리중간 결과를 메모리에 보관 → 디스크 I/O 최소화
통합 엔진배치, 스트리밍, ML, 그래프 처리를 하나의 API로
다국어Python, Scala, Java, R, SQL 지원
Lazy Evaluation실행 계획을 먼저 최적화한 뒤 한번에 실행

역사

2009  UC Berkeley AMPLab에서 시작
2010  오픈소스 공개
2014  Apache 최상위 프로젝트
2016  Spark 2.0 — DataFrame API, Catalyst Optimizer
2020  Spark 3.0 — Adaptive Query Execution
2023  Spark 3.5 — Spark Connect, 향상된 PySpark

Spark 생태계

2. 왜 Spark를 사용하는가?

Pandas, Polars, Dask 등이 있는데 왜?

"단일 머신에서 처리할 수 있으면 Pandas/Polars로 충분하다.
 그런데 데이터가 단일 머신 메모리를 초과하면?"

도구별 위치 비교

상세 비교

항목PandasPolarsDaskSpark
처리 규모~수 GB~수십 GB~수백 GBTB ~ PB
실행 환경단일 머신단일 머신단일/분산진정한 분산
언어PythonPython/RustPythonPython/Scala/Java/SQL
메모리데이터 전체 로드지연 실행청크 단위파티션 단위 분산
생태계풍부성장 중중간매우 풍부
클러스터불가불가가능(제한적)네이티브
SQL 지원없음있음(제한)DaskSQLSpark SQL (완전)
스트리밍없음없음제한적Structured Streaming
MLscikit-learn없음dask-mlMLlib

Spark가 지속적으로 쓰이는 5가지 이유

① 진정한 수평 확장 (Horizontal Scaling)

Pandas:  서버 1대 (메모리 64GB) → 64GB 이상 데이터 처리 불가

Spark:   서버 100대 (메모리 64GB × 100) = 6.4TB
         필요하면 200대로 늘리면 됨

단순히 노드를 추가하면 처리 능력이 선형으로 증가합니다.

② 클라우드 네이티브 통합

AWS:  EMR (Elastic MapReduce)
GCP:  Dataproc
Azure: HDInsight / Synapse Analytics

→ 클릭 몇 번으로 Spark 클러스터 생성/삭제
→ 처리할 때만 클러스터 띄우고, 끝나면 삭제 → 비용 절약

모든 주요 클라우드가 매니지드 Spark 서비스를 제공합니다.

③ 배치 + 스트리밍 통합

# 같은 API로 배치도, 스트리밍도 처리
# 배치
df = spark.read.parquet("s3://data/trips/")

# 스트리밍
df = spark.readStream.format("kafka").load()

# 변환 로직은 동일!
result = df.groupBy("zone").count()

④ 검증된 안정성과 생태계

10년 이상 프로덕션 검증
Fortune 500 기업 80% 이상 사용
커뮤니티: 2000+ contributor, 38000+ GitHub stars
연동: Delta Lake, Iceberg, Hudi, Kafka, Cassandra, ...

⑤ SQL 사용자도 접근 가능

-- 데이터 엔지니어가 아니어도 SQL로 TB 데이터 분석 가능
SELECT
    zone_name,
    COUNT(*) as trips,
    AVG(duration) as avg_duration
FROM trips
JOIN zones ON trips.zone_id = zones.id
GROUP BY zone_name
ORDER BY trips DESC

왜 연산이 빠를까?

핵심 1: 인메모리 처리

MapReduce (Hadoop):
  Read Disk → Process → Write Disk → Read Disk → Process → Write Disk
  ═══════════════════════════════════════════════════════════════════
  매 단계마다 디스크 I/O → 느림

Spark:
  Read Disk → Process (Memory) → Process (Memory) → Write Disk
  ═════════════════════════════════════════════════════════════
  중간 결과를 메모리에 유지 → 디스크 I/O 최소화

→ MapReduce 대비 최대 100배 빠름 (Spark 공식 벤치마크)

핵심 2: Catalyst Optimizer (쿼리 최적화)

사용자가 작성한 코드:
  df.filter(col("age") > 30).select("name", "age").filter(col("name") != "unknown")

Catalyst가 최적화한 코드:
  1. Predicate Pushdown: 필터를 데이터 읽기 단계로 밀어넣음
  2. Filter 합치기: 두 filter를 하나로 병합
  3. Column Pruning: 필요한 컬럼만 읽기

  → 실제 실행: 파일에서 name, age만 읽되 age>30 AND name!="unknown"인 것만

핵심 3: Tungsten 엔진 (메모리 최적화)

일반 JVM 객체:
  객체 헤더 (16B) + 필드 포인터 + 실제 데이터 → 오버헤드 큼

Tungsten:
  Off-heap 메모리에 바이너리 포맷으로 직접 저장
  → GC 부담 감소, 캐시 히트율 향상, 메모리 효율 2~5배

핵심 4: Adaptive Query Execution (AQE) — Spark 3.0+

기존: 실행 전에 통계 기반으로 계획 수립 → 실행

AQE:  실행 도중 실제 데이터 통계를 보고 계획 수정

      예: Join 시 한쪽이 예상보다 작으면
          Sort Merge Join → Broadcast Join으로 전환

      예: 셔플 후 파티션이 너무 작으면
          작은 파티션들을 자동으로 합침 (Coalesce)

데이터 크기별 추천 도구

3. Spark 아키텍처

클러스터 구조

각 구성 요소

구성 요소역할비유
Driver프로그램 진입점. DAG 생성, 태스크 분배, 결과 수집지휘자
SparkSessionSpark 기능 통합 진입점 (Spark 2.0+)리모컨
Cluster Manager리소스(CPU, 메모리) 할당. YARN, K8s, Standalone인사팀
Worker Node실제 계산을 수행하는 물리 머신공장
ExecutorWorker 위에서 동작하는 JVM 프로세스작업 라인
Task하나의 파티션을 처리하는 최소 작업 단위작업자

실행 흐름

① spark.read.parquet("...")               # Driver: 논리 계획 생성
② .filter(col("age") > 30)               # Driver: 논리 계획에 추가
③ .groupBy("dept").count()                # Driver: 논리 계획에 추가
④ .show()                                 # Action! Catalyst 최적화 시작
     │
     ▼
⑤ Catalyst Optimizer                      # 논리 → 물리 계획 변환
     │
     ▼
⑥ DAG Scheduler                           # Stage 분할 (셔플 경계)
     │
     ▼
⑦ Task Scheduler                          # 각 Executor에 태스크 배정
     │
     ▼
⑧ Executor에서 병렬 실행                    # 결과 → Driver로 반환

Job → Stage → Task 계층

Job (Action 하나 = Job 하나)
│
├─ Stage 0 (Shuffle 이전)
│   ├─ Task 0  (Partition 0: read + filter)
│   ├─ Task 1  (Partition 1: read + filter)
│   └─ Task 2  (Partition 2: read + filter)
│
│   ── Shuffle (데이터 재분배) ──
│
└─ Stage 1 (Shuffle 이후)
    ├─ Task 0  (부서A 집계)
    ├─ Task 1  (부서B 집계)
    └─ Task 2  (부서C 집계)

Stage 분할 기준: 셔플(데이터 재분배)이 필요한 시점에서 끊김

  • groupBy, join, repartition, distinct → 새 Stage

Deploy Mode: Client vs Cluster

Client Mode (개발/테스트):
  Driver가 제출한 머신에서 실행 → 로그를 바로 볼 수 있음

  [내 노트북] ← Driver 여기서 실행
      │
      ▼
  [Cluster]
  ├─ Executor 1
  ├─ Executor 2
  └─ Executor 3

Cluster Mode (프로덕션):
  Driver가 클러스터 내부에서 실행 → 안정적, 제출 머신과 독립

  [내 노트북] → spark-submit 제출 후 끊어도 OK
      │
      ▼
  [Cluster]
  ├─ Driver (클러스터 노드에서 실행)
  ├─ Executor 1
  ├─ Executor 2
  └─ Executor 3

RDD vs DataFrame

RDD (Resilient Distributed Dataset)

# RDD 방식 (저수준)
rdd = sc.textFile("data.csv")
rdd = rdd.map(lambda line: line.split(","))
rdd = rdd.filter(lambda x: int(x[2]) > 30)
rdd = rdd.map(lambda x: (x[3], 1))
rdd = rdd.reduceByKey(lambda a, b: a + b)
rdd.collect()
  • Spark의 원래 데이터 추상화 (2011~)
  • Java 객체로 저장 → GC 부담, 메모리 비효율
  • 최적화 불가 — Spark가 내부 구조를 모름

DataFrame (구조화된 데이터)

# DataFrame 방식 (고수준)
df = spark.read.option("header", "true").csv("data.csv")
df = df.filter(col("age") > 30)
result = df.groupBy("department").count()
result.show()
  • 스키마가 있는 분산 테이블 (2015~, Spark 1.3)
  • Catalyst Optimizer가 자동 최적화
  • Tungsten 엔진으로 바이너리 메모리 관리
  • SQL로도 동일하게 표현 가능

비교

항목RDDDataFrame
추상화 수준낮음 (람다 함수)높음 (컬럼 연산)
스키마없음 (비구조화)있음 (구조화)
최적화수동 (개발자 책임)Catalyst 자동 최적화
메모리Java 객체 → 비효율Tungsten 바이너리 → 효율
타입 안전컴파일 타임런타임
언어 성능Scala >> PythonScala ≈ Python (같은 실행 계획)
SQL 통합불가완벽 통합

요즘 RDD를 잘 안 쓰는 이유

이유 1: DataFrame이 더 빠름
  ─────────────────────────────────────
  같은 작업을 RDD vs DataFrame으로 하면
  DataFrame이 2~10배 빠름 (Catalyst + Tungsten 덕분)

이유 2: Python에서 성능 차이
  ─────────────────────────────────────
  RDD:       Python 객체 → 직렬화 → JVM → 역직렬화 → Python
             (매 연산마다 Python ↔ JVM 왕복)

  DataFrame:  JVM 내부에서 최적화된 코드로 실행
             (Python은 실행 계획만 전달, 실제 연산은 JVM)

  → PySpark에서 RDD는 극도로 느림

이유 3: 코드 가독성
  ─────────────────────────────────────
  # RDD (읽기 어려움)
  rdd.map(lambda x: (x[3], int(x[2]))) \
     .reduceByKey(lambda a,b: a+b)

  # DataFrame (읽기 쉬움)
  df.groupBy("department").agg(F.sum("salary"))

  # SQL (누구나 읽을 수 있음)
  SELECT department, SUM(salary) FROM employees GROUP BY department

이유 4: API 통합
  ─────────────────────────────────────
  DataFrame API = Spark SQL = Dataset API (Scala)
  모두 같은 Catalyst Optimizer를 거침
  → 어떤 API든 같은 실행 계획, 같은 성능

이유 5: 생태계 지원
  ─────────────────────────────────────
  Delta Lake, Iceberg, Hudi → DataFrame 기반
  MLlib → DataFrame 기반 (ML Pipeline)
  Structured Streaming → DataFrame 기반
  새로운 기능은 모두 DataFrame/Dataset 위에 구축됨

RDD를 아직 쓰는 경우:

  • 비구조화 데이터 (바이너리 파일 등) 저수준 처리
  • accumulator, broadcast 등 저수준 제어 필요
  • 레거시 코드 유지보수

4. Transformation vs Action

Lazy Evaluation (지연 평가)

Spark의 가장 중요한 개념:

  "Transformation은 실행 계획만 쌓고, Action이 호출될 때 비로소 실행한다"
# ① Transformation — 아무것도 실행되지 않음 (계획만 쌓임)
df = spark.read.parquet("trips/")           # 계획: 파일 읽기
df2 = df.filter(col("age") > 30)            # 계획: 필터 추가
df3 = df2.groupBy("dept").count()           # 계획: 집계 추가

# 여기까지 0줄도 읽지 않음!

# ② Action — 이 순간 전체 실행 시작!
df3.show()                                  # → Catalyst 최적화 → 실행

왜 Lazy?

이유 1: 최적화 기회
  ─────────────────
  모든 연산을 한번에 보고 최적화할 수 있음

  예: filter → select → filter
      → Catalyst가 filter 2개를 합치고, 필요한 컬럼만 읽도록 최적화

이유 2: 불필요한 계산 방지
  ─────────────────────────
  df = heavy_computation()
  df.take(5)  # 5개만 필요한데 전체를 계산할 필요 없음
              # Spark가 알아서 5개만 계산하고 멈춤

Transformation (변환) — Lazy

Narrow Transformation (셔플 없음 — 파티션 내부에서만 처리)

연산설명예시
select()컬럼 선택df.select("name", "age")
filter() / where()행 필터링df.filter(col("age") > 30)
withColumn()컬럼 추가/변경df.withColumn("age2", col("age")*2)
drop()컬럼 삭제df.drop("temp_col")
map()행 단위 변환 (RDD)rdd.map(lambda x: x*2)
flatMap()행 → 여러 행rdd.flatMap(lambda x: x.split())
union()두 DF 합치기df1.union(df2)
Narrow: 각 파티션이 독립적으로 처리 (네트워크 통신 없음)

  Partition 0 ──filter──▶ Partition 0'
  Partition 1 ──filter──▶ Partition 1'
  Partition 2 ──filter──▶ Partition 2'

Wide Transformation (셔플 발생 — 데이터 재분배 필요)

연산설명예시
groupBy()그룹별 집계df.groupBy("dept").count()
join()두 DF 조인df1.join(df2, "id")
repartition()파티션 재분배df.repartition(24)
distinct()중복 제거df.distinct()
orderBy() / sort()정렬df.orderBy("age")
reduceByKey()키별 축소 (RDD)rdd.reduceByKey(lambda a,b: a+b)
Wide: 파티션 간 데이터 이동 필요 (네트워크 셔플)

  Partition 0 ──┐
  Partition 1 ──┼── Shuffle ──▶ Partition 0' (dept=Engineering)
  Partition 2 ──┘              Partition 1' (dept=Marketing)
                               Partition 2' (dept=Sales)

Action (실행) — 즉시 실행 트리거

연산설명반환
show() / display()상위 N개 출력없음 (콘솔 출력)
count()행 수정수
collect()전체 데이터를 Driver로리스트 (⚠️ 대용량 주의)
take(n) / head(n)상위 N개리스트
first()첫 번째 행Row
write.parquet()파일 저장없음
foreach()각 행에 함수 적용없음
reduce()축소 연산
toPandas()Pandas DF로 변환pandas.DataFrame

전체 흐름 예시

# ── Transformations (Lazy — 계획만 쌓임) ──────────────

df = spark.read.parquet("trips/")             # T: 파일 읽기 계획

df2 = df.filter(col("duration") > 0)          # T: Narrow (필터)

df3 = df2.withColumn(                          # T: Narrow (컬럼 추가)
    "hour", F.hour("pickup_datetime")
)

df4 = df3.groupBy("hour").agg(                 # T: Wide (셔플!)
    F.count("*").alias("trips"),
    F.avg("duration").alias("avg_duration")
)

df5 = df4.orderBy("hour")                     # T: Wide (정렬!)

# ── Action (실행 트리거!) ─────────────────────────────

df5.show()                                     # A: 지금 실행!

# 실행 순서:
# Stage 0: read → filter → withColumn (Narrow, 셔플 없음)
#   ── Shuffle ──
# Stage 1: groupBy 집계
#   ── Shuffle ──
# Stage 2: orderBy 정렬 → show 출력

실행 계획 확인

# 논리 + 물리 실행 계획 확인
df5.explain(True)

# 출력 예시:
# == Parsed Logical Plan ==
# Sort [hour ASC]
# +- Aggregate [hour], [hour, count(1) AS trips, avg(duration) AS avg_duration]
#    +- Project [*, hour(pickup_datetime) AS hour]
#       +- Filter (duration > 0)
#          +- Relation [parquet] trips/
#
# == Optimized Logical Plan ==
# Sort [hour ASC]
# +- Aggregate [hour], [...]
#    +- Project [pickup_datetime, duration, hour(pickup_datetime) AS hour]  ← Column Pruning!
#       +- Filter (isnotnull(duration) AND (duration > 0))                ← Predicate Pushdown!
#          +- Relation [parquet] trips/

5. DataFrame & Spark SQL

Spark SQL이란?

DataFrame 연산을 SQL 문법으로 작성할 수 있게 하는 모듈입니다.
DataFrame API와 100% 동일한 Catalyst Optimizer를 거치므로 성능 차이가 없습니다.

# DataFrame API
result = df.filter(col("age") > 30) \
           .groupBy("department") \
           .agg(F.avg("salary").alias("avg_salary")) \
           .orderBy(F.desc("avg_salary"))

# Spark SQL — 완전히 동일한 실행 계획, 동일한 성능
df.createOrReplaceTempView("employees")

result = spark.sql("""
    SELECT department, AVG(salary) AS avg_salary
    FROM employees
    WHERE age > 30
    GROUP BY department
    ORDER BY avg_salary DESC
""")

어떨 때 Spark SQL을 쓰는가?

상황추천이유
SQL에 익숙한 분석가와 협업Spark SQLSQL은 보편적 언어
복잡한 조인/서브쿼리Spark SQLSQL이 더 읽기 쉬운 경우 多
동적 컬럼 처리, UDFDataFrame APIPython 코드가 유연
ML 파이프라인DataFrame APIMLlib가 DataFrame 기반
기존 SQL 자산 활용Spark SQL기존 쿼리 재사용
ETL 파이프라인 (혼합)둘 다 혼용상황에 맞게 선택

현업에서 Spark SQL 쓰는 파이프라인 사례

Case 1: 데이터 웨어하우스 ETL

# Bronze → Silver → Gold 를 Spark SQL로 처리
# 장점: SQL을 아는 누구나 파이프라인 이해/수정 가능

# Silver 정제
spark.sql("""
    CREATE OR REPLACE TABLE silver.trips AS
    SELECT
        CAST(pickup_datetime AS TIMESTAMP) AS pickup_dt,
        CAST(dropoff_datetime AS TIMESTAMP) AS dropoff_dt,
        CAST(PULocationID AS INT) AS pickup_zone_id,
        CAST(DOLocationID AS INT) AS dropoff_zone_id,
        TIMESTAMPDIFF(MINUTE, pickup_datetime, dropoff_datetime) AS duration_min
    FROM bronze.raw_trips
    WHERE pickup_datetime IS NOT NULL
""")

# Gold 집계
spark.sql("""
    CREATE OR REPLACE TABLE gold.daily_summary AS
    SELECT
        DATE(pickup_dt) AS trip_date,
        z.zone_name,
        COUNT(*) AS total_trips,
        AVG(duration_min) AS avg_duration
    FROM silver.trips t
    JOIN silver.zones z ON t.pickup_zone_id = z.zone_id
    GROUP BY 1, 2
""")

Case 2: Databricks / EMR 팀 협업

데이터 엔지니어: Python + DataFrame API로 파이프라인 구축
데이터 분석가:   Spark SQL로 노트북에서 쿼리
ML 엔지니어:    DataFrame API + MLlib

→ 같은 데이터, 같은 엔진, 다른 인터페이스

Case 3: SQL 기반 스케줄링 파이프라인

# Airflow에서 Spark SQL 실행
from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator

task = SparkSqlOperator(
    task_id="daily_aggregation",
    sql="""
        INSERT OVERWRITE TABLE gold.daily_revenue
        SELECT DATE(order_date), SUM(amount)
        FROM silver.orders
        WHERE order_date = '{{ ds }}'
        GROUP BY 1
    """,
    master="yarn",
    conn_id="spark_default"
)

DataFrame API vs Spark SQL 혼합 사용 (실전 패턴)

# 읽기: DataFrame API
raw_df = spark.read.parquet("s3://bucket/raw/trips/")

# 뷰 등록
raw_df.createOrReplaceTempView("raw_trips")

# 정제: Spark SQL (복잡한 조인/윈도우 함수는 SQL이 편함)
cleaned = spark.sql("""
    SELECT *,
        ROW_NUMBER() OVER (PARTITION BY trip_id ORDER BY updated_at DESC) AS rn
    FROM raw_trips
    QUALIFY rn = 1  -- 중복 제거
""")

# 후처리: DataFrame API (동적 로직은 Python이 편함)
for col_name in nullable_columns:
    cleaned = cleaned.fillna({col_name: default_values[col_name]})

# 저장: DataFrame API
cleaned.write.mode("overwrite").parquet("s3://bucket/silver/trips/")

6. GroupBy 내부 작동 방식

기본 원리

groupBy2단계로 동작합니다:

단계 1: Map-side (Partial) Aggregation — 로컬에서 먼저 집계
단계 2: Reduce-side (Final) Aggregation — 셔플 후 최종 집계

구체적 흐름

df.groupBy("department").agg(F.count("*").alias("cnt"), F.sum("salary").alias("total"))

GroupBy 최적화 포인트

① Partial Aggregation 효과

Without Partial Agg:
  12행 셔플 → 네트워크 전송 12행

With Partial Agg:
  9행 셔플 → 네트워크 전송 9행 (25% 감소)

실제 데이터 (수억 행):
  Without: 1억 행 셔플
  With:    1만 행 셔플 (그룹 수만큼만!)
  → 셔플 데이터 99.99% 감소!

② 데이터 스큐 문제

문제: 특정 키에 데이터가 몰림

  dept="Eng"  → 900만 행 → Reduce 파티션 0: 혼자서 900만 행 처리 (병목!)
  dept="Mkt"  → 50만 행
  dept="Sales"→ 50만 행

해결 방법:

  방법 1: Salting (키에 랜덤 값 추가)
  ──────────────────────────────────
  df = df.withColumn("salt", F.concat(col("dept"), F.lit("_"), (F.rand()*10).cast("int")))
  # "Eng" → "Eng_0", "Eng_1", ..., "Eng_9"
  # 900만 행이 10개 파티션으로 분산!

  agg1 = df.groupBy("salt").agg(F.sum("salary"))  # 분산 집계
  agg2 = agg1.withColumn("dept", F.split(col("salt"), "_")[0]) \
              .groupBy("dept").agg(F.sum("sum(salary)"))  # 최종 합산

  방법 2: AQE (Spark 3.0+) 자동 처리
  ──────────────────────────────────
  spark.conf.set("spark.sql.adaptive.enabled", "true")
  spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
  # Spark가 런타임에 스큐를 감지하고 자동으로 파티션 분할

③ GroupBy + Join (Shuffle 최소화)

# 안 좋은 패턴: groupBy 후 join → 셔플 2번
grouped = trips.groupBy("zone_id").count()
result = grouped.join(zones, "zone_id")  # 셔플 2번

# 좋은 패턴: broadcast join → 셔플 1번
from pyspark.sql.functions import broadcast
grouped = trips.groupBy("zone_id").count()
result = grouped.join(broadcast(zones), "zone_id")  # 셔플 1번 (zones를 각 노드에 복사)
Broadcast Join 조건:
  작은 테이블 < spark.sql.autoBroadcastJoinThreshold (기본 10MB)
  → 자동으로 Broadcast Join 선택

  수동 힌트:
  SELECT /*+ BROADCAST(zones) */ ...
  FROM trips JOIN zones ON trips.zone_id = zones.zone_id

GroupBy Sort-based vs Hash-based

Spark는 기본적으로 Sort-based Aggregation 사용:

  1. 같은 키를 가진 데이터를 셔플로 모음
  2. 키로 정렬
  3. 순차적으로 스캔하며 집계

  장점: 메모리 효율 (전체 데이터를 메모리에 올릴 필요 없음)
  단점: 정렬 비용

Hash-based는 RDD의 reduceByKey 등에서 사용:
  1. 해시 테이블에 키별로 집계
  장점: 정렬 없이 빠름
  단점: 메모리에 해시 테이블 유지 필요

부록: Spark 설정 치트시트

# 메모리 설정
spark.conf.set("spark.executor.memory", "4g")
spark.conf.set("spark.driver.memory", "2g")

# 셔플 파티션 수 (groupBy 등 셔플 후 파티션 수)
spark.conf.set("spark.sql.shuffle.partitions", "200")  # 기본 200

# AQE 활성화 (Spark 3.0+)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

# Broadcast Join 임계값
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10485760")  # 10MB

# 동적 파티션 pruning
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true")