2022. 3. 24. 23:07ㆍ개발/Kafka
고승범님의 [실전 카프카 개발부터 운영까지]에는 Producer의 동작을 Java로 구현한 예제가 있다. 다만 여기서 실제 동작하는 모습은 보여주지 않아 직접 실행해보기로 했다. Producer의 구조나 작동원리는 책에 잘 설명되어 있으니 생략!
테스트를 위한 Topic과 Consumer 생성하기
아래 명령어를 통해 peter-basic01이라는 이름을 가지는 Topic을 생성한다.
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server peter-kafka01.foo.bar:9092 --create --topic peter-basic01 --partitions 1 --replication-factor 3
Topic은 partition 1개로 구성되어 있으며, 클러스터에 같은 Topic을 3개 만든다. (원본1개 + 복제 2개)
peter-basic01을 바라보는 Consumer를 생성한다.
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server peter-kafka01.foo.bar:9092 --topic peter-basic01
Topic의 응답을 받지 않는 Producer 구현하기
코드를 작성하기 위해 Gradle Project를 하나 만든다. Kafka 사용을 위한 Dependency를 추가한다.
여기서 한가지 일을 더 해줘야하는데 Build시 아래의 jar 작업을 추가로 수행할 수 있도록 한다. 작업을 추가한 이유는 Gradle Build시 Fat Jar(Dependency까지 포함된 Jar)를 만들고, Jar파일 실행을위한 Main 클래스의 위치를 알려주기 위함이다.
프로젝트 설정은 여기까지하고 이제 Producer 코드를 작성한다.
아래 코드는 Producer가 Topic으로 Record를 전송하고 난뒤 Topic이 정상적으로 받았는지 확인하지 않는다. Kafka가 신뢰성이 높다지만 이런 방식은 운영에서 사용하기엔 위험하다.
Properties 타입의 인스턴스에 Broker의 정보와 Key,Value에 대한 Serializer를 정의한다.
KafkaProducer의 생성자로 설정을 주입하므로서 Producer를 생성한다. 이후에 3개의 Record를 Topic으로 전송한다.
package com.soojong.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class ProducerFireForgot {
public static void main(String[] args) {
Properties props = new Properties();
// Broker 정보를 정의한다.
props.put("bootstrap.servers","peter-kafka01.foo.bar:9092,peter-kafka02.foo.bar:9092,peter-kafka02.foo.bar:9092");
// Record의 key와 value는 문자이기 때문에 전송시 byte로 변환해야한다.
// 각각 serializer로 kafka에서 제공하는 StringSerializer를 사용하도록 한다.
props.put("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer" , "org.apache.kafka.common.serialization.StringSerializer");
// 위의 설정값을 가지는 Producer를 생성한다.
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
for(int i=0; i<3; i++){
// Producer가 보낼 Record를 생성한다.
ProducerRecord<String, String> record = new ProducerRecord<>("peter-basic01",
"Hello This is Kafka Record - " + i);
// Producer가 Record를 보낸다.
producer.send(record);
}
} catch (Exception e){
e.printStackTrace();
}finally {
producer.close();
}
}
}
소스코드 작성을 완료했다면 Gradle Build를 수행한다. Git Bash를 사용해 Jar파일을 실행서버로 전송한다. Git Bash는 윈도우 환경에서 Unix기반의 Bash명령어를 사용할 수 있도록해주는 Shell이다. (아무래도 Window Prompt는 명령어 자체가 익숙하지 않다보니 사용하지 않게된다.)
Git Bash의 scp 명령어를 사용해서 실행 서버로 Jar 파일을 전송했다.
scp -i [공개키] [보낼파일] [타겟서버계정]@[타겟서버주소]:[저장위치]
Jar파일을 실행해서 peter-basic01로 3개의 Record를 전송한다.
Consumer에서 성공적으로 3건의 데이터를 pull한다.
Topic의 응답을 받는 Producer (Synchronous 방식) 구현하기
Producer가 Topic으로 Record를 송신하고 성공여부를 확인하는 Java 코드를 작성한다.
코드는 첫번째 예제와 같은데 producer의 get 메소드를 통해 Topic으로부터 응답을 기다리는 로직이 추가되었다. 전송이 실패한 Record가 있으면 확인후 재전송을 하면되기 때문에 이전 방식 보다는 훨씬 더 신뢰성 있는 운영이 가능하다.
get메소드는 RecordMetadata 타입의 데이터를 반환하는데 여기에는 Topic의 Partition 번호, Offset 등이 저장되어 있다.
package com.soojong.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
public class ProducerSync {
public static void main(String[] args) {
Properties props = new Properties();
// Broker 정보를 정의한다.
props.put("bootstrap.servers","peter-kafka01.foo.bar:9092,peter-kafka02.foo.bar:9092,peter-kafka02.foo.bar:9092");
// Record의 key와 value는 문자이기 때문에 전송시 byte로 변환해야한다.
// 각각 serializer로 kafka에서 제공하는 StringSerializer를 사용하도록 한다.
props.put("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer" , "org.apache.kafka.common.serialization.StringSerializer");
// 위의 설정값을 가지는 Producer를 생성한다.
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
for(int i=0; i<3; i++){
// Producer가 보낼 Record를 생성한다.
ProducerRecord<String, String> record = new ProducerRecord<>("peter-basic01",
"Hello This is Kafka Record - " + i);
// Producer는 Record를 전송하고 Broker로부터 응답을 기다린다. Broker에서 에러가 발생하지 않으면 metadata를 얻는다.
RecordMetadata metadata = producer.send(record).get();
// 출력
System.out.printf("Topic : %s , Partition : %d , Offset : %d, Key : %s, Received Message :%s\n",
metadata.topic(),metadata.partition(),metadata.offset(),record.key(),record.value());
}
} catch (Exception e){
e.printStackTrace();
}finally {
producer.close();
}
}
}
위 코드를 실행하면 아래와 같은 결과를 볼 수 있다.
Topic의 응답을 받는 Producer (Asynchronous 방식) 구현하기
Topic의 응답을 받되 이를 비동기 방식으로 처리하는 코드를 작성한다. 전체적인 코드는 이전과 크게 차이가 없는데 send메소드의 2번째 인자로 콜백 Class의 인스턴스를 넘겨주기만 하면된다.
Producer는 Topic으로 Record를 전송하고 응답이 올때까지 기다리지 않고 바로 다음 Record를 Topic으로 보낸다. 첫번째 Record에 대한 Topic의 응답 처리는 콜백 Class인 PeterProducerCallback에 맡긴다.
package com.soojong.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
public class ProducerAsync {
public static void main(String[] args) {
Properties props = new Properties();
// Broker 정보를 정의한다.
props.put("bootstrap.servers","peter-kafka01.foo.bar:9092,peter-kafka02.foo.bar:9092,peter-kafka02.foo.bar:9092");
// Record의 key와 value는 문자이기 때문에 전송시 byte로 변환해야한다.
// 각각 serializer로 kafka에서 제공하는 StringSerializer를 사용하도록 한다.
props.put("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer" , "org.apache.kafka.common.serialization.StringSerializer");
// 위의 설정값을 가지는 Producer를 생성한다.
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
for(int i=0; i<3; i++){
// Producer가 보낼 Record를 생성한다.
ProducerRecord<String, String> record = new ProducerRecord<>("peter-basic01","Hello This is Async Kafka Record - " + i);
// Producer는 Record를 전송하고 결과값을 콜백 인스턴스가 처리하도록 한다. (비동기방식)
producer.send(record, new PeterProducerCallback(record));
}
} catch (Exception e){
e.printStackTrace();
}finally {
producer.close();
}
}
}
Producer가 사용할 Callback 클래스를 만들기 위해선 org.apache.kafka.clients.producer.Callback 인터페이스를 구현하면된다. Callback 인터페이스에는 onCompletion 메소드가 있는데 여기서 동기방식에서 봤던 RecordMetadata를 그대로 사용할 수 있다.
package com.soojong.kafka;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class PeterProducerCallback implements Callback {
private ProducerRecord<String,String> record;
public PeterProducerCallback(ProducerRecord<String, String> record) {
this.record = record;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null){
e.printStackTrace();
} else {
// 출력
System.out.printf("Topic : %s , Partition : %d , Offset : %d, Key : %s, Received Message :%s\n",metadata.topic(),metadata.partition(),metadata.offset(),record.key(),record.value());
}
}
}
코드 실행시 정삭적으로 동작하는 모습을 확인할 수 있다. Topic의 응답을 기다리지 않기때문에 동기방식 보다 빠르게 Record 송신이 가능하다. ( 순서가 중요한 Record라면 비동기 방식 사용시 Exception이 발생했을때 후처리가 조금 피곤하지 않을까 하는 생각도 든다. )
참고자료
Gradle Build시 Dependency를 포함하여 Jar 파일 생성하기.
Producer의 내부 동작방식을 설명한 Article
'개발 > Kafka' 카테고리의 다른 글
[Kafka] Consumer의 3가지 수신 방법 + Segment 삭제 (0) | 2022.03.28 |
---|---|
Source Connector Offset 초기화 하기 (0) | 2021.05.27 |
Source Connector 생성 하기 ( feat. Topic 자동생성 ) (0) | 2021.05.22 |
Confluent Hub Client설치, JDBC Connector 구동 (0) | 2021.05.22 |
Confluent의 Kafka Connect Concept (0) | 2021.05.17 |