CDC 너두 할 수 있어(feat. B2B 알림 서비스에 Kafka CDC 적용하기)

Dec.15.2022 유원영

Backend

"어 이거 CDC 적용하면 딱이겠는데요? 한번 CDC로 해보면 어때요?"

B2B 알림서비스 기획 리뷰 도중 제안받은 의견입니다. 저는 이때까지만 해도 CDC가 무엇인지 잘 모르는 상태였지만, 저 의견 덕분에 ‘그럼 한번 해볼까?’ 하는 생각이 들어 흔쾌히 동의하였고, 그로 인해 B2B 알림 서비스에 CDC를 도입하게 되었습니다. 물론 적용 하는 게 쉽진 않았지만, 개발하면서 겪었던 경험을 여러분과 공유하면서 어떤 상황에서 CDC를 도입해야 하는지, CDC를 도입해서 애플리케이션을 개발하려면 어떻게 해야 하는지, 나아가 CDC를 사용하면서 주의해야 하는 것은 어떤 것들이 있는지에 대해 이야기해 보고자 합니다.

먼저 프로젝트 소개부터, B2B 알림 서비스는 무엇인가요?

B2B 알림 서비스 소개

일단 이 이야기의 배경이 되는 B2B 알림 서비스 프로젝트부터 알아야, 왜 CDC를 도입하게 되었는지 이해하기 쉬울 겁니다. 기존 배민 B2C 고객 서비스에서는 ‘알림센터’라는 코어 시스템을 통해 고객에게 알림을 제공하고 있었지만, 사장님에게 발송되는 알림은 플랫폼이 부재한 관계로 카카오 알림톡으로 발송하고 있었습니다.

이에 따라, 사장님에게 전달되는 전체적인 알림 경험을 파악하기 힘들었고, 알림의 중요도와 종류에 관계없이 동일한 채널로 알림톡이 발송되고 있어, 전체적인 알림톡에 대한 반응이 저해될 뿐만 아니라, 알림톡 발송으로 인한 비용적인 문제가 발생하고 있던 상황이었습니다.

그래서, 알림센터를 활용해 사장님에게 전달되는 메시지를 내부 서비스를 통해 전달함으로써, 내부 서비스 활용도와 사용자 편의성을 향상하고자 진행한 프로젝트가 바로 B2B 알림 서비스 프로젝트입니다. 그 프로젝트 중 첫 번째 단계로 세일즈 매니저에 대한 알림톡을 알림센터내 웹 푸시 알림으로 전환하는 과정에서 CDC를 도입하게 되었습니다.

알림

B2B 알림서비스에 CDC를 도입하게 된 이유

세일즈 매니저에게 발송되는 알림은 다음 경우에 발생합니다.

  • 세일즈 매니저 본인이 만든 업무 요청 건의 상태가 변경되는 경우
  • 세일즈 매니저 본인이 해당 가게의 세일즈 매니저로 설정되어 있는 업무 요청 건의 상태가 변경될 때

기존에는 이러한 요청 건의 상태 변경이 사용자 동작에 의해(요청 반려 버튼을 클릭 등) 발생되는 것이므로 프론트 코드에 알림을 발송하는 코드가 있던 상태였죠.

하지만, 프론트에서 알림을 발생시키는 경우엔 다음과 같은 문제가 있습니다.

  • 네트워크 문제로 알림 발송 누락
  • 알림 발송 이후 요청 건의 상태 변경에 실패하면 실제 데이터와 알림이 맞지 않음

이러한 문제를 해결하고자 알림에 대한 처리를 백엔드에서 할 방법이 필요했습니다. 그래서, 요청 건의 상태가 변경되면 DB에 반영되는 것에 착안하여 변경된 데이터를 감지하는 CDC를 선택하게 된 것이죠. 이런 식으로 CDC는 데이터가 변경되는 시점에 해당 항목을 추적하고, 변경에 대응해야 하는 다른 시스템 및 서비스에 알림을 전송하는 데이터 통합 패턴입니다.

이제 CDC에 대해서도 한번 알아볼까요?

CDC의 개념과 동작 방식, 그리고 Kafka CDC

CDC는 Change Data Capture의 약자로, 앞서 말씀드린 것처럼 소스 시스템에서 데이터가 변경된 것을 감지하여, 타깃 시스템이 변경 작업에 대응하는 작업을 수행하도록 하는 프로세스입니다. 여기서 우리 B2B 알림 서비스의 소스 시스템이 바로 DB였던 것이고, 타깃 시스템이 B2B 알림 서비스였던 것이죠. 이렇게 CDC를 사용한다면, 데이터를 사용하는 모든 시스템에서 일관성을 유지할 수 있다는 장점이 있습니다.

