您可以创建一个 Kafka 实例,以建立一个高吞吐量、低延迟的实时数据管道,以支持在流数据处理和服务解耦等场景中各种业务系统的多样化需求。
通过命令行创建 Kafka 实例:
cat << EOF | kubectl -n default create -f -
apiVersion: middleware.alauda.io/v1
kind: RdsKafka
metadata:
name: my-cluster
spec:
entityOperator:
topicOperator:
resources:
limits:
cpu: 1
memory: 2Gi
requests:
cpu: 1
memory: 2Gi
userOperator:
resources:
limits:
cpu: 1
memory: 2Gi
requests:
cpu: 1
memory: 2Gi
tlsSidecar:
resources:
limits:
cpu: 200m
memory: 128Mi
requests:
cpu: 200m
memory: 128Mi
version: 3.8
replicas: 3
config:
auto.create.topics.enable: "false"
auto.leader.rebalance.enable: "true"
background.threads: "10"
compression.type: producer
default.replication.factor: "3"
delete.topic.enable: "true"
log.retention.hours: "168"
log.roll.hours: "168"
log.segment.bytes: "1073741824"
message.max.bytes: "1048588"
min.insync.replicas: "1"
num.io.threads: "8"
num.network.threads: "3"
num.recovery.threads.per.data.dir: "1"
num.replica.fetchers: "1"
unclean.leader.election.enable: "false"
resources:
limits:
cpu: 2
memory: 4Gi
requests:
cpu: 2
memory: 4Gi
storage:
size: 1Gi
# 使用可用存储类替代
class: local-path
deleteClaim: false
kafka:
listeners:
plain: {}
external:
type: nodeport
tls: false
zookeeper:
# 目前与 Kafka 代理相同
replicas: 3
resources:
limits:
cpu: 1
memory: 2Gi
requests:
cpu: 1
memory: 2Gi
# 存储与 Kafka 代理保持一致
storage:
size: 1Gi
# 使用可用存储类替代
class: local-path
deleteClaim: false
EOF
创建实例后,您可以使用以下命令检查实例的状态:
$ kubectl get rdskafka -n <namespace> -o=custom-columns=NAME:.metadata.name,VERSION:.spec.version,STATUS:.status.phase,MESSAGE:.status.reason,CreationTimestamp:.metadata.creationTimestamp
NAME VERSION STATUS MESSAGE CreationTimestamp
my-cluster 3.8 Active <none> 2025-03-06T08:46:57Z
test38 3.8 Failed Pod is unschedulable or is not starting 2025-03-06T08:46:36Z
输出表字段的含义如下:
字段 | 描述 |
---|---|
NAME | 实例名称 |
VERSION | 目前仅支持以下4个版本:2.5.0 、2.7.0 、2.8.2 、3.8 |
STATUS | 当前实例的状态,可能包括以下状态:
|
MESSAGE | 实例当前状态的原因 |
CreationTimestamp | 实例创建的时间戳 |
创建实例时建议将代理节点数设置为 3。如果您的代理节点数少于建议值 3,则在创建时您需要修改特定参数。
通过命令行创建单节点 Kafka 实例:
cat << EOF | kubectl -n default create -f -
apiVersion: middleware.alauda.io/v1
kind: RdsKafka
metadata:
name: my-cluster
spec:
entityOperator:
topicOperator:
resources:
limits:
cpu: 1
memory: 2Gi
requests:
cpu: 1
memory: 2Gi
userOperator:
resources:
limits:
cpu: 1
memory: 2Gi
requests:
cpu: 1
memory: 2Gi
tlsSidecar:
resources:
limits:
cpu: 200m
memory: 128Mi
requests:
cpu: 200m
memory: 128Mi
version: 3.8
replicas: 1
config:
auto.create.topics.enable: "false"
auto.leader.rebalance.enable: "true"
background.threads: "10"
compression.type: producer
delete.topic.enable: "true"
log.retention.hours: "168"
log.roll.hours: "168"
log.segment.bytes: "1073741824"
message.max.bytes: "1048588"
min.insync.replicas: "1"
num.io.threads: "8"
num.network.threads: "3"
num.recovery.threads.per.data.dir: "1"
num.replica.fetchers: "1"
unclean.leader.election.enable: "false"
## 确保下面参数配置正确
default.replication.factor: "1"
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
resources:
limits:
cpu: 2
memory: 4Gi
requests:
cpu: 2
memory: 4Gi
storage:
size: 1Gi
# 使用可用存储类替代
class: local-path
deleteClaim: false
kafka:
listeners:
plain: {}
external:
type: nodeport
tls: false
zookeeper:
# 目前与 Kafka 代理相同
replicas: 1
resources:
limits:
cpu: 1
memory: 2Gi
requests:
cpu: 1
memory: 2Gi
# 存储与 Kafka 代理保持一致
storage:
size: 1Gi
# 使用可用存储类替代
class: local-path
deleteClaim: false
EOF