띵유로그

[SPARK] 스파크 튜닝 방안 본문

DataEngineering/SPARK

[SPARK] 스파크 튜닝 방안

띵유 2022. 3. 21. 12:47
반응형

이번 글에서는 효율적인 코드 작성을 위한 스파크 튜닝 방법에 대해서 정리해보겠습니다.

 

1. 파티셔닝 사용

적절한 파티션의 개수를 두어 데이터의 분포를 적절하게 하는것이 중요합니다.

한 파티션에 너무 많은 데이터가 들어간다면, 처리하는데 비용이 너무 많이 들것이고
적은 데이터를 가진 파티션들이 너무 많다면 스케쥴링하는데에 비용이 많이 들것입니다. 또 모든 메타데이터를 저장하고 있어야합니다.

 

예를들어 RDD를 filter 하고 난 후에는 파티션별로 데이터의 양이 상이할 수 있습니다.

이때 repartition을 해주어서 데이터를 재분배해주는 것이 좋습니다.

2. 캐싱

RDD는 액션함수가 실행될 때마다 데이터로드부터 다시 계산됩니다. 그래서 동일한 RDD를 가지고 여러번의 연산을 해야할 때는 중복실행을 줄이기 위해 캐싱을 해둘 수 있습니다.

결과를 저장해두었다가 필요할 떄 다시 불러와서 사용하는 것입니다.

캐시함수 종류

  • cache() : 저장장소 = MEMORY_ONLY
  • persist() : 저장수준을 여러가지로 설정 가능
                 JVM 힙에 직렬화되지 않은 객체형태로 저장
Level Space used cpu time  In memory   On disk Serialized 
MEMORY_ONLY  높음 낮음 Y  N
MEMORY_ONLY_SER 낮음 높음  N Y
MEMORY_AND_DISK 높음 중간 일부 일부 일부
MEMORY_AND_DISK_SER  높음 높음 일부  일부 Y
DISK_ONLY  낮음 높음  Y  Y

SER = serialized

 

3. 브로드캐스팅

브로드캐스트 변수는 모든 작업노드가 접근할 수 있는 공유변수 입니다. 예를들어서 데이터양이 작은 Dataframe과 큰 Dataframe을 조인하려고할때, 비용이 많이들어가는 셔플연산이 필요합니다. 
이때 작은 Dataframe을 브로드캐스팅을 하면 셔플연산을 할 필요가 없기때문에 매우 효율적입니다.
당연히 큰 Dataframe을 join 할 때는 브로드캐스팅이 비효율적입니다.

largeDF.join(
	broadcast(smallDF),
    largeDf("id")<=> smallDf("id")
).show()

 

4. collect()함수 피하기

RDD.collect 함수를 호출하면 분산되어있던 데이터들이 마스터노드(driver 프로세스)로 보내집니다.
이 경우에 메모리 에러가 발생할 수도 있고 분산처리가 불가능합니다.
따라서 take 나 takeSample 함수를 사용하여 일부만 가져오는 것이 좋습니다.

 

 

5. YARN 파라미터 설정하기

yarn.nodemanager.resource.momory-md : 1개의 노드에 있는 모든 Excecutor 컨테이너들이 사용하는 메모리 총합

yarn.nodemanager.resource.cpu-vcores : 1개의 노드에 있는 모든 Excecutor 컨테이너들이 사용하는 코어 수 총합


* 노드마다 OS, 하둡 데몬이 돌아야하기 때문에 실제 물리적 크기보다 작게 할당해야합니다.

예를들면 다음과 같습니다.

한 노드가 16개의 코어, 64GB의 메모리를 가진다면
yarn.nodemanager.resource.memory-md = (64-1) x 1024 = 64512
yarn.nodemanager.resource.cpu-vcores = (16-1) = 15

* yarn.nodemanager.resource.memory-md > spark.yarn.executor.MemoryOverhead + spark.executor.memory

 

6. SPARK 파라미터 설정하기

spark.executor.memory : 하나의 executor에서 Task를 실행하는데 사용하는 메모리

spark.executor.cores : 하나의 executor에서 Task를 실행하는데 사용하는 (가상)코어 개수

spark.driver.memory : 하나의 driver에서 Task를 실행하는데 사용하는 메모리

spark.driver.cores : 하나의 driver에서 Task를 실행하는데 사용하는 (가상)코어 개수

spark.executor.instances : Job 실행을 위한 executor 개수 설정

spark.default.paralleism : join이나 reduceByKey같은 넓은 transformation 을 하거나 파티션을 지정하지 않은 경우 RDD 디폴트 파티션 개수

 

.. 계속 보완예정

반응형
Comments