选择连接地址及配置连接方式
业务应用访问 Kafka 实例时,有多种连接方式。本文介绍如何选择连接地址,通过该地址访问 Kafka 实例,并提供示例。
目录
服务器提供的连接方式说明及适用场景
创建 Kafka 实例时,服务器会提供多种访问链接组合,并附带相关参数说明:
- 内外网访问:应用与 Kafka 服务器是否处于同一业务集群内。
- 认证机制:是否开启认证以验证客户端身份。认证机制支持 SCRAM-SHA-512 和 TLS 协议(注:TLS 认证机制仅支持在 TLS 加密传输链路上开启)。
- 传输加密:保障数据传输过程的安全,采用数据加密技术防止数据在网络传输过程中被截获或窃听。传输加密协议通常为 TLS。
访问方式 | 认证机制 | 传输加密 | 适用场景 |
---|
内网访问 | 关闭 | 关闭 | 测试场景 |
内网访问 | 关闭 | 开启 | 测试场景 |
内网访问 | 开启 SCRAM-SHA-512(推荐) | 关闭 | 通过内网访问,消息传输不加密,但发送和接收消息需认证。 |
内网访问 | 开启 SCRAM-SHA-512 或 TLS(更安全) | 开启 | 通过内网访问,消息传输加密,发送和接收消息需认证。(安全性更高,性能可能受影响) |
外网访问 | 关闭 | 关闭 | 不推荐 |
外网访问 | 关闭 | 开启 | 测试场景 |
外网访问 | 开启 SCRAM-SHA-512 | 关闭 | 通过外网访问,消息传输不加密 |
外网访问 | 开启 SCRAM-SHA-512 或 TLS(推荐) | 开启 | 通过外网访问,消息传输加密,发送和接收消息需认证。(安全性更高,性能可能受影响) |
客户端配置说明
连接时,客户端需根据以下信息选择合适的链接并配置对应参数:
参数 | 客户端说明 |
---|
传输加密 | 客户端若需使用加密路径,必须使用对应端口并配置 TLS 协议证书。 |
认证与授权 | Kafka 实例开启认证时,客户端需配置对应的用户、密码或协议证书。 |
内外网连接 | 应用根据自身所在位置选择所需连接地址。 |
以下演示几种典型访问链接(Java 客户端),其他组合可根据合适链接进行配置:
- 内网访问,无认证,非加密传输
- 内网访问,有认证(TLS 协议),加密传输
- 外网访问,有认证(SCRAM-SHA-512 协议),非加密传输
- 外网访问,有认证(SCRAM-SHA-512 协议),加密传输
- 外网访问,有认证(TLS 协议),加密传输
1. 内网访问 - 无认证,非加密传输
此时客户端与 Kafka 实例位于同一业务集群内。Kafka 服务器未开启认证,传输过程无需加密。客户端只需获取 Kafka 实例的内网路由地址。
可复制【Kafka 实例 - 访问方式】页签中“集群内访问 - 明文传输”的连接地址,替换到实际代码片段中。
客户端连接代码示例如下:
package com.lanzhiwang.producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ExampleProducer {
public static void main(String[] args) {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "test-kafka-bootstrap:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// ProducerRecord(String topic, K key, V value)
ProducerRecord<String, String> record = new ProducerRecord<>("my-cluster-topic", "Precision Products", "China");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {
// 忽略返回值,无法判断消息是否发送成功。
producer.send(record);
// Thread.sleep(2000000); // 延迟 2000 秒
} catch (Exception e) {
// 生产者在发送消息到 Kafka 前遇到错误。
e.printStackTrace();
}
}
}
2. 内网访问 - 有认证(TLS 协议),加密传输
此时客户端与 Kafka 实例位于同一业务集群内。Kafka 服务器开启认证,传输过程需加密。客户端需获取 Kafka 实例的内网路由地址、TLS 协议的用户密码证书及传输证书。
-
Kafka 实例内网路由地址
可复制【Kafka 实例 - 访问方式】页签中“集群内访问 - TLS 传输”的连接地址,替换到实际代码片段中。
-
TLS 协议传输及密码证书
使用 kubectl 命令生成用户密码证书,并将相关证书复制到客户端对应路径。具体命令如下:
-
生成 CA 证书:
$ kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
-
生成 CA 证书密码:
$ kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d
-
TLS 协议用户密码证书
使用 kubectl 命令生成传输及密码证书,并将相关证书复制到客户端对应路径。具体命令如下:
-
生成用户证书:
$ kubectl -n <namespace> get secret <Kafka User Name> -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12
-
生成用户证书密码:
$ kubectl -n <namespace> get secret <Kafka User Name> -o jsonpath='{.data.user\.password}' | base64 -d
-
客户端连接代码示例如下:
package com.lanzhiwang.producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ExampleProducer {
public static void main(String[] args) {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "test-kafka-bootstrap:9093");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 配置 TLS/SSL 属性
kafkaProps.put("security.protocol", "SSL");
// kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
kafkaProps.put("ssl.truststore.location", "/Users/temp/certs/ca.p12");
// kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d
kafkaProps.put("ssl.truststore.password", "************");
// kubectl -n <namespace> get secret <Kafka User Name> -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12
kafkaProps.put("ssl.keystore.location", "/Users/temp/certs/user.p12");
// kubectl -n <namespace> get secret <Kafka User Name> -o jsonpath='{.data.user\.password}' | base64 -d
kafkaProps.put("ssl.keystore.password", "************");
// 内网访问无需配置,配置用于开启 nodeport 的 TLS
kafkaProps.put("ssl.endpoint.identification.algorithm", "");
// ProducerRecord(String topic, K key, V value)
ProducerRecord<String, String> record = new ProducerRecord<>("my-cluster-topic", "Precision Products", "China");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {
// 忽略返回值,无法判断消息是否发送成功。
producer.send(record);
// Thread.sleep(2000000); // 延迟 2000 秒
} catch (Exception e) {
// 生产者在发送消息到 Kafka 前遇到错误。
e.printStackTrace();
}
}
}
3. 外网访问 - 有认证(SCRAM-SHA-512 协议),非加密传输
此时客户端与 Kafka 实例位于不同业务集群。Kafka 服务器开启认证,传输过程无需加密。客户端需获取 Kafka 实例的外网访问地址,以及 SCRAM-SHA-512 协议下的用户名和密码。
-
可复制【Kafka 实例 - 访问方式】页签中“集群外访问 - 明文传输”的连接地址,替换到实际代码片段中。
-
可在【Kafka 实例 - 用户管理】页签使用已有用户或新建用户。
-
客户端连接代码示例如下:
package com.lanzhiwang.producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ExampleProducer {
public static void main(String[] args) {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "192.168.176.13:31786,192.168.176.23:30535,192.168.176.27:30219");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 配置 SCRAM-SHA-512 属性
kafkaProps.put("sasl.mechanism", "SCRAM-SHA-512");
kafkaProps.put("security.protocol", "SASL_PLAINTEXT");
kafkaProps.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"*************\" password=\"*************\";");
// ProducerRecord(String topic, K key, V value)
ProducerRecord<String, String> record = new ProducerRecord<>("my-cluster-topic", "Precision Products", "China");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {
// 忽略返回值,无法判断消息是否发送成功。
producer.send(record);
// Thread.sleep(2000000); // 延迟 2000 秒
} catch (Exception e) {
// 生产者在发送消息到 Kafka 前遇到错误。
e.printStackTrace();
}
}
}
4. 外网访问 - 有认证(SCRAM-SHA-512 协议),加密传输
此时客户端与 Kafka 实例位于不同业务集群。Kafka 服务器开启认证,传输过程需加密。客户端需获取 Kafka 实例的外网访问地址、SCRAM-SHA-512 协议下的用户名和密码,以及 TLS 协议传输证书。
-
可复制【Kafka 实例 - 访问方式】页签中“集群外访问 - 明文传输”的连接地址,替换到实际代码片段中。
-
可在【Kafka 实例 - 用户管理】页签使用已有用户或新建用户。
-
TLS 协议传输及密码证书
使用 kubectl 命令生成用户密码证书,并将相关证书复制到客户端对应路径。具体命令如下:
-
生成 CA 证书:
$ kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
-
生成 CA 证书密码:
$ kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d
-
客户端连接代码示例如下:
package com.lanzhiwang.producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ExampleProducer {
public static void main(String[] args) {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "192.168.176.13:31786,192.168.176.23:30535,192.168.176.27:30219");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("security.protocol", "SASL_SSL");
kafkaProps.put("sasl.mechanism", "SCRAM-SHA-512");
kafkaProps.put("ssl.truststore.location", "/Users/temp/certs/ca.p12");
kafkaProps.put("ssl.truststore.password", "************");
kafkaProps.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"*************\" password=\"*************\";");
// 内网访问无需配置,配置用于开启 nodeport 的 TLS
kafkaProps.put("ssl.endpoint.identification.algorithm", "");
// ProducerRecord(String topic, K key, V value)
ProducerRecord<String, String> record = new ProducerRecord<>("my-cluster-topic", "Precision Products", "China");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {
// 忽略返回值,无法判断消息是否发送成功。
producer.send(record);
// Thread.sleep(2000000); // 延迟 2000 秒
} catch (Exception e) {
// 生产者在发送消息到 Kafka 前遇到错误。
e.printStackTrace();
}
}
}
外网访问 - 有认证(TLS 协议),加密传输
此时客户端与 Kafka 实例位于不同业务集群。Kafka 服务器开启认证,传输过程需加密。客户端需获取 Kafka 实例的外网访问地址,以及 SCRAM-SHA-512 协议下的用户名和密码和 TLS 协议传输证书。
-
可复制【Kafka 实例 - 访问方式】页签中“集群外访问 - 明文传输”的连接地址,替换到实际代码片段中。
-
TLS 协议传输及密码证书
使用 kubectl 命令生成用户密码证书,并将相关证书复制到客户端对应路径。具体命令如下:
-
生成 CA 证书:
$ kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
-
生成 CA 证书密码:
$ kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d
-
TLS 协议用户密码证书
使用 kubectl 命令生成传输及密码证书,并将相关证书复制到客户端对应路径。具体命令如下:
-
生成用户证书:
$ kubectl -n <namespace> get secret <Kafka User Name> -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12
-
生成用户证书密码:
$ kubectl -n <namespace> get secret <Kafka User Name> -o jsonpath='{.data.user\.password}' | base64 -d
-
客户端连接代码示例如下:
package com.lanzhiwang.producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ExampleProducer {
public static void main(String[] args) {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "192.168.176.13:31786,192.168.176.23:30535,192.168.176.27:30219");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 配置 TLS/SSL 属性
kafkaProps.put("security.protocol", "SSL");
// kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
kafkaProps.put("ssl.truststore.location", "/Users/temp/certs/ca.p12");
// kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d
kafkaProps.put("ssl.truststore.password", "************");
// kubectl -n <namespace> get secret <Kafka User Name> -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12
kafkaProps.put("ssl.keystore.location", "/Users/temp/certs/user.p12");
// kubectl -n <namespace> get secret <Kafka User Name> -o jsonpath='{.data.user\.password}' | base64 -d
kafkaProps.put("ssl.keystore.password", "************");
// 内网访问无需配置,配置用于开启 nodeport 的 TLS
kafkaProps.put("ssl.endpoint.identification.algorithm", "");
// ProducerRecord(String topic, K key, V value)
ProducerRecord<String, String> record = new ProducerRecord<>("my-cluster-topic", "Precision Products", "China");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {
// 忽略返回值,无法判断消息是否发送成功。
producer.send(record);
// Thread.sleep(2000000); // 延迟 2000 秒
} catch (Exception e) {
// 生产者在发送消息到 Kafka 前遇到错误。
e.printStackTrace();
}
}
}