본문 바로가기

KAFKA3

실시간 데이터 수집-5 (Filebeat 설치) 환율 데이터를 API로 받아보니 json array 형태로 되어있어서 kafka로 직접적으로 넣기가 힘들었다. filestream connector를 사용하면 json array를 하나하나 풀어서 파일에 write 한 후 읽어갈 수 있지만 실제 운영환경에서는 사용을 권하지 않는다고 공식문서에 적혀있었다. 그렇다면 log 파일 형태로 관리했을때 어떻게 kafka로 수집할 수 있을지 찾아보다 ELK stack에서 자주 사용되는 filebeat를 사용하기로 결정했다. connector의 역할을 충분히 대체할 수 있고 제한적인 환경에서 logstash보다 적은 리소스를 차지한다. 환율 log 파일 생성 현재 환율 api를 통해 가져오는 데이터가 json형태의 데이터가 여러개 들어있는 array 형태이기 때문에 .. 2024. 3. 10.
실시간 데이터 수집-2 (AWS에서 Confluent 설치) 실제로 Confluent를 서버에 설치해본다. 리소스 한계상 Standalone 버전으로 진행했다. 설치 환경 서버 aws ec2 : t2.xlarge 유형 OS / 소프트웨어 버전 S/W Version Ubuntu 20.04.6 JAVA 11 Confluent - Community 7.6.0 JAVA 설치 java 11버전 설치 script : https://github.com/tjsdud594/Confluent/blob/main/ShellScript/install_java.sh java 17 버전 설치시 아래 에러발생 Error: the Confluent CLI requires Java version 1.8 or 1.11. See https://docs.confluent.io/current/insta.. 2024. 2. 11.
실시간 데이터 수집-1 (시나리오) 실시간 데이터 수집을 위한 시나리오를 구축한다. 어떤 데이터를 수집할 지, 가공을 어떻게 할 것인지, 어디에 저장할 것인지, 시각화는 어떻게 할 것인지. Architecture 수집대상 실시간 환율 api : 실시간 환율 정보를 가져온다. (링크) 수집기 Filebeat : json log file 형태로 기록되고 있는 실시간 환율 데이터를 수집하여 kafka topic에 저장한다. Kafka : Topic에 실시간 데이터를 queue 방식으로 저장한다. Sink Connector : Topic에 저장된 데이터를 Sink Connector를 활용하여 Target DB(Elasticearch)에 저장한다. Prometheus : node와 Kafka의 Metric 수집에 사용. 가공 Spark Stream.. 2024. 2. 11.