본문 바로가기
데이터/Spark

Spark Partition 최적화

by Sunyoung95 2024. 6. 23.
spark의 병렬처리는 task 단위로 처리가 된다.
각 task는 spark의 parition단위와 일치하므로 partition 설정을 어떻게 하느냐에 따라 병렬처리의 효율성이 달라지게 된다.

 

Spark에서 task란

  • Spark Application이 제출되면 job → Stage → Task 순으로 쪼개지게 되며 실질적으로 처리되는 최소 연산 단위는 Task
  • 따라서 각 처리 단위는 1 Task = 1 Core = 1 Partition 이 된다.
  • 따라서 각 코어 당 얼마만큼의 메모리가 할당될 수 있느냐에 따라 Partition의 크기를 정할 수 있다.

Partition 이란?

  • RDDs나 Dataset을 구성하고 있는 최소 단위 객체
  • 각 Partition은 서로 다른 노드에서 분산처리된다.
  • 하나의 Task에서 하나의 Partition이 처리되며, 하나의 Task는 하나의 Core가 연산처리한다.

  • 위 이미지의 경우 전체 파티션이 1800개 이며, 전체 Core 수를 300개로 세팅해둔 상태(= 현재 동시 처리중인 Task 300개)
  • 따라서 Partition의 수는 Core의 수, Partition의 크기는 Memory 크기에 따라 결정 된다.
    • 적은 Partition = 크기가 큰 Partition
    • 많은 Partition = 크기가 작은 Partition 

Partition의 종류

Input Partiton

  • 관련 설정 : spark.sql.files.maxpartitionBytes
  • 기본 값 : 128MB (134217728)
  • 처음 파일을 읽을 때 생성하는 Partition
  • spark가 읽어올 파일의 크기가 128MB보다 크다면 Spark에서 128MB만큼 쪼개면서 파일을 읽는다.
  • 파일의 크기가 128MB보다 작다면 그대로 읽어들여, 파일 하나당 하나의 Partition이 된다. 
  • 보통 필요한 컬럼만 골라서 뽑아 쓰기 때문에 파일이 128MB보다 작으므로 기본설정을 유지한다.
  • 만약에 파일 하나의 크기가 매우 크고 수 도 많다면 설정값의 크기를 늘리고 자원도 늘려야한다. 

Output Partition

  • 관련 설정 : df.repartition(cnt), df.coalesce(cnt)
  • 파일을 저장할 때 생성하는 Partition
  • 해당 Partition의 수(cnt)가 마지막에 저장되는 파일의 수를 지정.
  • 만약 Hadoop과 같은 block storage를 사용하고 있다면 blocksize에 맞게 크기를 맞춰서 설정해주면 된다.
  • Tips
    • groupBy 집계 후 저장할 때 데이터의 크기가 작아진다. 그 후 spark.sql.shuffle.partitions 설정에 따라 파일 수가 지정되는데 이때 파일의 크기를 늘리거나 줄이기 위해 repartition / coalesce를 사용해 parition 수를 바꿀 수 있다.
    • df.where()를 통해 필터링을 하고 나서 그대로 저장한다면 파편화가 생기므로 repartition(cnt)를 한 후 저장한다.
    • repartition은 셔플을 기반으로 작동하므로 Partition을 늘릴때만 사용하고,
      Partition을 줄일 때는 셔플을 수행하지 않는 coalesce사용을 권장한다. 

Shuffle Partition

  • 관련 설정 : spark.sql.shuffle.partitions
  • spark 성능에 가장 크게 영향을 미치는 Partition으로, join / groupBy 등의 연산을 수행할 때 Shuffle Partition이 쓰인다
  • Join, groupBy 수행 시 Partition의 수(Task의 수)가 결정된다.
  • 해당 설정값은 Core의 수가 아닌 각 Core의 Memory에 따라 결정해야한다. 
    • Why? 만약에 Partition의 크기가 커서 연산에 쓰이는 메모리가 부족하다면 Shuffle Spill(데이터를 직렬화하고 스토리에 저장, 처리 이후에는 역 직렬화하고 연산 재개)이 일어나기 때문.
    • Shuffle Spill이 일어나면 Task가 지연되고 에러가 발생할 수 있다.
  • 일반적으로 하나의 Shuffle Partition 크기가 100~200MB 정도 나올 수 있도록 spark.sql.shuffle.partitions 수를 조절하는 것이 최적이다.
  • Tips
    • Memory Limit Over, Memory Spill 등 자원 문제가 생길 경우, Shuffle Partition 크기를 우선적으로 고려해야 함.

참고

'데이터 > Spark' 카테고리의 다른 글

Spark 병렬처리 성능 테스트 사례  (0) 2024.07.14
RDD vs. Dataframe (In Python)  (1) 2024.06.16
Spark Cluster 구조 및 작동원리  (0) 2024.06.09

댓글