본문 바로가기
실시간 데이터 수집

실시간 데이터 수집-7 (Spark 설치 및 Spark Streaming 사용)

by Sunyoung95 2024. 4. 7.
실시간 데이터를 원본그대로 수집하는 것 뿐만 아니라 가공하는 방법을 익히기 위해 Spark Streaming을 선택했다.
실시간 데이터 가공에는 대표적으로 Spark Streaming과 Flink가 있는것으로 알고있지만
이번 기회에는 익숙한 Spark를 활용하여 데이터를 가공하고 
추후에 Spark Streaming과 Flink를 비교하는 게시물을 작성하는 것을 목표로 한다.

 

Spark 설치

파일 다운로드

  • Apache Spark 사이트에서 파일 다운로드
$ wget https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz

설치 경로 설정 & 압축해제

  • 압축해제 및 환경 변수 설정
$ tar -xvf spark-3.5.1-bin-hadoop3.tgz
$ vi ~/.bashrc
==============================================================
# 가장 마지막줄에 추가
export SPARK_HOME=/home/ubuntu/spark/spark-3.5.1-bin-hadoop3
export PATH=$PATH:$SPARK_HOME/bin
wq!
$ source ~/.bashrc
  • 설정 변경
$ cp $SPARK_HOME/conf/spark-defaults.conf.template $SPARK_HOME/conf/spark-defaults.conf
$ cp $SPARK_HOME/conf/spark-env.sh.template $SPARK_HOME/conf/spark-env.sh
$ cp $SPARK_HOME/conf/spark-env.sh $SPARK_HOME/conf/log4j.properties

$ vi $SPARK_HOME/conf/log4j.properties
==============================================================
...
# 가장 마지막 줄에 추가
log4j.rootCategory=WARN, console
wq!
  • spark 실행 테스트
ubuntu@ip-172-31-9-181:~$ pyspark
Python 3.8.10 (default, Nov 22 2023, 10:22:35)
[GCC 9.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/07 10:41:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.5.1
      /_/

Using Python version 3.8.10 (default, Nov 22 2023 10:22:35)
Spark context Web UI available at http://ip-172-31-9-181.ap-northeast-2.compute.internal:4040
Spark context available as 'sc' (master = local[*], app id = local-1712486488674).
SparkSession available as 'spark'.
>>>

Spark Streaming 사용방법

Spark Streaming은 따로 설치하는 것이 아닌 Spark설치시 내부 모듈로 함께 설치되므로 간단하게 import하여 사용이 가능하다.

사용예시

더보기
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession \
        .builder \
        .appName("readfromkafka") \
        .master("local[*]") \
        .config("es.index.auto.create", "true") \
        .getOrCreate()

kafka_bootstrap_servers = "172.31.9.181:9092"
topic_name = "worldmoney.filebeat.spark"

print("=================================Start Read Kafka Topic=================================")
df = spark \
  .readStream\
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
  .option("subscribe", topic_name) \
  .load()
print("=================================End Read Kafka Topic=================================")

print(df.printSchema())

query = df.writeStream \
        .outputMode("append") \
        .queryName("writing_to_es") \
        .format("org.elasticsearch.spark.sql") \
        .option("checkpointLocation", "/tmp/") \
        .option("es.resource", "index/worldmoney.filebeat.spark") \
        .option("es.nodes", "localhost:9200") \
        .start()
  • 실행 코드
    • kafka dependency :  org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1
    • elasticsearch dependency : org.elasticsearch:elasticsearch-spark-20_2.12:8.13.4
$ nc -lk 9999
$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,org.elasticsearch:elasticsearch-spark-20_2.12:8.13.4 kafka_to_es.py

 

댓글