- 인쇄
- PDF
Cloud Data Streaming Service 활용
- 인쇄
- PDF
CMAK에 접속하여 Topic을 생성하고 Topic에 데이터를 저장하는 방법, Topic에 저장된 데이터를 Cloud Data Streaming Service Cluster에 전송하는 방법과 Apache Kafka 클라이언트와 Apache Kafka의 Broker 노드 간 통신 구간을 암호화하는 방법을 설명합니다.
CMAK 접속
CMAK는 Apache Kafka 클러스터를 관리하는 플랫폼입니다. CMAK에 접속하여 Topic을 생성하고 Topic의 Partition 개수를 변경하거나 Topic의 데이터 보관 주기를 변경할 수 있습니다.
CMAK에 접속하기 전에 Public 도메인을 활성화해야 합니다.
CMAK에 접속하는 방법은 다음과 같습니다.
- 네이버 클라우드 플랫폼 콘솔에서 Services > Big Data & Analytics > Cloud Data Streaming Service > Cluster 메뉴를 차례대로 클릭해 주십시오.
- 클러스터의 체크 박스를 클릭한 후 클러스터 관리 > CMAK 접속 도메인 설정 변경 버튼을 클릭해 주십시오.
- 팝업 창이 나타나면 정보를 확인한 후 [확인] 버튼을 클릭해 주십시오.
- Public 도메인이 활성화됩니다.
- 클러스터의 체크 박스를 클릭한 후 클러스터 관리 > CMAK 접속을 클릭해 주십시오.
- CMAK 접속 메뉴가 활성화되지 않으면 Public 도메인이 활성화되지 않은 것입니다. 1~3의 절차를 다시 수행해 주십시오.
- CMAK 접속을 위한 사전 작업 창이 나타나면 CMAK 바로가기 버튼을 클릭해 주십시오.
- 로그인 팝업 창이 나타나면 클러스터 생성 시 설정한 사용자 ID와 Password를 입력해 주십시오.
- ID를 잊어버린 경우, 클러스터 관리 > CMAK 접속 비밀번호 초기화를 클릭하면 ID를 확인할 수 있습니다.
- 클러스터 계정 이름을 클릭해 주십시오.
- Cluster Summary에서 Topic과 Broker 노드에 대한 정보를 확인해 주십시오.
Topic에 대한 정보
Broker 노드에 대한 정보
Topic 생성
- CMAK 페이지에 접속해 주십시오.
- CMAK 페이지에서 Topic > Create 메뉴를 클릭해 주십시오.
- 토픽 정보를 입력해 주십시오.
- Topic: Topic의 이름 입력
- Partitions: Topic의 Partitions 개수 입력
- Replication Factor: Topic의 Replication 개수 입력
- [Create] 버튼을 클릭해 주십시오.
상세 설정 값에 대한 설명은 Apach Kafka Documentation을 참조해 주십시오.
- Topic을 생성한 이후에는 Partitions 개수는 늘리는 것만 가능하며 이미 추가된 Partitions 개수를 줄이는 것을 불가능합니다.
- 데이터 Consume 시 순서 보장이 필요한 경우, Partitions 개수는 1로 설정해야 합니다. 단, Partitions이 1개인 경우 하나의 브로커 노드에 모든 데이터가 저장되므로 브로커 노드의 Disk 사용량 관리에 주의해야 합니다.
- Topic을 생성한 이후에는 Replication 개수를 변경할 수 없습니다. 안정적인 서비스를 운영하기 위해 최소 2개 이상의 Replication를 적용하는 것을 권장합니다.
Topic 정보 확인
① Topic Summary: Topic의 정보 확인
② Operations: Topic 삭제, Partition 추가, 재분배, 설정 변경
③ Partitions by Broker: 각 브로커 노드에 대한 생성된 Partition 정보 확인
④ Partition Information: 각 Partition의 Leader와 Replication이 어떤 브로커 노드에 위치하는지 확인
Topic의 데이터 보관 주기 변경
Topic의 데이터 보관 주기를 조정하는 방법은 다음과 같습니다.
- CMAK 접속을 참고하여 CMAK에 접속해 주십시오.
- 클러스터 이름을 클릭해 주십시오.
- Topic > List를 클릭해 주십시오.
- Topic 목록에서 데이터 보관주기를 변경할 Topic의 이름을 클릭해 주십시오.
- Operations > [Update Config] 버튼을 클릭해 주십시오.
- retention.ms 값을 수정해 주십시오.
- 1시간으로 설정하고 싶은 경우, 3600000 값을 입력합니다.
- n시간으로 설정하고 싶은 경우, 3600000 * n의 결과값을 입력합니다.
- [Update Config] 버튼을 클릭해 주십시오.
Topic의 Partition 개수 변경
CMAK에서 생성된 Topic의 Partition 개수를 늘릴 수 있습니다. Partition은 추가하는 것만 가능하며 기존에 할당된 Partition은 삭제할 수 없습니다.
Topic의 Partition 개수를 늘리는 방법은 다음과 같습니다.
- CMAK 접속을 참고하여 CMAK에 접속해 주십시오.
- 클러스터 이름을 클릭해 주십시오.
- Topic > List를 클릭해 주십시오.
- Topic 목록에서 데이터 보관주기를 변경할 Topic의 이름을 클릭해 주십시오.
- Operations > [Add Partitions] 버튼을 클릭해 주십시오.
- Partitions 항목에 Partition 개수를 입력해 주십시오.
- 입력된 수보다 큰 수만 입력할 수 있습니다.
- 입력된 수보다 큰 수만 입력할 수 있습니다.
- [Add Paritions] 버튼을 클릭해 주십시오.
Producer/Consumer 구현
다음과 같은 구조로 Producer VM, Consumer VM을 생성하여 Topic에 데이터를 저장하고 전송하는 방법을 설명합니다.
Apache Kafka, Java, Python을 활용하여 데이터를 전송하고 저장하는 방법을 설명합니다.
이 가이드는 CentOS 7.3을 기준으로 설명합니다.
Producer VM, Consumer VM 생성
Producer VM과 Consumer VM을 생성하는 방법은 다음과 같습니다.
- Server 시작을 참조하여 서버를 생성해 주십시오.
- 서버 생성 시 Cloud Data Streaming Service 클러스터를 생성한 VPC와 동일한 VPC를 선택해 주십시오.
- Subnet은 Public Subnet을 선택해 주십시오.
- 해당 VM에 공인 IP를 할당해야 접속할 수 있습니다.
Java 설치
다음 명령어를 입력하여 Java를 설치해 주십시오.
yum install java -y
Apache Kafka 설치
다음 명령어를 입력하여 Apache Kafka를 설치해 주십시오.
wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz
# 압축을 풀어줍니다.
tar -zxvf kafka_2.12-2.4.0.tgz
Broker 노드 정보 확인
Broker 노드 정보를 확인하는 방법은 다음과 같습니다.
- 네이버 클라우드 플랫폼 콘솔에서 Services > Big Data & Analytics > Cloud Data Streaming Service > Cluster 메뉴를 차례대로 클릭해 주십시오.
- 클러스터 상세 정보에서 Broker 노드 정보의 [상세 보기] 버튼을 클릭해 주십시오.
- Broker 노드 정보 창이 나타나면 정보를 확인해 주십시오.
- PlainText: Broker 노드와 암호화 없이 통신하기 위한 정보
- TLS: Broker 노드와 암호화 통신하기 위한 정보
- hosts 파일 정보: 암호화 통신에 이용되는 hosts 파일 수정 시 필요한 정보
Broker 노드의 ACG 설정
Broker 노드의 ACG 규칙을 설정하는 방법은 다음과 같습니다.
- 네이버 클라우드 플랫폼 콘솔에서 Services > Big Data & Analytics > Cloud Data Streaming Service > Cluster 메뉴를 차례대로 클릭해 주십시오.
- 클러스터 상세 정보에서 Broker 노드 ACG의 을 클릭해 주십시오.
- ACG 목록에서 Broker 노드의 ACG를 선택한 후 [설정] 버튼을 클릭해 주십시오.
- ACG 규칙을 입력한 후 [추가] 버튼을 클릭해 주십시오.
- 프로토콜: TCP
- 접근 소스: Producer VM, Consumer VM의 비공인 IP 입력
- 허용 포트: 9092-9093 입력
- ACG에 대한 메모 입력
- 규칙이 추가되었는지 확인한 후 [적용] 버튼을 클릭해 주십시오.
- 해당 규칙이 ACG에 적용됩니다.
Apache Kafka 활용하여 데이터 전송
이전 단계에서 생성한 Producer VM에 접속한 후 다음 명령어를 입력해 주십시오.
cd kafka_2.12-2.4.0 ./bin/kafka-console-producer.sh --broker-list [broker.list] --topic [topic] # [broker.list]에 앞에서 확인했던 브로커 노드 정보의 PlainText 복사본을 입력합니다. # [topic]에 CMAK에서 생성한 Topic을 입력합니다. # 예시) ./bin/kafka-console-producer.sh --broker-list 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092 --topic test
전송하고 싶은 메시지를 입력해 주십시오.
- Broker 노드에 해당 메시지가 저장됩니다.
- 종료를 원할 경우, [Ctrl + C] 키를 눌러 주십시오.
이전 단계에서 생성한 Consumer VM에 접속한 후 아래 명령어를 실행해 주십시오.
- --from-beginning 명령어를 사용할 경우, 해당 Topic에 대한 데이터를 처음부터 모두 조회합니다.
- --from-beginning 명령어를 사용하지 않을 경우, 데이터를 조회한 순간부터 입력되는 데이터만 조회합니다.
cd kafka_2.12-2.4.0 ./bin/kafka-console-consumer.sh --bootstrap-server [bootstrap.server] --topic [topic] --from-beginning # [bootstrap.server]에 앞에서 확인했던 브로커 노드 정보의 PlainText 복사본을 입력합니다. # [topic]에 앞 단계 Producer VM에서 입력한 Topic을 입력합니다. # 예시) ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092 --topic test --from-beginning
Java 활용하여 데이터 전송
Java를 활용하여 데이터를 전송하는 방법을 설명합니다.
이 가이드는 IntelliJ IDEA를 사용하는 것을 기준으로 설명합니다.
프로젝트 생성
프로젝트를 생성하는 방법은 다음과 같습니다.
- IntelliJ IDEA를 실행한 후, File > New > Project를 클릭해 주십시오.
- Maven Archetype을 선택한 후 Project 정보를 입력하고 [Create] 버튼을 클릭하여 프로젝트를 생성해 주십시오.
pom.xml 파일 수정
프로젝트 종속성, Java 버전, 패키징 메서드를 정의하는 pom.xml
파일을 수정해 주십시오.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<groupId>org.example</groupId>
<artifactId>maventest</artifactId>
<version>1.0-SNAPSHOT</version>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<!-- Apache Kafka version in Cloud Data Streaming Service -->
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.21</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>KafkaMain</mainClass>
</transformer>
</transformers>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
사용자 환경에 따라 pom.xml 파일이 상이할 수 있습니다.
KafkaMain.java 작성
Java Application 실행 시 argument로 produce/consume 여부, Topic, Broker lists를 전달합니다.
public class KafkaMain {
public static void main(String[] args) throws IOException {
String topicName = args[1];
String brokers = args[2];
switch(args[0]){
case "produce":
Producer.produce(brokers, topicName);
break;
case "consume":
Consumer.consume(brokers, topicName);
break;
default:
System.out.println("Wrong arguments");
break;
}
System.exit(0);
}
}
Producer.java 작성
Producer.java
파일을 작성해 주십시오. 0~99의 숫자를 전송하는 예시는 다음과 같습니다.
public class Producer {
public static void produce(String brokers, String topicName) throws IOException {
// Create Producer
KafkaProducer<String, String> producer;
// Configure
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", brokers);
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(properties);
for(int i=0;i<100;i++){
ProducerRecord record = new ProducerRecord<String, String>(topicName, Integer.toString(i));
producer.send(record);
}
producer.close();
}
}
Consumer.java 작성
Consumer.java
파일을 작성해 주십시오.
public class Consumer {
public static int consume(String brokers, String topicName) {
// Create Consumer
KafkaConsumer<String, String> consumer;
// Configure
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", brokers);
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("group.id", "consumer_group");
properties.setProperty("auto.offset.reset", "earliest");
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList(topicName));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(500); // wait for 500ms
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
System.out.println(record.value());
}
}
}
}
jar 파일 빌드 및 실행
- 작성한 Java code를 git에 저장한 후, VM에서 git clone 명령어를 입력하여 해당 code를 다운로드해 주십시오.
git clone 자신의 git repository
- 해당 Java Application을 빌드하려면 Maven을 설치해 주십시오.
yum install maven -y
- 다운로드한 Java code가 있는 폴더로 이동한 후, jar 파일을 빌드해 주십시오.
jar 파일을 빌드하면 target 폴더와 target 폴더 내에 jar 파일이 생성됩니다.cd kafkatest mvn clean package
- target 폴더로 이동한 후, jar 파일을 실행해 주십시오.
cd target # 데이터 전송하기 java -jar kafkatest-1.0-SNAPSHOT.jar produce [topic] [broker.list] # [topic]에 CMAK에서 만든 Topic을 입력합니다. # [broker.list]에 앞에서 확인했던 브로커 노드 정보의 PlainText 복사본을 입력합니다. 예시) java -jar kafkatest-1.0-SNAPSHOT.jar produce test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092 # 데이터 조회하기 java -jar kafkatest-1.0-SNAPSHOT.jar consume [topic] [broker.list] # [topic]에 CMAK에서 만든 Topic을 입력합니다. # [broker.list]에 앞에서 확인했던 브로커 노드 정보의 PlainText 복사본을 입력합니다. 예시) java -jar kafkatest-1.0-SNAPSHOT.jar consume test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092
Python 활용하여 데이터 전송
이 가이드에서는 Python 2.7.5 버전을 기준으로 설명합니다.
Python에서 Kafka를 활용하려면 kafka-python package
를 설치해 주십시오.
# pip 설치
curl -LO https://bootstrap.pypa.io/pip/2.7/get-pip.py
python get-pip.py
# kafka-python package 설치
pip install kafka-python
KafkaMain.py 작성
KafkaMain.py
파일을 작성해 주십시오
import sys
from kafka import KafkaProducer, KafkaConsumer
from json import dumps
import time
def produce(topicName, brokerLists):
producer = KafkaProducer(bootstrap_servers=brokerLists,
value_serializer=lambda x:
dumps(x).encode('utf-8'))
for i in range(100):
producer.send(topicName, i)
def consume(topicName, brokerLists):
consumer = KafkaConsumer(topicName, bootstrap_servers=brokerLists,
group_id="test")
for msg in consumer:
print(msg)
action=sys.argv[1]
topicName=sys.argv[2]
brokerLists=sys.argv[3].split(',')
if action == 'produce':
produce(topicName, brokerLists)
elif action == 'consume':
consume(topicName, brokerLists)
else:
print('wrong arguments')
KafkaMain.py 파일 실행하여 produce 수행
KafkaMain.py
파일을 실행해 주십시오. 0~99의 숫자를 전송하는 예시입니다.
python KafkaMain.py produce [topic] [broker.list]
# [topic]에 CMAK에서 생성한 Topic을 입력합니다.
# [broker.list]에 앞에서 확인했던 Broker 노드 정보의 PlainText 복사본을 입력합니다.
# 예시) python KafkaMain.py produce test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092
KafkaMain.py 파일을 실행하여 consume 수행
KafkaMain.py
파일을 실행하여
python KafkaMain.py consume [topic] [broker.list]
# [topic]에 CMAK에서 생성한 Topic을 입력합니다.
# [broker.list]에 앞에서 확인했던 Broker 노드 정보의 PlainText 복사본을 입력합니다.
# 예시) python KafkaMain.py consume test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092
통신 구간 암호화
Apache Kafka 클라이언트와 Apache Kafka의 Broker 노드 간 통신 구간을 암호화하는 방법을 설명합니다.
전체 프로세스는 다음과 같습니다. 1~3의 과정은 클러스터 생성 시 자동으로 실행됩니다.
- 매니저 노드에서 자체 인증서를 생성합니다.
- 각 Broker 노드에서 인증서를 생성한 후 인증서 서명 요청을 생성합니다.
- 인증서 서명 요청에 대해 매니저 노드가 서명합니다.
- 클라이언트에서는 인증서를 다운로드한 후 인증서에 대한 정보를 갖고 있는 TrustStore를 생성합니다.
- 암호화 통신에 대한 설정 파일을 작성합니다.
- hosts 파일을 수정합니다.
인증서 다운로드
인증서를 다운로드하는 방법은 다음과 같습니다.
- 네이버 클라우드 플랫폼 콘솔에서 Services > Big Data & Analytics > Cloud Data Streaming Service > Cluster 메뉴를 차례대로 클릭해 주십시오.
- 클러스터 상세 정보에서 인증서 관리 항목의 [다운로드] 버튼을 클릭해 주십시오.
- 팝업 창이 나타나면 [확인] 버튼을 클릭해 주십시오.
- 인증서가 다운로드됩니다.
- 다운로드한 인증서 파일을 Producer, Consumer VM으로 복사해 주십시오.
- Producer, Consumer VM의
/root
경로에 ca-cert라는 이름으로 저장합니다.
- Producer, Consumer VM의
Truststore 생성
인증서 정보를 저장하는 TrustStore을 생성하는 방법은 다음과 같습니다.
- Truststore를 생성하려면 다음 명령어를 입력해 주십시오.
keytool -keystore kafka.client.truststore.jks -alias mytruststore -import -file ca-cert
- keystore 비밀번호를 입력해 주십시오.
- Enter keystore password: Password 입력
- Re-enter new password: Password 다시 입력
- Trust this certificate? [no]: 문구가 나타나면 'yes'를 입력해 주십시오.
- ls 명령어를 입력하여
kafka.client.truststore.jks
파일이 생성되었는지 확인해 주십시오.
암호화 설정 파일 작성
client-auth.properties
파일을 다음과 같이 작성해 주십시오.
[password]에는kafka.client.truststore.jks
파일 생성 시 설정한 비밀번호를 입력합니다.
# /root/kafka_2.12-2.4.0 폴더에 암호화 설정 파일을 생성합니다.
cd kafka_2.12-2.4.0
vi client-auth.properties
security.protocol=SSL
ssl.truststore.location=/root/kafka.client.truststore.jks
ssl.truststore.password=[password]
hosts 파일 수정
host 파일은 리눅스에서 DNS 서버보다 먼저 호스트명을 IP로 변환해 주는 파일입니다. 암호화 통신을 하려면 hosts 파일(/etc/hosts)을 수정해야 합니다.
/etc/hosts
파일을 실행해 주십시오.
vi /etc/hosts
Broker 노드 정보의 hosts 파일 정보 복사본을 추가해 주십시오.
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.2.24 yeakafka2-d-2am
192.168.2.25 yeakafka2-d-2an
192.168.2.26 yeakafka2-d-2ao
통신 구간 암호화하여 데이터 전송
Producer VM에서 다음과 같은 명령어를 실행해 주십시오.
전송하고 싶은 메시지를 입력하면 브로커 노드에 해당 메시지가 저장됩니다. 종료하려면 [Ctrl] + [C] 키를 눌러 주십시오.
./bin/kafka-console-producer.sh --broker-list [broker.list] --topic [topic] --producer.config client-auth.properties
# [broker.list]에 앞에서 확인했던 브로커 노드 정보의 TLS 복사본을 입력합니다.
# [topic]에 CMAK에서 생성한 Topic을 입력합니다.
# 예시) ./bin/kafka-console-producer.sh --broker-list yeakafka2-d-2am:9093,yeakafka2-d-2an:9093,yeakafka2-d-2ao:9093 --topic test --producer.config client-auth.properties
통신 구간 암호화하여 저장된 데이터 조회
Consumer VM에서 다음과 같은 명령어를 실행해 주십시오.
cd kafka_2.12-2.4.0
./bin/kafka-console-consumer.sh --bootstrap-server [bootstrap.server] --topic [topic] --consumer.config client-auth.properties
# [bootstrap.server]에 앞에서 확인했던 브로커 노드 정보의 TLS 복사본을 입력합니다.
# [topic]에 앞 단계 Producer VM에서 입력한 Topic을 입력합니다.
# 예시) ./bin/kafka-console-consumer.sh --bootstrap-server yeakafka2-d-2am:9093,yeakafka2-d-2an:9093,yeakafka2-d-2ao:9093 --topic test --consumer.config client-auth.properties
Kafka Connect로 데이터 파이프라인 구축
Kafka Connect로 데이터 파이프라인을 구축하는 방법은 Kafka Connect로 데이터 파이프라인 구축을 참조해 주십시오.