• 0) 메세징 시스템 이란?
    • 비동기 데이터전송 방식.
    • 예) Pub/Sub (브로드캐스트) 모델 : Publisher -> [Queue] -> Subscriber
    • 전송에 있어서, 발신자와 수신자간 서로를 알 필요가 없다. (decoupled 하다)
    • 즉, 각 프로세스들의 디펜던시를 최소화 할 수 있다.
    • (이 때문에, 구조적으로써~ 네트워크의 확장성이 용이해 진다.)
    • 빠른 대용량 처리에 적합.
      • At Message Once : 메세지 손실이 될수있디만, 재전송없이 최대한 빠른처리 정책.
      • At Least Once : 메세지 손실은 없지만, 중복은 될수도 있는 정책.
      • Exactly Once : 이상적이지만... 굉장히 어렵고, 보장해준다면서~ 타프로세스 예외를 두는 경우가 많다.
  • 1) 카프카 란?
    • 고성능 고가용 메세징 시스템. (분산 리플리케이션 로그 서비스)
    • (대용량 고성능 실시간 처리를 위해서) '배치전송', '페이지 캐시', '파티션', '분산', '리플리케이션, '자동리더선출' 등의 기술이 구현됨.
    • 높은 처리량, 간단한 스케일아웃, 안전한 데이터 영구 디스크보관.
    • 엄밀한 의미에서 '표준MQ'는 아님... (AMQP를 사용하지 않고, 속도를 위해서 TCP기반동작)
    • RabbitMQ : 
      • 표준화된 Advanced Message Queueing Protocal 0.9.1 따름. 
      • (Exchanger) -> [Queue]
        • 내부적으로 Exchanger가 있어서, Queue으로 메세지를 분배하는 복잡한 작업을 한다.
        • (direct exchange, fanout exchange, topic exchange, headers exchange)
      • 메타를 자체적으로 관리해서, 처리량이 높지않다.
      • 손실되지 않아야 하는, 메세지 처리에 적합. (메세지 처리에 필요한 다양한 기능도 지원)
    • Kafka
      • Producer(exchanger) -> (Broker) -> Consumer(exchanger)
        • 고성능을 위해서, Producer 및 Consumer 에게 Exchanger 역활을 넘겼다.
      • 메타를 Zookeeper으로 관리해서, 효율이 좋다.
      • 약간 손실이 되더라도, 빠르고 대량의 메세지 처리에 적합.
    • 'LinkedIn'에서 내부시스템 개선과정에서 개발함
      • 실시간 처리율을 높여야하는 needs
      • 무중단 비중앙 시스템성 needs
      • 타사 제품등등의 이기종 호환성과 유연성 needs
      • 손쉬운 스케일아웃 needs
    • 활용예시
      • 예) '마이크로 서비스 아키텍쳐' 에서 각각의 '프로세스 로그'를 통합한다고 했을때, kafka를 활용하면 딲!
      • 예) 카카오에서는 웹문서 검색 데이터 수집에서...  유실없이 처리한다고 한다.
    • '분산 Application' 으로써 Cluster 구성가능. (꼭 홀수운영 하지않고, 자유롭게 할수있음)
    • 주로 Kafka Broker와 Zookeeper가 직접통신 하면서, 메타정보를 저장하고 상태관리를 하게된다.
  • 2) 쥬키퍼 란?
    • 분산 코디네이터 서비스.
    • 본래, 하둡 산하 프로젝트들이 '분산 Application'으로 개발되면서~ 이 동물들을 코디네이션 하는 프로젝트.
    • '분산 Application' 이 안정적인 서비스를 하도록, 각 정보를 중앙에서 관리 및 동기화 함.
    • 쥬키퍼도 여러대의 서버으로 Cluster(앙상블)로 구성됨. ('분산 Application' 이 <-> 쥬키퍼Cluster 와 통신)
    • 쥬키퍼Cluster는 '홀수 과반수 방식'을 따름. (예: 3대 라면~ 1대die는 OK , 2대die는 ERR)
    • 쥬키퍼는 '계층형 ZNode 디렉토리 구조'에 'Key-Value'를 저장 함.
  • 3) 쥬키퍼 기본
    • 설치
      • tar zxf zookeeper-3.4.12.tar.gz
      • ln -s zookeeper-3.4.12.tar.gz zookeeper
      • mkdir /data : 쥬키퍼가 내부적으로 사용할 별도의 dir생성 (ZNode의 스냅샷 및 로그 저장)
      • echo "1" > /data/myid : 쥬키퍼Cluster 내의 유니크ID 값생성
      • zoo.cfg 설정
        • ...
    • 실행
      • zkServer.sh start , zkServer.sh status , zkServer.sh stop
      • zkCli.sh
        • create /test-node abc : 디폴드 persistent 옵션.
        • create -e /test-node abc : ephemeral옵션. 커넥션이 끊어지면 자동노드삭제. (헬스체크에 유용)
        • get /test-node
  • 3) 쥬키퍼 구조
    • /{루트ZNode}
      • /controller : 카프카 클러스터를 관리하는 컨트롤러 관련값 (토픽 파티션 벨런스 등등)
      • /brokers : 브로커 관련값
        • /topics : 토픽 관련값
      • /consumers : 컨슈머 관련값 (구버젼은 각 컨슈머 오프셋을 쥬키퍼에도 가능했으나... 이젠 토픽에만 권고)
      • /config
        • /topics : 토픽상세정도
  • 4) 카프카 기본
    • 설치
      • tar zxf kafka_2.11-1.1.1.tar.gz
      • ln -s kafka_2.11-1.1.1 kafka
      • server.properties 설정
        • broker.id=1 : 카프카Cluster 내의 유니크ID 값지정
        • log.dirs=/kafka1 : 카프카가 내부적으로 사용할 디렉토리
        • default.replication.factor=1 : 토픽생성시 복제수 옵션 디폴트값 (각 topic create시 재지정 가능)
        • offsets.topic.num.partitions=? : 오프셋 토픽의 파티션 수
        • offsets.topic.replication.factor=? : 오프셋 토픽의 리플리케이션 팩터
        • zookeeper.connect={모든 쥬키퍼 서버:2181}/{루트 할 ZNode이름} : 한 쥬키퍼만 쓸 경우, 걔가 죽으면... 바로장애! 독박! 그리고... 쥬키퍼는 공용임으로~ 적절한 루트ZNode를 정하면 좋음
        • zookeeper.session.timeout=3 : 자바App들은 GC 때문에 종종 멈추니까... 너무짧게하면 낭패
        • unclean.leader.election.enable=true : 전면 장애시, 최후에 리더가 아니라도~ 손실감안하고 새로운 리더를 뽑을 수 있도록 하는 옵션 (장애시간이 무한정 길어지는것 방지)
        • min.insync.replicas=3 : 동기화 됨을 판단하는 최소한의 복제수
    • 실행
      • kafka-server-start.sh server.properties -daemon , kafka-server-stop.sh
      • cat ~/kafka/logs/server.log : 카프카 상태확인
      • kafka-topics.sh --create --zookeeper {쥬키퍼서버들:2181}/test-node --replication-factor 1 --partitions 1 --topic test-topic
      • kafka-console-producer.sh --broker-list {카프카서버들:9092} --topic test-topic
      • kafka-console-consumer.sh --bootstrap-server {카프카서버들:9092} --topic test-topic --from-beginning
  • 5) 카프카 기술
    • 내부적으로 테크니컬한 기술이 구현되어있음.
    • 비중앙 분산 시스템 : 단일 시스템에 비해서, 성능확장과 안정성 및 장애대응에 아주 유리하게 함.
    • 페이지 캐시 : {OS Disk} -> {PageCache Memory} -> {Kafka} , (커널 readBuf -> socketBuf -> NICBuf)
    • 배치 전송 : 자잘한 I/O를 한대 묶어서, 한방으로 처리. (압축프로토콜도 지원.)
  • 6) 카프카 구조
    • (메세지) -> [클러스터] > [브로커] > [토픽] > [파티션] > [오프셋]
    •  : 특정 메세지는 최종적으로 특정 파이션에서 오프셋 단위로 처리됨.
    • 토픽 : 메세지를 구분하는 논리적인 단위.
    • 파티션 :
      • 토픽을 분할한 단위. (한 토픽을 멀티프로세스 하기위한 분할)
      • (너무 쪼게면... 쥬키퍼 관리비용 높아지는 등의 부작용이 있겟죠?)
      • 파티션 갯수만큼의 프로세스가 있으면 좋음. (각자 자기 파티션만 잘 처리하면 되니까)
      • 각각의 Producer 및 Consumer 를 특정 파티션으로 지정 가능.
      • (카프카에서는... 파티션 삭제! 줄이는 기능은 없다고 함... 참고...)
    • 오프셋
      • 각 파티션마다 메세지가 저장되는 위치.
      • 오프셋은 파티션 내에서 유일하고 순차적으로 증가하는 Long Number.
      • 오프셋 순서데로 메시지를 처리함.
      • 즉, 적어도 한 파이션 내부에서는 메세지의 순서가 보장됨!
  • 7) 카프카 HA 및 Replication
    • 분산서버의 장애시, 고가용성을 위해서~ 리플리케이션을 지원.
    • (토픽을 이루는 각각의 파티션들을 '클러스터' 내부에 나누어서, '리더 브로커' 및 '팔로워 브로커' 으로 복제)
    • (replication-factor 값수만큼 복제를 하여, '리더 브로커' 에 장애가 나더라도, '팔로워 브로커'로 즉각 대응)
    • 모든 R/W는 리더에게만 하고, 팔로워는 똑같이 복제만 함.
    • 물론, 리플리케이션을 사용하면... 용량이나 시스템 리소스 등의 감소는 감안 해야죠?
    • ISR(In Sync Replica)
      • 리플리케니션 되는 그룹. (ISR에 속해있는 구성원만이 리더가 될수있는 규칙)
      • ISP에 속한 팔로워 중 동기화에 문제가 감지되면, ISR에서 추방됨.
      • 항상 정확히 동기화된, 팔로워를 유지하기위한 정책.
    • 전면장애, 리더선출 이슈
      • 모든 브로커가 전면장애 일때, 두가지 길의 딜레마가 있다.
        • 1. 최후에 리더가 다시 살아날때까지... 무한정 기다려, 손실없이 복구한다.
        • 2. 장애과정중 ISR에서 추방되었더라도, 손실을 감안하고 복구부터 한다.
      • (마치 마지막 왕좌의 계승을 누가해야하는지? 한편의 영화에 한장면 같다...)
      • unclean.leader.election.enable 옵션으로 결정하면 되시옵니다.
  • 8) 카프카 Producer
    • Producer는 특정 파티션에 메세지를 보내도록 할수있고, 디폴트 Round-Robbin으로 파티션에 분배 할수있음.
    • acks 옵션 : 메세지 완료를 위한 ack의 수. (적을수록 성능좋고 손실안좋고) (--request-required-acks)
      • acks=0 : 어떠한 '완료ack' 없이 바로 종료. (재전송도 당연히 안함) (속도는 겁나빠름)
      • acks=1 : '리더 브로커' 에 기록했다는 '완료ack' 받으면, 종료.
      • acks=all : '리더 및 팔로워 브로커' min.insync.replicas 에 동기화 됬다는 '완료ack' 받으면, 종료.
        • 단순히 무손실 한다고... replication=3, min.insync.replicas=3, acks=all 하면... 졲망함;
        • 1대만 장애되도... min.insync.replicas를 절때 만족 못하게 되니... 어처구니 없는 전면장애;;;
    • compression.type 옵션 : 메세지 압축.
      • 속도 : none > snappy > lz4 > gzip
      • 용량 : gzip > lz4 > snappy > none
    • batch.size 옵션 : 하나의 파티션에 묶어서 보낼 Size 지정.
    • linger.ms 옵션 : 배치Size 미만시, 대기시간.
    • max.request.size 옵션 : 메시지 최대 Size. (디폴트 1MB)
  • 9) 카프카 Consumer
    • Consumer는 특정 파티션을 관리하고, 오프셋을 조절하면서 메세지를 받도록 할수있음. 
    • (구버젼은 쥬키퍼에 각 오프셋을 저장할수있엇고, 신버젼은 토픽에만 각 오프셋을 저장함)
    • ('파티션 리더'에게 fetch요청을 하며, pull방식으로 Consumer 주도적으로 불필요한 대기없이 땡겨옴)
    • group.id 옵션 : 컨슈머가 속한 그룹 식별값. (--group)
    • auto.offset.reset 옵션 : ConsumerGroup 차원에서 오프셋값에 문제가 있을경우... 어떻게 할것이냐?
      • latest : 가장 최신 오프셋값으로 리셋 (디폴트)
      • earliest : 가장 초기 오프셋값으로 리셋 (신규추가 파티션일 경우, earliest 아니면~ 메세지 유실이라 착각함)
      • none : 그냥 에러를 던짐
      • (새로운 group.id 할경우, latest면~ 최신offset 봐서 안되는거 같고... earliest면? 처음부터 보는거 같아!)
    • enable.auto.commit : Consumer가 동적으로 변하는 상황에서는, AutoCommit하면~ 메세지 중복이슈 있음.
    • auto.commit.interval.ms : Consumer가 주기적으로 오프셋값을 커밋함. (주기와 어긋나면... 중복이슈)
    • session.timeout.ms 옵션 : 아마도 ConsumerGroup에서 Consumer체크 제한시간.
    • heartbeat.interval.ms 옵션 : 아마도 ConsumerGroup에서 Consumer체크 주기.
    • (spring.kafka.listener.concurrency=8 : 동시성. '스프링 컨커런트 리스너 컨테이너' 의 쓰레드 갯수...)
    • --partition :
      • 특정 파티션만~ (한 컨슈머가 여러게의 파티션을 처리하면... 순서는 전혀 보장안되겠죠?)
      • 파티션이 하나이면, 절대적으로 순서는 보장됨. (Hyperledger에서 이렇게 Orderer 활용)
      • 하나의 파티션은 하나의 Consumer만 담당할수있다.
      • 하나의 Consumer가 여러게의 파티션을 담당할수는있다.
      • 파티션 오프셋은 각각에 ConsumerGroup이 독립적으로 관리한다.
    • 컨슈머 그룹 :
      • 특정 토픽파티션들에 관한 소유권을 공유하는 그룹.
      • 신규 Consumer가 오면, 각 파티션 담당자를 자동으로 rebalance 한다.
      • (물론, rebalance 과정에서 일시적 중단이 발생함)
      • 특정 Consumer의 Heartbeat에 문제가 있으면, Session-Timeout 되고~ 다시 rebalance 됨.
      • (물론, 다운된 Consumer가 담당하던 파티션은... 다른Consumer가 짊어짐) (알아서 잘 이중화 효과???)
      • 서로다른 컨슈머 그룹은 각자 오프셋이 독립적으로 관리됨으로, 똑같은 토픽을 각자 처리할수있음!!!
    • 커밋
      • 컨슈머 그룹의 Consumer들이 각 자신의 파이션에 오프셋을 기록하는것.
      • rebalance 가 일어나면, 새로운 파티션의 '최근 오프셋' 부터 다시 처리.
      • 따라서, 커밋 방식에 따라... 메시지 중복 or 누락 이 좌지우지 됨.
        • 자동 커밋 : poll() 하면서 주기적으로 커밋시간을 체크하고, 커밋.
        • 수동 커밋 : 사용하는 application-logic 에서, 직접 커밋.
        • (카프카는 At Least Once 정책)

-끝-

'MQ' 카테고리의 다른 글

SQS  (0) 2020.05.16
RabbitMQ (AMQP)  (0) 2020.04.30
Elastic Stack 란?  (1) 2020.04.12
flafka (flume + kafka)  (0) 2019.06.08
빅데이터와 하둡  (0) 2019.05.18

+ Recent posts