Выбор адреса подключения и настройка способа подключения
Когда бизнес-приложения обращаются к экземплярам Kafka, существует несколько способов подключения. В этом разделе описывается, как выбрать адрес подключения, получить доступ к экземпляру Kafka через этот адрес, а также приведены примеры.
Содержание
Описание способов подключения, предоставляемых сервером, и применимые сценарии
При создании экземпляра Kafka сервер предоставляет различные комбинации ссылок для доступа с соответствующим описанием параметров:
- Внутренний/Внешний доступ: находятся ли приложение и сервер Kafka в одном бизнес-кластере.
- Механизм аутентификации: включена ли аутентификация для проверки идентичности клиента. Механизмы аутентификации поддерживают SCRAM-SHA-512 и протокол TLS (Примечание: механизм аутентификации TLS поддерживается только при включении по TLS-зашифрованному каналу передачи).
- Шифрование транспорта: обеспечивает безопасность данных при передаче, используя технологию шифрования данных, чтобы предотвратить перехват или прослушивание данных в сети. Протокол шифрования транспорта обычно TLS.
Описание конфигурации клиента
При подключении клиент должен выбрать соответствующую ссылку и настроить соответствующие параметры на основе следующей информации:
Ниже приведены несколько типичных ссылок доступа (Java клиент), другие комбинации можно настроить соответственно на основе подходящей ссылки:
- Внутренний доступ, без аутентификации, передача без шифрования
- Внутренний доступ, с аутентификацией (протокол TLS), зашифрованная передача
- Внешний доступ, с аутентификацией (протокол SCRAM-SHA-512), передача без шифрования
- Внешний доступ, с аутентификацией (протокол SCRAM-SHA-512), зашифрованная передача
- Внешний доступ, с аутентификацией (протокол TLS), зашифрованная передача
1. Внутренний доступ — без аутентификации, передача без шифрования
В этом случае клиент и экземпляр Kafka находятся в одном бизнес-кластере. Поскольку на сервере Kafka аутентификация не включена, процесс передачи не требует шифрования. Клиенту достаточно получить внутренний маршрут экземпляра Kafka.
Можно скопировать адрес подключения для «Доступ внутри кластера — Plain Transmission» на вкладке 【Kafka Instance - Access Method】 и заменить его в реальном коде.
Пример кода подключения клиента:
package com.lanzhiwang.producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ExampleProducer {
public static void main(String[] args) {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "test-kafka-bootstrap:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// ProducerRecord(String topic, K key, V value)
ProducerRecord<String, String> record = new ProducerRecord<>("my-cluster-topic", "Precision Products", "China");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {
// Игнорируем возвращаемое значение, нет способа узнать, было ли сообщение успешно отправлено.
producer.send(record);
// Thread.sleep(2000000); // Задержка 2000 секунд
} catch (Exception e) {
// Если у продюсера возникли ошибки до отправки сообщения в Kafka.
e.printStackTrace();
}
}
}
2. Внутренний доступ — с аутентификацией (протокол TLS), зашифрованная передача
В этом случае клиент и экземпляр Kafka находятся в одном бизнес-кластере. Поскольку на сервере Kafka включена аутентификация, процесс передачи должен быть зашифрован. Клиенту необходимо получить внутренний адрес маршрута экземпляра Kafka, сертификаты пользователя и пароля для протокола TLS, а также сертификат передачи.
-
Внутренний адрес маршрута экземпляра Kafka
Можно скопировать адрес подключения для «Доступ внутри кластера — TLS Transmission» на вкладке 【Kafka Instance - Access Method】 и заменить его в реальном коде.
-
Сертификаты передачи и пароля для протокола TLS
Используйте команду kubectl для генерации сертификатов пользователя и пароля, скопируйте соответствующие сертификаты в нужные пути на клиенте. Конкретные команды:
-
Сертификаты пользователя и пароля для протокола TLS
Используйте команду kubectl для генерации сертификата пользователя и пароля, скопируйте соответствующие сертификаты в нужные пути на клиенте. Конкретные команды:
-
Пример кода подключения клиента:
package com.lanzhiwang.producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ExampleProducer {
public static void main(String[] args) {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "test-kafka-bootstrap:9093");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Настройка свойств TLS/SSL
kafkaProps.put("security.protocol", "SSL");
// kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
kafkaProps.put("ssl.truststore.location", "/Users/temp/certs/ca.p12");
// kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d
kafkaProps.put("ssl.truststore.password", "************");
// kubectl -n <namespace> get secret <Kafka User Name> -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12
kafkaProps.put("ssl.keystore.location", "/Users/temp/certs/user.p12");
// kubectl -n <namespace> get secret <Kafka User Name> -o jsonpath='{.data.user\.password}' | base64 -d
kafkaProps.put("ssl.keystore.password", "************");
// Внутренний доступ не требует настройки, настройка для включения TLS на nodeport
kafkaProps.put("ssl.endpoint.identification.algorithm", "");
// ProducerRecord(String topic, K key, V value)
ProducerRecord<String, String> record = new ProducerRecord<>("my-cluster-topic", "Precision Products", "China");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {
// Игнорируем возвращаемое значение, нет способа узнать, было ли сообщение успешно отправлено.
producer.send(record);
// Thread.sleep(2000000); // Задержка 2000 секунд
} catch (Exception e) {
// Если у продюсера возникли ошибки до отправки сообщения в Kafka.
e.printStackTrace();
}
}
}
3. Внешний доступ — с аутентификацией (протокол SCRAM-SHA-512), передача без шифрования
В этом случае клиент и экземпляр Kafka находятся в разных бизнес-кластерах. Поскольку на сервере Kafka включена аутентификация, процесс передачи не требует шифрования. Клиенту необходимо получить внешний адрес доступа экземпляра Kafka, а также имя пользователя и пароль по протоколу SCRAM-SHA-512.
-
Можно скопировать адрес подключения для «Доступ снаружи кластера — Plain Transmission» на вкладке 【Kafka Instance - Access Method】 и заменить его в реальном коде.
-
Можно использовать существующего пользователя или создать нового на вкладке 【Kafka Instance - User Management】.
-
Пример кода подключения клиента:
package com.lanzhiwang.producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ExampleProducer {
public static void main(String[] args) {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "192.168.176.13:31786,192.168.176.23:30535,192.168.176.27:30219");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Настройка свойств SCRAM-SHA-512
kafkaProps.put("sasl.mechanism", "SCRAM-SHA-512");
kafkaProps.put("security.protocol", "SASL_PLAINTEXT");
kafkaProps.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"*************\" password=\"*************\";");
// ProducerRecord(String topic, K key, V value)
ProducerRecord<String, String> record = new ProducerRecord<>("my-cluster-topic", "Precision Products", "China");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {
// Игнорируем возвращаемое значение, нет способа узнать, было ли сообщение успешно отправлено.
producer.send(record);
// Thread.sleep(2000000); // Задержка 2000 секунд
} catch (Exception e) {
// Если у продюсера возникли ошибки до отправки сообщения в Kafka.
e.printStackTrace();
}
}
}
4. Внешний доступ — с аутентификацией (протокол SCRAM-SHA-512), зашифрованная передача
В этом случае клиент и экземпляр Kafka находятся в разных бизнес-кластерах. Поскольку на сервере Kafka включена аутентификация, процесс передачи должен быть зашифрован. Клиенту необходимо получить внешний адрес доступа экземпляра Kafka, имя пользователя и пароль по протоколу SCRAM-SHA-512, а также сертификат передачи по протоколу TLS.
-
Можно скопировать адрес подключения для «Доступ снаружи кластера — Plain Transmission» на вкладке 【Kafka Instance - Access Method】 и заменить его в реальном коде.
-
Можно использовать существующего пользователя или создать нового на вкладке 【Kafka Instance - User Management】.
-
Сертификаты передачи и пароля для протокола TLS
Используйте команду kubectl для генерации сертификатов пользователя и пароля, скопируйте соответствующие сертификаты в нужные пути на клиенте. Конкретные команды:
-
Пример кода подключения клиента:
package com.lanzhiwang.producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ExampleProducer {
public static void main(String[] args) {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "192.168.176.13:31786,192.168.176.23:30535,192.168.176.27:30219");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("security.protocol", "SASL_SSL");
kafkaProps.put("sasl.mechanism", "SCRAM-SHA-512");
kafkaProps.put("ssl.truststore.location", "/Users/temp/certs/ca.p12");
kafkaProps.put("ssl.truststore.password", "************");
kafkaProps.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"*************\" password=\"*************\";");
// Внутренний доступ не требует настройки, настройка для включения TLS на nodeport
kafkaProps.put("ssl.endpoint.identification.algorithm", "");
// ProducerRecord(String topic, K key, V value)
ProducerRecord<String, String> record = new ProducerRecord<>("my-cluster-topic", "Precision Products", "China");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {
// Игнорируем возвращаемое значение, нет способа узнать, было ли сообщение успешно отправлено.
producer.send(record);
// Thread.sleep(2000000); // Задержка 2000 секунд
} catch (Exception e) {
// Если у продюсера возникли ошибки до отправки сообщения в Kafka.
e.printStackTrace();
}
}
}
Внешний доступ — с аутентификацией (протокол TLS), зашифрованная передача
В этом случае клиент и экземпляр Kafka находятся в разных бизнес-кластерах. Поскольку на сервере Kafka включена аутентификация, процесс передачи должен быть зашифрован. Клиенту необходимо получить внешний адрес доступа экземпляра Kafka, а также имя пользователя и пароль по протоколу SCRAM-SHA-512 и сертификат передачи по протоколу TLS.
-
Можно скопировать адрес подключения для «Доступ снаружи кластера — Plain Transmission» на вкладке 【Kafka Instance - Access Method】 и заменить его в реальном коде.
-
Сертификаты передачи и пароля для протокола TLS
Используйте команду kubectl для генерации сертификатов пользователя и пароля, скопируйте соответствующие сертификаты в нужные пути на клиенте. Конкретные команды:
-
Сертификаты пользователя и пароля для протокола TLS
Используйте команду kubectl для генерации сертификата пользователя и пароля, скопируйте соответствующие сертификаты в нужные пути на клиенте. Конкретные команды:
-
Пример кода подключения клиента:
package com.lanzhiwang.producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ExampleProducer {
public static void main(String[] args) {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "192.168.176.13:31786,192.168.176.23:30535,192.168.176.27:30219");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Настройка свойств TLS/SSL
kafkaProps.put("security.protocol", "SSL");
// kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
kafkaProps.put("ssl.truststore.location", "/Users/temp/certs/ca.p12");
// kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d
kafkaProps.put("ssl.truststore.password", "************");
// kubectl -n <namespace> get secret <Kafka User Name> -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12
kafkaProps.put("ssl.keystore.location", "/Users/temp/certs/user.p12");
// kubectl -n <namespace> get secret <Kafka User Name> -o jsonpath='{.data.user\.password}' | base64 -d
kafkaProps.put("ssl.keystore.password", "************");
// Внутренний доступ не требует настройки, настройка для включения TLS на nodeport
kafkaProps.put("ssl.endpoint.identification.algorithm", "");
// ProducerRecord(String topic, K key, V value)
ProducerRecord<String, String> record = new ProducerRecord<>("my-cluster-topic", "Precision Products", "China");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {
// Игнорируем возвращаемое значение, нет способа узнать, было ли сообщение успешно отправлено.
producer.send(record);
// Thread.sleep(2000000); // Задержка 2000 секунд
} catch (Exception e) {
// Если у продюсера возникли ошибки до отправки сообщения в Kafka.
e.printStackTrace();
}
}
}