Kafka Connect : 카프카 커넥트 내부 토픽 이해

    들어가기 전

    카프카 커넥트는 운영에 필요한 메타 정보를 카프카에 저장해둔다. 따라서 카프카 커넥트에서 사용하는 내부 토픽이 무엇이 있는지, 그리고 어떤 역할을 하는지 이해해야 할 필요가 있다.

    카프카 커넥트 내부 토픽

    카프카 커넥트에서 사용하는 카프카 내부 토픽은 총 4가지가 존재한다. 이 중에서 카프카 커넥트가 만들어서 사용하는 내부 토픽은 3가지고, 하나는 카프카 자체 내부 토픽을 사용한다. 아래에서 카프카 커넥트가 사용하는 내부 토픽 4가지를 알 수 있다

    내부 토픽명 설명
    connect-offsets Source Connector 별로 메세지를 전송한 offset 정보를 가지고 있음. 
    Source Connector가 한번 전송한 메세지를 중복 전송하지 않기 위해 사용함.
     기본 25개의 파티션으로 구성됨.
    connect-configs Connector의 config 정보를 가짐. Connect 재기동 시, 설정된 Connector를 기동함.
    connect-status Connector의 상태 정보를 가짐. Connect 재기동 시, 설정된 Connector를 기동함.
    _consumer_offsets Consumer가 읽어들인 메세지의 offset 정보를 가짐. Sink Connector가 읽어들인 메세지의 Offset 정보가 기록됨. 한번 읽어들인 메세지를 중복해서 읽지 않기 위함. 기본 50개의 파티션으로 구성됨.

     

    connect-configs

    connect-configs 내부 토픽은 커넥터가 커넥트에 등록되었을 때의 설정 정보를 저장하는 내부 토픽이다. 만약 커넥트가 재기동된다면 이 설정 정보를 읽어서 동작한다. 그렇다면 connect-configs의 기본 동작은 어떻게 될까?

    새로운 커넥터를 등록했을 때

    새로운 커넥터를 등록하면 connect-configs 토픽에 아래 메세지가 저장된다. 메세지는 크게 세 가지로 나누어지는 것 같다.

    • connector-<커넥터 이름>
    • task-<커넥터 이름>
    • commit-<커넥터 이름>

    커넥터는 커넥터 인스턴스와 실제로 일을 하는 커넥터 Task로 이루어진다. 따라서 커넥터의 설정 정보를 저장할 때는 커넥터 인스턴스, Task의 설정 정보를 저장하는 메세지가 같이 생성된다. 그리고 필요한 메세지가 다 전달되었으면 Commit 메세지를 보내서 마무리 한다.

    {
      "topic": "connect-configs",
      "partition": 0,
      "offset": 1,
      "tstype": "create",
      "ts": 1673772117625,
      "key": "connector-mysql_cdc_ops_source_avro_01",
      "payload": "{\"properties\":{\"connector.class\":\"io.debezium.connector.mysql.MySqlConnector\",\"tasks.max\":\"1\",\"database.hostname\":\"localhost\",\"database.port\":\"3306\",\"database.user\":\"connect_dev\",\"database.password\":\"connect_dev\",\"database.server.id\":\"30001\",\"database.server.name\":\"mysqlavro\",\"database.include.list\":\"ops\",\"table.include.list\":\"ops.customers, ops.products, ops.orders, ops.order_items, ops.boards\",\"database.history.kafka.bootstrap.servers\":\"localhost:9092\",\"database.history.kafka.topic\":\"schema-changes.mysql.oc\",\"time.precision.mode\":\"connect\",\"database.connectionTimezone\":\"Asia/Seoul\",\"key.converter\":\"io.confluent.connect.avro.AvroConverter\",\"value.converter\":\"io.confluent.connect.avro.AvroConverter\",\"key.converter.schema.registry.url\":\"http://localhost:8081\",\"value.converter.schema.registry.url\":\"http://localhost:8081\",\"transforms\":\"rename_topic, unwrap\",\"transforms.rename_topic.type\":\"org.apache.kafka.connect.transforms.RegexRouter\",\"transforms.rename_topic.regex\":\"(.*)\\\\.(.*)\\\\.(.*)\",\"transforms.rename_topic.replacement\":\"$1-$2-$3\",\"transforms.unwrap.type\":\"io.debezium.transforms.ExtractNewRecordState\",\"transforms.unwrap.drop.tombstones\":\"false\",\"name\":\"mysql_cdc_ops_source_avro_01\"}}"
    }
    {
      "topic": "connect-configs",
      "partition": 0,
      "offset": 2,
      "tstype": "create",
      "ts": 1673772117862,
      "key": "task-mysql_cdc_ops_source_avro_01-0",
      "payload": "{\"properties\":{\"connector.class\":\"io.debezium.connector.mysql.MySqlConnector\",\"transforms.rename_topic.regex\":\"(.*)\\\\.(.*)\\\\.(.*)\",\"tasks.max\":\"1\",\"database.history.kafka.topic\":\"schema-changes.mysql.oc\",\"transforms\":\"rename_topic, unwrap\",\"transforms.unwrap.drop.tombstones\":\"false\",\"transforms.unwrap.type\":\"io.debezium.transforms.ExtractNewRecordState\",\"value.converter\":\"io.confluent.connect.avro.AvroConverter\",\"key.converter\":\"io.confluent.connect.avro.AvroConverter\",\"database.connectionTimezone\":\"Asia/Seoul\",\"database.user\":\"connect_dev\",\"transforms.rename_topic.type\":\"org.apache.kafka.connect.transforms.RegexRouter\",\"database.server.id\":\"30001\",\"database.history.kafka.bootstrap.servers\":\"localhost:9092\",\"time.precision.mode\":\"connect\",\"database.server.name\":\"mysqlavro\",\"database.port\":\"3306\",\"value.converter.schema.registry.url\":\"http://localhost:8081\",\"task.class\":\"io.debezium.connector.mysql.MySqlConnectorTask\",\"database.hostname\":\"localhost\",\"database.password\":\"connect_dev\",\"name\":\"mysql_cdc_ops_source_avro_01\",\"table.include.list\":\"ops.customers, ops.products, ops.orders, ops.order_items, ops.boards\",\"key.converter.schema.registry.url\":\"http://localhost:8081\",\"database.include.list\":\"ops\",\"transforms.rename_topic.replacement\":\"$1-$2-$3\"}}"
    }
    {
      "topic": "connect-configs",
      "partition": 0,
      "offset": 3,
      "tstype": "create",
      "ts": 1673772117880,
      "key": "commit-mysql_cdc_ops_source_avro_01",
      "payload": "{\"tasks\":1}"
    }

    커넥터 일시정지 + Restart

    커넥터를 일시정지하고 재시작하는 작업을 진행해서 connect-configs에 어떤 영향을 미치는지 확인해봤다.

    • target-state-<커넥터 이름>의 Key에 대해서 각 커넥터의 Payload를 관리

    target-state-<커넥터 이름>의 Key에 대해서 Payload로 현재 state를 나타내고 있다. connect-configs에는 카프카 커넥터의 상태까지 관리하는 것을 알 수 있다.

    {
      "topic": "connect-configs",
      "partition": 0,
      "offset": 4,
      "tstype": "create",
      "ts": 1673772538726,
      "key": "target-state-mysql_cdc_ops_source_avro_01",
      "payload": "{\"state\":\"PAUSED\"}"
    }
    
    {
      "topic": "connect-configs",
      "partition": 0,
      "offset": 5,
      "tstype": "create",
      "ts": 1673772616784,
      "key": "target-state-mysql_cdc_ops_source_avro_01",
      "payload": "{\"state\":\"STARTED\"}"
    }

    커넥터 삭제

    존재하던 카프카 커넥터를 삭제했을 때 connect-configs에 어떤 영향이 있는지를 확인봤다.

    • `connector-<커넥터 이름>`
    • `target-state-<커넥터 이름>`

    위의 Key에 대해서 툼스톤 메세지가 생성되는 것을 확인할 수 있다. 삭제되는 커넥터는 connector / taget-state Prefix가 붙은 Key에 툼스톤 메세지를 생성해서 카프카 커넥트에서 삭제하는 것으로 이해할 수 있다. 

    {
      "topic": "connect-configs",
      "partition": 0,
      "offset": 6,
      "tstype": "create",
      "ts": 1673772667089,
      "key": "connector-mysql_cdc_ops_source_avro_01",
      "payload": null
    }
    {
      "topic": "connect-configs",
      "partition": 0,
      "offset": 7,
      "tstype": "create",
      "ts": 1673772667089,
      "key": "target-state-mysql_cdc_ops_source_avro_01",
      "payload": null
    }

     

     

    connect-status

    connect-status 토픽은 커넥트에 등록된 커넥터의 메타 정보를 저장하는 내부 토픽이다. 커넥트가 재기동할 때 connect-status에서 메타 정보를 읽어와서 재기동 직전의 카프카 커넥트 상태를 복원해주는데 사용된다. connect-status의 모든 것들을 살펴보지는 못했지만 다음 Prefix를 가진 Key가 사용되는 것을 확인할 수 있었다. 

    • `status-topic-<커넥터 이름>`
    • `status-connector-<커넥터 이름>`
    • `status-task-<토픽 이름>:connector-<커넥터 이름>`

    `status-connector`, `status-task`는 커넥트에 등록된 커넥터 인스턴스와 커넥터 Task의 상태를 관리해주는 역할을 한다. 주로 이 값을 Key로 가지는 메세지의 Payload에는 현재 State가 Running, Paused, Failed 인지가 기록된다. 

    status-topic에 있는 topic은 커넥터가 Source / Sink System에서 가져온 정보를 저장하는 토픽이다. 예를 들어 Debezium Source Connector를 하나 등록했을 때 다음 토픽이 생성된다고 가정해보자. 

    • `mysqlavro` : source System과 관련된 토픽
    • `mysqlavro-db-table` : Source System Table에서 읽은 데이터가 메세지로 변환되어 저장되는 토픽 

    이 때, `status-topic`이 가리키는 토픽은 `mysqlavro` 토픽이고 이 토픽에는 아래 메세지가 저장되어있다. 값에서 볼 수 있듯이, Source System에서 내부적으로 발생한 것을 보여준다. 

    {
      "source": {
        "version": "1.9.7.Final",
        "connector": "mysql",
        "name": "mysqlavro",
        "ts_ms": 1673774395602,
        "snapshot": {
          "string": "last"
        },
        "db": "ops",
        "sequence": null,
        "table": {
          "string": "products"
        },
        "server_id": 0,
        "gtid": null,
        "file": "binlog.000006",
        "pos": 2030242,
        "row": 0,
        "thread": null,
        "query": null
      },
      "databaseName": {
        "string": "ops"
      },
      "schemaName": null,
      "ddl": {
        "string": "CREATE TABLE `products` (\n  `product_id` int NOT NULL,\n  `product_name` varchar(100) DEFAULT NULL,\n  `product_category` varchar(200) DEFAULT NULL,\n  `unit_price` decimal(10,0) DEFAULT NULL,\n  PRIMARY KEY (`product_id`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci"
      },

     

    커넥터를 등록 했을 때

    커넥터를 커넥트에 등록하면 커넥터 인스턴스와 Task는 커넥트 위에서 동작하고 있는 상태가 된다. 따라서 커넥트의 현재 상태에 대한 기록이 필요하다. 또한 커넥터와 관련된 Topic도 생성되는데 이 Status 역시 함께 connect-status에 등록되게 된다. 따라서 아래 key를 가진 메세지가 connect-status에 등록된다.

    • `status-connector-<커넥터 이름>`
    • `status-task-<커넥터 이름>`
    • `status-topic-<토픽 이름>-connector-<커넥터 이름>`

     

    {
      "topic": "connect-status",
      "partition": 3,
      "offset": 0,
      "tstype": "create",
      "ts": 1673774391200,
      "key": "status-connector-mysql_cdc_ops_source_avro_01",
      "payload": "{\"state\":\"RUNNING\",\"trace\":null,\"worker_id\":\"127.0.1.1:8083\",\"generation\":2}"
    }
    {
      "topic": "connect-status",
      "partition": 3,
      "offset": 1,
      "tstype": "create",
      "ts": 1673774392827,
      "key": "status-task-mysql_cdc_ops_source_avro_01-0",
      "payload": "{\"state\":\"RUNNING\",\"trace\":null,\"worker_id\":\"127.0.1.1:8083\",\"generation\":3}"
    }
    
    {
      "topic": "connect-status",
      "partition": 4,
      "offset": 7,
      "tstype": "create",
      "ts": 1673774398178,
      "key": "status-topic-mysqlavro:connector-mysql_cdc_ops_source_avro_01",
      "payload": "{\"topic\":{\"name\":\"mysqlavro\",\"connector\":\"mysql_cdc_ops_source_avro_01\",\"task\":0,\"discoverTimestamp\":1673774398178}}"
    }

     

     

    커넥터를 일시 정지 했을 때

    커넥터를 일시 정지한다면, 커넥터 인스턴스 + 커넥터 Task가 멈추게 된다. 따라서 이런 상태 변화를 기록해야하는데 이 상태 변화는 connect-configs 뿐만 아니라 connect-status에도 등록되게 된다. 

    `status-connector-<커넥터 이름>`, `status-task-<커넥터 이름>`의 Key에 Payload로 state가 온다. 이 state에 `PAUSED`라고 되어있는 것을 볼 수 있다. 

    {
      "topic": "connect-status",
      "partition": 3,
      "offset": 2,
      "tstype": "create",
      "ts": 1673774611052,
      "key": "status-connector-mysql_cdc_ops_source_avro_01",
      "payload": "{\"state\":\"PAUSED\",\"trace\":null,\"worker_id\":\"127.0.1.1:8083\",\"generation\":3}"
    }
    {
      "topic": "connect-status",
      "partition": 3,
      "offset": 3,
      "tstype": "create",
      "ts": 1673774611130,
      "key": "status-task-mysql_cdc_ops_source_avro_01-0",
      "payload": "{\"state\":\"PAUSED\",\"trace\":null,\"worker_id\":\"127.0.1.1:8083\",\"generation\":3}"
    }

     

     

    connect-offsets

    Source Connector는 주기적으로 Source System과 상호 작용을 한다. Source Connector는 Source System과 어디까지 상호 작용을 했는지 connect-offsets에 기록하고, 이 토픽을 참고해서 Source System의 변경점을 중복 반영되지 않도록 한다. 상호 작용을 했다는 의미는 브로커에게까지 Source System의 변경점이 반영된 것을 의미한다. 

    Kafka Producer는 이런 부분을 따로 관리하는 내부 토픽이 존재하지 않기 때문에 Source Connector에서는 connect-offsets 내부 토픽을 이용해서 이런 내용들을 관리한다. 반면 Sink Connector는 Converter를 통해서 메세지를 읽어오는데, 이미 Kafka Consumer는 중복 읽기 체크를 위해서 __consumer_offsets이라는 내부 토픽이 존재한다. 따라서 Sink Connector는 __consumer_offsets을 이용한다. 

    실제로 connect-offsets에는 어떤 메세지가 기록되어있는지를 살펴봤다. 

    {
      "topic": "connect-offsets",
      "partition": 0,
      "offset": 0,
      "tstype": "create",
      "ts": 1673774401826,
      "key": "[\"mysql_cdc_ops_source_avro_01\",{\"server\":\"mysqlavro\"}]",
      "payload": "{\"ts_sec\":1673774395,\"file\":\"binlog.000006\",\"pos\":2030242}"
    }

    위는 debezium Source Connector가 생성한 connect-offsets 메세지다. debezium Source Connector는 RDBMS의 binary log 파일을 읽어서 변경점을 카프카 메세지로 생성해주는 작업을 한다. 메세지의 key, value를 중점적으로 살펴보자

    • key : converter 이름 + server 이름
    • payload : binlog의 어떤 포지션까지 읽었는지 

    Key, Value의 의미를 조합해보면 다음과 같은 결론을 낼 수 있다.

    debezium Source Connector 중에 mysql_cdc_ops_source_avro_01 커넥터는 mysqlavro라는 서버로부터 binlog.00006 파일의 2030242까지 읽었다

    Source Connector는 mysqlavro라는 Source System으로부터 다음 메세지를 읽어오게 된다면, binlog.00006 파일의 2030242번 이후부터의 변경점을 읽어올 것을 이해할 수 있다. 

     

    최종 정리

    • 카프카 커넥트는 4개의 내부 토픽을 이용해서 카프카 커넥트의 상태와 커넥터의 중복 읽기를 방지한다. 
    • connect-offsets 토픽에는 특정 커넥터가 어디까지 Source Connector로부터 메세지를 생성했는지를 기록한다.
    • connect-status 토픽에는 커넥터 인스턴스, 커넥터 task의 상태, 커넥터와 관련된 토픽 정보가 저장된다. 
    • connect-configs 토픽에는 커넥터 인스턴스. 커넥터 task의 설정값, 상태가 저장된다. 

    댓글

    Designed by JB FACTORY