일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- 데이터파이프라인
- Spark
- 하둡
- 프로그래머스 큰 수 만들기
- freenom
- 런타임데이터영역
- Databricks
- Catalyst Optimizer
- 프로그래머스힙
- 카프카
- 실행엔진
- AWS Crawler
- kafka 설치
- 문맥교환
- 하둡1.0
- 하둡2.0
- EMR 구조
- 데이터 수집
- 스파크
- 지연연산
- 서버간 복사
- Spark 최적화
- 하둡에코시스템
- 데이터베이스복사
- 데이터엔지니어링
- lazy evaluation
- 프로그래머스
- ORACLE MSSQL차이
- 빌드도구
- ORACLE문법
- Today
- Total
띵유로그
[스파크] RDD연산자 본문
Transformation 과 Action
RDD연산자는 크게 transformation(변환) 과 action(행동)으로 나뉜다.
Transformation | Acition | |
정의 | RDD를 return | RDD를 통해 계산 수행 (return type 은 연산마다 다름) |
특징 | Lazy | eager |
예시 | filter, map, flatMap, distinct | count, foreach, collect, reduce, take |
[예시]
list를 만든 후에 이전 글에서 설명했던 parallelize함수를 통해 RDD를 만들었다.
(spark2.0부터는 sparksession을 통해 호출해야한다. 따라서 spark.sparkcontext.parallelize 를 호출하면됨)
그 이후 글자별로 map을 해주었다.
하지만! 위 코드를 실행해도 클러스터에는 아무런 변화가 일어 나지 않는다. ( return될 RDD의 refrence가 반환되기는 함)
왜일까?
이유는 아래서 설명!
지연연산이란?
스파크에서는 변환연산자를 바로 실행하지 않는다.
transaction이 호출되고 나면 RDD계보(변환연산의 순서)를 살펴보고 연산그래프를 작성한 뒤에 action을 실행한다.
다시말하면, transation은 연산을 어떤 순서로 실행하는지 알려주는 설계도 라고 할 수 있다.
[]
유용한 RDD Action
takeSample : return Array[T]. random sample을 return
takeOrdered : return Array[T].처음 n개의 element를 return
saveAsTextFile : return Unit. textfile 로 저장
saveAsSequenceFile : return Unit. Hadoop Sequence File로 저장
추가 예시) 모든 로그가 담겨있는 RDD에서 2021년 8월의 error 로그만 모아서 count
val lastYearsLogs: RDD[String] = ...
val numDecErrorLogs = lastYearsLogs.filter(lg => lg.contains("2021-08")&&lg.contains("error")).count()
=> count가 실행되기 전까지 filter가 실행되지 않는다!
[연습]
1. map 변환 연산자
또 다른 함수를 인자로 받아 RDD하나를 반환한다. (다른 타입의 RDD를 반환할 수 있음)
1) 숫자 변형
scala > val numbers = sc.parallelize(10 to 50 by 10)
scala > numbers.foreach(x => println(x))
-> 10 부터 50 까지 출력
scala > val Squarednum = numbers.map(num => num* num)
scala > Squarednum.foreach(x => println(x))
-> 100,400,....,2500 출력
2) 타입 변경 & reverse
scala > reversed = Squarednum.map( x=>x.toString.reverse)
scala > reversed.foreach(x=>println(x))
001
004
009
0061
0052
2-1) 타입 변경 & reverse
scala > val alsoReversed = Squarednum.map(_.toString.reverse)
scala > alsoReversed.first
scala > alsoReversed.top(4)
-> 문자열순 정렬하여 가장 큰 4개 반환
009
0061
0052
004
2. distinct, flatMap 변환 연산자
15,16,20,20
77,80,94
94,98,16,31
31,15,20
가 적힌 client-ids.log 를 가지고 아래 작업들을 수행해본다. (일자별 주문 고객 id)
scala > val lines=sc.textFile("/home/spark/client-ids.log")
scala > val idsStr=lines.map(line=>line.split(","))
scala > idsStr.foreach(println(_)) #java의 String을 반환한다. @930defa 등...
idsStr는 각 줄 별로 , 로 split하여 새로운 rdd(각요소로 배열을 가지는, 즉 배열의 배열)를 만든것이다.
따라서
scala > idsStr.first 를 실행하면 Array[String] = Array(15, 16, 20, 20)를 반환한다.
idsStr RDD의 실제 내용을 출력하려면 RDD의 collect 행동연산자를 사용한다.
collect 행동연산자는 새로운 배열을 생성하고 RDD의 모둔 요소를 이 배열에 모은다.
scala > idsStr.collect
res5: Array[Array[String]] = Array(Array(15, 16, 20, 20), Array(77, 80, 94), Array(94, 98, 16, 31), Array(31, 15, 20))
이렇게 변환 연산자가 반환한 여러 배열의 모든 요소를 단일 배열로 통합하기 위해서는 flatMap연산자를사용할 수 있다.
scala > val ids = lines.flatMap(_.split(","))
scala > ids.collect
res6: Array[String] = Array(15, 16, 20, 20, 77, 80, 94, 94, 98, 16, 31, 31, 15, 20)
scala > ids.collect.mkString("; ")
res8: String = 15; 16; 20; 20; 77; 80; 94; 94; 98; 16; 31; 31; 15; 20
String으로 저장된 RDD를 Int로 변환하자.
scala > val intids=ids.map(_.toInt)
scala > intIds.collect
Array[Int] = Array(15, 16, 20, 20, 77, 80, 94, 94, 98, 16, 31, 31, 15, 20)
고객 ID의 고유 목록을 만들자.
scala > val uniqueIds = intIds.distinct
scala > uniqueIds.collect
Array[Int] = Array(16, 80, 98, 20, 94, 15, 77, 31)
scala > val finalCount = uniqueIds.count
finalCount : Long = 8
전체 구매 횟수를 집계해보자
scala >val transactionCount = ids.count
transactionCount : Long = 14
'DataEngineering > SPARK' 카테고리의 다른 글
[스파크] RDD - Double RDD 전용 함수 (0) | 2021.03.08 |
---|---|
[스파크]RDD연산자2(sample, take, takeSample) (0) | 2021.03.08 |
[스파크] RDD 개요 (0) | 2021.03.08 |
[SPARK] SPARK DATAFRAME이란 - RDD와 어떻게 다른가? (0) | 2020.12.06 |
[SPARK] SPARK - ORIENTATION (0) | 2020.11.25 |