- 인쇄
- PDF
Kafka Connect로 데이터 파이프라인 구축
- 인쇄
- PDF
이 가이드는 서버에 Kafka Connect를 설치한 후 Cloud Data Streaming Service를 활용하여 MySQL의 변경 사항을 Elasticsearch에 적용하는 방법을 설명합니다.
이 가이드는 사용자의 이해를 돕기 위해 작성된 예제입니다. 이 가이드에서 사용된 connector 및 confluent-hub에 대한 라이선스는 confluent 사에 있으며 confluent 사의 라이선스 정책을 따릅니다. 네이버클라우드는 해당 connector 및 confluent-hub를 직접 제공하는 사업자가 아니며, 사용자는 connector 및 confluent-hub 사용 여부를 직접 결정할 수 있습니다.
사전 작업
이 가이드를 수행하기 전에 이용 신청을 완료해야 하는 작업은 다음과 같습니다.
- 서버 및 VPC 생성
- Cloud DB for MySQL 서버 생성
- Cloud Data Streaming Service 클러스터 생성
- Search Engine Service 클러스터 생성
네트워크 설정
Step 1. Cloud DB for MySQL 설정
Cloud DB for MySQL의 DB 계정을 추가하는 방법은 다음과 같습니다.
- 네이버 클라우드 플랫폼 콘솔에서 Services > Database > Cloud DB for MySQL > DB Server 메뉴를 차례대로 클릭해 주십시오.
- DB Server를 선택한 후 DB 관리 > DB User 관리를 클릭해 주십시오.
- 필요한 정보를 입력한 후 [DB User 추가] 버튼을 클릭해 주십시오.
- DB 계정이 추가되면 [저장] 버튼을 클릭해 주십시오
Cloud DB for MySQL에 대한 자세한 설명은 Cloud DB for MySQL 사용 가이드를 참고해 주십시오.
Step 2. ACG 설정
Cloud Data Streaming Service 브로커 노드의 9092번 포트로 접근할 수 있도록 ACG를 설정하는 방법은 다음과 같습니다.
- 네이버 클라우드 플랫폼 콘솔에서 Services > Compute > Server > ACG 메뉴를 차례대로 클릭해 주십시오.
- ACG 목록에서 'cdss-b-xxxxx'을 선택한 후 [ACG 설정] 버튼을 클릭해 주십시오.
- ACG 규칙을 입력한 후 [추가] 버튼을 클릭해 주십시오.
- 프로토콜: TCP
- 접근 소스: Kafka Connect가 실행될 서버의 IP
- 허용 포트: 9092
- [적용] 버튼을 클릭해 주십시오.
Search Engine Service 매니저 노드의 9200번 포트로 접근할 수 있도록 ACG를 설정하는 방법은 다음과 같습니다.
- 네이버 클라우드 플랫폼 콘솔에서 Services > Compute > Server > ACG 메뉴를 차례대로 클릭해 주십시오.
- ACG 목록에서 'searchengine-m-xxxxx'을 선택한 후 [ACG 설정] 버튼을 클릭해 주십시오.
- ACG 규칙을 입력한 후 [추가] 버튼을 클릭해 주십시오.
- 프로토콜: TCP
- 접근 소스: Kafka Connect가 실행될 서버의 IP
- 허용 포트: 9200
서버에 Kafka Connect 설치
서버에 Kafka Connect를 설치하는 방법을 설명합니다.
Step 1. Java 설치
Java를 설치하는 방법은 다음과 같습니다.
다음 명령어를 입력하여 yum 업데이트를 진행해 주십시오.
yum update
다음 명령어를 입력하여 java-1.8.0-openjdk-devel.x86_64를 설치해 주십시오.
yum install java-1.8.0-openjdk-devel.x86_64
다음 명령어를 입력하여 정상적으로 설치가 되었는지 확인해 주십시오.
java -version javac -version
Step 2. Kafka Connect 설치
Kafka Connect를 설치하는 방법은 다음과 같습니다.
다음 명령어를 입력하여 서버의
/root
경로에 Kafka Connect를 다운로드해 주십시오.curl -O http://packages.confluent.io/archive/7.0/confluent-community-7.0.1.tar.gz
다음 명령어를 입력하여 다운로드한 파일의 압축을 해제해 주십시오.
tar -zxvf confluent-community-7.0.1.tar.gz
다음 명령어를 입력하여 구동 전
properties
파일을 수정해 주십시오.- Cloud Data Streaming Service의 브로커 노드 정보를 참고하여 properties 파일의 ‘bootstrap.servers’에 ip 목록을 추가합니다.
vi /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
bootstrap.servers=10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092
Step 3. Confluent Hub 설치
Confluent Hub는 Kafka Connect에서 사용되는 다양한 플러그인을 간편하게 다운로드할 수 있는 저장소입니다. 지원되는 플러그인의 전체 목록은 Confluent에서 제공하는 Confluent Connector Portfolio를 확인해 주십시오.
다음 명령어를 입력하여
/root
경로에 새로운 폴더를 만든 후 해당 폴더로 이동해 주십시오.- 예제에서는 'confluent-hub'라는 이름의 폴더를 생성합니다.
mkdir confluent-hub cd confluent-hub
다음 명령어를 입력하여 현재 경로(
/root/confluent-hub
)에 Confluent Hub를 다운로드해 주십시오.curl -O http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz
다음 명령어를 입력하여 다운로드한 파일의 압축을 해제해 주십시오.
tar -zxvf confluent-hub-client-latest.tar.gz
다음 명령어를 입력하여 현재 경로(
/root/confluent-hub
)에 추후 플러그인이 저장될 폴더를 생성해 주십시오.- 예제에서는 'plugins'라는 이름의 폴더를 생성합니다.
mkdir plugins
다음 명령어를 차례대로 입력하여 PATH 환경변수에 압축 해제한 bin 폴더의 경로를 추가해 주십시오.
vi ~/.bashrc
export CONFLUENT_HOME='~/confluent-hub' export PATH=$PATH:$CONFLUENT_HOME/bin
source ~/.bashrc
Step 4. MySQL Connector 설치
다음 명령어를 입력하여 debezium-connector-mysql을 설치해 주십시오.
--component-dir
은 실제 플러그인이 설치되는 폴더 경로입니다. STEP 3에서 생성한/root/confluent-hub/plugins
로 설정해 주십시오.--worker-configs
는 플러그인 설치 후 적용된 properties 파일의 경로입니다. Step 2에서 수정했던/root/confluent-7.0.1/etc/kafka/connect-distributed.properties
로 설정해 주십시오.
confluent-hub install debezium/debezium-connector-mysql:1.7.0 --component-dir /root/confluent-hub/plugins --worker-configs /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
Step 5. Elasticsearch Connector 설치
다음 명령어를 입력하여 kafka-connect-elasticsearch을 설치해 주십시오.
--component-dir
과--worker-configs
는 STEP 4와 동일하게 적용해 주십시오.
confluent-hub install confluentinc/kafka-connect-elasticsearch:11.1.3 --component-dir /root/confluent-hub/plugins --worker-configs /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
Step 6. Kafka Connect 프로세스 실행
Kafka Connect 프로세스를 실행하는 방법은 다음과 같습니다.
다음 명령어를 입력하여 Kafka Connect 프로세스를 백그라운드로 실행해 주십시오.
/root/confluent-7.0.1/bin/connect-distributed -daemon /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
다음 명령어를 입력하여 프로세스가 정상적으로 작동하는지 확인해 주십시오.
curl localhost:8083
{"version":"7.0.1-ccs","commit":"b7e52413e7cb3e8b","kafka_cluster_id":"m1hLK0L6Qra5TLVy7b_A4A"}
다음 명령어를 입력하여 앞에서 설치한 Connector가 모두 정상적으로 노출되는지 확인해 주십시오.
curl localhost:8083/connector-plugins
[ {"class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type":"sink","version":"11.1.3"}, {"class":"io.debezium.connector.mysql.MySqlConnector","type":"source","version":"1.7.0.Final"}, {"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"}, {"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"}, {"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"} ]
Kafka Connect 사용
Kafka Connect를 사용하는 방법을 설명합니다.
Step 1. MySQL 테이블 생성 및 데이터 추가
MySQL 테이블을 생성하고 데이터를 추가하는 방법은 다음과 같습니다.
- 사용 중인 Cloud DB for MySQL 서버에 접속해 주십시오.
- 다음 명령어를 입력하여 테이블을 생성한 후 데이터를 추가해 주십시오.
- 예제에서는 member라는 이름의 테이블을 생성합니다.
CREATE TABLE IF NOT EXISTS member ( id int NOT NULL PRIMARY KEY, name varchar(100), email varchar(200), department varchar(200) ); INSERT INTO member(id, name, email, department) values (1, 'messi', 'messi@gmail.com', 'A'); INSERT INTO member(id, name, email, department) values (2, 'ronaldo', 'ronaldo@naver.com', 'B'); INSERT INTO member(id, name, email, department) values (3, 'son', 'son@ncloud.com', 'B'); INSERT INTO member(id, name, email, department) values (4, 'park', 'park@yahoo.com', 'B');
Step 2. MySQL Connector 등록
MySQL Connector를 등록하는 방법은 다음과 같습니다.
- Kafka Connect가 설치된 서버로 접속해 주십시오.
- 요청 시 보낼 JSON body를 개인 환경에 맞게 아래 형식대로 입력해 주십시오.
{ "name": "mysql-connector", // connector의 이름 "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", // 사용할 플러그인의 종류 "database.hostname": "db-9c242.vpc-cdb.ntruss.com", // MySQL 서버의 엔드포인트 "database.port": "3306", // MySQL 서버의 포트 "database.user": "kimdong", // MySQL 서버의 user "database.password": "1234", // MySQL 서버 user의 password "database.server.id": "184054", // kafka connect에서 사용되는 MySQL 서버의 uuid "database.server.name": "NCP_MYSQL", // kafka connect에서 사용할 MySQL 서버에 부여하는 이름, 추후 kafka topic의 접두사로 사용됨 "database.whitelist": "test", // kafka connect에서 접근할 MySQL 서버의 데이터베이스 지정 "database.history.kafka.bootstrap.servers": "10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092", // Broker 노드 정보 "database.history.kafka.topic": "this_is_topic", // MySQL 히스토리 변경 내역을 저장할 topic 이름 "snapshot.locking.mode": "none", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter": "org.apache.kafka.connect.json.JsonConverter" } }
- 다음 명령어를 입력하여 MySQL Connector를 등록해 주십시오.
curl -X POST localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{"name":"mysql-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"db-9c242.vpc-cdb.ntruss.com","database.port":"3306","database.user":"kimdong","database.password":"1234","database.server.id":"184054","database.server.name":"NCP_MYSQL","database.whitelist":"test","database.history.kafka.bootstrap.servers":"10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092","database.history.kafka.topic":"this_is_topic","snapshot.locking.mode":"none","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter"}}'
Step 3. Topic 확인
Topic이 생성되었는지 확인하는 방법은 다음과 같습니다.
- Cloud Data Streaming Service에서 제공하는 CMAK에 접속해 주십시오.
- STEP 2에서 설정한 body의 정보대로 Topic이 생성된 것을 확인해 주십시오.
- STEP 1에서 생성한 MySQL의 member 테이블의 변경 정보가 담길 Topic은 'NCP_MYSQL.test.member'입니다.
- STEP 1에서 생성한 MySQL의 member 테이블의 변경 정보가 담길 Topic은 'NCP_MYSQL.test.member'입니다.
Step 4. Elasticsearch Connector 등록
Elasticsearch Connector를 등록하는 방법은 다음과 같습니다.
- Kafka Connect가 설치된 서버로 접속해 주십시오.
- 요청 시 보낼 JSON body를 개인 환경에 맞게 아래 형식대로 입력해 주십시오.
{ "name": "es-connector", // connector의 이름 "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", // 사용할 플러그인의 종류 "connection.url": "http://10.0.100.9:9200", // topic을 가져올 kafka 브로커 노드 "tasks.max": "1", "topics": "NCP_MYSQL.test.member", // 컨슈밍할 topic 이름 "type.name": "_doc", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "true", "value.converter.schemas.enable": "true", "transforms": "extractKey", "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.extractKey.field": "id", // MySQL 테이블에서 사용하는 pk명 "behavior.on.null.values": "IGNORE" } }
- 아래 명령어를 입력하여 Elasticsearch Connector를 등록해 주십시오.
curl -X POST localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{"name":"es-connector","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","connection.url":"http://10.0.100.9:9200","tasks.max":"1","topics":"NCP_MYSQL.test.member","type.name":"_doc","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"true","value.converter.schemas.enable":"true","transforms":"extractKey","transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key","transforms.extractKey.field":"id","behavior.on.null.values":"IGNORE"}}'
Step 5. Elasticsearch 데이터 확인
Elasticsearch 데이터를 확인하는 방법은 다음과 같습니다.
- Search Engine Service에서 제공하는 Kibana의 엔드포인트로 접속해 주십시오.
- 인덱스 목록을 조회하여 ncp_mysql.test.member라는 이름의 인덱스가 생성된 것을 확인해 주십시오.
- ncp_mysql.test.member 인덱스의 내용을 조회해 주십시오.
- ncp_mysql.test.member 인덱스의 특정 도큐먼트를 조회해 주십시오.
- MySQL에서 추가한 데이터는
_source.after
에서 확인할 수 있습니다.
- MySQL에서 추가한 데이터는