CDC에서 데이터 변경을 감지하는 방법에는 Pull 방식과 Push 방식이 존재합니다.

  • Pull 방식: 타깃 시스템의 주기적인 풀링으로 변경 사항이 있는지 확인하는 방법입니다. 쉽게 구현할 수 있다는 장점이 있지만, 실시간성이 떨어진다는 단점 또한 있습니다.
  • Push 방식: 소스 시스템이 변경이 발생할때마다 타깃 시스템에 알려주는 방법입니다. Pull 방식에 비해 소스 시스템이 많은 작업을 해야 하고, 타깃 시스템에 문제가 발생한다면 변경 이벤트에 누락이 발생할 수 있지만, 실시간성이 뛰어나다는 장점이 있습니다.

이런 두 가지 방법 중, Push 방식에서 이벤트 누락의 단점을 메시지큐인 Kafka를 통해 해결하여 CDC 시스템을 만드는 것이 바로 Kafka CDC입니다. B2B 알림서비스에서는 소스 시스템인 DB였는데요, 이 DB로부터 데이터의 변경 이벤트를 감지해서 Kafka 이벤트를 발행해 주는 것이 바로 Debezium MySQL Connector입니다.

Debezium MySQL Connector 주요 특징

Debezium MySQL Connector는 MySQL의 binlog를 읽어 INSERT, UPDATE, DELETE 연산에 대한 변경 이벤트를 만들어 Kafka 토픽으로 이벤트를 전송해줍니다. binlog를 기반으로 데이터를 수집하기 때문에, DB에서 수행된 모든 이벤트가 안정적으로 수집되고, 이벤트 발행 시 정확한 순서가 보장되죠.

B2B 알림 서비스에서는 어떻게 Kafka CDC를 활용했는지 알아봅시다

사전 준비

일단 Kafka + Kafka Connect + Debezium MySQL Connector가 사용할 수 있는 환경에, 이미 설치되어 있다는 것을 가정하고 진행하겠습니다. 이 과정에서 주의해야 하는 것은 MySQL의 설정입니다. binlog 설정이 제대로 되어 있는지 확인해야 하고, Debezium MySQL Connector가 사용할 DB 계정에 SELECT, RELOAD, SHOW DATABASE, REPLICATION SLAVE, REPLICATION CLIENT 등의 권한이 잘 있나 확인해야 합니다.

Kafka CDC를 활용한 코드 작성하기

일단 먼저 Kafka를 통해 넘어오는 이벤트 레코드를 변환해야 하는데 Debezium MySQL Connector는 Apache Avro를 지원하고, 이를 사용하려면 스키마 레지스트리를 사용해야 하죠. 스키마 레지스트리에 등록된 스키마를 받기 위해서는, 프로젝트의 gradle에 설정을 추가해두면 편합니다.

val schemaRegistry = "http://localhost:8081" // 스키마 레지스트리 주소
val downloadInputs = listOf(
    "schema.data-key",
    "schema.data-value"
)
val avroDestination = "org/main/avro" //avro 스키마가 저장될 프로젝트상의 위치
schemaRegistry {
    url.set(schemaRegistry)
    download {
        // 패턴에 해당하는 서브젝트(스키마)를 다운로드
        downloadInputs.forEach {
            subjectPattern(
                inputPattern = it,
                file = avroDestination
            )
        }
    }
}

받아온 avro 스키마는 대략적으로 다음과 같이 생겼죠.

{
    "type": "record",
    "name": "Envelope",
    "namespace": "schema.data",
    "fields": [
        {
            "name": "before",
            "type": [
                "null",
                {
                    "type": "record",
                    "name": "Value",
                    "fields": [
                        {
                            "name": "id",
                            "type": "long"
                        }
                        // ...
                    ],
                    "connect.name": "schema.data"
                }
            ],
            "default": null
        },
        {
            "name": "after",
            "type": [
                "null",
                "Value"
            ],
            "default": null
        },
        {
            "name": "source",
            "type": {
                "type": "record",
                "name": "Source",
                "namespace": "io.debezium.connector.mysql",
                "fields": [
                    {
                        "name": "version",
                        "type": "string"
                    }
                    // ...
                ],
                "connect.name": "io.debezium.connector.mysql.Source"
            }
        },
        {
            "name": "op",
            "type": "string"
        },
        {
            "name": "ts_ms",
            "type": [
                "null",
                "long"
            ],
            "default": null
        },
        {
            "name": "transaction",
            "type": [
                "null",
                {
                    "type": "record",
                    "name": "ConnectDefault",
                    "namespace": "io.confluent.connect.avro",
                    "fields": [
                        {
                            "name": "id",
                            "type": "string"
                        },
                        {
                            "name": "total_order",
                            "type": "long"
                        },
                        {
                            "name": "data_collection_order",
                            "type": "long"
                        }
                    ]
                }
            ],
            "default": null
        }
    ],
    "connect.name": "schema.Envelope"
}

