Avatar
이형석
좋아요
댓글
kafka_Replication Factor

2021.07.06 | 에스코어 오픈소스SW그룹


 

  1. 기술개요
  2. 상세설명
    1. replication
    2. producer
      1. ACKs
  3. 적용사례
    1. cluster & replication factor
      1. leader 선정방법
    2. troubleshooting
  4. 끝맺음
    1. 마무리

 

개요

kafka-apis.png

Kafka HA

Kafka는 분산 시스템으로서 데이터를 여러 곳에 분산 저장하여 서버에 문제가 발생하여도 높은 가용성을 보장합니다.

Kafka 는 다른 application과는 다른 replication 방식을 통해 replication factor라는 그룹을 생성하여 HA를 구성하고 있습니다.

이번 문서에서는 replication factor와 같은 Kafka HA 구성을 위해 알아야 하는 것들을 살펴보도록 하겠습니다.

 

Kafka topic

Kafka 의 메시지는 topic으로 구분되어 저장되며, topic은 메시지를 저장할 때 1개 이상의 partition으로 분산하여 저장합니다.

partition이 하나인 경우 문제가 발생되어 다운되면 해당 메시지를 손실하게 되므로 여러개의 partition을 두어 topic의 메시지를 분산 저장하고 있습니다.

topic은 기본 옵션은 Round-robin 방식으로 각 partition에 분배되며, 한번 증가한 partition의 개수는 원복할 수 없으니 고민해야합니다.

 

상세설명

replication

Kafka에서의 replication은 topic 자체를 복사하는 것이 아니라 topic을 구성하는 각 partition을 복사하는 것입니다.

Kafka topic을 생성할 때 명령어를 보시면 다음과 같습니다.

bin/kafka-topics.sh --create --bootstrap-server kafka-01:9092 --replication-factor 1 --partitions 1 --topic test

여기서 주목해야하는 부분은 바로 "--replication-factor 1" 부분으로, 앞으로 cluster에서 몇 개의 replication을 생성할 것인지에 대한 설정입니다.

이때 leader 역할의 broker는 단 1대 생성되고, topic 으로 통하는 모든 data의 read/write 는 오직 leader에서 이루어집니다.

 

replication factor 수를 지정하여 topic을 생성하면 아래와 같이 생성 될 것입니다.

하나의 topic에 대하여 leader 와 follower가 생성되었고 ISR이라고 묶여있는데, ISR은(In-Sync Replication) 으로 하나의 replication group으로 생각하면 됩니다.

image2020-8-7_14-42-46.png

 

ISR 내에서는 leader 혹은 leader가 포함된 broker 가 다운 되면 follower 들 중 하나가 새로운 leader가 되어 정상작동하게 됩니다.

그렇기 때문에 ISR 내에는 1개의 leader와 n 개의 follower가 존재하여 그룹으로 묶여있습니다.

 

다음은 leader 와 follower 들이 서로의 상태를 확인하며 ISR을 어떤 방법으로 유지하고 있는지를 살펴보겠습니다.

follower는 leader와 동일한 데이터를 유지해야 하므로 지속적으로 leader의 데이터를 pull 하고 있습니다.

매우 짧은 주기로 데이터 정합성을 위해 서로를 체크하고 있습니다.

이러한 상황에서 follower가 일정시간(replica.lag.time.max.ms) 동안 leader에게 요청을 보내지 않으면 해당 follower는 방출되고 ISR은 축소됩니다.

 

 

 

 

producer

ACKs

producer는 topic에 대해 leader에게만 전달하므로 ACK 옵션을 사용하여 follower에게 메시지가 잘 도착하였는지 확인합니다.

kafka document에서는 아래와 같이 ACK 옵션을 설명하였습니다.

Option
Message Loss
Speed
Description
acks = 0 High High Producer는 서버로부터 확인을 기다리지 않습니다. 즉시 소켓 버퍼에 추가되고 전송 된 것으로 간주됩니다.
acks = 1 Medium Medium

Producer는 자신이 보낸 메시지에 대해 서버의 leader가 받았는지 기다립니다.

follower들은 확인하지 않으며 leader가 ACK을 보낸뒤 follower에게 복제되기 전에 실패하면 해당 메시지는 손실될 수 있습니다.

acks = all(-1) Low Low

Producer는 자신이 보낸 메시지에 대해 서버의 leader 가 전체 동기화 된 follower들이 받았는지 승인할때 까지 기다립니다.

최소 하나의 복제본까지 처리됨을 확인하므로 메시지 손실의 확률은 거의 없습니다.

개발환경에 맞추어 옵션을 변경하며 default는 acks=1 입니다.

acks = all 인 경우 'min.insync.replicas' 옵션을 설정하여 leader가 전파할 follower 수를 지정할 수 있습니다.

