띵유로그

[데이터파이프라인] Logstash 구성 및 twitter 연결 본문

DataEngineering

[데이터파이프라인] Logstash 구성 및 twitter 연결

띵유 2022. 1. 17. 01:03
반응형

wget 명령어를 통해 logstash 를 다운로드 받습니다.

logstash 도 링크를 걸어줍니다.

그 후에 .bash_profile을 수정해서 어떤 경로에서도 logstash 명령어를 수행할 수 있도록 경로를 잡아줍니다.

.bash_profile

그 후 수정된 값을 적용시키기 위해 source명령어를 실행해줍니다.

 

2. twitter 연동

먼저 트위터 계정을 만들고 개발자 app 신청을 합니다.

https://www.citopes.com/entry/%ED%8A%B8%EC%9C%84%ED%84%B0-%EA%B0%9C%EB%B0%9C%EC%9E%90-%EA%B3%84%EC%A0%95-%EC%B7%A8%EB%93%9D%ED%95%98%EA%B8%B0

 

트위터 개발자 계정 신청하기

트위터의 API를 이용하기 위해서는 Access Token을 발급받아야 하는데, 그것을 위해 가장 기본적인 작업이 트위터 개발자 계정을 신청하는 일입니다. 개발자 계정만 신청해서 승인이 나면 Twitter Manag

www.citopes.com

 

그 후에 consermer_key, oauth_token을 발급받습니다.

 

producer 폴더를 하나 생성 한 후에 producer_test.conf파일을 만들어 정보를 입력합니다.

input {

twitter {
consumer_key => "~"
consumer_secret => "~"
oauth_token => "~"
oauth_token_secret => "~"
keywords => ["news","game","bigdata","nft"]
full_tweet => true
}
}
output{
stdout{
codec => rubydebug
}
}

keywords를 트위터에서 수집했고 정상 작동하는지 확인하기 위해 우선 output 형식이 stdout로 지정해서 콘솔에 뿌려줍니다.

아래 명령어를 통해 logstash를 실행합니다.

아래와 같이 수집되는것을 볼 수 있습니다.

정상동작하는것을 확인했으니 이제 카프카 서버로 보내도록 하겠습니다.

 

3. 카프카 브로커로 보내기

좀전에 생성했던 conf 파일에서 output 부분만 수정합니다.

stdout으로 되어있던 것을 kafka로 수정해줍니다.

kafka {
bootstrap_servers => "카프카브로커 ip주소:9092"
codec => json{}
acks => "1"
topic_id => "토픽명"
}

다시 logstash를 실행해서 프로듀서를 실행 한 후 컨슈머를 확인하면 잘 수집되는것을 볼 수 있다.

좌 - logstash(producer) / 우- consumer

참고) consumer도 logstash로 구현했고, consumer.conf는 다음과 같다. (콘솔에 출력)

input {
  kafka {
    bootstrap_servers => "브로커id:9092"
    topics => ["토픽명"]
    consumer_threads => 1
    decorate_events => true
    }
}

output {
stdout {
codec=> rubydebug
}
}
반응형
Comments