샤인의 IT (막 적는) 메모장

[DOIK 스터디] Kafka와 Kubernetes Strimzi Operator 본문

Kubernetes 심화 주제

[DOIK 스터디] Kafka와 Kubernetes Strimzi Operator

신샤인 2022. 6. 9. 23:10
반응형

Kafka?

  • Kafka 란? : 실시간으로 기록 스트림을 게시, 구독, 저장 및 처리할 수 있는 분산 데이터 스트리밍 플랫폼
  • Kafka 주요 특징
    • 처리량 : 묶음 단위 배치 처리로 실시간 로그 데이터 처리
    • 확장성 : 브로커 Scale-in/Out
    • 영속성 : 데이터를 메모리에 저장하지 않고 파일시스템에 저장함, 파일 I/O 성능 향상을 위해 메모리에 페이지 캐시 사용
    • 고가용성 : 서버/브로커 장애 시 데이터 복제를 통한 지속적인 데이터 처리 가능
  • Kafka 아키텍처 정리
    • Zookeeper : 분산 처리된 메시지 큐 정보를 관리
    • Broker : Kafka 서버
    • Topic : 메시지가 생산되고 소비되는 주제 (각 메시지 별로 분류)
    • Partition : Topic 내 분산되어 저장되는 데이터 단위
    • Offset : Partition 내 메시지를 식별할 수 있는 값
    • Producer : 메시지를 정해진 Topic으로 기록하는 역할
    • Consumer Group : 정해진 Topic으로 메시지 요청해서 처리하는 역할
  • Kafka 아키텍처 구성도

Kafka Architecture

  • Producer 세부
    • ProducerRecord : 프로듀서에서 생성하는 레코드
    • Send() : 레코드 전송 요청 메소드
    • Partitioner : 어느 파티션으로 전송할 지 지정함
      • UniformStrickyPartitioner : Accumulator에서 레코드들이 배치로 묶일 때 까지 기다렸다가 전송, 파티션을 순회하면서 전송하기 때문에 분배 전송
      • RoundRobinPartitioner : 레코드가 들어오는데로 파티션 순회하면서 전송, 버퍼에 묶이는 정도가 적어 전송 성능 낮음
      • CustomPartitioner : 사용자 지정 파티셔너
    • Accumulator : 배치로 묶어 전송할 데이터를 모으는 버퍼
    • Sender : Accumulator의 메시지를 Fetch 하여 Kafka 브로커로 전송
    • Producer 구성도

Producer 구성

 

  • Consumer 세부
    • Fetcher : 리더 파티션으로부터 레코드들을 가져와서 대기
    • Poll : Fetcher에 있는 레코드를 리턴
    • ConsumerRecord : 처리하고자 하는 레코드 모임

Consumer 구성

 

  • log Segment
    • 카프카에서는 데이터 삭제가 세그먼트 단위로 일어나기 떄문에, 레코드 단위로 삭제는 불가능
    • 이미 적재된 데이터에 대해 수정 불가능
  • Replication
    • 브로커 중 일부 장애 발생 시, 데이터 유실 없이 안전하게 사용하기 위함
    • 카프카 데이터 복제는 파티션 단위로 이루어짐
    • 복제 개수는 최소 1, 최대 브로커 수 사용 가능
    • 복제 개수만큼 저장 용량이 증가하기 때문에 고려 필요
    • 리더 파티션이 있는 브로커가 다운될 경우 다른 브로커가 리더 파티션으로 변경
    • ISR(In-Sync-Replicas) : 리더 파티션과 팔로워 파티션이 모두 싱크된 상태
    • ISR 설정이 중요한 이유는 In-Sync 상태가 다를 경우 메시지를 받지 못하기 때문

Broker 내 Topic Replication 구성

  • Topic과 Partition
    • Topic은 데이터를 분류하기 위해 사용하는 단위이며, Partition은 프로듀서가 보낸 데이터가 저장되어 있음
    • Partition에 저장되어 있는 데이터를 레코드라고 함
    • 토픽 생성 시 기본적으로 브로커에 한대 씩 라운드-로빈 방식으로 생성됨
    • 클라이언트로 접속 시 리더 파티션과 통신하며 여러 브로커에 골고루 통신함
    • 파티션을 늘리는건 자유지만, 파티션을 줄일 경우에는 토픽 삭제하고 재생성 해야됨
  • Record
    • Timestamp, Offset, Header, Key, Value로 구성되어 있음
    • 프로듀서가 생성한 레코드가 브로커로 전송되면 오프셋과 타임스탬프를 지정하여 저장
    • 브로커에 적재된 레코드는 삭제가 불가능하고 리텐션 기간이 지나거나 용량 문제가 생길 경우만 삭제 가능

 

