띵유로그

[스파크] RDD연산자 본문

DataEngineering/SPARK

[스파크] RDD연산자

띵유 2021. 3. 8. 12:43
반응형

Transformation 과 Action

RDD연산자는 크게 transformation(변환) 과 action(행동)으로 나뉜다.

  Transformation Acition
정의 RDD를 return RDD를 통해 계산 수행
(return type 은 연산마다 다름)
특징 Lazy eager
예시 filter, map, flatMap, distinct count, foreach, collect, reduce, take

[예시]

 list를 만든 후에 이전 글에서 설명했던 parallelize함수를 통해 RDD를 만들었다.
(spark2.0부터는 sparksession을 통해 호출해야한다. 따라서 spark.sparkcontext.parallelize 를 호출하면됨)

그 이후 글자별로 map을 해주었다.
하지만! 위 코드를 실행해도 클러스터에는 아무런 변화가 일어 나지 않는다. ( return될 RDD의 refrence가 반환되기는 함)

왜일까?
이유는 아래서 설명!

지연연산이란?

스파크에서는 변환연산자를 바로 실행하지 않는다.
transaction이 호출되고 나면 RDD계보(변환연산의 순서)를 살펴보고
연산그래프를 작성한 뒤에 action을 실행한다.
다시말하면, transation은 연산을 어떤 순서로 실행하는지 알려주는 설계도 라고 할 수 있다

[]

 

유용한 RDD Action

takeSample : return Array[T]. random sample을 return 

takeOrdered : return Array[T].처음 n개의 element를 return 

saveAsTextFile : return Unit. textfile 로 저장
saveAsSequenceFile : return Unit. Hadoop Sequence File로 저장


추가 예시) 모든 로그가 담겨있는  RDD에서 2021년 8월의 error 로그만 모아서 count

 val lastYearsLogs:  RDD[String] = ...
 val numDecErrorLogs = lastYearsLogs.filter(lg => lg.contains("2021-08")&&lg.contains("error")).count()

=>  count가 실행되기 전까지 filter가 실행되지 않는다!

 

 

[연습]

1. map 변환 연산자
또 다른 함수를 인자로 받아 RDD하나를 반환한다. (다른 타입의 RDD를 반환할 수 있음)
1) 숫자 변형
scala > val numbers = sc.parallelize(10 to 50 by 10)
scala > numbers.foreach(x => println(x)) 
 -> 10 부터 50 까지 출력
scala > val Squarednum = numbers.map(num => num* num)

scala > Squarednum.foreach(x => println(x))
-> 100,400,....,2500 출력
2) 타입 변경 & reverse
scala > reversed = Squarednum.map( x=>x.toString.reverse)
scala > reversed.foreach(x=>println(x))
001

004

009

0061

0052
2-1) 타입 변경 & reverse
scala > val alsoReversed = Squarednum.map(_.toString.reverse)
scala > alsoReversed.first
scala > alsoReversed.top(4) 
-> 문자열순 정렬하여 가장 큰 4개 반환 
009
0061
0052
004

2. distinct, flatMap 변환 연산자
15,16,20,20
77,80,94
94,98,16,31
31,15,20
가 적힌 client-ids.log 를 가지고 아래 작업들을 수행해본다. (일자별 주문 고객 id)

scala > val lines=sc.textFile("/home/spark/client-ids.log")
scala > val idsStr=lines.map(line=>line.split(","))

scala > idsStr.foreach(println(_)) #java의 String을 반환한다. @930defa 등...
idsStr는 각 줄 별로 , 로 split하여 새로운 rdd(각요소로 배열을 가지는, 즉 배열의 배열)를 만든것이다.
따라서 
scala > idsStr.first 를 실행하면 Array[String] = Array(15, 16, 20, 20)를 반환한다. 
idsStr RDD의 실제 내용을 출력하려면 RDD의 collect 행동연산자를 사용한다. 
collect 행동연산자는 새로운 배열을 생성하고 RDD의 모둔 요소를 이 배열에 모은다.
scala > idsStr.collect
res5: Array[Array[String]] = Array(Array(15, 16, 20, 20), Array(77, 80, 94), Array(94, 98, 16, 31), Array(31, 15, 20))

이렇게 변환 연산자가 반환한 여러 배열의 모든 요소를 단일 배열로 통합하기 위해서는 flatMap연산자를사용할 수 있다.
scala > val ids = lines.flatMap(_.split(","))
scala > ids.collect
res6: Array[String] = Array(15, 16, 20, 20, 77, 80, 94, 94, 98, 16, 31, 31, 15, 20)
scala > ids.collect.mkString("; ")
res8: String = 15; 16; 20; 20; 77; 80; 94; 94; 98; 16; 31; 31; 15; 20

String으로 저장된 RDD를 Int로 변환하자.

scala > val intids=ids.map(_.toInt)
scala > intIds.collect
Array[Int] = Array(15, 16, 20, 20, 77, 80, 94, 94, 98, 16, 31, 31, 15, 20)

고객 ID의 고유 목록을 만들자.
scala > val uniqueIds = intIds.distinct

scala > uniqueIds.collect
Array[Int] = Array(16, 80, 98, 20, 94, 15, 77, 31)
scala > val finalCount = uniqueIds.count
finalCount : Long = 8

전체 구매 횟수를 집계해보자
scala >val transactionCount = ids.count
transactionCount : Long = 14

반응형
Comments