일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- AWS Crawler
- freenom
- 프로그래머스
- Catalyst Optimizer
- 데이터베이스복사
- 실행엔진
- Spark
- 데이터엔지니어링
- 지연연산
- lazy evaluation
- 데이터파이프라인
- 하둡
- 하둡1.0
- 문맥교환
- 런타임데이터영역
- 스파크
- Databricks
- 서버간 복사
- Spark 최적화
- 데이터 수집
- 프로그래머스힙
- 카프카
- 빌드도구
- EMR 구조
- 프로그래머스 큰 수 만들기
- 하둡에코시스템
- kafka 설치
- ORACLE MSSQL차이
- ORACLE문법
- 하둡2.0
- Today
- Total
띵유로그
[SPARK] 스파크 튜닝 방안 본문
이번 글에서는 효율적인 코드 작성을 위한 스파크 튜닝 방법에 대해서 정리해보겠습니다.
1. 파티셔닝 사용
적절한 파티션의 개수를 두어 데이터의 분포를 적절하게 하는것이 중요합니다.
한 파티션에 너무 많은 데이터가 들어간다면, 처리하는데 비용이 너무 많이 들것이고
적은 데이터를 가진 파티션들이 너무 많다면 스케쥴링하는데에 비용이 많이 들것입니다. 또 모든 메타데이터를 저장하고 있어야합니다.
예를들어 RDD를 filter 하고 난 후에는 파티션별로 데이터의 양이 상이할 수 있습니다.
이때 repartition을 해주어서 데이터를 재분배해주는 것이 좋습니다.
2. 캐싱
RDD는 액션함수가 실행될 때마다 데이터로드부터 다시 계산됩니다. 그래서 동일한 RDD를 가지고 여러번의 연산을 해야할 때는 중복실행을 줄이기 위해 캐싱을 해둘 수 있습니다.
결과를 저장해두었다가 필요할 떄 다시 불러와서 사용하는 것입니다.
캐시함수 종류
- cache() : 저장장소 = MEMORY_ONLY
- persist() : 저장수준을 여러가지로 설정 가능
JVM 힙에 직렬화되지 않은 객체형태로 저장
Level | Space used | cpu time | In memory | On disk | Serialized |
MEMORY_ONLY | 높음 | 낮음 | Y | N | N |
MEMORY_ONLY_SER | 낮음 | 높음 | Y | N | Y |
MEMORY_AND_DISK | 높음 | 중간 | 일부 | 일부 | 일부 |
MEMORY_AND_DISK_SER | 높음 | 높음 | 일부 | 일부 | Y |
DISK_ONLY | 낮음 | 높음 | N | Y | Y |
SER = serialized
3. 브로드캐스팅
브로드캐스트 변수는 모든 작업노드가 접근할 수 있는 공유변수 입니다. 예를들어서 데이터양이 작은 Dataframe과 큰 Dataframe을 조인하려고할때, 비용이 많이들어가는 셔플연산이 필요합니다.
이때 작은 Dataframe을 브로드캐스팅을 하면 셔플연산을 할 필요가 없기때문에 매우 효율적입니다.
당연히 큰 Dataframe을 join 할 때는 브로드캐스팅이 비효율적입니다.
largeDF.join(
broadcast(smallDF),
largeDf("id")<=> smallDf("id")
).show()
4. collect()함수 피하기
RDD.collect 함수를 호출하면 분산되어있던 데이터들이 마스터노드(driver 프로세스)로 보내집니다.
이 경우에 메모리 에러가 발생할 수도 있고 분산처리가 불가능합니다.
따라서 take 나 takeSample 함수를 사용하여 일부만 가져오는 것이 좋습니다.
5. YARN 파라미터 설정하기
yarn.nodemanager.resource.momory-md : 1개의 노드에 있는 모든 Excecutor 컨테이너들이 사용하는 메모리 총합
yarn.nodemanager.resource.cpu-vcores : 1개의 노드에 있는 모든 Excecutor 컨테이너들이 사용하는 코어 수 총합
* 노드마다 OS, 하둡 데몬이 돌아야하기 때문에 실제 물리적 크기보다 작게 할당해야합니다.
예를들면 다음과 같습니다.
한 노드가 16개의 코어, 64GB의 메모리를 가진다면
yarn.nodemanager.resource.memory-md = (64-1) x 1024 = 64512
yarn.nodemanager.resource.cpu-vcores = (16-1) = 15
* yarn.nodemanager.resource.memory-md > spark.yarn.executor.MemoryOverhead + spark.executor.memory
6. SPARK 파라미터 설정하기
spark.executor.memory : 하나의 executor에서 Task를 실행하는데 사용하는 메모리
spark.executor.cores : 하나의 executor에서 Task를 실행하는데 사용하는 (가상)코어 개수
spark.driver.memory : 하나의 driver에서 Task를 실행하는데 사용하는 메모리
spark.driver.cores : 하나의 driver에서 Task를 실행하는데 사용하는 (가상)코어 개수
spark.executor.instances : Job 실행을 위한 executor 개수 설정
spark.default.paralleism : join이나 reduceByKey같은 넓은 transformation 을 하거나 파티션을 지정하지 않은 경우 RDD 디폴트 파티션 개수
.. 계속 보완예정
'DataEngineering > SPARK' 카테고리의 다른 글
SPARK - DATAFRAME, DATASET (0) | 2022.07.27 |
---|---|
[SPARK] 스파크 기본 아키텍쳐 (0) | 2022.03.03 |
[오류]java.lang.NoSuchMethodError: scala.reflect.internal... spark library 설정 (0) | 2022.01.12 |
[SPAKR] note (0) | 2021.11.04 |
[SPARK] SPARK 지연연산의 이점 (0) | 2021.08.18 |