Strimzi?

  • Strimzi 란? : 쿠버네티스 환경에서 Kafka 운영 관리할 수 있는 Operator
  • Strimzi 특징
    • Kafka 클러스터 구성요소 배포 관리
    • Kafka 커넥트 설정
    • Kafka 업그레이드 설정
    • 브로커 관리
    • 토픽, 유저 생성 관리
  • Strimzi가 제공하는 Operator
    • Cluster Operator : Kafka & Zookeeper 클러스터를 배포하고 관리하는 Operator
    • Topic Operator : 클러스터 내 Topic을 생성하고 관리하는 Operator
    • User Operator : 클러스터 내 User를 생성하고 관리하는 Operator
  • Operator 테스트 구성도
    • AWS 환경에서 실습
    • Kubernetes Cluster가 구성되어 있어야 함.

Strimzi Operator 테스트를 위한 구성도

 

  • Strimzi 설치
    • 기본적으로 쿠버네티스 클러스터가 구성되어 있어야 함.
    • K8s v1.23.6 버전 테스트
    • Strimzi Operator 설치
#Kafka 네임스페이스 생성
kubectl create namespace kafka


#Helm 레포지토리 추가
helm repo add strimzi https://strimzi.io/charts/


#Master 노드에 Operator 설치
#노드셀렉터로 마스터 노드 지정


printf 'tolerations: [{key: node-role.kubernetes.io/master, operator: Exists, effect: NoSchedule}]\n' | \
helm install kafka-operator strimzi/strimzi-kafka-operator --version 0.29.0 --namespace kafka \
  --set nodeSelector."kubernetes\.io/hostname"=k8s-m --values /dev/stdin


#CRD 목록 확인
kubectl get crd -n kafka
kubectl describe crd -n kafka

 

 

작업 확인

 

Kafka Namespace 생성 후 Helm Repo 추가
Strimzi Operator 배포
배포 후 CRD 목록 확인

 

  • Kafka 클러스터 배포
    • 오브젝트를 통해 배포
    • 배포 시 Zookeeper → Broker → Entity Operator 순서대로 배포됨
    • kafka.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    #version: 3.1.1
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: false
      - name: external
        port: 9094
        type: nodeport
        tls: false
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 10Gi
        deleteClaim: true
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      #inter.broker.protocol.version: "3.1.1"
    template:
      pod:
        affinity:
          podAntiAffinity:
            requiredDuringSchedulingIgnoredDuringExecution:
              - labelSelector:
                  matchExpressions:
                    - key: app.kubernetes.io/name
                      operator: In
                      values:
                        - kafka
                topologyKey: "kubernetes.io/hostname"
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 10Gi
      deleteClaim: true
    template:
      pod:
        affinity:
          podAntiAffinity:
            requiredDuringSchedulingIgnoredDuringExecution:
              - labelSelector:
                  matchExpressions:
                    - key: app.kubernetes.io/name
                      operator: In
                      values:
                        - zookeeper
                topologyKey: "kubernetes.io/hostname"
  entityOperator:
    topicOperator: {}
    userOperator: {}
#오브젝트 적용
kubectl apply -f kafka-1.yaml -n kafka


#Pod/PVC 확인
kubectl get po,pvc -n kafka


#kafka Cluster 확인
kubectl get kafka -n kafka


#Kafka 리스너 정보 확인
kubectl get kafka -n kafka my-cluster -o jsonpath={.status} | jq -r ".listeners"

 

작업 확인

오브젝트 적용
Zookeeper 배포 후 Broker 배포 확인
Broker 배포 후 Entity Operator 배포 확인
배포된 Kafka 클러스터 리소스 확인

 

 

 

  • Kafka 접속 클라이언트 파드 배포
    • Client Pod를 배포해서 Kafka 클러스터 설정 정보 확인
    • Client.yaml
