Gidhub BE Developer

Kafka Consumer Option

2019-12-18
goodGid

이 글의 코드 및 정보들은 을 바탕으로 작성하였습니다.

컨슈며 주요 기능

  • 특정 파티션을 관리하고 있는

  • 파티션 리더에게

  • 메시지 가져오기 요청을 하는 것이다.

  • 각 요청은 로그의 오프셋을 명시하고

  • 그 위치로부터 로그 메시지를 수신한다.


  • 그래서 컨슈머는 가져올 메시지의 위치를 조정할 수 있고

  • 필요하다면 이미 가져온 데이터도 다시 가져올 수 있다.


  • 예를 들어

  • 가져온 메시지를 통해 작업을 하였는데

  • 작업을 롤백하고

  • 새로 다시 시작해야하는 경우에

  • 이미 가져온 메시지를 다시 가져와

  • 작업을 새로 시작할 수 있다.


  • 이미 가져온 메시지를 다시 가져올 수 있는 기능은

  • 일반적인 메시지큐 솔루션(ex RabbitMQ)에서는 제공하지 않는 기능이다.


컨슈머 종류

  • 컨슈머에는 2가지 종류가 있다.
  1. 올드 컨슈머 (Old Consumer)

  2. 뉴 컨슈머 (New Consumer)

  • 두개의 큰차이는

  • 주키퍼의 사용 유무이다.


  • 구 버전의 카프카에서는

  • 컨슈머의 오프셋을

  • 주키퍼의 지노드에 저장하는 방식을 지원하다

  • 카프카 버전 0.9(2015. 11. 23 출시)부터

  • 컨슈머의 오프셋 저장을

  • 카프카의 토픽에 저장하는 방식으로 변경하였다.


  • 아직까지는 컨슈머의 오프셋 저장 방법이

  • 주키퍼의 지노드 또는 카프카의 토픽을 이용한 방식 모두 지원하지만

  • 향후 릴리즈되는 카프카 버전에서는

  • 주키퍼의 지노드에 저장하는 방식은 사라질 예정이다.


bootstrap.servers

  • 카프카 클러스터에 처음 연결을 하기 위한

  • 호스트포트 정보 로 구성된 리스트 정보를 나타낸다.


  • 정의된 포맷은 다음과 같다.

  • 호스트 이름:포트, 호스트 이름:포트, 호스트 이름:포트


주의 사항

  • 전체 카프카 리스트가 아닌

  • 호스트 하나만 입력해 사용할 수 있지만

  • 추천하지 않는다.


  • 카프카 클러스터는 살아있는 상태지만

  • 입력한 하나의 호스트에

  • 장애가 발생할 경우 접속이 불가하기 때문이다.


fetch.min.bytes

  • 한번에 가져올 수 있는 최소 데이터 사이즈이다.

  • 만약 지정한 사이즈보다 작은 경우

  • 요청에 대해 응답하지 않고

  • 데이터가 누적될 때까지 기다린다.


fetch.max.wait.ms

  • fetch.min.bytes에 의해 설정된 데이터보다 적은 경우

  • 요청에 응답을 기다리는 최대 시간


fetch.max.bytes

  • 한번에 가져올 수 있는 최대 데이터 사이즈

session.timeout.ms

  • 컨슈머와 브로커사이의 세션 타임 아웃 시간

  • 브로커가 컨슈머가 살아있는것으로 판단하는 시간(기본값 10초)


  • 만약 컨슈머가 그룹 코디네이터에게

  • 하트비트(heartbeat)를 보내지 않고

  • session.timeout.ms이 지나면

  • 해당 컨슈머는 종료되거나

  • 장액가 발생한 것으로 판단하고

  • 컨슈머 그룹은 리밸런스(Rebalance)를 시도한다.


  • session.timeout.ms은 하트비트 없이

  • 얼마나 오랫동안 컨슈머가 있을 수 있는지를 제어하며

  • 이 속성은 heartbeat.interval.ms와 밀접한 관련이 있다.

  • 일반적인 경우 두 속성이 함께 수정된다.


  • session.timeout.ms를 기본값보다 낮게 설정하면

  • 실패를 빨리 감지할 수 있지만

  • gc 혹은 poll Loop를 완료하는 시간이 길어지게 되면

  • 원하지 않게 리밸런스가 일어나기도 한다.


  • 반대로 session.timeout.ms를 높게 설정하면

  • 원하지 않는 리밸런스가 일어날 가능성은 줄지만

  • 실제 오류를 감지하는 데 시간이 오래 걸릴 수 있다.


heartbeat.interval.ms

  • 그룹 코디네이터에게 얼마나 자주

  • KafkaConsumer poll () 메소드로

  • 하트비트를 보낼 것인지 조정한다.


  • session.timeout.ms과 밀접한 관계가 있다.

  • session.timeout.ms보다 낮아야한다.

  • 일반적으로 1/3정도로 설정한다.

  • (기본값 3초)


max.poll.records

  • 단일 호출 poll()에 대한 최대 레코드 수를 조정한다.

  • 이 옵션을 통해 어플리케이션이 폴링 루프에서 데이터 양을 조정 할 수 있다.


max.poll.interval.ms

  • 컨슈머가 살아있는지 체크하기 위해

  • 하트비트를 주기적으로 보내는데

  • 컨슈머가 계속해서 하트비트만 보내고

  • 실제로 메시지를 가져가지 않는 경웅가 있을 수도 있다.


  • 이러한 경우 컨슈머가 무한정 해당 파티션을 점유할 수 없도록

  • 주기적으로 poll을 호출하지 않으면

  • 장애라고 판단하고

  • 컨슈머 그룹에서 제외한 후

  • 다른 컨슈머가 해당 파티션에서 메시지를 가져갈 수 있게 한다.


enable.auto.commit

  • 백그라운드로 주기적으로 오프셋을 커밋한다.

auto.commit.interval.ms

  • 주기적으로 오프셋을 커밋하는 시간

auto.offset.reset

  • 카프카에서 초기 오프셋이 없거나

  • 현재 오프셋이 더 이상 존재하지 않은 경우

  • (데이터가 삭제)

  • 다음 옵션으로 reset한다.

  1. earliest : 가장 초기의 오프셋값으로 설정

  2. latest : 가장 마지막 오프셋값으로 설장

  3. none : 이전 오프셋값을 찾지 못하면 에러를 발생


group.id

  • 컨슈머가 속한 컨슈머 그룹을 식별하는 식별자이다.

reqeust.timeout.ms

  • 요청에 대해 응답을 기다리는 최대 시간

Reference


Comments

Index