- 컨슈며 주요 기능
- 컨슈머 종류
- bootstrap.servers
- fetch.min.bytes
- fetch.max.wait.ms
- fetch.max.bytes
- session.timeout.ms
- heartbeat.interval.ms
- max.poll.records
- max.poll.interval.ms
- enable.auto.commit
- auto.commit.interval.ms
- auto.offset.reset
- group.id
- reqeust.timeout.ms
- Reference
이 글의 코드 및 정보들은 책을 바탕으로 작성하였습니다.
컨슈며 주요 기능
-
특정 파티션을 관리하고 있는
-
파티션 리더에게
-
메시지 가져오기 요청을 하는 것이다.
-
각 요청은 로그의 오프셋을 명시하고
-
그 위치로부터 로그 메시지를 수신한다.
-
그래서 컨슈머는 가져올 메시지의 위치를 조정할 수 있고
-
필요하다면 이미 가져온 데이터도 다시 가져올 수 있다.
-
예를 들어
-
가져온 메시지를 통해 작업을 하였는데
-
작업을 롤백하고
-
새로 다시 시작해야하는 경우에
-
이미 가져온 메시지를 다시 가져와
-
작업을 새로 시작할 수 있다.
-
이미 가져온 메시지를 다시 가져올 수 있는 기능은
-
일반적인 메시지큐 솔루션(ex RabbitMQ)에서는 제공하지 않는 기능이다.
컨슈머 종류
- 컨슈머에는 2가지 종류가 있다.
-
올드 컨슈머 (Old Consumer)
-
뉴 컨슈머 (New Consumer)
-
두개의 큰차이는
-
주키퍼의 사용 유무이다.
-
구 버전의 카프카에서는
-
컨슈머의 오프셋을
-
주키퍼의 지노드에 저장하는 방식을 지원하다
-
카프카 버전 0.9(2015. 11. 23 출시)부터
-
컨슈머의 오프셋 저장을
-
카프카의 토픽에 저장하는 방식으로 변경하였다.
-
아직까지는 컨슈머의 오프셋 저장 방법이
-
주키퍼의 지노드 또는 카프카의 토픽을 이용한 방식 모두 지원하지만
-
향후 릴리즈되는 카프카 버전에서는
-
주키퍼의 지노드에 저장하는 방식은 사라질 예정이다.
bootstrap.servers
-
카프카 클러스터에 처음 연결을 하기 위한
-
호스트 와 포트 정보 로 구성된 리스트 정보를 나타낸다.
-
정의된 포맷은 다음과 같다.
-
호스트 이름:포트, 호스트 이름:포트, 호스트 이름:포트
주의 사항
-
전체 카프카 리스트가 아닌
-
호스트 하나만 입력해 사용할 수 있지만
-
추천하지 않는다.
-
카프카 클러스터는 살아있는 상태지만
-
입력한 하나의 호스트에
-
장애가 발생할 경우 접속이 불가하기 때문이다.
fetch.min.bytes
-
한번에 가져올 수 있는 최소 데이터 사이즈이다.
-
만약 지정한 사이즈보다 작은 경우
-
요청에 대해 응답하지 않고
-
데이터가 누적될 때까지 기다린다.
fetch.max.wait.ms
-
fetch.min.bytes에 의해 설정된 데이터보다 적은 경우
-
요청에 응답을 기다리는 최대 시간
fetch.max.bytes
- 한번에 가져올 수 있는 최대 데이터 사이즈
session.timeout.ms
-
컨슈머와 브로커사이의 세션 타임 아웃 시간
-
브로커가 컨슈머가 살아있는것으로 판단하는 시간(기본값 10초)
-
만약 컨슈머가 그룹 코디네이터에게
-
하트비트(heartbeat)를 보내지 않고
-
session.timeout.ms이 지나면
-
해당 컨슈머는 종료되거나
-
장액가 발생한 것으로 판단하고
-
컨슈머 그룹은 리밸런스(Rebalance)를 시도한다.
-
session.timeout.ms은 하트비트 없이
-
얼마나 오랫동안 컨슈머가 있을 수 있는지를 제어하며
-
이 속성은 heartbeat.interval.ms와 밀접한 관련이 있다.
-
일반적인 경우 두 속성이 함께 수정된다.
-
session.timeout.ms를 기본값보다 낮게 설정하면
-
실패를 빨리 감지할 수 있지만
-
gc 혹은 poll Loop를 완료하는 시간이 길어지게 되면
-
원하지 않게 리밸런스가 일어나기도 한다.
-
반대로 session.timeout.ms를 높게 설정하면
-
원하지 않는 리밸런스가 일어날 가능성은 줄지만
-
실제 오류를 감지하는 데 시간이 오래 걸릴 수 있다.
heartbeat.interval.ms
-
그룹 코디네이터에게 얼마나 자주
-
KafkaConsumer poll () 메소드로
-
하트비트를 보낼 것인지 조정한다.
-
session.timeout.ms과 밀접한 관계가 있다.
-
session.timeout.ms보다 낮아야한다.
-
일반적으로 1/3정도로 설정한다.
-
(기본값 3초)
max.poll.records
-
단일 호출 poll()에 대한 최대 레코드 수를 조정한다.
-
이 옵션을 통해 어플리케이션이 폴링 루프에서 데이터 양을 조정 할 수 있다.
max.poll.interval.ms
-
컨슈머가 살아있는지 체크하기 위해
-
하트비트를 주기적으로 보내는데
-
컨슈머가 계속해서 하트비트만 보내고
-
실제로 메시지를 가져가지 않는 경웅가 있을 수도 있다.
-
이러한 경우 컨슈머가 무한정 해당 파티션을 점유할 수 없도록
-
주기적으로 poll을 호출하지 않으면
-
장애라고 판단하고
-
컨슈머 그룹에서 제외한 후
-
다른 컨슈머가 해당 파티션에서 메시지를 가져갈 수 있게 한다.
enable.auto.commit
- 백그라운드로 주기적으로 오프셋을 커밋한다.
auto.commit.interval.ms
- 주기적으로 오프셋을 커밋하는 시간
auto.offset.reset
-
카프카에서 초기 오프셋이 없거나
-
현재 오프셋이 더 이상 존재하지 않은 경우
-
(데이터가 삭제)
-
다음 옵션으로 reset한다.
-
earliest : 가장 초기의 오프셋값으로 설정
-
latest : 가장 마지막 오프셋값으로 설장
-
none : 이전 오프셋값을 찾지 못하면 에러를 발생
group.id
- 컨슈머가 속한 컨슈머 그룹을 식별하는 식별자이다.
reqeust.timeout.ms
- 요청에 대해 응답을 기다리는 최대 시간