- 0) 메세징 시스템 이란?
- 비동기 데이터전송 방식.
- 예) Pub/Sub (브로드캐스트) 모델 : Publisher -> [Queue] -> Subscriber
- 전송에 있어서, 발신자와 수신자간 서로를 알 필요가 없다. (decoupled 하다)
- 즉, 각 프로세스들의 디펜던시를 최소화 할 수 있다.
- (이 때문에, 구조적으로써~ 네트워크의 확장성이 용이해 진다.)
- 빠른 대용량 처리에 적합.
- At Message Once : 메세지 손실이 될수있디만, 재전송없이 최대한 빠른처리 정책.
- At Least Once : 메세지 손실은 없지만, 중복은 될수도 있는 정책.
- Exactly Once : 이상적이지만... 굉장히 어렵고, 보장해준다면서~ 타프로세스 예외를 두는 경우가 많다.
- 1) 카프카 란?
- 고성능 고가용 메세징 시스템. (분산 리플리케이션 로그 서비스)
- (대용량 고성능 실시간 처리를 위해서) '배치전송', '페이지 캐시', '파티션', '분산', '리플리케이션, '자동리더선출' 등의 기술이 구현됨.
- 높은 처리량, 간단한 스케일아웃, 안전한 데이터 영구 디스크보관.
- 엄밀한 의미에서 '표준MQ'는 아님... (AMQP를 사용하지 않고, 속도를 위해서 TCP기반동작)
- RabbitMQ :
- 표준화된 Advanced Message Queueing Protocal 0.9.1 따름.
- (Exchanger) -> [Queue]
- 내부적으로 Exchanger가 있어서, Queue으로 메세지를 분배하는 복잡한 작업을 한다.
- (direct exchange, fanout exchange, topic exchange, headers exchange)
- 메타를 자체적으로 관리해서, 처리량이 높지않다.
- 손실되지 않아야 하는, 메세지 처리에 적합. (메세지 처리에 필요한 다양한 기능도 지원)
- Kafka
- Producer(exchanger) -> (Broker) -> Consumer(exchanger)
- 고성능을 위해서, Producer 및 Consumer 에게 Exchanger 역활을 넘겼다.
- 메타를 Zookeeper으로 관리해서, 효율이 좋다.
- 약간 손실이 되더라도, 빠르고 대량의 메세지 처리에 적합.
- Producer(exchanger) -> (Broker) -> Consumer(exchanger)
- 'LinkedIn'에서 내부시스템 개선과정에서 개발함
- 실시간 처리율을 높여야하는 needs
- 무중단 비중앙 시스템성 needs
- 타사 제품등등의 이기종 호환성과 유연성 needs
- 손쉬운 스케일아웃 needs
- 활용예시
- 예) '마이크로 서비스 아키텍쳐' 에서 각각의 '프로세스 로그'를 통합한다고 했을때, kafka를 활용하면 딲!
- 예) 카카오에서는 웹문서 검색 데이터 수집에서... 유실없이 처리한다고 한다.
- '분산 Application' 으로써 Cluster 구성가능. (꼭 홀수운영 하지않고, 자유롭게 할수있음)
- 주로 Kafka Broker와 Zookeeper가 직접통신 하면서, 메타정보를 저장하고 상태관리를 하게된다.
- 2) 쥬키퍼 란?
- 분산 코디네이터 서비스.
- 본래, 하둡 산하 프로젝트들이 '분산 Application'으로 개발되면서~ 이 동물들을 코디네이션 하는 프로젝트.
- '분산 Application' 이 안정적인 서비스를 하도록, 각 정보를 중앙에서 관리 및 동기화 함.
- 쥬키퍼도 여러대의 서버으로 Cluster(앙상블)로 구성됨. ('분산 Application' 이 <-> 쥬키퍼Cluster 와 통신)
- 쥬키퍼Cluster는 '홀수 과반수 방식'을 따름. (예: 3대 라면~ 1대die는 OK , 2대die는 ERR)
- 쥬키퍼는 '계층형 ZNode 디렉토리 구조'에 'Key-Value'를 저장 함.
- 3) 쥬키퍼 기본
- 설치
- tar zxf zookeeper-3.4.12.tar.gz
- ln -s zookeeper-3.4.12.tar.gz zookeeper
- mkdir /data : 쥬키퍼가 내부적으로 사용할 별도의 dir생성 (ZNode의 스냅샷 및 로그 저장)
- echo "1" > /data/myid : 쥬키퍼Cluster 내의 유니크ID 값생성
- zoo.cfg 설정
- ...
- 실행
- zkServer.sh start , zkServer.sh status , zkServer.sh stop
- zkCli.sh
- create /test-node abc : 디폴드 persistent 옵션.
- create -e /test-node abc : ephemeral옵션. 커넥션이 끊어지면 자동노드삭제. (헬스체크에 유용)
- get /test-node
- 설치
- 3) 쥬키퍼 구조
- /{루트ZNode}
- /controller : 카프카 클러스터를 관리하는 컨트롤러 관련값 (토픽 파티션 벨런스 등등)
- /brokers : 브로커 관련값
- /topics : 토픽 관련값
- /consumers : 컨슈머 관련값 (구버젼은 각 컨슈머 오프셋을 쥬키퍼에도 가능했으나... 이젠 토픽에만 권고)
- /config
- /topics : 토픽상세정도
- /{루트ZNode}
- 4) 카프카 기본
- 설치
- tar zxf kafka_2.11-1.1.1.tar.gz
- ln -s kafka_2.11-1.1.1 kafka
- server.properties 설정
- broker.id=1 : 카프카Cluster 내의 유니크ID 값지정
- log.dirs=/kafka1 : 카프카가 내부적으로 사용할 디렉토리
- default.replication.factor=1 : 토픽생성시 복제수 옵션 디폴트값 (각 topic create시 재지정 가능)
- offsets.topic.num.partitions=? : 오프셋 토픽의 파티션 수
- offsets.topic.replication.factor=? : 오프셋 토픽의 리플리케이션 팩터
- zookeeper.connect={모든 쥬키퍼 서버:2181}/{루트 할 ZNode이름} : 한 쥬키퍼만 쓸 경우, 걔가 죽으면... 바로장애! 독박! 그리고... 쥬키퍼는 공용임으로~ 적절한 루트ZNode를 정하면 좋음
- zookeeper.session.timeout=3 : 자바App들은 GC 때문에 종종 멈추니까... 너무짧게하면 낭패
- unclean.leader.election.enable=true : 전면 장애시, 최후에 리더가 아니라도~ 손실감안하고 새로운 리더를 뽑을 수 있도록 하는 옵션 (장애시간이 무한정 길어지는것 방지)
- min.insync.replicas=3 : 동기화 됨을 판단하는 최소한의 복제수
- 실행
- kafka-server-start.sh server.properties -daemon , kafka-server-stop.sh
- cat ~/kafka/logs/server.log : 카프카 상태확인
- kafka-topics.sh --create --zookeeper {쥬키퍼서버들:2181}/test-node --replication-factor 1 --partitions 1 --topic test-topic
- kafka-console-producer.sh --broker-list {카프카서버들:9092} --topic test-topic
- kafka-console-consumer.sh --bootstrap-server {카프카서버들:9092} --topic test-topic --from-beginning
- 설치
- 5) 카프카 기술
- 내부적으로 테크니컬한 기술이 구현되어있음.
- 비중앙 분산 시스템 : 단일 시스템에 비해서, 성능확장과 안정성 및 장애대응에 아주 유리하게 함.
- 페이지 캐시 : {OS Disk} -> {PageCache Memory} -> {Kafka} , (커널 readBuf -> socketBuf -> NICBuf)
- 배치 전송 : 자잘한 I/O를 한대 묶어서, 한방으로 처리. (압축프로토콜도 지원.)
- 6) 카프카 구조
- (메세지) -> [클러스터] > [브로커] > [토픽] > [파티션] > [오프셋]
- : 특정 메세지는 최종적으로 특정 파이션에서 오프셋 단위로 처리됨.
- 토픽 : 메세지를 구분하는 논리적인 단위.
- 파티션 :
- 토픽을 분할한 단위. (한 토픽을 멀티프로세스 하기위한 분할)
- (너무 쪼게면... 쥬키퍼 관리비용 높아지는 등의 부작용이 있겟죠?)
- 파티션 갯수만큼의 프로세스가 있으면 좋음. (각자 자기 파티션만 잘 처리하면 되니까)
- 각각의 Producer 및 Consumer 를 특정 파티션으로 지정 가능.
- (카프카에서는... 파티션 삭제! 줄이는 기능은 없다고 함... 참고...)
- 오프셋 :
- 각 파티션마다 메세지가 저장되는 위치.
- 오프셋은 파티션 내에서 유일하고 순차적으로 증가하는 Long Number.
- 오프셋 순서데로 메시지를 처리함.
- 즉, 적어도 한 파이션 내부에서는 메세지의 순서가 보장됨!
- 7) 카프카 HA 및 Replication
- 분산서버의 장애시, 고가용성을 위해서~ 리플리케이션을 지원.
- (토픽을 이루는 각각의 파티션들을 '클러스터' 내부에 나누어서, '리더 브로커' 및 '팔로워 브로커' 으로 복제)
- (replication-factor 값수만큼 복제를 하여, '리더 브로커' 에 장애가 나더라도, '팔로워 브로커'로 즉각 대응)
- 모든 R/W는 리더에게만 하고, 팔로워는 똑같이 복제만 함.
- 물론, 리플리케이션을 사용하면... 용량이나 시스템 리소스 등의 감소는 감안 해야죠?
- ISR(In Sync Replica) :
- 리플리케니션 되는 그룹. (ISR에 속해있는 구성원만이 리더가 될수있는 규칙)
- ISP에 속한 팔로워 중 동기화에 문제가 감지되면, ISR에서 추방됨.
- 항상 정확히 동기화된, 팔로워를 유지하기위한 정책.
- 전면장애, 리더선출 이슈
- 모든 브로커가 전면장애 일때, 두가지 길의 딜레마가 있다.
- 1. 최후에 리더가 다시 살아날때까지... 무한정 기다려, 손실없이 복구한다.
- 2. 장애과정중 ISR에서 추방되었더라도, 손실을 감안하고 복구부터 한다.
- (마치 마지막 왕좌의 계승을 누가해야하는지? 한편의 영화에 한장면 같다...)
- unclean.leader.election.enable 옵션으로 결정하면 되시옵니다.
- 모든 브로커가 전면장애 일때, 두가지 길의 딜레마가 있다.
- 8) 카프카 Producer
- Producer는 특정 파티션에 메세지를 보내도록 할수있고, 디폴트 Round-Robbin으로 파티션에 분배 할수있음.
- acks 옵션 : 메세지 완료를 위한 ack의 수. (적을수록 성능좋고 손실안좋고) (--request-required-acks)
- acks=0 : 어떠한 '완료ack' 없이 바로 종료. (재전송도 당연히 안함) (속도는 겁나빠름)
- acks=1 : '리더 브로커' 에 기록했다는 '완료ack' 받으면, 종료.
- acks=all : '리더 및 팔로워 브로커' min.insync.replicas 에 동기화 됬다는 '완료ack' 받으면, 종료.
- 단순히 무손실 한다고... replication=3, min.insync.replicas=3, acks=all 하면... 졲망함;
- 1대만 장애되도... min.insync.replicas를 절때 만족 못하게 되니... 어처구니 없는 전면장애;;;
- compression.type 옵션 : 메세지 압축.
- 속도 : none > snappy > lz4 > gzip
- 용량 : gzip > lz4 > snappy > none
- batch.size 옵션 : 하나의 파티션에 묶어서 보낼 Size 지정.
- linger.ms 옵션 : 배치Size 미만시, 대기시간.
- max.request.size 옵션 : 메시지 최대 Size. (디폴트 1MB)
- 9) 카프카 Consumer
- Consumer는 특정 파티션을 관리하고, 오프셋을 조절하면서 메세지를 받도록 할수있음.
- (구버젼은 쥬키퍼에 각 오프셋을 저장할수있엇고, 신버젼은 토픽에만 각 오프셋을 저장함)
- ('파티션 리더'에게 fetch요청을 하며, pull방식으로 Consumer 주도적으로 불필요한 대기없이 땡겨옴)
- group.id 옵션 : 컨슈머가 속한 그룹 식별값. (--group)
- auto.offset.reset 옵션 : ConsumerGroup 차원에서 오프셋값에 문제가 있을경우... 어떻게 할것이냐?
- latest : 가장 최신 오프셋값으로 리셋 (디폴트)
- earliest : 가장 초기 오프셋값으로 리셋 (신규추가 파티션일 경우, earliest 아니면~ 메세지 유실이라 착각함)
- none : 그냥 에러를 던짐
- (새로운 group.id 할경우, latest면~ 최신offset 봐서 안되는거 같고... earliest면? 처음부터 보는거 같아!)
- enable.auto.commit : Consumer가 동적으로 변하는 상황에서는, AutoCommit하면~ 메세지 중복이슈 있음.
- auto.commit.interval.ms : Consumer가 주기적으로 오프셋값을 커밋함. (주기와 어긋나면... 중복이슈)
- session.timeout.ms 옵션 : 아마도 ConsumerGroup에서 Consumer체크 제한시간.
- heartbeat.interval.ms 옵션 : 아마도 ConsumerGroup에서 Consumer체크 주기.
- (spring.kafka.listener.concurrency=8 : 동시성. '스프링 컨커런트 리스너 컨테이너' 의 쓰레드 갯수...)
- --partition :
- 특정 파티션만~ (한 컨슈머가 여러게의 파티션을 처리하면... 순서는 전혀 보장안되겠죠?)
- 파티션이 하나이면, 절대적으로 순서는 보장됨. (Hyperledger에서 이렇게 Orderer 활용)
- 하나의 파티션은 하나의 Consumer만 담당할수있다.
- 하나의 Consumer가 여러게의 파티션을 담당할수는있다.
- 파티션 오프셋은 각각에 ConsumerGroup이 독립적으로 관리한다.
- 컨슈머 그룹 :
- 특정 토픽의 파티션들에 관한 소유권을 공유하는 그룹.
- 신규 Consumer가 오면, 각 파티션 담당자를 자동으로 rebalance 한다.
- (물론, rebalance 과정에서 일시적 중단이 발생함)
- 특정 Consumer의 Heartbeat에 문제가 있으면, Session-Timeout 되고~ 다시 rebalance 됨.
- (물론, 다운된 Consumer가 담당하던 파티션은... 다른Consumer가 짊어짐) (알아서 잘 이중화 효과???)
- 서로다른 컨슈머 그룹은 각자 오프셋이 독립적으로 관리됨으로, 똑같은 토픽을 각자 처리할수있음!!!
- 커밋
- 컨슈머 그룹의 Consumer들이 각 자신의 파이션에 오프셋을 기록하는것.
- rebalance 가 일어나면, 새로운 파티션의 '최근 오프셋' 부터 다시 처리.
- 따라서, 커밋 방식에 따라... 메시지 중복 or 누락 이 좌지우지 됨.
- 자동 커밋 : poll() 하면서 주기적으로 커밋시간을 체크하고, 커밋.
- 수동 커밋 : 사용하는 application-logic 에서, 직접 커밋.
- (카프카는 At Least Once 정책)
-끝-
'MQ' 카테고리의 다른 글
SQS (0) | 2020.05.16 |
---|---|
RabbitMQ (AMQP) (0) | 2020.04.30 |
Elastic Stack 란? (1) | 2020.04.12 |
flafka (flume + kafka) (0) | 2019.06.08 |
빅데이터와 하둡 (0) | 2019.05.18 |