min.insync.replicas 옵션은 메시지를 보낼 때 write를 성공하기 위한 최소 복제본의 수를 의미하며 default = 1 입니다.

 

image2020-7-28_10-22-46.png

replication factor : 3

min.insync.replicas : 2

위 경우 leader 1개, follower 1개로 최소 요구 수인 2를 충족시키므로 오른쪽 follower에 복제가 안되어도 producer는 ACK을 받습니다.

image2020-7-22_16-42-37.png

하지만 follower 모두 복제되지 않는다면 최소요구수를 충족시키지 못해 ACK을 보낼 수 없습니다.

 

 

적용사례

image2020-8-10_10-47-24.png

replication factor를 구성하면 자동으로 leader가 선정이 됩니다. 

하지만 장애가 발생했을 때 leader는 어떻게 선출하게 될까요? 선출방법으로는 두 가지 방법이 있습니다.

$ vi ~/kafka/config/server.properties 에서

 

1. 가장 최근의 leader가 살아나기를 기다린다. (복구 시 데이터 일관성을 우선시하는 설정)

unclean.leader.election.enable = false

가장 최근의 leader 복구를 기다려야 하므로 서비스 장애가 좀 더 지속될 수 있지만, 데이터 유실은 발생하지 않습니다.

 

2. 가장 먼저 복구되는 broker를 leader로 선출한다. (서비스 제공을 우선시하는 설정)

unclean.leader.election.enable = true (default)

가장 먼저 복구되는 broker를 leader로 선출하여 서비스를 빠르게 재시작하는 방법입니다.

서비스 장애시간은 최소화할 수 있지만 복제 상태에 따른 데이터 유실이 발생할 수 있습니다.

 

위 상황을 다음 순서대로 그림과 함께 설명드리겠습니다.

  1. 초기상태
  2. Message 1 도착 & follower 다운
  3. Message 2 도착 & follower 다운
  4. Message 3 도착 & leader 다운
  5. leader 복구
    1. Message 4 도착 & 먼저 복구되는 broker leader 선출
    2. Message 4 도착 & 가장 최근 leader 복구

 

1. 초기상태

image2020-7-29_9-41-48.png

현재 메시지 없이 1 leader & 2 follower 가 replication factor로 구성되어 있습니다.

 

image2020-8-4_15-53-40.png

-producer

$ ./bin/kafka-console-producer.sh --broker-list kafka-01:9092 --topic testRF
>

-consumer

kafka1$ ./bin/kafka-console-consumer.sh --bootstrap-server kafka-01:9092 --topic testRF

 

 

2. Message 1 도착 & follower 다운

 

message 1이 모두 전달 image2020-7-29_10-57-6.png되었지만, 전달되자마자 broker A가 다운 되었습니다.

image2020-8-6_10-48-8.png

-producer

$ ./bin/kafka-console-producer.sh --broker-list kafka-01:9092 --topic testRF
>Message1
>

-consumer

$ ./bin/kafka-console-consumer.sh --bootstrap-server kafka-01:9092, kafka-02:9093, kafka-03:9094 --topic testRF
Message1

 

 

3. Message 2 도착 & follower 다운

image2020-7-30_14-6-39.png

마찬가지로 message 2가 모두 전달되자마자 broker B가 다운 되었습니다.

image2020-8-6_10-50-20.png

-producer

$ ./bin/kafka-console-producer.sh --broker-list kafka-01:9092, kafka-02:9093, kafka-03:9094 --topic testRF
>Message1
>Message2
>

-consumer

$ ./bin/kafka-console-consumer.sh --bootstrap-server kafka-01:9092, kafka-02:9093, kafka-03:9094 --topic testRF
Message1
Message2

 

 

4. Message 3 도착 & leader 다운image2020-7-30_14-7-31.png

 

마지막으로 leader 가 message 3을 전달 받자마자 다운되었습니다.

-producer