이 상태에서 빌드를 거치고 나면 Envelop 클래스가 생기는데, 이것을 그대로 활용하는 것보단 적절하게 변환해서 사용하는 게 좋습니다. 마침 우리 프로젝트는 Kotlin이었기 때문에 Kotlin의 확장 함수를 이용해서 변환을 쉽게 할 수 있었죠.

fun Envelop.toBefore(): CdcRecord? {
    val before = this.getBefore() ?: return null

    return CdcRecord(
        //...
    )
}

자, 클래스도 준비되었으니 이제 클래스를 활용해서 본격적인 작업을 시작하기에 앞서, Kafka CDC도 Kafka를 이용해야 하므로 ConsumerConfig를 설정해줘야 하겠죠?

@Configuration
class CdcConsumerConfig {
    @Bean(CDC_CONTAINER_FACTORY)
    fun cdcListenerContainerFactory(
        properties: CdcConsumerProperties,
        @Value("\${spring.kafka.bootstrap-servers}") bootstrapServers: String
    ): KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Envelope>> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, Envelope>()
        factory.consumerFactory = DefaultKafkaConsumerFactory(
            mapOf(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to properties.keyDeserializerClass,
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to properties.valueDeserializerClass,
                ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to properties.enableAutoCommit,
                ConsumerConfig.MAX_POLL_RECORDS_CONFIG to properties.maxPollRecords,
                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to properties.autoOffsetReset,
                // Schema Registry, Avro 관련 설정 필수
                KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG to properties.schemaRegistryUrl,
                KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG to properties.specificAvroReader,
            )
        )
        // ...
    }
}

이벤트를 받아 처리하는 것이니 이벤트 리스너를 만들어야겠죠?

class CdcEventListener(
    private val cdcEventProcessor: List<CdcEventProcessor>,
) {
    private val sinkQueue = Queues.get<List<Envelope>>(4096).get()
    private val sinks = Sinks.many()
        .unicast()
        .onBackpressureBuffer(sinkQueue)
    private lateinit var disposable: Disposable

    @KafkaListener(
        topics = ["\${kafka.cdc.topic}"],
        groupId = "\${kafka.cdc.groupId}",
        containerFactory = CdcConsumerConfig.CDC_CONTAINER_FACTORY,
    )
    fun listen(
        @Payload payloads: List<Envelope?>,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) partition: Int,
        @Header(KafkaHeaders.RECEIVED_TOPIC) topic: String,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) ts: Long,
        acknowledgment: Acknowledgment,
    ) {
        // ... 
    }
}

B2B 알림 서비스에서는, CDC 이벤트의 종류에 따라서 다른 알림을 보내야 했었는데요. 이를 위해서 서로 다른 이벤트의 종류를 받을 이벤트 프로세서와, 각 이벤트 내에서 어떤 알림을 보낼지 결정하는 이벤트 핸들러를 구현하였습니다. 예를 들어 어떤 종류(예: 가게 로고 수정, 가게 소개 수정 등)의 요청 건에 대한 이벤트인지는 이벤트 프로세서로 구분하고, 해당 요청 건에 대한 어떤 이벤트인지(예: 요청이 승인됨, 요청이 반려됨 등)는 이벤트 핸들러로 구분해서 알림을 보내는 구조이죠. 그림으로 나타내면 아래와 같습니다.

이러한 구조를 반영하려면 이벤트 리스너에 아래와 같이 각 이벤트 프로세서에 이벤트를 전달하는 코드가 추가되어야 합니다.

class CdcEventListener(
    private val cdcEventProcessor: List<CdcEventProcessor>,
) {
    // ...
    @PostConstruct
    protected fun init() {
        disposable = sinks.asFlux()
            // ...
            // 이벤트를 처리 하는 과정에서 doAlarm으로 알림 발송
            .doOnNext(::doAlarm)
            // ...
    }

    private fun doAlarm(CdcRecords: List<Envelope> = emptyList()) {
        Flux.fromIterable(CdcRecords)
            .flatMap {
                Mono.fromCallable {
                    // 각각의 이벤트 프로세서에게 이벤트를 처리 하도록 지시
                    cdcEventProcessor.forEach { processor ->
                        try {
                            processor.process(
                                before = it.toBefore(),
                                after = it.toAfter(),
                            )
                        } catch (e: Exception) {
                            log.warn("[CdcEventProcessor] occured exception", e)
                        }
                    }
                }.subscribeOn(Schedulers.boundedElastic())
            }.subscribe()
    }
}

이제 이벤트를 처리하는 이벤트 프로세서의 차례입니다. 이벤트 프로세서에서는 자신이 전달받은 이벤트에 대해서 처리할 수 있는 이벤트인지 확인한 후, 처리할 수 있는 이벤트의 종류라면 자신이 가지고 있는 이벤트 핸들러들에게 처리를 위임합니다.

