실시간 데이터를 원본그대로 수집하는 것 뿐만 아니라 가공하는 방법을 익히기 위해 Spark Streaming을 선택했다.
실시간 데이터 가공에는 대표적으로 Spark Streaming과 Flink가 있는것으로 알고있지만
이번 기회에는 익숙한 Spark를 활용하여 데이터를 가공하고
추후에 Spark Streaming과 Flink를 비교하는 게시물을 작성하는 것을 목표로 한다.
Spark 설치
파일 다운로드
- Apache Spark 사이트에서 파일 다운로드
$ wget https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
설치 경로 설정 & 압축해제
- 압축해제 및 환경 변수 설정
$ tar -xvf spark-3.5.1-bin-hadoop3.tgz
$ vi ~/.bashrc
==============================================================
# 가장 마지막줄에 추가
export SPARK_HOME=/home/ubuntu/spark/spark-3.5.1-bin-hadoop3
export PATH=$PATH:$SPARK_HOME/bin
wq!
$ source ~/.bashrc
- 설정 변경
$ cp $SPARK_HOME/conf/spark-defaults.conf.template $SPARK_HOME/conf/spark-defaults.conf
$ cp $SPARK_HOME/conf/spark-env.sh.template $SPARK_HOME/conf/spark-env.sh
$ cp $SPARK_HOME/conf/spark-env.sh $SPARK_HOME/conf/log4j.properties
$ vi $SPARK_HOME/conf/log4j.properties
==============================================================
...
# 가장 마지막 줄에 추가
log4j.rootCategory=WARN, console
wq!
- spark 실행 테스트
ubuntu@ip-172-31-9-181:~$ pyspark
Python 3.8.10 (default, Nov 22 2023, 10:22:35)
[GCC 9.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/07 10:41:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.5.1
/_/
Using Python version 3.8.10 (default, Nov 22 2023 10:22:35)
Spark context Web UI available at http://ip-172-31-9-181.ap-northeast-2.compute.internal:4040
Spark context available as 'sc' (master = local[*], app id = local-1712486488674).
SparkSession available as 'spark'.
>>>
Spark Streaming 사용방법
Spark Streaming은 따로 설치하는 것이 아닌 Spark설치시 내부 모듈로 함께 설치되므로 간단하게 import하여 사용이 가능하다.
사용예시
- kafka 특정 topic의 데이터를 읽어와서 Elasticsearch의 index에 적재
- 사용한 csv files : https://github.com/tjsdud594/Confluent/tree/main/spark/files
- pyspark source code (원본)
더보기
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession \
.builder \
.appName("readfromkafka") \
.master("local[*]") \
.config("es.index.auto.create", "true") \
.getOrCreate()
kafka_bootstrap_servers = "172.31.9.181:9092"
topic_name = "worldmoney.filebeat.spark"
print("=================================Start Read Kafka Topic=================================")
df = spark \
.readStream\
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", topic_name) \
.load()
print("=================================End Read Kafka Topic=================================")
print(df.printSchema())
query = df.writeStream \
.outputMode("append") \
.queryName("writing_to_es") \
.format("org.elasticsearch.spark.sql") \
.option("checkpointLocation", "/tmp/") \
.option("es.resource", "index/worldmoney.filebeat.spark") \
.option("es.nodes", "localhost:9200") \
.start()
- 실행 코드
- kafka dependency : org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1
- elasticsearch dependency : org.elasticsearch:elasticsearch-spark-20_2.12:8.13.4
$ nc -lk 9999
$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,org.elasticsearch:elasticsearch-spark-20_2.12:8.13.4 kafka_to_es.py
'실시간 데이터 수집' 카테고리의 다른 글
실시간 데이터 수집-6 (Elasticsearch & kibana 설치) (0) | 2024.03.24 |
---|---|
실시간 데이터 수집-5 (Filebeat 설치) (0) | 2024.03.10 |
실시간 데이터 수집-4 (Grafana 설치 및 Prometheus 연동) (1) | 2024.02.25 |
실시간 데이터 수집-3 (Prometheus 설치) (0) | 2024.02.20 |
실시간 데이터 수집-2 (AWS에서 Confluent 설치) (1) | 2024.02.11 |
댓글