apiVersion: v1
kind: Pod
metadata:
  name: ${PODNAME}
  labels:
    app: myclient
spec:
  nodeName: k8s-m
  containers:
  - name: ${PODNAME}
    image: bitnami/kafka:3.2
    command: ["tail"]
    args: ["-f", "/dev/null"]
# Client Pod 3대 배포
cat ~/DOIK/3/myclient.yaml
for ((i=1; i<=3; i++)); do PODNAME=myclient$i envsubst < ~/DOIK/3/myclient.yaml | kubectl apply -f - ; done


# Client Pod 배포 확인
kubectl get po -l app=myclient


# Kafka client 에서 제공되는 kafka 관련 도구들 확인
kubectl exec -it myclient1 -- ls /opt/bitnami/kafka/bin


# Kafka 접속을 위한 환경변수 지정
SVCDNS=my-cluster-kafka-bootstrap.kafka.svc:9092
echo $SVCDNS


#브로커 정보 확인
kubectl exec -it myclient1 -- kafka-broker-api-versions.sh --bootstrap-server $SVCDNS


#브로커에 설정된 기본값 확인
kubectl exec -it myclient1 -- kafka-configs.sh --bootstrap-server $SVCDNS --broker 0 --all --describe


#토픽 확인
kubectl exec -it myclient1 -- kafka-topics.sh --bootstrap-server $SVCDNS --list


#CRD로 토픽 리스트 확인
kubectl get kafkatopics -n kafka

 

배포 확인

Client Pod 배포
Client에서 제공하는 Kakfa 명령어 확인
Client를 통한 Broker 정보 확인
Client를 통해 Broker 기본값과 토픽 확인 후 CRD 토픽 목록 확인

 

 

 

  • 토픽 생성
    • topic.yaml
kind: KafkaTopic
metadata:
  name: ${TOPICNAME}
  labels:
    strimzi.io/cluster: "my-cluster"
#Partition 1 Replicas가 3개인 토픽을 생성하며 ISR은 2로 지정
spec:
  partitions: 1
  replicas: 3
  config:
    retention.ms: 7200000
    segment.bytes: 1073741824
    min.insync.replicas: 2
#토픽 생성 오브젝트 적용
TOPICNAME=mytopic1 envsubst < ~/DOIK/3/mytopic.yaml | kubectl apply -f - -n kafka


#생성 토픽 확인
kubectl get kafkatopics -n kafka


#Client로 토픽 생성
kubectl exec -it myclient1 -- kafka-topics.sh --create --bootstrap-server $SVCDNS --topic mytopic2 --partitions 1 --replication-factor 3 --config retention.ms=17280000


# 토픽 상세 정보 확인
kubectl exec -it myclient1 -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --describe


# 토픽 파티션 갯수 늘리기
kubectl exec -it myclient1 -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --alter --partitions 2
kubectl exec -it myclient1 -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --describe


#ISR 변경
kubectl exec -it myclient1 -- kafka-configs.sh --bootstrap-server $SVCDNS --topic mytopic2 --alter -add-config min.insync.replicas=3
kubectl exec -it myclient1 -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --describe

배포 확인

오브젝트를 통한 Topic 생성
Client를 통한 Topic 생성
Partition 수 늘리기 (늘릴 땐 마음대로라도 줄일땐 아니란다..)
ISR 변경

 

  • Kafka 토픽 데이터 주고 받기
    • Client Pod에서 Topic에 데이터를 넣어보고 확인하기

 

# 토픽에 데이터 넣어보기
kubectl exec -it myclient1 -- kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic1


# 토픽 데이터 확인
kubectl exec -it myclient2 -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic1 --from-beginning

배포 확인

Topic 데이터 확인 (중간에 데이터 넣는 과정이 날라감..ㅠ)

 

 

 

 

 

마무리

완전 생소한 개념에다 처음 접하다 보니 감도 안잡히고 너무 어려웠는데 그래도 알아가는 게 있어 뿌듯하다

컨슈머 그룹하고 페일오버도 좀 더 정리해서 올려야겠다..😁

 

반응형

'Kubernetes 심화 주제' 카테고리의 다른 글

[DOIK 스터디] NoSQL와 MongoDB Operator  (0) 2022.06.23
Comments