Confluent의 Kafka Connect Concept

2021. 5. 17. 19:11개발/Kafka

Confluent사에서 제공하는 카프카 커넥트 컨셉 문서를 통해서 Connect에 대해서 배운 내용을 기록합니다.

 

Kafka Connect Concepts | Confluent Documentation

Kafka Connect Concepts Kafka Connect is a framework to stream data into and out of Apache Kafka®. The Confluent Platform ships with several built-in connectors that can be used to stream data to or from commonly used systems such as relational databases o

docs.confluent.io

부족한 부분이나 잘못 설명된 부분이 있다면 댓글로 피드백 해주세요.

 

 

 

Kafka Connect Concept


카프카 커넥트는 Kafka 서버로부터 스트림 데이터를 꺼내거나 넣을 수 있도록 해주는 프레임워크다.

Confluent에서 제공하는 플랫폼에는 관계형DB나 HDFS에서 사용가능한 여러 커넥터를 내장하고 있다. 

 

지금부터 설명하는 내용들은 카프카 커넥트의 핵심이 되는 컨셉이다.

 

이건 다른 문서에 있던 그림이긴한데, Connector와 Task의 관계를 파악하기 좋았다. Connector는 Task를 관리하며 Task는 실제로 데이터를 Source로부터 복사해서 Kafka로 보내거나, Kafka로부터 데이터를 가져오는 역할을한다. 아래 그림은 Source Connector로서 Stream에서 받아온 데이터를 Kafka로 보내는 모습을 나타낸다.

출처 : https://docs.confluent.io/platform/current/connect/devguide.html#connect-devguide

 

 

 

Connectors


커넥터는 Task를 통해 데이터스트리밍을 관리한다. 커넥터는 어디에서 데이터를 복사해서 가져올지 또는 데이터를 어디로 보낼지를 정의한다. 

 

 

커넥터 인스턴스는 카프카와 외부 시스템간의 데이터 복제를 관리하는 역할을 한다. 커넥터 인스턴스의 클래스는 커넥터 플러그인에 정의되어 있다. 

 

위 Document에서 용어 사용에 대해 명확하게 해야한다고 설명하고 있었다. 

현재 대부분의 개발자들이 커넥터 인스턴스와 커넥터 플러그인을 모두 커넥터라고 칭하고 있다는 점이다.

예를들어 "커넥터 설치하기" 같은 경우 여기서 말하는 커넥터는 커넥터 플러그인을 의미한다. 

"커넥터 상태 확인하기" 같은 경우에 말하는 커넥터는 커넥터 인스턴스를 의미한다.

 

커넥터 인스턴스를 사용하기 위해선 클래스가 당연히 필요할것이다. 커넥터 인스턴스 생성을 위한 클래스가 정의된 곳이 커넥터 플러그인. 사용자는 특정 명령어를 통해서 커넥터 인스턴스를 생성한다. 

 

사용자는 Connector에 대해 코드를 작성할 필요가 없지만, 원한다면 새로운 커넥터 플러그인 코드를 작성할 순 있다. 새로운 커넥터 플러그인 코드 작성을 원하는 개발자라면 아래의 workflow를 참고하자. ( 커넥터 클래스와 테스크 클래스를 구축해서 Packaging하면 Connector Plugin이 된다. )

출처 : Kafka Connect Concepts Confluent Documentation


Task


( 이제부터 말하는 커넥터는 모두 커넥터 인스턴스를 의미한다 )

Task는 Kafka로부터 데이터를 어떻게 복사해 올 지 또는 어떻게 데이터를 복사해서 내보낼지를 구현한다. 

실제로 커넥터 내부에서 데이터를 복사하는 작업을 하는것은 Task이다. 각각의 커넥터는 task의 묶음을 관리한다. 

 

커넥터는 하나의 작업을 쪼개 여러 Task로 보낼 수 있기 때문에 몇가지 설정을 통해 병렬처리, 확장가능한 데이터복제 처리들이 가능하다. 

 

Task는 내부적으로 상태를 저장하지 않는다. Task의 상태는 별도의 Topic에 따로 저장한다. Task상태 저장을 위한 토픽은 config.storage.topic과 status.storage.topic이며 설정 파일을 통해 지정이 가능하다.

 

Task는 탄력적이고 확장가능한 파이프라인을 제공하기위해 언제든지 정지,실행, 또는 재실행이 가능하도록한다. ( 아마도 상태를 저장하지 않기 때문에 가능한게 아닐까?? )

 

아래 그림은 source data가 task를 통해 Kafka Server로 들어가는 모습을 나타낸다. 여기서 봐야할것은 internal offsets 정보가 Task내부가 아니라 Local Disk 또는 Kafka의 Topic에 저장되는 모습이다.

