반응형
Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
Tags
- container
- variable
- bash
- WEB
- ioredirection
- python
- namespace
- Vagrant
- k8s
- multivm
- httpd실행
- springboot
- devops #engineer
- nginx
- DOIK
- 쿠버네티스
- Kubernetes
- RSS
- docker
- Engineer
- aws #engineer
- 컨테이너
- 도커
- mongodb operator
- Strimzi
- java
- 파이썬
- devops #jenkins
- linux
- 초간단파이썬
Archives
- Today
- Total
샤인의 IT (막 적는) 메모장
[DOIK 스터디] Kafka와 Kubernetes Strimzi Operator 본문
반응형
Kafka?
- Kafka 란? : 실시간으로 기록 스트림을 게시, 구독, 저장 및 처리할 수 있는 분산 데이터 스트리밍 플랫폼
- Kafka 주요 특징
- 처리량 : 묶음 단위 배치 처리로 실시간 로그 데이터 처리
- 확장성 : 브로커 Scale-in/Out
- 영속성 : 데이터를 메모리에 저장하지 않고 파일시스템에 저장함, 파일 I/O 성능 향상을 위해 메모리에 페이지 캐시 사용
- 고가용성 : 서버/브로커 장애 시 데이터 복제를 통한 지속적인 데이터 처리 가능
- Kafka 아키텍처 정리
- Zookeeper : 분산 처리된 메시지 큐 정보를 관리
- Broker : Kafka 서버
- Topic : 메시지가 생산되고 소비되는 주제 (각 메시지 별로 분류)
- Partition : Topic 내 분산되어 저장되는 데이터 단위
- Offset : Partition 내 메시지를 식별할 수 있는 값
- Producer : 메시지를 정해진 Topic으로 기록하는 역할
- Consumer Group : 정해진 Topic으로 메시지 요청해서 처리하는 역할
- Kafka 아키텍처 구성도
- Producer 세부
- ProducerRecord : 프로듀서에서 생성하는 레코드
- Send() : 레코드 전송 요청 메소드
- Partitioner : 어느 파티션으로 전송할 지 지정함
- UniformStrickyPartitioner : Accumulator에서 레코드들이 배치로 묶일 때 까지 기다렸다가 전송, 파티션을 순회하면서 전송하기 때문에 분배 전송
- RoundRobinPartitioner : 레코드가 들어오는데로 파티션 순회하면서 전송, 버퍼에 묶이는 정도가 적어 전송 성능 낮음
- CustomPartitioner : 사용자 지정 파티셔너
- Accumulator : 배치로 묶어 전송할 데이터를 모으는 버퍼
- Sender : Accumulator의 메시지를 Fetch 하여 Kafka 브로커로 전송
- Producer 구성도
- Consumer 세부
- Fetcher : 리더 파티션으로부터 레코드들을 가져와서 대기
- Poll : Fetcher에 있는 레코드를 리턴
- ConsumerRecord : 처리하고자 하는 레코드 모임
- log Segment
- 카프카에서는 데이터 삭제가 세그먼트 단위로 일어나기 떄문에, 레코드 단위로 삭제는 불가능
- 이미 적재된 데이터에 대해 수정 불가능
- Replication
- 브로커 중 일부 장애 발생 시, 데이터 유실 없이 안전하게 사용하기 위함
- 카프카 데이터 복제는 파티션 단위로 이루어짐
- 복제 개수는 최소 1, 최대 브로커 수 사용 가능
- 복제 개수만큼 저장 용량이 증가하기 때문에 고려 필요
- 리더 파티션이 있는 브로커가 다운될 경우 다른 브로커가 리더 파티션으로 변경
- ISR(In-Sync-Replicas) : 리더 파티션과 팔로워 파티션이 모두 싱크된 상태
- ISR 설정이 중요한 이유는 In-Sync 상태가 다를 경우 메시지를 받지 못하기 때문
- Topic과 Partition
- Topic은 데이터를 분류하기 위해 사용하는 단위이며, Partition은 프로듀서가 보낸 데이터가 저장되어 있음
- Partition에 저장되어 있는 데이터를 레코드라고 함
- 토픽 생성 시 기본적으로 브로커에 한대 씩 라운드-로빈 방식으로 생성됨
- 클라이언트로 접속 시 리더 파티션과 통신하며 여러 브로커에 골고루 통신함
- 파티션을 늘리는건 자유지만, 파티션을 줄일 경우에는 토픽 삭제하고 재생성 해야됨
- Record
- Timestamp, Offset, Header, Key, Value로 구성되어 있음
- 프로듀서가 생성한 레코드가 브로커로 전송되면 오프셋과 타임스탬프를 지정하여 저장
- 브로커에 적재된 레코드는 삭제가 불가능하고 리텐션 기간이 지나거나 용량 문제가 생길 경우만 삭제 가능
Strimzi?
- Strimzi 란? : 쿠버네티스 환경에서 Kafka 운영 관리할 수 있는 Operator
- Strimzi 특징
- Kafka 클러스터 구성요소 배포 관리
- Kafka 커넥트 설정
- Kafka 업그레이드 설정
- 브로커 관리
- 토픽, 유저 생성 관리
- Strimzi가 제공하는 Operator
- Cluster Operator : Kafka & Zookeeper 클러스터를 배포하고 관리하는 Operator
- Topic Operator : 클러스터 내 Topic을 생성하고 관리하는 Operator
- User Operator : 클러스터 내 User를 생성하고 관리하는 Operator
- Operator 테스트 구성도
- AWS 환경에서 실습
- Kubernetes Cluster가 구성되어 있어야 함.
- Strimzi 설치
- 기본적으로 쿠버네티스 클러스터가 구성되어 있어야 함.
- K8s v1.23.6 버전 테스트
- Strimzi Operator 설치
#Kafka 네임스페이스 생성
kubectl create namespace kafka
#Helm 레포지토리 추가
helm repo add strimzi https://strimzi.io/charts/
#Master 노드에 Operator 설치
#노드셀렉터로 마스터 노드 지정
printf 'tolerations: [{key: node-role.kubernetes.io/master, operator: Exists, effect: NoSchedule}]\n' | \
helm install kafka-operator strimzi/strimzi-kafka-operator --version 0.29.0 --namespace kafka \
--set nodeSelector."kubernetes\.io/hostname"=k8s-m --values /dev/stdin
#CRD 목록 확인
kubectl get crd -n kafka
kubectl describe crd -n kafka
작업 확인
- Kafka 클러스터 배포
- 오브젝트를 통해 배포
- 배포 시 Zookeeper → Broker → Entity Operator 순서대로 배포됨
- kafka.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
#version: 3.1.1
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: false
- name: external
port: 9094
type: nodeport
tls: false
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 10Gi
deleteClaim: true
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
#inter.broker.protocol.version: "3.1.1"
template:
pod:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app.kubernetes.io/name
operator: In
values:
- kafka
topologyKey: "kubernetes.io/hostname"
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 10Gi
deleteClaim: true
template:
pod:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app.kubernetes.io/name
operator: In
values:
- zookeeper
topologyKey: "kubernetes.io/hostname"
entityOperator:
topicOperator: {}
userOperator: {}
#오브젝트 적용
kubectl apply -f kafka-1.yaml -n kafka
#Pod/PVC 확인
kubectl get po,pvc -n kafka
#kafka Cluster 확인
kubectl get kafka -n kafka
#Kafka 리스너 정보 확인
kubectl get kafka -n kafka my-cluster -o jsonpath={.status} | jq -r ".listeners"
작업 확인
- Kafka 접속 클라이언트 파드 배포
- Client Pod를 배포해서 Kafka 클러스터 설정 정보 확인
- Client.yaml
apiVersion: v1
kind: Pod
metadata:
name: ${PODNAME}
labels:
app: myclient
spec:
nodeName: k8s-m
containers:
- name: ${PODNAME}
image: bitnami/kafka:3.2
command: ["tail"]
args: ["-f", "/dev/null"]
# Client Pod 3대 배포
cat ~/DOIK/3/myclient.yaml
for ((i=1; i<=3; i++)); do PODNAME=myclient$i envsubst < ~/DOIK/3/myclient.yaml | kubectl apply -f - ; done
# Client Pod 배포 확인
kubectl get po -l app=myclient
# Kafka client 에서 제공되는 kafka 관련 도구들 확인
kubectl exec -it myclient1 -- ls /opt/bitnami/kafka/bin
# Kafka 접속을 위한 환경변수 지정
SVCDNS=my-cluster-kafka-bootstrap.kafka.svc:9092
echo $SVCDNS
#브로커 정보 확인
kubectl exec -it myclient1 -- kafka-broker-api-versions.sh --bootstrap-server $SVCDNS
#브로커에 설정된 기본값 확인
kubectl exec -it myclient1 -- kafka-configs.sh --bootstrap-server $SVCDNS --broker 0 --all --describe
#토픽 확인
kubectl exec -it myclient1 -- kafka-topics.sh --bootstrap-server $SVCDNS --list
#CRD로 토픽 리스트 확인
kubectl get kafkatopics -n kafka
배포 확인
- 토픽 생성
- topic.yaml
kind: KafkaTopic
metadata:
name: ${TOPICNAME}
labels:
strimzi.io/cluster: "my-cluster"
#Partition 1 Replicas가 3개인 토픽을 생성하며 ISR은 2로 지정
spec:
partitions: 1
replicas: 3
config:
retention.ms: 7200000
segment.bytes: 1073741824
min.insync.replicas: 2
#토픽 생성 오브젝트 적용
TOPICNAME=mytopic1 envsubst < ~/DOIK/3/mytopic.yaml | kubectl apply -f - -n kafka
#생성 토픽 확인
kubectl get kafkatopics -n kafka
#Client로 토픽 생성
kubectl exec -it myclient1 -- kafka-topics.sh --create --bootstrap-server $SVCDNS --topic mytopic2 --partitions 1 --replication-factor 3 --config retention.ms=17280000
# 토픽 상세 정보 확인
kubectl exec -it myclient1 -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --describe
# 토픽 파티션 갯수 늘리기
kubectl exec -it myclient1 -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --alter --partitions 2
kubectl exec -it myclient1 -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --describe
#ISR 변경
kubectl exec -it myclient1 -- kafka-configs.sh --bootstrap-server $SVCDNS --topic mytopic2 --alter -add-config min.insync.replicas=3
kubectl exec -it myclient1 -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --describe
배포 확인
- Kafka 토픽 데이터 주고 받기
- Client Pod에서 Topic에 데이터를 넣어보고 확인하기
# 토픽에 데이터 넣어보기
kubectl exec -it myclient1 -- kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic1
# 토픽 데이터 확인
kubectl exec -it myclient2 -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic1 --from-beginning
배포 확인
마무리
완전 생소한 개념에다 처음 접하다 보니 감도 안잡히고 너무 어려웠는데 그래도 알아가는 게 있어 뿌듯하다
컨슈머 그룹하고 페일오버도 좀 더 정리해서 올려야겠다..😁
반응형
'Kubernetes 심화 주제' 카테고리의 다른 글
[DOIK 스터디] NoSQL와 MongoDB Operator (0) | 2022.06.23 |
---|
Comments