[2020-08-06 11:10:35,301] WARN [Producer clientId=console-producer] Connection to node 1 (kafka-02/----:9093) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-08-06 11:10:35,502] WARN [Producer clientId=console-producer] Connection to node 2 (kafka-03/----:9094) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-08-06 11:10:35,804] WARN [Producer clientId=console-producer] Connection to node 0 (kafka-01/----:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
...
[2020-08-06 11:10:37,004] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 9 : {testRF=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2020-08-06 11:10:37,105] WARN [Producer clientId=console-producer] 1 partitions have leader brokers without a matching listener, including [testRF-0] (org.apache.kafka.clients.NetworkClient)
...

-consumer

[2020-08-06 11:10:39,897] WARN [Consumer clientId=consumer-console-consumer-20230-1, groupId=console-consumer-20230] Connection to node 0 (kafka-01/----:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-08-06 11:10:40,872] WARN [Consumer clientId=consumer-console-consumer-20230-1, groupId=console-consumer-20230] Connection to node 0 (kafka-01/----:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
...

이때, 위에서 설정한 값에 따라 다음과 같이 결과가 나오게 되니 운영하시는 서비스 상황에 맞는 우선순위를 선택하셔서 설정하시기 바랍니다.

 

5-1. leader복구 - Message 4 도착 & 먼저 복구되는 broker leader 선출

image2020-7-30_14-13-18.png

image2020-7-30_14-16-59.png

default 값인 가장 먼저 복구된 broker가 leader 역할을 수행하는 상황입니다.

기존의 message 2, message 3 은 유실되지만 도착한 message 4를 전달 받으며 서비스를 빠르게 가동할 수 있습니다.

 

unclean.leader.election.enable = true 로 설정 후, 종료된 순서 1>2>0 과 동일하게 서비스를 시작하였습니다.

image2020-8-6_11-11-19.png

가장 먼저 1이 살아나면서 leader가 되고 뒤따라 실행된 0,2 가 follower가 되었습니다.

 

 

5-2. leader복구 - Message 4 도착 & 가장 최근 leader 복구

image2020-7-30_14-17-32.png

가장 최근 leader 였던 broker가 복구될 때 까지 대기하고 나서 서비스를 구동하는 상황입니다.

기존 message 들의 유실이 없지만 복구를 기다리는 시간이 오래 걸릴 수 있습니다.

 

unclean.leader.election.enable = false 로 설정 후, 기존 종료된 순서 1 > 2 > 0 순서대로 시작하였지만 leader = -1 이 표시되있는걸 확인할 수 있습니다.

image2020-8-6_13-46-36.png

image2020-8-4_15-53-40.png

기존 leader였던 0가 살아나 leader를 연임하면서 다시 replication factor를 구성한 것을 볼 수 있습니다.

 

troubleshooting

OSR(Out of Sync Replication)

replication factor에서 정상적으로 복제된 ISR(In-Sync Replication)을 보았습니다.

원본 메시지에 비해 정해진 시간을 초과하여 복제되는 경우는 Out of Sync(원본 메시지 복제가 지연되는 경우)이라고 합니다.

이러한 오류가 발생하는 이유와 어떻게 replication 지연을 감지하는지 보겠습니다.

 

Out of Sync의 주요 원인으로는 다음과 같습니다.

  1. slow replica
    'leader에서 message를 disk에 쓰는 속도' > 'follower에서 message를 disk에 쓰는 속도'
    인 경우 대부분 follower의 disk I/O 성능 문제로 발생합니다.

  2. stuck replica
    follower 가 leader로 부터 message를 가져오지 못하는 상황입니다.
    follower 측의 서버 문제거나 Full GC가 발생하여 프로세스가 멈추는 등의 상황이 이에 해당합니다.

  3. bootstrapping replica
    사용자가 topic의 replication factor 를 증가시키면서 follower가 leader의 메시지를 따라잡지 못하는 경우가 이에 해당합니다.

 

replication 지연은 크게 두 가지로 확인이 가능합니다.

1. Message 지연 개수로 확인
"replica.lag.max.messages" 로 leader-follower 간 메시지가 지연된 개수로 지연을 감지합니다.
설정된 값 과 leader - follower의 메시지 차이가 같은 경우를 의미합니다.
replica.lag.max.messages=3 일 때, leader - 5 / follower - 2 의 메시지를 가지고 있는 경우가 이에 해당합니다.

다만 주의할 점은 replica.lag.max.messages 보다 낮은 양으로 항상 일정하게 들어오다가 갑자기 유입량이 증가(2msg/s → 5 msg/s)하게 되면 kafka 는 지연되었다고 판단합니다.

하지만 실제로 follower는 정상작동 중이며 잠깐의 지연이지만 위 옵션을 이용하면 OSR로 판단되어 불필요한 경고가 발생할 수 있습니다.

2. 지연 시간으로 확인 

follower에서 leader로 fetch 요청을 보내는 시간을 확인하여 지연을 감지합니다.

replica.lag.time.max.ms =1000 이라면 follower 가 leader로 1s 내에 fetch 요청하면 정상으로 판단합니다.

이 방식은 message로 판단하는 것과 다르게 갑작스럽게 message의 양이 변경되어도 follower가 정상작동 중이라면 불필요하게 OSR로 변경되지 않습니다.

 

Topic create issue

토픽을 생성하는 과정에서 replication의 개수와 확인하는 수를 지정하여 생성하는 방법을 알아봤었습니다.

하지만 여기서 주의해야할 점은 min.insync.replicas 의 수가 replication factor의 수보다 많아도 topic이 생성은 된다는 점이다.

kafka-topics --create --zookeeper kafka-01:2181 --config min.insync.replicas=2 --topic test --partitions 1 --replication-factor 1

위의 경우 min.insync.replicas 의 수가 1  더 많아 에러가 발생할 것이 예상이 되지만 topic은 정상적으로 생성이 됩니다.

acks=all 을 하지 않는다면 문제되지 않는 설정이기 때문에 acks=1 또는 acks=0 에서 정상작동 되므로 오류가 아닙니다.

하지만 acks=all 인 상태로 message를 생성하면 leader broker에서 아래와 같은 NotEnoughReplicas 에러가 발생합니다.

$ ./bin/kafka-console-producer.sh --broker-list kafka-01:9092 --topic minTest --request-required-acks all
>test
>[2020-08-07 10:34:05,986] WARN [Producer clientId=console-producer] Got error produce response with correlation id 4 on topic-partition minTest-0, retrying (2 attempts left). Error: NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender)
[2020-08-07 10:34:06,087] WARN [Producer clientId=console-producer] Got error produce response with correlation id 5 on topic-partition minTest-0, retrying (1 attempts left). Error: NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender)
[2020-08-07 10:34:06,190] WARN [Producer clientId=console-producer] Got error produce response with correlation id 6 on topic-partition minTest-0, retrying (0 attempts left). Error: NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender)
[2020-08-07 10:34:06,294] ERROR Error when sending message to topic minTest with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.

