Source Connector 생성 하기 ( feat. Topic 자동생성 )

2021. 5. 22. 19:01개발/Kafka

Kafka Connect는 하나의 서비스 형태로 구동되기 때문에 REST방식의 Connector 관리가 가능하다. 

worker 설정파일에 별다른 셋팅을 하지 않았다면 8083포트를 통해서 Connector 생성 요청이 가능하다. 

REST방식을 사용함으로써 프로그램을 멈추지 않고도 상태확인이나, 수정 및 생성이 자유롭다는 장점을 가진다. (시작할때부터 커맨드라인에 사용할 커넥터를 미리 정의할 필요가 없다. )

문서에 따르면 현재 content-type은 application/json 방식만 지원한다.

 

 

Database 셋팅


사용한 DB는 MariaDB이며, Source Connector가 읽어올 user라는 테이블을 생성했다. PK는 id.


Source Connector 설정


REST 방식으로 Source Connector를 생성해보자.

Connector를 생성하기 위해선 아래와 같은 구문을 따르면 된다.

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d [JSON타입의 설정]

POST 방식을 통해 생성요청을 하며 /connectors로 mapping한다. Content-Type으로는 application/json을 셋팅하고 그 뒤에 Connector 생성에 대한 설정을 담은 JSON형태의 구문을 작성하면 된다.

 

Connector에 대한 설정은 아래 사이트를 참고하자.

 

JDBC Connector Source Connector Configuration Properties | Confluent Documentation

JDBC Connector Source Connector Configuration Properties To use this connector, specify the name of the connector class in the connector.class configuration property. connector.class=io.confluent.connect.jdbc.JdbcSourceConnector Connector-specific configur

docs.confluent.io

 

사용할 Connector 옵션을 살펴보자.

connector.class

Connector를 생성하기 위해서 필요한 클래스를 작성한다. 

connector.class : "io.confluent.connect.jdbc.JdbcSourceConnector"

Connection.url

데이터베이스 접근을 위한 주소를 설정한다.

connection.url : "jdbc:mysql://localhost:3306/mydatabase"

Connection.user, Connection.password

데이터베이스 접속을 위한 ID와 Password를 설정한다.

connection.user : "root"
connection.password : "root"

Topic.prefix

Topic 생성시 이름앞에 붙일 접두어. 여기서 작성한 prefix와 테이블명이 Topic의 이름이 된다. ( prefix + 테이블명 ) 

(토픽 생성에 대해선 뒤에 있는 옵션에서 설명.)

Topic.prefix : "jdbc-connector-"

mode

테이블에 변경이 발생했을때 어떤 방식으로 데이터를 poll할지 셋팅한다. bulk를 사용하면 이벤트가 발생한 테이블의 내용을 모두 poll한다. incrementing은 incrementing column을 통해서 신규로 들어온 row를 판단하고, 해당 데이터만 poll해온다. 여기서 주의해야할점은 incrementing모드의 경우에 "삭제(delete)"와 "수정(update)"에 대해선 작동하지 않는다는 점이다. 따라서 수정과 삭제정보도 poll하고 싶다면 shadow테이블을 만들어야 할것이다. 

mode : "incrementing"

incrementing.column.name

incrementing column을 셋팅한다. id라는 컬럼명을 보고 어떤 row부터 poll할지 판단한다. 설정한 column의 타입이 varchar인 경우 에러가 난다.

incrementing.column.name : "id"

poll.interval.ms

테이블에서 새로운 데이터에 대해 데이터를 폴링하는 주기를 설정한다.

poll.interval.ms : 10000

table.whitelist

데이터를 poll할 테이블의 목록을 셋팅한다. 복수개의 테이블에서 데이터를 가져오는경우 콤마(,)를 통해서 작성한다.

table.whitelist: "user"

topic.creation.default.replication.factor 

Source Connector를 실행했을때 Topic이 존재하지 않는다면 Source Connector는 자동으로 Topic을 생성할 수있다. 이때 몇가지 조건이 존재하는데 먼저 worker의 설정파일에서 topic.creation.enable=true 로 셋팅해야한다. ( 사실 default값이 true이므로 따로 설정하지 않아도 된다. ) 이렇게 하면 자동으로 default라는 이름으로 topic create group이 생성되는데 이 그룹은 topic 생성을 담당한다. 아래 작성한 옵션은 default 그룹으로 topic을 자동생성할때 replication factor옵션을 1로 셋팅하겠다는 뜻이다. Topic 자동생성을 위해서 반드시 셋팅되어야한다. 

