Kafka 를 사용하다보면 __TypeId__ 이라는 Header 를 마주하는 경우가 있다.
이 Header 를 보았다면 아마도 JsonSerializer 를 사용하는 Kafka Producer 에서 생성된 메세지의 Header 를 보았을 가능성이 매우 높다.
이 __TypeId__ 라는 헤더는 메세지를 보낼 때의 데이터 객체 타입 이름을 가지고 있다. 아래처럼 Value 타입이 Object 인 JsonSerializer 를 사용하는 Kafka Producer 에서 send 메서드를 통해 메세지를 보낼 때, 메세지 파라미터의 데이터 타입에 따라 해당 데이터의 타입 이름이 들어가게 된다.
@Override
public ListenableFuture<SendResult<String, Object>> sendQueue(@NonNull final PostKafkaProxyParam params) {
return kafkaObjectToJsonTemplate.send(params.getTopic(), "asdfvasdfv"); // java.lang.String
}
@Override
public ListenableFuture<SendResult<String, Object>> sendQueue(@NonNull final PostKafkaProxyParam params) {
HashMap<String, Object> obj = new Gson().fromJson(params.getMessage(), new TypeToken<HashMap<String, Object>>(){}.getType());
return kafkaObjectToJsonTemplate.send(params.getTopic(), obj); // java.util.HashMap
}
메세지를 보내는 과정에서 DefaultJackson2JavaTypeMapper 클래스의 fromJavaType 이라는 메서드를 호출하게 되는데 이를 잘 살펴보면 해당 Header 에 값을 넣는 코드를 볼 수 있다.
그리고 아래와 같이 바이트로 Header 에 저장한 후 메세지를 보내게 된다.
{
"headers": {
"__ContentTypeId__": "amF2YS5sYW5nLk9iamVjdA==",
"b3": "MWNhODkwY2I0MDZkZjAzNC04NTEzNzI0MmMzNzhkMDQ0LTE=",
"__KeyTypeId__": "amF2YS5sYW5nLk9iamVjdA==",
"__TypeId__": "amF2YS51dGlsLkhhc2hNYXA="
}
}
메세지가 나중에 Consumer 에서 JsonDeserializer 를 통해 Header 를 읽을 때는 AbstractJavaTypeMapper 라는 클래스에서 __TypeId__ 이라는 Header 가 있는지를 확인한다.
만약 여기서 위의 Header 가 존재하면 바이트를 String 으로 변환해서 어떤 데이터 타입의 클래스인지 확인하게 된다.
그 후, Consumer 으로부터 전달받은 메세지에서 위에서 확인한 데이터 타입의 클래스로 Deserialize 를 시도하게 된다.
'프로그래밍 > Spring' 카테고리의 다른 글
[JPA] H2 DDL 초기 테스트 데이터 설정 (0) | 2022.10.01 |
---|---|
[Spring] Spring에서 H2 데이터베이스 조회 시 한글 깨짐 수정 (0) | 2022.10.01 |
[Spring] @ModelAttribute 파라미터에서 사용 방법 및 원리 (0) | 2022.07.30 |
[Spring] JPA 에서 Oracle DB 사용할 때 DB 함수 사용하기 (0) | 2022.04.04 |
[Spring] JPA 와 Mybatis 동시 사용 시 Connection Deadlock 벗어난 이슈 정리 (0) | 2022.04.04 |