Kafka Connect로 데이터 파이프라인 구축
    • PDF

    Kafka Connect로 데이터 파이프라인 구축

    • PDF

    기사 요약

    이 가이드는 서버에 Kafka Connect를 설치한 후 Cloud Data Streaming Service를 활용하여 MySQL의 변경 사항을 Elasticsearch에 적용하는 방법을 설명합니다.

    참고

    이 가이드는 사용자의 이해를 돕기 위해 작성된 예제입니다. 이 가이드에서 사용된 connector 및 confluent-hub에 대한 라이선스는 confluent 사에 있으며 confluent 사의 라이선스 정책을 따릅니다. 네이버클라우드는 해당 connector 및 confluent-hub를 직접 제공하는 사업자가 아니며, 사용자는 connector 및 confluent-hub 사용 여부를 직접 결정할 수 있습니다.

    사전 작업

    이 가이드를 수행하기 전에 이용 신청을 완료해야 하는 작업은 다음과 같습니다.

    • 서버 및 VPC 생성
    • Cloud DB for MySQL 서버 생성
    • Cloud Data Streaming Service 클러스터 생성
    • Search Engine Service 클러스터 생성

    네트워크 설정

    Step 1. Cloud DB for MySQL 설정

    Cloud DB for MySQL의 DB 계정을 추가하는 방법은 다음과 같습니다.

    1. 네이버 클라우드 플랫폼 콘솔에서 Services > Database > Cloud DB for MySQL > DB Server 메뉴를 차례대로 클릭해 주십시오.
    2. DB Server를 선택한 후 DB 관리 > DB User 관리를 클릭해 주십시오.
    3. 필요한 정보를 입력한 후 [DB User 추가] 버튼을 클릭해 주십시오.
    4. DB 계정이 추가되면 [저장] 버튼을 클릭해 주십시오
      cdss-5-2_ko
    참고

    Cloud DB for MySQL에 대한 자세한 설명은 Cloud DB for MySQL 사용 가이드를 참고해 주십시오.

    Step 2. ACG 설정

    Cloud Data Streaming Service 브로커 노드의 9092번 포트로 접근할 수 있도록 ACG를 설정하는 방법은 다음과 같습니다.

    1. 네이버 클라우드 플랫폼 콘솔에서 Services > Compute > Server > ACG 메뉴를 차례대로 클릭해 주십시오.
    2. ACG 목록에서 'cdss-b-xxxxx'을 선택한 후 [ACG 설정] 버튼을 클릭해 주십시오.
    3. ACG 규칙을 입력한 후 [추가] 버튼을 클릭해 주십시오.
      cdss-5-4_ko
      • 프로토콜: TCP
      • 접근 소스: Kafka Connect가 실행될 서버의 IP
      • 허용 포트: 9092
    4. [적용] 버튼을 클릭해 주십시오.

    Search Engine Service 매니저 노드의 9200번 포트로 접근할 수 있도록 ACG를 설정하는 방법은 다음과 같습니다.

    1. 네이버 클라우드 플랫폼 콘솔에서 Services > Compute > Server > ACG 메뉴를 차례대로 클릭해 주십시오.
    2. ACG 목록에서 'searchengine-m-xxxxx'을 선택한 후 [ACG 설정] 버튼을 클릭해 주십시오.
    3. ACG 규칙을 입력한 후 [추가] 버튼을 클릭해 주십시오.
      cdss-5-6_ko
      • 프로토콜: TCP
      • 접근 소스: Kafka Connect가 실행될 서버의 IP
      • 허용 포트: 9200

    서버에 Kafka Connect 설치

    서버에 Kafka Connect를 설치하는 방법을 설명합니다.

    Step 1. Java 설치

    Java를 설치하는 방법은 다음과 같습니다.

    1. 다음 명령어를 입력하여 yum 업데이트를 진행해 주십시오.

      yum update
      
    2. 다음 명령어를 입력하여 java-1.8.0-openjdk-devel.x86_64를 설치해 주십시오.

      yum install java-1.8.0-openjdk-devel.x86_64
      
    3. 다음 명령어를 입력하여 정상적으로 설치가 되었는지 확인해 주십시오.

      java -version
      javac -version
      

    Step 2. Kafka Connect 설치

    Kafka Connect를 설치하는 방법은 다음과 같습니다.

    1. 다음 명령어를 입력하여 서버의 /root 경로에 Kafka Connect를 다운로드해 주십시오.

      curl -O http://packages.confluent.io/archive/7.0/confluent-community-7.0.1.tar.gz
      
    2. 다음 명령어를 입력하여 다운로드한 파일의 압축을 해제해 주십시오.

      tar -zxvf confluent-community-7.0.1.tar.gz
      
    3. 다음 명령어를 입력하여 구동 전 properties 파일을 수정해 주십시오.

      • Cloud Data Streaming Service의 브로커 노드 정보를 참고하여 properties 파일의 ‘bootstrap.servers’에 ip 목록을 추가합니다.
      vi /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
      
      bootstrap.servers=10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092
      

    Step 3. Confluent Hub 설치

    Confluent Hub는 Kafka Connect에서 사용되는 다양한 플러그인을 간편하게 다운로드할 수 있는 저장소입니다. 지원되는 플러그인의 전체 목록은 Confluent에서 제공하는 Confluent Connector Portfolio를 확인해 주십시오.

    1. 다음 명령어를 입력하여 /root 경로에 새로운 폴더를 만든 후 해당 폴더로 이동해 주십시오.

      • 예제에서는 'confluent-hub'라는 이름의 폴더를 생성합니다.
      mkdir confluent-hub
      cd confluent-hub
      
    2. 다음 명령어를 입력하여 현재 경로(/root/confluent-hub)에 Confluent Hub를 다운로드해 주십시오.

      curl -O http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz
      
    3. 다음 명령어를 입력하여 다운로드한 파일의 압축을 해제해 주십시오.

      tar -zxvf confluent-hub-client-latest.tar.gz
      
    4. 다음 명령어를 입력하여 현재 경로(/root/confluent-hub)에 추후 플러그인이 저장될 폴더를 생성해 주십시오.

      • 예제에서는 'plugins'라는 이름의 폴더를 생성합니다.
      mkdir plugins
      
    5. 다음 명령어를 차례대로 입력하여 PATH 환경변수에 압축 해제한 bin 폴더의 경로를 추가해 주십시오.

      vi ~/.bashrc
      
      export CONFLUENT_HOME='~/confluent-hub'
      export PATH=$PATH:$CONFLUENT_HOME/bin
      
      source ~/.bashrc
      

    Step 4. MySQL Connector 설치

    다음 명령어를 입력하여 debezium-connector-mysql을 설치해 주십시오.

    • --component-dir은 실제 플러그인이 설치되는 폴더 경로입니다. STEP 3에서 생성한 /root/confluent-hub/plugins로 설정해 주십시오.
    • --worker-configs는 플러그인 설치 후 적용된 properties 파일의 경로입니다. Step 2에서 수정했던 /root/confluent-7.0.1/etc/kafka/connect-distributed.properties로 설정해 주십시오.
    confluent-hub install debezium/debezium-connector-mysql:1.7.0 --component-dir /root/confluent-hub/plugins --worker-configs /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
    

    Step 5. Elasticsearch Connector 설치

    다음 명령어를 입력하여 kafka-connect-elasticsearch을 설치해 주십시오.

    • --component-dir--worker-configs는 STEP 4와 동일하게 적용해 주십시오.
    confluent-hub install confluentinc/kafka-connect-elasticsearch:11.1.3 --component-dir /root/confluent-hub/plugins --worker-configs /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
    

    Step 6. Kafka Connect 프로세스 실행

    Kafka Connect 프로세스를 실행하는 방법은 다음과 같습니다.

    1. 다음 명령어를 입력하여 Kafka Connect 프로세스를 백그라운드로 실행해 주십시오.

      /root/confluent-7.0.1/bin/connect-distributed -daemon /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
      
    2. 다음 명령어를 입력하여 프로세스가 정상적으로 작동하는지 확인해 주십시오.

      curl localhost:8083
      
      {"version":"7.0.1-ccs","commit":"b7e52413e7cb3e8b","kafka_cluster_id":"m1hLK0L6Qra5TLVy7b_A4A"}
      
    3. 다음 명령어를 입력하여 앞에서 설치한 Connector가 모두 정상적으로 노출되는지 확인해 주십시오.

      curl localhost:8083/connector-plugins
      
      [
         {"class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type":"sink","version":"11.1.3"}, 
         {"class":"io.debezium.connector.mysql.MySqlConnector","type":"source","version":"1.7.0.Final"},
         {"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},
         {"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},
         {"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}
      ]
      

    Kafka Connect 사용

    Kafka Connect를 사용하는 방법을 설명합니다.

    Step 1. MySQL 테이블 생성 및 데이터 추가

    MySQL 테이블을 생성하고 데이터를 추가하는 방법은 다음과 같습니다.

    1. 사용 중인 Cloud DB for MySQL 서버에 접속해 주십시오.
    2. 다음 명령어를 입력하여 테이블을 생성한 후 데이터를 추가해 주십시오.
      • 예제에서는 member라는 이름의 테이블을 생성합니다.
      CREATE TABLE IF NOT EXISTS member (
       id int NOT NULL PRIMARY KEY,
       name varchar(100),
       email varchar(200),
       department varchar(200)
      );
      
      INSERT INTO member(id, name, email, department) values (1, 'messi', 'messi@gmail.com', 'A');
      INSERT INTO member(id, name, email, department) values (2, 'ronaldo', 'ronaldo@naver.com', 'B');
      INSERT INTO member(id, name, email, department) values (3, 'son', 'son@ncloud.com', 'B');
      INSERT INTO member(id, name, email, department) values (4, 'park', 'park@yahoo.com', 'B');
      

    Step 2. MySQL Connector 등록

    MySQL Connector를 등록하는 방법은 다음과 같습니다.

    1. Kafka Connect가 설치된 서버로 접속해 주십시오.
    2. 요청 시 보낼 JSON body를 개인 환경에 맞게 아래 형식대로 입력해 주십시오.
      {
         "name": "mysql-connector", // connector의 이름
         "config": {
           "connector.class": "io.debezium.connector.mysql.MySqlConnector", // 사용할 플러그인의 종류
           "database.hostname": "db-9c242.vpc-cdb.ntruss.com", // MySQL 서버의 엔드포인트
           "database.port": "3306", // MySQL 서버의 포트
           "database.user": "kimdong", // MySQL 서버의 user
           "database.password": "1234", // MySQL 서버 user의 password
           "database.server.id": "184054", // kafka connect에서 사용되는 MySQL 서버의 uuid
           "database.server.name": "NCP_MYSQL", // kafka connect에서 사용할 MySQL 서버에 부여하는 이름, 추후 kafka topic의 접두사로 사용됨 
           "database.whitelist": "test", // kafka connect에서 접근할 MySQL 서버의 데이터베이스 지정
           "database.history.kafka.bootstrap.servers": "10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092", // Broker 노드 정보
           "database.history.kafka.topic": "this_is_topic", // MySQL 히스토리 변경 내역을 저장할 topic 이름 
           "snapshot.locking.mode": "none",
           "value.converter": "org.apache.kafka.connect.json.JsonConverter",
           "key.converter": "org.apache.kafka.connect.json.JsonConverter"
         }
      }
      
    3. 다음 명령어를 입력하여 MySQL Connector를 등록해 주십시오.
      curl -X POST localhost:8083/connectors \
         -H "Content-Type: application/json" \
         -d '{"name":"mysql-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"db-9c242.vpc-cdb.ntruss.com","database.port":"3306","database.user":"kimdong","database.password":"1234","database.server.id":"184054","database.server.name":"NCP_MYSQL","database.whitelist":"test","database.history.kafka.bootstrap.servers":"10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092","database.history.kafka.topic":"this_is_topic","snapshot.locking.mode":"none","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter"}}' 
      
      
      

    Step 3. Topic 확인

    Topic이 생성되었는지 확인하는 방법은 다음과 같습니다.

    1. Cloud Data Streaming Service에서 제공하는 CMAK에 접속해 주십시오.
    2. STEP 2에서 설정한 body의 정보대로 Topic이 생성된 것을 확인해 주십시오.
      • STEP 1에서 생성한 MySQL의 member 테이블의 변경 정보가 담길 Topic은 'NCP_MYSQL.test.member'입니다.
        cdss-5-7_ko

    Step 4. Elasticsearch Connector 등록

    Elasticsearch Connector를 등록하는 방법은 다음과 같습니다.

    1. Kafka Connect가 설치된 서버로 접속해 주십시오.
    2. 요청 시 보낼 JSON body를 개인 환경에 맞게 아래 형식대로 입력해 주십시오.
      {
         "name": "es-connector", // connector의 이름
         "config": {
           "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", // 사용할 플러그인의 종류
           "connection.url": "http://10.0.100.9:9200", // topic을 가져올 kafka 브로커 노드
           "tasks.max": "1",
           "topics": "NCP_MYSQL.test.member", // 컨슈밍할 topic 이름
           "type.name": "_doc",
           "value.converter": "org.apache.kafka.connect.json.JsonConverter",
           "key.converter": "org.apache.kafka.connect.json.JsonConverter",
           "key.converter.schemas.enable": "true",
           "value.converter.schemas.enable": "true",
           "transforms": "extractKey",
           "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
           "transforms.extractKey.field": "id", // MySQL 테이블에서 사용하는 pk명
           "behavior.on.null.values": "IGNORE"
         }
      }
      
    3. 아래 명령어를 입력하여 Elasticsearch Connector를 등록해 주십시오.
      curl -X POST localhost:8083/connectors \
         -H "Content-Type: application/json" \
         -d '{"name":"es-connector","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","connection.url":"http://10.0.100.9:9200","tasks.max":"1","topics":"NCP_MYSQL.test.member","type.name":"_doc","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"true","value.converter.schemas.enable":"true","transforms":"extractKey","transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key","transforms.extractKey.field":"id","behavior.on.null.values":"IGNORE"}}'
      

    Step 5. Elasticsearch 데이터 확인

    Elasticsearch 데이터를 확인하는 방법은 다음과 같습니다.

    1. Search Engine Service에서 제공하는 Kibana의 엔드포인트로 접속해 주십시오.
    2. 인덱스 목록을 조회하여 ncp_mysql.test.member라는 이름의 인덱스가 생성된 것을 확인해 주십시오.
      cdss-5-8_ko
    3. ncp_mysql.test.member 인덱스의 내용을 조회해 주십시오.
      cdss-5-9_ko
    4. ncp_mysql.test.member 인덱스의 특정 도큐먼트를 조회해 주십시오.
      • MySQL에서 추가한 데이터는 _source.after에서 확인할 수 있습니다.
        cdss-5-10_ko

    이 문서가 도움이 되었습니까?

    Changing your password will log you out immediately. Use the new password to log back in.
    First name must have atleast 2 characters. Numbers and special characters are not allowed.
    Last name must have atleast 1 characters. Numbers and special characters are not allowed.
    Enter a valid email
    Enter a valid password
    Your profile has been successfully updated.