카프카 커넥트 : 커넥트의 파티셔닝

    카프카 커넥트의 파티셔닝

    카프카 커넥트에 올려진 커넥터는 Source 시스템에서 데이터를 가져올 때, 파티셔닝 한 후에 데이터를 가져온다. 여기서 이야기하는 파티셔닝은 카프카에서 이야기하는 파티셔닝과는 다른 개념이다. 

    카프카의 파티셔닝은 토픽마다 나누어진 파티션에 메세지의 키 값을 해시 알고리즘으로 구하여 이루어진다. 카프카 차원의 파티셔닝은 메세지가 토픽의 어떤 파티션으로 보내져야 하는지를 의미한다. 그렇지만 커넥터에서 이야기하는 파티셔닝은 이것과는 다른 개념이다.

    커넥터의 파티셔닝은 Source System에서 처리해야 할 자원과 커넥터 인스턴스가 가지고 있는 Task에게 일을 나누어 주는 것이다. 한 가지 예를 들어서 알아보자. Source System을 MySQL로 가정하고 Table이 4개가 존재한다고 가정해보자. Source Connector는 최대 2개의 Task가 생성될 수 있다고 가정해보자. 이 경우 커넥터의 파티셔닝은 Table 4개가 Task가 분배되는 것을 의미한다. 

    즉, Task #1은 1~2번 Table에서 데이터를 읽어온다. Task #2는 3~4번 Table에서 데이터를 읽어온다. Source System에서 처리해야 할 자원을 Task에 알맞게 분배하는 작업을 Connector의 파티셔닝이라고 이해할 수 있다. 

    https://docs.confluent.io/platform/current/connect/devguide.html#partitions-and-records

    위의 그림을 살펴보면 커넥터의 파티셔닝과 카프카의 파티셔닝의 차이를 좀 더 명확히 이해할 수 있다. 

    • 커넥터 파티셔닝 : Table 4개가 Connector 2개에 각각 분배된다. 
    • 카프카 파티셔닝 : 커넥터는 Task를 통해서 읽은 데이터로 카프카 메세지를 생성한다. 이 때, 각 메세지의 파티션을 결정한다. 

    커넥터의 Task와 파티셔닝 유무

    일부 커넥터들은 Multi Task를 지원한다. 예를 들어 아래의 JdbcSourceConnector는 Multi Task를 지원한다. 만약 Multi Task를 사용하고 싶다면 tasks.max의 설정값을 1보다 큰 값으로 설정하면 된다. 바꿔 이야기하면 몇몇 커넥터는 Multi Task를 지원하지 않을 수 있다는 것이다. Single Task를 사용하는 커넥터라면 커넥터 차원에서의 자원 파티셔닝은 일어나지 않을 것이다. 

    // JdbcSourceConnector
    {
    	"name": "mysql_jdbc_om_source_00",
        "config": {
        	"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
            "tasks.max": "1",
            "connection.url": "jdbc:mysql://localhost:3306/om",
            "connection.user": "connect_dev",
            "connecftion.password": "connect_dev",
            "topic.prefix": "mysql_om_",
            "topic.creation.default.replication.factor": 1,
            "topic.creation.default.partitions": 1,
            "catalog.pattern": "om",
            "table.whitelist": "om.customers",
            "poll.interval.ms": 10000,
            "mode": "incrementing",
            "incrementing.column.name": "customer_id"
        }
    }

    커넥터의 파티셔닝 기준은?

    카프카 커넥터는 제각각 다른 파티셔닝 기준을 가진다. 따라서 카프카 커넥터를 사용하기 전에 커넥터가 파티셔닝을 할 때, 어떤 자원이 기준이 되는지를 반드시 살펴봐야 한다. 예를 들어 JdbcSourceConnector는 연결된 DB의 Table이 파티셔닝의 기준이 된다고 한다. 

    MySQL의 Table이 3개, JdbcSourceConnector의 "tasks.max" 값이 2로 설정되어 있다면 2개의 Task가 생성되어서 3개의 Table을 분배받는다. 

    MySQL의 Table이 3개, "task.max" 값이 4로 설정되어 있다면 3개의 Task가 생성되어서 파티셔닝 된다. 최대값은 4지만 4개가 생성된다는 것을 의미하지는 않는다. 


    MultiTask와 커넥터의 오프셋

    카프카 커넥터는 Source System에서 어디까지 읽어왔는지를 connect-offsets 토픽에 저장한다고 했다. 그렇다면 커넥터를 Multi Task 모드로 등록하면, 각 Task마다 서로 다른 오프셋이 등록되는 것일까? connect-offsets를 살펴보면 그렇지는 않은 것 같다. 

    {
      "topic": "connect-offsets",
      "partition": 8,
      "offset": 6,
      "tstype": "create",
      "ts": 1673949518286,
      "key": "[\"mysql_jdbc_om_source_00\",{\"protocol\":\"1\",\"table\":\"om.customers\"}]",
      "payload": "{\"incrementing\":4}"
    }
    {
      "topic": "connect-offsets",
      "partition": 8,
      "offset": 7,
      "tstype": "create",
      "ts": 1673949548293,
      "key": "[\"mysql_jdbc_om_source_00\",{\"protocol\":\"1\",\"table\":\"om.customers\"}]",
      "payload": "{\"incrementing\":8}"
    }

    커넥터가 기록하는 오프셋을 살펴보면 Key에는 이런 정보가 저장되어 있다. 

    • Connector 이름 : mysql_jdbc_om_source_00
    • Table 명 : Source System에서 읽어온 테이블 이름
    • payload : 어디까지 읽었는지 기록

    커넥터는 분산 환경에서 동작하고 Task가 나누어지더라도 connect-offset에는 그것을 구별할 수 있는 내용이 남아있지 않다. 커넥터가 connect-offsets에 기록하는 정보는 "어떤 커넥터가 어떤 Source System의 어디까지 읽어왔는지"만 기록된다. 어떤 Task가 그 일을 했는지는 기록되지 않는다. 

     

    요약

    • Multi Task, Single Task를 지원하는 커넥터가 각각 존재한다.
    • Multi Task를 지원하는 커넥터의 경우 Source System, Sink System의 처리해야 할 부분을 파티셔닝해서 각 Task에 분배한다. 
    • 커넥터의 파티셔닝과 카프카의 파티셔닝은 다른 개념이다.
    • 커넥터 파티셔닝 기준은 커넥터마다 다르고, 주로 N:1(Source System : Connector) 의 관계다. 
    • connect-offsets에는 커넥터 이름 / Source System 이름 / 어디까지 처리되었는지에 대한 정보만 적혀져 있다. Task에 대한 메타정보는 가지지 않는다. 

     

    참고

    댓글

    Designed by JB FACTORY