@Service
class NotificationCenterAlarmFacade(
    private val notificationHandlers: List<NotificationHandler>
) : CdcEventProcessor {

    override fun process(before: CdcRecord?, after: CdcRecord?) {
        log.debug("[알림서비스] process 진입 before = `{}`, after = `{}`", before, after)

        if (after == null) {
            log.info("[알림서비스] 데이터 삭제건에 대해서는 알림서비스 발송처리를 하지 않습니다. before: `{}`", before)
            return
        }

        notificationHandlers.find { it.accept(before = before, after = after) }
            ?.send(record = after)
    }
}

마지막으로, 핸들러는 본인이 처리할 수 있는 이벤트인지 확인하고 알림을 보내는 것으로 B2B 알림 서비스가 동작하게 됩니다.

@Component
class CompleteHandler : NotificationHandler {
    override fun send(record: CdcRecord) {
        // 알림 발송 로직
    }

    override fun accept(before: CdcRecord?, after: CdcRecord?): Boolean {
        // 완료 이벤트에 대한 알림을 발송하는 핸들러이기 때문에, 완료 이벤트인지 확인하는 조건
        if (before == null || after == null ||
            before.status == Complete || after.status != Complete
        ) {
            return false
        }
        log.info("[알림서비스] 완료 이벤트 감지 `{}`", after.id)
        return true
    }
}

이제 특정 요청 건들의 상태가 변경되면 자동으로 알림을 받아볼 수 있게 되었습니다!
그럼 마지막으로 CDC를 사용할 때 무엇을 주의해야 할지 알아보시죠.

Kafka CDC를 사용할 때 주의할 것

AWS Aurora 환경에서 쓰기 부하가 많은 경우

Debezium MySQL Connector를 연동하면 binlog dump thread가 Aurora MySQL 클러스터 스토리지의 binlog를 읽는데, 이때 잠시 락을 걸게 됩니다. 그런데, Aurora MySQL 2.10.2 미만의 버전(2.10.2 버전 binlog 부하 개선사항)에서는 아키텍처상 문제로 쓰기 부하가 많은 경우 부하가 심해지게 되는 문제가 있습니다. 만약 binlog dump thread의 부하가 심해지는 경우 INSERT, UPDATE, DELETE, COMMIT 등 DML 관련 레이턴시가 증가하게 되고, 이에 따라 장애가 발생할 수 있게 됩니다.

중복 메시지 발생의 가능성

여러 가지 경우로 Kakfa 메시지는 중복될 수 있습니다. 그렇다면 중복으로 메시지가 발생하는 경우에도 대응해야겠죠. 이건 CDC의 문제가 아니라 이벤트 큐를 사용한다면 누구나 발생할 수 있는 문제이기 때문에 여러 해결 방법이 있고, 저는 Redis Cache를 이용해 이 문제를 해결했습니다.

class CdcEventListener(
    private val cdcEventProcessor: List<CdcEventProcessor>,
) {
    // ...
    @PostConstruct
    protected fun init() {
        disposable = sinks.asFlux()
            // ...
            // 이벤트를 처리 하는 과정에서 doCheckDuplicationPrevent 으로 중복 확인 
            .flatMap(::doCheckDuplicationPrevent)
            // ...
    }

    private fun doCheckDuplicationPrevent(cdcRecords: List<Envelope>): Mono<List<Envelope>> {
        return Mono.fromCallable {
            cdcRecords.filter {
                // HashCode를 이용한 RedisKey 생성
                val key = RedisCacheType.DUPLICATION_PREVENT.addPostfix(name = "${it.getAfter().getId()}:${it.hashCode()}")
                // 해당 Key가 이미 존재하는지 확인
                val existKey = redisTemplate.opsForValue().existKey(
                    key = key
                )
                log.debug(
                    "[CDC][EventEmitterSinks] Check Duplication Prevent. Key = `{}`, Value = `{}`",
                    key, existKey
                )
                (!existKey)
            }
        }.subscribeOn(Schedulers.boundedElastic())
    }
}

마무리

지금까지 CDC란 무엇인지, Kafka CDC를 활용해 코드는 어떻게 작성하는지, Kafka CDC를 사용할 때 주의할 것은 무엇인지에 대해서 B2B 알림 서비스의 예시를 통해 알아보았습니다.

이 글의 내용을 참고해 여러분의 프로젝트에 CDC를 도입하는 데 도움이 되길 바라며, 추가적으로 Kafka CDC에 더 관심 있는 분은 Apache Kafka 공식 문서Debezium MySQL Connector 공식 문서를 참고하시기 바랍니다.

또한, 우리팀에 합류해 같이 B2B 알림 서비스를 개선하고 싶은 분이라면 채용공고를 참고하세요!

끝까지 읽어주셔서 감사합니다!