acks=all 옵션으로 topic을 생성할때에는 위와 같은 경우를 주의해주시기 바랍니다.

 

끝맺음

이 문서를 통해 kafka의 replication factor에 대한 내용을 설명드렸습니다.

이 문서가 현재 운영중인 Kafka 클러스터의 HA 구성에 도움이 되기를 바랍니다.

감사합니다.

 

 


 

일반적으로 topic 의 leader는 자동적으로 설정되지만 수동적으로 위치를 지정할 수 있습니다.

kafka에서 제공하는 ~/kafka/bin/kafka-reassign-partitions.sh 을 사용할 수 있고,

이전 문서에서 알아본 CMAK을 사용하여 변경할 수 있습니다.

 

-sh 사용 

json 코드를 생성하여 kafka가 reassign을 하도록 도와줍니다.

$vi test-reassignment.json
 
 
{
    "version":1,
    "partitions":[
    {"topic":"RF3""partition":0"replicas":[0,1,2]},
    {"topic":"RF3""partition":1"replicas":[1,0,2]},
    {"topic":"RF3""partition":2"replicas":[2,0,1]}
    ]
}

아래와 같이 적용하면 json에 명시된 replicas의 가장 앞 broker가 leader로 변경되게 됩니다.

$ ./bin/kafka-reassign-partitions.sh --zookeeper zk-01:2181, zk-02:2182, zk-03:2183 --reassignment-json-file test-reassignment.json --execute
 
Current partition replica assignment
 
{"version":1,"partitions":[{"topic":"RF3","partition":2,"replicas":[1,2,0],"log_dirs":["any","any","any"]},{"topic":"RF3","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"RF3","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]}]}
 
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.
 
 
 
#적용 전
$ ./bin/kafka-topics.sh --describe --zookeeper zk-01:2181, zk-02:2182, zk-03:2183 --topic RF3
Topic: RF3  PartitionCount: 3   ReplicationFactor: 3    Configs:
    Topic: RF3  Partition: 0    Leader: 2   Replicas: 2,0,1 Isr: 2,0,1
    Topic: RF3  Partition: 1    Leader: 0   Replicas: 0,1,2 Isr: 0,1,2
    Topic: RF3  Partition: 2    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0
 
 
#적용 후
$ ./bin/kafka-topics.sh --describe --zookeeper zk-01:2181, zk-02:2182, zk-03:2183 --topic RF3
Topic: RF3  PartitionCount: 3   ReplicationFactor: 3    Configs:
    Topic: RF3  Partition: 0    Leader: 0   Replicas: 0,1,2 Isr: 1,0,2
    Topic: RF3  Partition: 1    Leader: 1   Replicas: 1,0,2 Isr: 1,0,2
    Topic: RF3  Partition: 2    Leader: 2   Replicas: 2,0,1 Isr: 1,0,2

 

 

-CMAK 화면 

<반영 전>

image2020-8-4_11-24-43.png

상단메뉴에서 preferred replica election > Run Preferred Replica Election 실행하여 아래와 같이 json 파일을 적용시켜 줍니다.

<반영후>

image2020-8-4_13-50-32.png


위처럼 설정을 통해 미리 지정된 leader partition은 따로 preferred leader라고 명칭합니다.