Kafka, Producer 부터
Consumer 까지

Index

  • Producer
  • Broker
  • Consumer
  • Q&A

Producer

Producer Code

KafkaProducer 생성

KafkaProducer 생성

  • Accumulator
    사용자가 send한 Record를 메모리(RecordBatch)에 차곡차곡 쌓아주는 역할

  • Network Thread
    Accumulator에서 만든 RecordBatch를 Broker로 전송하는 역할

Producer Code

Accumulator

Accumulator

record를 topic + partition 별로 모아두는 대기 공간(?)

Accumulator에서 사용하는
메모리 크기

batch.size = default 16KB

buffer.memory = default 32MB

default 설정 이라면 최대 2048개(32MB / 16KB)의 record batch가 생성

broker로 보내는 속도보다 쌓이는 속도가 더 빠르다면 buffer.memory 만큼 채워지고

max.block.ms 시간 만큼 블락된다

상태가 안 바뀌고 시간이 더 지나면 throw exception

Network Thread

NetworkThread는 1, 2, 3을 쉴틈 없이 실행 (약간의 텀을 주는 방법은 p.11)

Network Thread

linger.ms: Network Thread에서 accumulator에 쌓인 데이터를 가져올때 대기하는 시간
쌓이는 속도가 느려서 record batch가 가득 차지 않았는데 보내면 뭔가 아쉽다. 약간의 대기를 통해 record batch가 좀 더 찬 상태에서 보낼 수 있다.
default: 0, 0이 아닌 값을 설정하면
broker 전송 수는 줄겠지만 latency도 준다. 서비스에 맞게 조절

max.request.size: Broker한번 전송할 때의 최대 크기, default 1MB

max.request.size(1MB) / batch.size(16KB) = 한번에 전송 가능한 최대 record batch 개수 64개

Network Thread

max.in.flight.requests.per.connection = 3

max.in.flight.requests.per.connection = 1

왼쪽이 Network Thread, 오른쪽이 Broker

max.in.flight.requests.per.connection: ack를 받지 않고 전송할 수 있는 최대 request 개수

retries가 켜진 상태에서(retries값이 1이상, default 2147483647) max.in.flight.requests.per.connection이 2 이상이면 메시지 순서가 바뀔 수 있다

Broker

어떻게 저장되나

[Topic name]-[partition] 폴더 구조

Disk 헤더 개수, 장비 노후화를 고려

4TB 한개 보다 1TB 4개를 권장

어떻게 저장되나

Segment 단위로 파일 저장

*.index, *.log, *.timeindex

마지막 파일이 active segment

파일명은 segment에 기록된 첫 offset

즉 223448069.log의 제일 마지막에는 224827222 offset이 저장되고
22482723.log 제일 처음에는 224827223 offset이 저장되어있다.

어떻게 저장되나

Topic

Partition

Segment

데이터가 들어오면 active segment에 append

어떻게 저장되나

index파일에는 해당 메시지의 offset과 log 파일에서의 위치

log 파일에는 offset과 position, 크기 그리고 실제 데이터

검색엔진의 역색인 구조와 비슷하다.

어떻게 저장되나

실제로는 모든 offset 번호가 index 파일에 써지는게 아니라 index.interval.bytes 만큼 record.batch가 log파일에 쓰일때마다 index 작성

log 파일에는 하나의 offset에 하나의 record만 존재하기도 하지만

대부분 여러 record가 저장되어있다. (producer에서 보낸 record batch)

출처: https://thehoard.blog/how-kafkas-storage-internals-work-3a29b02e026

어떻게 저장되나

Consumer

Consumer Code

Kafka Consumer 생성

Kafka Consumer 생성

  • Fetcher
    poll 함수가 실행되면 적절한 크기의 records 리턴하고,
    내부에 records가 없다면 Broker에게 records를 요청하고 저장.
    그리
    적절한 크기의 record 리턴하는 역할
  • Coordinator
    어떤 토픽, 파티션을 consume 할지
    Broker의 group coordinator와 통신하는 역할
    heartbeat, offset commit, consumer group join 도 합니다~

데이터 가져오기

데이터 가져오기

consumer.poll(100) 함수가 실행 될때의 consumer 내부 코드