topic.creation.default.replication.factor : 1

topic.creation.default.partitions

위와 마찬가지로 Topic 자동 생성을 위해 반드시 셋팅되어야 하는 값이다. default 그룹을 통해 topic을 자동생성할때 파티션을 몇개로 셋팅할지 정하는 옵션이다. 

 

추가로 설명하면 사용자가 default이외의 그룹을 만들수 있으며 그룹마다 이런 replication factor나 partition옵션을 셋팅할 수 있다. 그리고 include, exclude 옵션을 통해서 사용자가 생성한 그룹으로 topic을 생성할지 제외할지 선택할 수 있다. 현재는 필요없는 설정이므로 따로 그룹을 만들진 않았다.

topic.creation.default.partitions : 1

 


Source Connector 생성하기


아래와 같이 Source Connector에 대한 설정을 마치고 POST 요청을 통해 Connector를 생성한다. 

Command Line에 Source Connector 생성정보를 모두 작성하는건 너무 귀찮은 일이니 따로 json파일에 작성한다.

 

아래 명령어를 실행한다. 

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d @/home/soojong2/connect/connector/user-source-connector.json

여기서 No suitable driver found for jdbc:mysql: ~~~ 에러가 발생해서 한참 고생했다. 

이 문제의 원인을 파악하기 위해선 Confluent에서 내려받은 JDBC-connector 폴더 하위의 jar파일들을 확인해야한다. 잘보면 mysql connector가 없다.

 

아래 사이트에서 MySQL Connector를 위한 jar파일을 다운받아야한다.

 

MySQL :: Download Connector/J

MySQL Connector/J 8.0 is highly recommended for use with MySQL Server 8.0, 5.7 and 5.6. Please upgrade to MySQL Connector/J 8.0.

dev.mysql.com

사용중인 운영체제에 맞는 deb 패키지 파일을 다운받는다.

다운 받은 deb 패키지의 압축을 풀면 하위에 mysql connector를 위한 jar파일이 있다. 

위 jar파일을 plugin.path 하위에 넣도록하자. jar파일이 모여있는 lib폴더에 넣었다.

 

다시 Source Connector 생성을 위한 요청을 해보자. (그전에 worker를 재실행했다. )

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d @/home/soojong2/connect/connector/user-source-connector.json

이전과 달리 JDBC Driver를 찾지 못한다는 문구가 사라졌으며, user table에 select 쿼리를 날리는 모습을 확인할 수 있다.


데이터 Insert와 Topic 자동생성


먼저 현재 topic 목록을 조회해본다. 아직 jdbc-connector-user topic은 자동으로 생성되지 않았다. 

connect-configs, connect-offsets, connect-status는 worker를 실행하면 자동으로 생성되는 Topic이다.

 

 

데이터를 Insert했다. 그리고 다시 topic의 목록을 확인해보자.

INSERT INTO user VALUES( 1 , 'SooJong' , '100' , 'seoul'); COMMIT;

자동으로 jdbc-connector-user라는 이름의 topic이 생성된것을 확인할 수 있다.

구동중인 worker에서 그 내용을 확인할 수 있다. 

topic이 존재하는지 확인하고 없다면 생성한다

 

Consumer를 실행해보자. Topic에서 성공적으로 데이터를 가져오는 모습을 확인할 수 있다.

 


 

참고 사이트


 

 

Connect REST Interface | Confluent Documentation

Since Kafka Connect is intended to be run as a service, it also supports a REST API for managing connectors. By default this service runs on port 8083. When executed in distributed mode, the REST API will be the primary interface to the cluster. You can ma

docs.confluent.io

 

 

How to use Kafka Connect - Getting Started | Confluent Documentation

How to use Kafka Connect - Getting Started This document provides information about how to get started with Kafka Connect. Before doing anything here, you should read and understand Kafka Connect Concepts. The following topics are covered in this document:

docs.confluent.io

 

 

Customization of Kafka Connect automatic topic creation :: Debezium Documentation

By default, the Kafka broker configuration enables the broker to create topics at runtime if the topics do not already exist. Topics created by the broker cannot be configured with custom properties. If you use a Kafka version earlier than 2.6.0, and you w

debezium.io

 

Auto-creating Debezium Change Data Topics

Debezium is an open source distributed platform for change data capture. Start it up, point it at your databases, and your apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases. Debezium is durable a

debezium.io