- 인쇄
- PDF
Kafka Connect로 Cloud Data Streaming Cluster 교체
- 인쇄
- PDF
별도의 VPC Server를 생성하고 Confluent Replicator를 이용하여 Kafka Cluster간 데이터를 실시간으로 복제하여 Kafka Cluster를 교체하는 방법을 다루고 있습니다.
이 과정은 Cent Linux로 생성된 Cluster를 Rocky Linux가 적용된 Cluster로 변경하는데 도움을 줄 수 있습니다.
사전 작업
이 가이드를 수행하기 전에 확인이 필요한 작업은 다음과 같습니다.
- 기존 Cloud Data Streaming Service Cluster의 BrokerNode 접근정보
- 기존과 동일한 Kafka 버전을 갖는 신규 Cloud Data Streaming Service Cluster의 BrokerNode 접근정보
- 문서의 내용중 기존Cluster 는 A, 신규로 생성한 Cluster는 B라고 정의합니다.
Cluster 전환 순서
신규로 생성한 Cluster로 이전하는 순서를 가이드 합니다.
- 하단의 가이드에 따라 VPC Server생성, Confluent Replicator설치/적용, ACG설정을 완료 합니다.
- 신규로 생성한 Cluster에 기존 Cluster 의 Topic 과 데이터가 정상적으로 복제되는지 확인/검증 합니다.
- Consumer를 신규로 생성한 Cluster로 BootstrapUrl 을 변경합니다.
- Producer를 신규로 생성한 Cluster로 BootstrapUrl 을 변경합니다.
- 기존의 Cluster의 Kafka Topic 에 Queue가 완전히 처리되었는지 확인/검증합니다.
- Confluent Replicator가 설치된 VPC Server와 기존 클러스터는 정지 후 반납합니다.
VPC Server 생성
STEP 1. 서버 생성
Confluent Replicator를 설치할 서버를 생성합니다.
- 네이버 클라우드 플랫폼 콘솔에서 Compute > Server 메뉴를 차례대로 클릭합니다.
- [서버생성] 버튼을 클릭하여 서버를 생성합니다. (서버생성 가이드)
- 서버생성시 Subnet 의 종류는 Cluster의 BrokerNode와 동일하거나 같은 타입(Private)의 Subnet으로 생성합니다.
- 생성된 서버의 SSH로 접속하여 Confluent Replicator 설치를 준비합니다. (서버접속 가이드)
- private subnet에 생성된 VPC Server의 경우 NATG/W가 추가로 필요하며, 라우팅 설정이 필요 합니다.
네트워크 ACG 설정
STEP 1. ACG 설정
신규로 생성된 VPC Server는 Cloud Data Streaming Service Cluster의 BrokerNode의 9092Port로 접근할 수 있어야 합니다.
이를 위해 ACG를 추가 하는 방법을 설명합니다.
- 네이버 클라우드 플랫폼 콘솔에서 Service > BigData&Analytics > Cloud Data Streaming Service 메뉴를 차례대로 클릭합니다.
- Cluster 를 선택 후 브로커노드ACG 옆의 새창열기 버튼을 클릭합니다.
- 브로커노드ACG 의 이름과 또는 ACGID가 동일한 ACG를 선택 후, 상단의 [ACG 설정] 버튼을 클릭합니다.
- inboud 탭에서 아래의 정보를 입력 후 [추가] 버튼을 클릭합니다.
- 프로토콜: TCP
- 접근소스: VPC Server의 IP 또는 VPC Server가 속한 ACG의 이름
- 허용포트: 9092
- 하단의 [적용] 버튼을 클릭합니다.
동일한 방법으로 현재 이용중인 클러스터와 변경할 클러스터 모두 동일하게 ACG를 등록합니다.
Confluent Replicator 설치
Server에 Confluent Replicator를 설치하는 예제를 소개합니다.
설치과정에서 Kakfa Broker Node에 대한 접속정보 확인이 필요합니다.
Confluent Replicator는 라이센스에 따라 이용 가능여부에 대한 확인이 필요할 수 있으며, 네이버클라우드에서는 이용방법만을 가이드 합니다.
STEP 1. java 설치
- 다음 명령어를 입력하여 jdk를 설치해 주십시오
최소 1.8또는 java11이상을 권장 합니다.
sudo yum install java-devel -y
STEP 2. Confluent Replicator설치
Confluent Replicator는 Confluent Platform의 일부로 제공됩니다.
생성한 VPC Server에 Confluent Platform을 설치하고, Replicator 커넥터를 다운로드합니다.
Confluent Platform 다운로드
Confluent Platform은 Confluent 다운로드 페이지에서 다운로드할 수 있습니다.
curl -O https://packages.confluent.io/archive/7.5/confluent-7.5.0.tar.gz
다운로드한 파일의 압축을 해제 후, 디렉토리로 이동합니다.
tar -xzf confluent-7.5.0.tar.gz
cd confluent-7.5.0
Confluent Platform 설치 디렉토리 경로를 환경 변수로 설정합니다. (~/.bashrc 또는 ~/.bash_profile 수정)
export CONFLUENT_HOME=/path/to/confluent-7.5.0
export PATH=$CONFLUENT_HOME/bin:$PATH
변경 사항을 적용합니다
source ~/.bashrc
또는
source ~/.bash_profile
Confluent Hub를 통한 Replicator 설치
Replicator의 최신버전 커넥터를 설치합니다
confluent-hub install confluentinc/kafka-connect-replicator:latest
STEP 3. Kafka 클러스터 구성 확인 및 설정
VPC Server에 설치된 Replicator가 두 클러스터의 브로커에 접근할 수 있는지 확인합니다.
Replicator에서 Kafka 브로커가 실행 중인 포트에 접근할 수 있어야 하며, ACG설정으로 접근이 허용되어 있어야 합니다.
Kafka 토픽 확인 명령어 예시
kafka-topics --bootstrap-server A_CLUSTER_BROKER_1_IP:9092,A_CLUSTER_BROKER_2_IP:9092 --list
kafka-topics --bootstrap-server B_CLUSTER_BROKER_1_IP:9092,B_CLUSTER_BROKER_2_IP:9092 --list
Topic 지정하여 옮기는 경우 B Cluster는 A Cluster 와 동일한 Topic을 가지고 있어야 합니다.
아래 예제는 A Cluster에 "example-topic" 이라는 이름을 가진 Topic이 있는 경우는 예제로 합니다.
- 만약 자동으로 생성하여 전체를 옮기자 하는 경우 아래 STEP 4. Replicator 설정 파일 작성을 참고 하시기 바랍니다.
kafka-topics --bootstrap-server B_CLUSTER_BROKER_1_IP:9092 --create --topic example-topic --partitions 3 --replication-factor 2
STEP 4. Replicator 설정 파일 작성
중간 서버에서 Kafka Replicator 설정 파일(replicator.properties)을 작성합니다. 이 파일은 두 클러스터 간 데이터를 복제하는데 필요한 설정을 포함합니다.
/path/to/confluent-7.5.0/etc/kafka-connect-replicator/quickstart-replicator.properties 파일을 아래의 내용으로 수정 합니다.
# 커넥터의 이름 설정
name=replicator-connector
connector.class=io.confluent.connect.replicator.ReplicatorSourceConnector
key.converter=io.confluent.connect.replicator.util.ByteArrayConverter
value.converter=io.confluent.connect.replicator.util.ByteArrayConverter
header.converter=io.confluent.connect.replicator.util.ByteArrayConverter
tasks.max=4
# 소스 클러스터의 연결 정보
src.kafka.bootstrap.servers=A_CLUSTER_BROKER_1_IP:9092,A_CLUSTER_BROKER_2_IP:9092
# 대상 클러스터의 연결 정보
dest.kafka.bootstrap.servers=B_CLUSTER_BROKER_1_IP:9092,B_CLUSTER_BROKER_2_IP:9092
# 대상 클러스터에 토픽이 존재하지 않는 경우 자동생성할지 여부
topic.auto.create=true
# 복제할 토픽명(여러개일 경우 topic1|topic2|topic3 가능)
topic.whitelist=example-topic
# 복제하지 않고 제외할 토픽명
#topic.blacklist=
# 복제되는 토픽의 토픽명 재정의가 필요한 경우 (기존토픽명.replica 로 생성됨)
topic.rename.format=${topic}.replica
# 토픽 복제 실패시 재시도 간격 설정(10초)
topic.create.backoff.ms=10000
# 오프셋 관리에 대한 설정
offset.storage.topic=replicator-offsets
config.storage.topic=replicator-configs
status.storage.topic=replicator-status
STEP 5. connector 및 Replicator 설정 파일 작성
설정 파일을 사용하여 Kafka Connect를 standalone모드로 실행하기 위해 connect-standalone.properties 파일을 수정합니다.
bootstrap.servers=B_CLUSTER_BROKER_1_IP:9092,B_CLUSTER_BROKER_2_IP:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/share/java,/root/confluent-7.5.0/share/confluent-hub-components
STEP 6. Replicator 실행
변경된 설정 파일을 적용한 kafka Connect를 이용하여 Replicator를 standalone 모드로 실행합니다.
** 실행 즉시 복제가 시작됩니다.**
connect-standalone $CONFLUENT_HOME/etc/kafka/connect-standalone.properties $CONFLUENT_HOME/etc/kafka-connect-replicator/quickstart-replicator.properties
로그파일은 $CONFLUENT_HOME/logs 디렉토리에 저장됩니다.
STEP 7. 문제 해결 및 추가 설정
- 접근 오류 : VPC Server 와 Cluster의 브로커노드ACG에 9092 Port가 허용되어 있는지 확인합니다.
- 데이터 지연 문제 : 복제 속도가 느린 경우 consumer.threads 값을 늘리거나 브로커의 성능을 점검합니다.
- 여러 토픽 복제 : topic.whitelist 설정에 정규 표현식을 사용하여 여러 토픽을 복제하거나, 특정 토픽을 제외하려면 topic.blacklist 옵션을 사용할 수 있습니다
topic.whitelist=topic1|topic2|topic3
topic.blacklist=topic4|topic5|topic6