Kafka with Strimzi¶
Apache Kafka deployment and management using the Strimzi operator for cloud-native Kafka operations.
Overview¶
Strimzi provides a Kubernetes-native way to deploy and manage Apache Kafka clusters, offering custom resource definitions (CRDs) for Kafka, topics, users, and connectors.
Strimzi Architecture¶
Core Components¶
- Cluster Operator: Manages Kafka clusters and resources
- Topic Operator: Manages Kafka topics as Kubernetes resources
- User Operator: Manages Kafka users and access control
- Kafka Connect: Manages connector instances for data integration
Custom Resources¶
- Kafka: Defines Kafka cluster configuration
- KafkaTopic: Defines topic specifications
- KafkaUser: Defines user credentials and ACLs
- KafkaConnect: Defines connector cluster configuration
Kafka Cluster Configuration¶
Basic Cluster Setup¶
Kafka Cluster Resource:
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: kafka-cluster
namespace: kafka
spec:
kafka:
version: 3.6.0
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
- name: external
port: 9094
type: nodeport
tls: false
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
inter.broker.protocol.version: "3.6"
log.message.format.version: "3.6"
storage:
type: persistent-claim
size: 100Gi
class: fast-ssd
resources:
requests:
memory: 2Gi
cpu: 500m
limits:
memory: 4Gi
cpu: 2000m
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 20Gi
class: fast-ssd
resources:
requests:
memory: 1Gi
cpu: 250m
limits:
memory: 2Gi
cpu: 1000m
entityOperator:
topicOperator:
resources:
requests:
memory: 256Mi
cpu: 100m
limits:
memory: 512Mi
cpu: 500m
userOperator:
resources:
requests:
memory: 256Mi
cpu: 100m
limits:
memory: 512Mi
cpu: 500m
Production Configuration¶
High Availability Setup:
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: kafka-production
namespace: kafka
spec:
kafka:
version: 3.6.0
replicas: 5
listeners:
- name: internal
port: 9092
type: internal
tls: true
authentication:
type: tls
- name: external
port: 9094
type: loadbalancer
tls: true
authentication:
type: tls
config:
# Performance optimization
num.network.threads: 8
num.io.threads: 16
socket.send.buffer.bytes: 102400
socket.receive.buffer.bytes: 102400
socket.request.max.bytes: 104857600
# Replication settings
default.replication.factor: 3
min.insync.replicas: 2
unclean.leader.election.enable: false
# Log settings
log.retention.hours: 168
log.segment.bytes: 1073741824
log.retention.check.interval.ms: 300000
# Compression
compression.type: producer
storage:
type: persistent-claim
size: 500Gi
class: fast-ssd
resources:
requests:
memory: 8Gi
cpu: 2000m
limits:
memory: 16Gi
cpu: 4000m
jvmOptions:
-Xms: 4g
-Xmx: 4g
-XX:
UseG1GC: true
MaxGCPauseMillis: 20
InitiatingHeapOccupancyPercent: 35
Topic Management¶
Standard Topics¶
Customs Declaration Topics:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: customs-declarations
namespace: kafka
labels:
strimzi.io/cluster: kafka-cluster
spec:
partitions: 12
replicas: 3
config:
retention.ms: 604800000 # 7 days
segment.ms: 86400000 # 1 day
cleanup.policy: delete
min.insync.replicas: 2
compression.type: lz4
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: customs-approvals
namespace: kafka
labels:
strimzi.io/cluster: kafka-cluster
spec:
partitions: 6
replicas: 3
config:
retention.ms: 2592000000 # 30 days
cleanup.policy: delete
min.insync.replicas: 2
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: customs-notifications
namespace: kafka
labels:
strimzi.io/cluster: kafka-cluster
spec:
partitions: 3
replicas: 3
config:
retention.ms: 86400000 # 1 day
cleanup.policy: delete
min.insync.replicas: 2
Compacted Topics¶
Reference Data Topics:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: customs-reference-data
namespace: kafka
labels:
strimzi.io/cluster: kafka-cluster
spec:
partitions: 1
replicas: 3
config:
cleanup.policy: compact
segment.ms: 604800000 # 7 days
min.cleanable.dirty.ratio: 0.1
delete.retention.ms: 86400000 # 1 day
User Management and Security¶
Service Users¶
Application Service Users:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: nucleus-service
namespace: kafka
labels:
strimzi.io/cluster: kafka-cluster
spec:
authentication:
type: tls
authorization:
type: simple
acls:
# Producer permissions
- resource:
type: topic
name: customs-declarations
patternType: literal
operation: Write
host: "*"
- resource:
type: topic
name: customs-notifications
patternType: literal
operation: Write
host: "*"
# Consumer permissions
- resource:
type: topic
name: customs-approvals
patternType: literal
operation: Read
host: "*"
- resource:
type: group
name: nucleus-consumer-group
patternType: literal
operation: Read
host: "*"
# Schema registry access
- resource:
type: topic
name: _schemas
patternType: literal
operation: Read
host: "*"
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: approval-service
namespace: kafka
labels:
strimzi.io/cluster: kafka-cluster
spec:
authentication:
type: tls
authorization:
type: simple
acls:
# Consumer permissions
- resource:
type: topic
name: customs-declarations
patternType: literal
operation: Read
host: "*"
- resource:
type: group
name: approval-service-group
patternType: literal
operation: Read
host: "*"
# Producer permissions
- resource:
type: topic
name: customs-approvals
patternType: literal
operation: Write
host: "*"
Administrative Users¶
Kafka Admin User:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: kafka-admin
namespace: kafka
labels:
strimzi.io/cluster: kafka-cluster
spec:
authentication:
type: tls
authorization:
type: simple
acls:
# Cluster admin permissions
- resource:
type: cluster
operation: All
host: "*"
- resource:
type: topic
name: "*"
patternType: literal
operation: All
host: "*"
- resource:
type: group
name: "*"
patternType: literal
operation: All
host: "*"
- resource:
type: transactionalId
name: "*"
patternType: literal
operation: All
host: "*"
Kafka Connect Integration¶
Connect Cluster¶
Kafka Connect Deployment:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: kafka-connect-cluster
namespace: kafka
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.6.0
replicas: 3
bootstrapServers: kafka-cluster-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: kafka-cluster-cluster-ca-cert
certificate: ca.crt
authentication:
type: tls
certificateAndKey:
secretName: kafka-connect-user
certificate: user.crt
key: user.key
config:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
config.storage.replication.factor: 3
offset.storage.replication.factor: 3
status.storage.replication.factor: 3
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 1000m
memory: 2Gi
Database Connector¶
SQL Server Source Connector:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: mssql-source-connector
namespace: kafka
labels:
strimzi.io/cluster: kafka-connect-cluster
spec:
class: io.debezium.connector.sqlserver.SqlServerConnector
tasksMax: 2
config:
database.hostname: mssql-server
database.port: 1433
database.user: kafka_user
database.password: ${file:/opt/kafka/external-configuration/connector-config/password:password}
database.dbname: CustomsDB
database.server.name: customs-db
table.include.list: dbo.declarations,dbo.approvals
database.history.kafka.bootstrap.servers: kafka-cluster-kafka-bootstrap:9093
database.history.kafka.topic: schema-changes.customs-db
database.history.consumer.security.protocol: SSL
database.history.producer.security.protocol: SSL
transforms: route
transforms.route.type: org.apache.kafka.connect.transforms.RegexRouter
transforms.route.regex: ([^.]+)\\.([^.]+)\\.([^.]+)
transforms.route.replacement: $3
Monitoring and Operations¶
Metrics Collection¶
Kafka Metrics:
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: kafka-cluster
spec:
kafka:
metrics:
lowercaseOutputName: true
rules:
# Broker metrics
- pattern: kafka.server<type=(.+), name=(.+)><>Value
name: kafka_server_$1_$2
- pattern: kafka.server<type=(.+), name=(.+), clientId=(.+)><>Value
name: kafka_server_$1_$2
labels:
clientId: "$3"
# Topic metrics
- pattern: kafka.log<type=LogSize, name=Size, topic=(.+), partition=(.+)><>Value
name: kafka_log_size
labels:
topic: "$1"
partition: "$2"
Health Monitoring¶
Cluster Health Checks:
# Check cluster status
kubectl get kafka kafka-cluster -n kafka
# Check broker pods
kubectl get pods -l strimzi.io/cluster=kafka-cluster -n kafka
# Check topic status
kubectl get kafkatopic -n kafka
# Check user status
kubectl get kafkauser -n kafka
# View cluster events
kubectl get events -n kafka --sort-by=.metadata.creationTimestamp
Performance Monitoring¶
Key Performance Metrics:
# Message throughput
rate(kafka_server_brokertopicmetrics_messagesinpersec[5m])
# Byte throughput
rate(kafka_server_brokertopicmetrics_bytesinpersec[5m])
# Consumer lag
kafka_consumer_lag_sum
# Broker CPU usage
rate(process_cpu_seconds_total{job="kafka-brokers"}[5m])
# JVM memory usage
jvm_memory_bytes_used / jvm_memory_bytes_max
Troubleshooting¶
Common Issues¶
Broker Not Starting:
# Check broker logs
kubectl logs kafka-cluster-kafka-0 -n kafka
# Check persistent volume claims
kubectl get pvc -n kafka
# Check resource constraints
kubectl describe pod kafka-cluster-kafka-0 -n kafka
Topic Creation Issues:
# Check topic operator logs
kubectl logs deployment/kafka-cluster-entity-operator -c topic-operator -n kafka
# Verify topic configuration
kubectl describe kafkatopic <topic-name> -n kafka
# Check cluster configuration
kubectl get kafka kafka-cluster -o yaml -n kafka
Consumer Lag Issues:
# Check consumer group status
kubectl exec kafka-cluster-kafka-0 -n kafka -- \
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe --group <group-id>
# Reset consumer group offset
kubectl exec kafka-cluster-kafka-0 -n kafka -- \
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group <group-id> \
--reset-offsets --to-earliest \
--topic <topic-name> --execute
Diagnostic Commands¶
# List topics
kubectl exec kafka-cluster-kafka-0 -n kafka -- \
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
# Describe topic
kubectl exec kafka-cluster-kafka-0 -n kafka -- \
bin/kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --topic <topic-name>
# Test producer
kubectl exec kafka-cluster-kafka-0 -n kafka -- \
bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic <topic-name>
# Test consumer
kubectl exec kafka-cluster-kafka-0 -n kafka -- \
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic <topic-name> --from-beginning
Best Practices¶
Cluster Configuration¶
- Replication Factor: Use minimum of 3 for production
- Partition Strategy: Plan partitions based on throughput requirements
- Resource Allocation: Adequate CPU and memory for brokers
- Storage: Use fast SSDs for Kafka logs
Topic Design¶
- Naming Convention: Consistent topic naming standards
- Partition Count: Balance between parallelism and overhead
- Retention Policy: Appropriate retention based on use case
- Compaction: Use compacted topics for reference data
Security¶
- TLS Encryption: Enable TLS for all communications
- Authentication: Use TLS client certificates or SASL
- Authorization: Implement ACLs with least privilege
- Network Policies: Restrict network access to Kafka
Operations¶
- Monitoring: Comprehensive metrics and alerting
- Backup: Regular backup of topic configurations
- Capacity Planning: Monitor growth and plan scaling
- Documentation: Maintain current operational procedures
For advanced Kafka and Strimzi configurations, refer to the Strimzi documentation.