출처 : Kafka Connect Concepts Confluent Documentation


Task Rebalancing


Worker는 클러스터내에 있는 Connector를 리밸런싱한다. 각각의 worker는 대략적으로 비슷한 개수의 Task를 가지게 된다. Worker가 죽으면 가지고 있던 Task를 정상 구동중인 Worker로 적절히 분배한다.

 

Worker2가 죽으면 T2,T3 task를 적절히 Reblancing한다.

출처 : Kafka Connect Concepts Confluent Documentation

 


Worker


Connector와 Task의 작업을 스케쥴링하며 관리하는 프로세스를 Worker라고 부른다. 

Worker에는 Standalone과 Distributed라는 2가지 타입이 존재한다.

 

Standalone Worker

하나의 Worker만 존재하는 아주 심플한 모드이다. 따라서 모든 Connector와 Task의 실행을 하나의 Worker가 담당하게 된다. 

단일 프로세스이기 때문에 최소한의 설정만 셋팅하면 된다. 호스트로부터 로그를 수집하는 작업과 같이 명확한 상황에서 사용하기에 좋다. 하지만 standalone모드는 확장이 제한되어있고, 내결함성에 대해서 약점을 가진다.

 

 

Distributed Worker

분산(Distributed) 모드는 Worker의 확장이 가능하고, 자동 내결함성을 가진다. 분산모드에선 동일한 group.id를 통해서 여러개의 worker 실행이 가능하다. 또한, 같은 group.id를 가지는 Worker끼리 커넥터 및 task 실행에 대한 코디네이션을 해주는 기능도 제공한다.

 

예기치 않은 장애에 의해 Worker가 죽더라도, 다른 Worker들이 이를 인지하고 작업을 재분배한다. ( consumer그룹의 리밸런싱과 아주 흡사하게 동작한다 ) 

 

같은 group.id를 가지는 worker들은 같은 connect 클러스트에 존재한다. 

예를들어 worker-a가 group.id가 connect-cluster-a일 때, worker-b도 똑같이 group.id를 connect-cluster-a라고 셋팅해뒀다면, worker-a와 worker-b는 connect-cluster-a라는 이름의 cluster를 구성하게 된다. 

 

분산모드를 통해 3개의 worker를 경우를 그림으로 표현했다. 해당 문서에선 Worker를 Process로 지칭한다.

출처 : https://docs.confluent.io/platform/current/connect/concepts.html


Converters


Converters는 카프카 커넥트 배포에 있어서 필수적이다. 특정 데이터 포맷 형태로 데이터를 읽어오거나 보낼때는 Converters를 사용해야한다.
Task는 Converter를 사용해서 byte 형태의 데이터를 Connect internal data포맷으로 변경할 수 있다. 물론 그 반대의 경우도 가능하다.

 

Confluent는 기본적으로 아래의 Converter를 기본제공한다.

출처 : https://docs.confluent.io/platform/current/connect/concepts.html



Converter는 Connector와 분리되어있기 때문에 재사용하기 좋다. 만약에 Source Connector와 Sink Connector가 모두 Avro Converter를 사용한다면 Avro 1개를 재사용하는 방식으로 구동된다. 

출처 : https://docs.confluent.io/platform/current/connect/concepts.html


Transforms


Connector는 Transforms를 통해 메세지를 간단히 수정하는 것이 가능하다. 
마이너한 데이터 수정이 필요할때 편리하다. 뿐만아니라 여러개의 Transforms을 연결(chain)하는 것도 Connector 설정을 통해 가능하다. ( 다만 다소 복잡한 변환을 수행해야할 경우는 ksqlDB나 Kafka Streams를 사용하라고 한다. )

Transforms는 하나의 데이터를 받아 데이터를 수정해서 내보내는 간결한 기능을 수행한다. 

 

Source Connector에서 Transforms을 사용할 경우를 생각해보자. Kafka Connect는 source data가 첫번째 변환을 수행할 수 있도록 한다. 첫번째 변환이 완료되면 그 다음 변환을 수행하며 Transforms가 끝날때까지 계속 작업을 수행하며, 마지막으로 binary 형태로 변환해서 Kafka 서버로 보낸다. 

Sink Connector에서 Kafka 서버로부터 데이터를 읽은 다음에 sink record형태로 데이터를 먼저 변환하고 셋팅한 Tranforms를 수행하도록 한다


Confluent에서 Cast, Drop, Flatten 등 몇가지 Transform을 제공해준다.

Flatten Transform을 사용하면 아래처럼 다차원 json데이터를 모두 1차원으로 만들어준다.

출처 : https://docs.confluent.io/platform/current/connect/transforms/flatten.html#flatten