- 인쇄
- PDF
Cloud Data Streaming Service로 Spark Streaming 연동
- 인쇄
- PDF
이 가이드는 네이버 클라우드 플랫폼 Cloud Hadoop과 Cloud Data Streaming Service(CDSS)를 연동하는 방법을 소개합니다.
사전 작업
- Cloud Data Streaming Service를 생성해 주십시오.
- Cloud Data Streaming Service 생성에 관한 자세한 내용은 Cloud Data Streaming Service 사용 가이드를 참고해 주십시오.
- Cloud Data Streaming Service 사용하기 위해 VM 생성 및 세팅해 주십시오.
- VM 생성과 세팅에 관한 자세한 내용은 Cloud Data Streaming Service 사용 가이드를 참고해 주십시오.
- Cloud Hadoop 클러스터를 생성해 주십시오.
- Cloud Hadoop 클러스터 생성에 관한 자세한 내용은 Cloud Hadoop 시작 가이드를 참고해 주십시오.
Cloud Hadoop과 Cloud Data Streaming Service는 같은 VPC 내 통신이 가능한 동일 Subnet 으로 생성하는 것을 권장합니다.
- ACG를 설정해 주십시오.
- Cloud Hadoop에서 Cloud Data Streaming Service Broker 노드에 접속하기 위해서 9092 포트를 허용해주어야 합니다.
- Cloud Data Streaming Service의 Broker 노드 ACG 접근 소스에 Cloud Hadoop의 Subnet 대역을 추가해 주십시오.
Zeppelin Notebook에서 CDSS 연동하려면 추가적으로 Cloud Hadoop ACG에 9996 포트를 허용해줘야 합니다.
자세한 내용은 서비스별 UI 접속 및 패스워드 설정 가이드를 참고해 주십시오.
Kafka를 활용한 데이터 전송
- Cloud Data Streaming Service VM에서 Kafka를 실행해 주십시오.
[root@s17e27e0cf6c ~ ]# cd kafka_2.12-2.4.0
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-server-start.sh -daemon config/server.properties
- 토픽을 생성해 주십시오.
- bootstrap-server 뒤에는 broker-list를 넣어주십시오.
# 토픽 생성
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-topics.sh --create --bootstrap-server 172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092 --replication-factor 1 --partitions 1 --topic [topic]
# 생성된 토픽 확인
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-topics.sh --list --bootstrap-server 172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092
broker-list는 Cloud Data Streaming Service > Cluster > Broker 노드 정보에서 확인할 수 있습니다.
- 데이터를 생성해 주십시오.
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-console-producer.sh --broker-list 172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092 --topic [topic]
Kafka 연동
이 가이드에서는 Spark Streaming으로 Kafka 연동하는 두 가지 방법을 설명합니다.
- 엣지 노드에서 CDSS 연동하기
- Zeppelin Notebook에서 CDSS 연동하기
Spark Streaming에 대한 자세한 내용은 Spark Streaming 공식홈페이지를 참고해 주십시오.
엣지노드에서 CDSS 연동
- Cloud Hadoop 엣지 노드에서 Spark를 실행해 주십시오.
# Cloud Hadoop 1.5 이상 버전
[sshuser@e-001-example-pzt-hd ~]$ sudo -u {계정명} spark-shell --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.8
# Cloud Hadoop 1.4 버전
[sshuser@e-001-example-pzt-hd ~]$ sudo -u {계정명} spark-shell --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2
- Spark Streaming을 사용하여 실시간으로 데이터를 읽어주십시오.
> import org.apache.spark.sql.streaming.Trigger
> val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("subscribe", "[topic]").option("startingOffsets", "earliest").load()
> val stream = df.writeStream.trigger(Trigger.ProcessingTime("5 seconds")).outputMode("append").format("console").start().awaitTermination()
일반적인 batch로도 데이터를 읽어올 수 있습니다.
> val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "172.16.0.6:9092,172.16.0.7:9092,172.16.0.8:9092").option("subscribe", "[topic]").option("startingOffsets","earliest").load()
> df.show
- Spark Streaming을 사용하여 Kafka에 데이터를 작성할 수도 있습니다.
Streaming을 하기 전 엣지 노드에서 먼저 checkpoint를 만들어야 합니다.
hdfs dfs -mkdir -p /streaming/checkpointLocation
test 토픽에서 데이터를 읽어오고 읽어온 데이터를 새로운 토픽에 저장해 줍니다.
> val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("subscribe", "test").option("startingOffsets", "earliest").load()
> val ds = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("checkpointLocation","/streaming/checkpointLocation").option("topic", "newtopic").start()
실시간으로 데이터를 처리할 필요가 없을 경우 아래 코드로 간단하게 내용을 저장할 수 있습니다.
> val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "172.16.0.6:9092,172.16.0.7:9092,172.16.0.8:9092").option("subscribe", "test").option("startingOffsets","earliest").load()
> df.selectExpr("key","value").write.format("kafka").option("kafka.bootstrap.servers","172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("topic","newtopic").save()
이제 Kafka에서 newtopic을 확인해 보면 데이터가 들어가 있는 것을 확인할 수 있습니다.
Zeppelin Notebook에서 CDSS 연동
Zeppelin UI에 접속한 후, Interpreter를 클릭해 주십시오.
spark2 하위에 Dependencies를 추가해 주십시오.
- artifact : org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2
- exclude : net.jpountz.lz4:lz4:1.3.0
[Notebook] > Create new note를 클릭한 후, 새 노트북을 생성해 주십시오.
- Default Interpreter는 spark2로 설정해 주십시오.
- Default Interpreter는 spark2로 설정해 주십시오.
Zeppelin Notebook에서 Spark Streaming을 사용하여 실시간으로 데이터를 읽고 쓸 수 있습니다. 코드는 다음과 같습니다.
> import org.apache.spark.sql.streaming.Trigger
> val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("subscribe", "[topic]").option("startingOffsets", "earliest").load()
> val stream = df.writeStream.trigger(Trigger.ProcessingTime("5 seconds")).outputMode("append").format("console").start().awaitTermination()
UDF를 이용해서 Binary로 표현된 데이터를 string으로 변환할 수 있습니다.
아래는 예제 코드입니다.
> import org.apache.spark.sql.functions.udf
> val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "172.16.0.6:9092,172.16.0.7:9092,172.16.0.8:9092").option("subscribe", "test").load()
> val toString = udf((payload: Array[Byte]) => new String(payload))
> val chstring = df.withColumn("value", toStr(df("value")))
> df.show