데이터 가져오기 (fetcher에 records가 있는 경우)

데이터 가져오기 (fetcher에 records가 있는 경우)

max.poll.records: fetcher로부터 가져올 record의 최대 개수, default 500개

max.poll.interval.ms: poll 함수를 실행 해야할 최대 시간 간격, 설정된 시간 안에 poll 함수를 실행하지 않으면
consumer에 문제가 있다고(?) 판단해서 consumer group에서 빠지고 해당 consumer group은 rebalance 실행

default 5 분

데이터 가져오기 (fetcher에 records가 없는 경우)

데이터 가져오기 (fetcher에 records가 없는 경우)

데이터 가져오기 (fetcher에 records가 없는 경우)

max.partition.fetch.bytes: 하나의 topic + partition에서 가져 올 데이터의 최대 크기, default 1MB

fetch.min.bytes: 하나의 Broker에서 가져올 데이터의 최소 크기, default는 바로 리턴, default 1byte

fetch.max.wait.ms: fetch.min.bytes큼 쌓일때까지 기다리는 최대 시간, default 500ms producerlinger.ms 와 비슷하지 않나요?


fetch.max.bytes: 하나의 Broker에서 가져올 데이터의 최대 크기, record batch 단위로 가져오기 때문에 대략적인 값이다, default 50MB

ex) 하나의 Broker에서 3개(TopicA_0, TopicA_2, TopicB_1) 파티션 데이터를 가져온다고 가정 하면 Broker에서는 아래와 같이 동작

readPartitionInfos = List((TopicA, 0), (TopicA, 2), (TopicB, 1))

maxBytes = fetch.max.bytes

readPartitionInfos.foreach{ case(topic, partition) =>

size = math.min(max.partition.fetch.bytes, maxBytes)

result += read(topic, partition, size)

maxBytes -= size

}

데이터 가져오기 (fetcher에 records가 없는 경우)

데이터 가져오기 (fetcher에 records가 없는 경우)

부록

Consumer Rebalance

Consumer Rebalance

Consumer Rebalance

Consumer Rebalance

Consumer Rebalance

Consumer Rebalance

group coordinator가

Consumer group “Group1”의 consumer node들에게 토픽,파티션을 할당 해줬는데

Consumer Rebalance

“Group1”에 노드 한개가 추가되면 Rebalance가 시작되고 해당 Consumer Group의 모든 consumer 노드는 조인을 해야한다.

Consumer Rebalance

제한 시간(broker config: group.initial.rebalance.delay.ms) 내에 각 consumer가 join 해야하는데,

records의 처리 시간이 길어서 제때 join 하지 못하면 해당 consumer group에서 제외된다.

Consumer Rebalance

Rebalnce는 언제 발생하나?

Consumer Group offset

Consumer Group offset

Ver. 0.9 미만 에서는 zookeeper에 consumer offset을 저장

zookeeper로 offset 관리했을때는 consumer offset을 수정하기 위해 zookeeper client로 접속해 값 변경

Consumer node가 많아지면 zookeeper의 부하 증가 -> 속도 저하

Consumer Group offset

Ver. 0.9 이상 __consumer_offset 토픽을 사용

Group name

Topic name

partition

offset

Commit time

test-01

my-topic

1

0

1551191950

test-02

my-topic

1

0

1551193842

test-01

my-topic

1

10

1551203421

test-01

my-topic

1

19

1551243229

OLD

NEW

offset commit을 한다는 건 __consumer_offset 토픽에 offset 정보 record를 produce 하는 것이라 볼 수 있다.

Consumer Group offset

그럼 consumer group의 offset을 찾기 위해서는 처음부터 훑어야 하는건가? 하는 의문이 들수 있지만

compaction이 실행 된다면!!

못다한 이야기

  • Exactly-Once(Transaction)
  • LogCleaner and Log Compaction
  • Purgatory
  • Controller in Broker and Leader Election
  • Metrics
  • Kafka Connect

Kafka에 모든걸 알면 좋지만 시간은 유한하기 때문에 내 현실에 도움 될만한 것들을 추려보자...

감사합니다.

Kafka, Producer 부터 Consumer 까지 - Google Slides