• Русский
  • Выбор адреса подключения и настройка способа подключения

    Когда бизнес-приложения обращаются к экземплярам Kafka, существует несколько способов подключения. В этом разделе описывается, как выбрать адрес подключения, получить доступ к экземпляру Kafka через этот адрес, а также приведены примеры.

    Содержание

    Описание способов подключения, предоставляемых сервером, и применимые сценарии

    При создании экземпляра Kafka сервер предоставляет различные комбинации ссылок для доступа с соответствующим описанием параметров:

    • Внутренний/Внешний доступ: находятся ли приложение и сервер Kafka в одном бизнес-кластере.
    • Механизм аутентификации: включена ли аутентификация для проверки идентичности клиента. Механизмы аутентификации поддерживают SCRAM-SHA-512 и протокол TLS (Примечание: механизм аутентификации TLS поддерживается только при включении по TLS-зашифрованному каналу передачи).
    • Шифрование транспорта: обеспечивает безопасность данных при передаче, используя технологию шифрования данных, чтобы предотвратить перехват или прослушивание данных в сети. Протокол шифрования транспорта обычно TLS.
    Способ доступаМеханизм аутентификацииШифрование транспортаПрименимые сценарии
    ВнутреннийОтключеноОтключеноТестовый сценарий
    ВнутреннийОтключеноВключеноТестовый сценарий
    ВнутреннийВключено SCRAM-SHA-512 (рекомендуется)ОтключеноДоступ через внутреннюю сеть, сообщения передаются без шифрования, но требуется аутентификация для отправки и получения сообщений.
    ВнутреннийВключено SCRAM-SHA-512 или TLS (более безопасно)ВключеноДоступ через внутреннюю сеть, сообщения передаются с шифрованием, требуется аутентификация для отправки и получения сообщений. (Более высокая безопасность, возможное влияние на производительность)
    ВнешнийОтключеноОтключеноНе рекомендуется
    ВнешнийОтключеноВключеноТестовый сценарий
    ВнешнийВключено SCRAM-SHA-512ОтключеноДоступ через внешнюю сеть, сообщения передаются без шифрования
    ВнешнийВключено SCRAM-SHA-512 или TLS (рекомендуется)ВключеноДоступ через внешнюю сеть, сообщения передаются с шифрованием, требуется аутентификация для отправки и получения сообщений. (Более высокая безопасность, возможное влияние на производительность)

    Описание конфигурации клиента

    При подключении клиент должен выбрать соответствующую ссылку и настроить соответствующие параметры на основе следующей информации:

    ПараметрКлиент
    Шифрование транспортаЕсли клиент должен использовать зашифрованный канал, необходимо использовать соответствующий порт и настроить сертификат протокола TLS.
    Аутентификация и авторизацияЕсли для экземпляра Kafka включена аутентификация, клиент должен настроить соответствующего пользователя, пароль или сертификат для протокола.
    Внутреннее/внешнее подключениеПриложение выбирает необходимый адрес подключения в зависимости от своего расположения.

    Ниже приведены несколько типичных ссылок доступа (Java клиент), другие комбинации можно настроить соответственно на основе подходящей ссылки:

    • Внутренний доступ, без аутентификации, передача без шифрования
    • Внутренний доступ, с аутентификацией (протокол TLS), зашифрованная передача
    • Внешний доступ, с аутентификацией (протокол SCRAM-SHA-512), передача без шифрования
    • Внешний доступ, с аутентификацией (протокол SCRAM-SHA-512), зашифрованная передача
    • Внешний доступ, с аутентификацией (протокол TLS), зашифрованная передача

    1. Внутренний доступ — без аутентификации, передача без шифрования

    В этом случае клиент и экземпляр Kafka находятся в одном бизнес-кластере. Поскольку на сервере Kafka аутентификация не включена, процесс передачи не требует шифрования. Клиенту достаточно получить внутренний маршрут экземпляра Kafka.

    Можно скопировать адрес подключения для «Доступ внутри кластера — Plain Transmission» на вкладке 【Kafka Instance - Access Method】 и заменить его в реальном коде.

    Пример кода подключения клиента:

    package com.lanzhiwang.producer;
    import java.util.Properties;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    public class ExampleProducer {
        public static void main(String[] args) {
    
            Properties kafkaProps = new Properties();
            kafkaProps.put("bootstrap.servers", "test-kafka-bootstrap:9092");
            kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            // ProducerRecord(String topic, K key, V value)
            ProducerRecord<String, String> record = new ProducerRecord<>("my-cluster-topic", "Precision Products", "China");
    
            try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {
                // Игнорируем возвращаемое значение, нет способа узнать, было ли сообщение успешно отправлено.
                producer.send(record);
                // Thread.sleep(2000000); // Задержка 2000 секунд
            } catch (Exception e) {
                // Если у продюсера возникли ошибки до отправки сообщения в Kafka.
                e.printStackTrace();
            }
        }
    }

    2. Внутренний доступ — с аутентификацией (протокол TLS), зашифрованная передача

    В этом случае клиент и экземпляр Kafka находятся в одном бизнес-кластере. Поскольку на сервере Kafka включена аутентификация, процесс передачи должен быть зашифрован. Клиенту необходимо получить внутренний адрес маршрута экземпляра Kafka, сертификаты пользователя и пароля для протокола TLS, а также сертификат передачи.

    1. Внутренний адрес маршрута экземпляра Kafka

      Можно скопировать адрес подключения для «Доступ внутри кластера — TLS Transmission» на вкладке 【Kafka Instance - Access Method】 и заменить его в реальном коде.

    2. Сертификаты передачи и пароля для протокола TLS

      Используйте команду kubectl для генерации сертификатов пользователя и пароля, скопируйте соответствующие сертификаты в нужные пути на клиенте. Конкретные команды:

      • Генерация CA-сертификата:

        $ kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
      • Получение пароля CA-сертификата:

        $ kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d
    3. Сертификаты пользователя и пароля для протокола TLS

      Используйте команду kubectl для генерации сертификата пользователя и пароля, скопируйте соответствующие сертификаты в нужные пути на клиенте. Конкретные команды:

      • Генерация сертификата пользователя:

        $ kubectl -n <namespace> get secret <Kafka User Name> -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12
      • Получение пароля сертификата пользователя:

        $ kubectl -n <namespace> get secret <Kafka User Name> -o jsonpath='{.data.user\.password}' | base64 -d
    4. Пример кода подключения клиента:

      package com.lanzhiwang.producer;
      import java.util.Properties;
      
      import org.apache.kafka.clients.producer.KafkaProducer;
      import org.apache.kafka.clients.producer.ProducerRecord;
      
      public class ExampleProducer {
          public static void main(String[] args) {
      
              Properties kafkaProps = new Properties();
              kafkaProps.put("bootstrap.servers", "test-kafka-bootstrap:9093");
              kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
              kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      
              // Настройка свойств TLS/SSL
              kafkaProps.put("security.protocol", "SSL");
              // kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
              kafkaProps.put("ssl.truststore.location", "/Users/temp/certs/ca.p12");
              // kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d
              kafkaProps.put("ssl.truststore.password", "************");
              // kubectl -n <namespace> get secret <Kafka User Name> -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12
              kafkaProps.put("ssl.keystore.location", "/Users/temp/certs/user.p12");
              // kubectl -n <namespace> get secret <Kafka User Name> -o jsonpath='{.data.user\.password}' | base64 -d
              kafkaProps.put("ssl.keystore.password", "************");
              // Внутренний доступ не требует настройки, настройка для включения TLS на nodeport
              kafkaProps.put("ssl.endpoint.identification.algorithm", "");
              // ProducerRecord(String topic, K key, V value)
              ProducerRecord<String, String> record = new ProducerRecord<>("my-cluster-topic", "Precision Products", "China");
      
              try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {
                  // Игнорируем возвращаемое значение, нет способа узнать, было ли сообщение успешно отправлено.
                  producer.send(record);
                  // Thread.sleep(2000000); // Задержка 2000 секунд
              } catch (Exception e) {
                  // Если у продюсера возникли ошибки до отправки сообщения в Kafka.
                  e.printStackTrace();
              }
          }
      }

    3. Внешний доступ — с аутентификацией (протокол SCRAM-SHA-512), передача без шифрования

    В этом случае клиент и экземпляр Kafka находятся в разных бизнес-кластерах. Поскольку на сервере Kafka включена аутентификация, процесс передачи не требует шифрования. Клиенту необходимо получить внешний адрес доступа экземпляра Kafka, а также имя пользователя и пароль по протоколу SCRAM-SHA-512.

    1. Можно скопировать адрес подключения для «Доступ снаружи кластера — Plain Transmission» на вкладке 【Kafka Instance - Access Method】 и заменить его в реальном коде.

    2. Можно использовать существующего пользователя или создать нового на вкладке 【Kafka Instance - User Management】.

    3. Пример кода подключения клиента:

      package com.lanzhiwang.producer;
      import java.util.Properties;
      
      import org.apache.kafka.clients.producer.KafkaProducer;
      import org.apache.kafka.clients.producer.ProducerRecord;
      
      public class ExampleProducer {
          public static void main(String[] args) {
      
              Properties kafkaProps = new Properties();
              kafkaProps.put("bootstrap.servers", "192.168.176.13:31786,192.168.176.23:30535,192.168.176.27:30219");
              kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
              kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      
              // Настройка свойств SCRAM-SHA-512
              kafkaProps.put("sasl.mechanism", "SCRAM-SHA-512");
              kafkaProps.put("security.protocol", "SASL_PLAINTEXT");
              kafkaProps.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"*************\" password=\"*************\";");
              // ProducerRecord(String topic, K key, V value)
              ProducerRecord<String, String> record = new ProducerRecord<>("my-cluster-topic", "Precision Products", "China");
      
              try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {
                  // Игнорируем возвращаемое значение, нет способа узнать, было ли сообщение успешно отправлено.
                  producer.send(record);
                  // Thread.sleep(2000000); // Задержка 2000 секунд
              } catch (Exception e) {
                  // Если у продюсера возникли ошибки до отправки сообщения в Kafka.
                  e.printStackTrace();
              }
          }
      }

    4. Внешний доступ — с аутентификацией (протокол SCRAM-SHA-512), зашифрованная передача

    В этом случае клиент и экземпляр Kafka находятся в разных бизнес-кластерах. Поскольку на сервере Kafka включена аутентификация, процесс передачи должен быть зашифрован. Клиенту необходимо получить внешний адрес доступа экземпляра Kafka, имя пользователя и пароль по протоколу SCRAM-SHA-512, а также сертификат передачи по протоколу TLS.

    1. Можно скопировать адрес подключения для «Доступ снаружи кластера — Plain Transmission» на вкладке 【Kafka Instance - Access Method】 и заменить его в реальном коде.

    2. Можно использовать существующего пользователя или создать нового на вкладке 【Kafka Instance - User Management】.

    3. Сертификаты передачи и пароля для протокола TLS

      Используйте команду kubectl для генерации сертификатов пользователя и пароля, скопируйте соответствующие сертификаты в нужные пути на клиенте. Конкретные команды:

      • Генерация CA-сертификата:

        $ kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
      • Получение пароля CA-сертификата:

        $ kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d
    4. Пример кода подключения клиента:

      package com.lanzhiwang.producer;
      import java.util.Properties;
      
      import org.apache.kafka.clients.producer.KafkaProducer;
      import org.apache.kafka.clients.producer.ProducerRecord;
      
      public class ExampleProducer {
          public static void main(String[] args) {
      
              Properties kafkaProps = new Properties();
              kafkaProps.put("bootstrap.servers", "192.168.176.13:31786,192.168.176.23:30535,192.168.176.27:30219");
              kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
              kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      
              kafkaProps.put("security.protocol", "SASL_SSL");
              kafkaProps.put("sasl.mechanism", "SCRAM-SHA-512");
              kafkaProps.put("ssl.truststore.location", "/Users/temp/certs/ca.p12");
              kafkaProps.put("ssl.truststore.password", "************");
              kafkaProps.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"*************\" password=\"*************\";");
              // Внутренний доступ не требует настройки, настройка для включения TLS на nodeport
              kafkaProps.put("ssl.endpoint.identification.algorithm", "");
              // ProducerRecord(String topic, K key, V value)
              ProducerRecord<String, String> record = new ProducerRecord<>("my-cluster-topic", "Precision Products", "China");
      
              try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {
                  // Игнорируем возвращаемое значение, нет способа узнать, было ли сообщение успешно отправлено.
                  producer.send(record);
                  // Thread.sleep(2000000); // Задержка 2000 секунд
              } catch (Exception e) {
                  // Если у продюсера возникли ошибки до отправки сообщения в Kafka.
                  e.printStackTrace();
              }
          }
      }

    Внешний доступ — с аутентификацией (протокол TLS), зашифрованная передача

    В этом случае клиент и экземпляр Kafka находятся в разных бизнес-кластерах. Поскольку на сервере Kafka включена аутентификация, процесс передачи должен быть зашифрован. Клиенту необходимо получить внешний адрес доступа экземпляра Kafka, а также имя пользователя и пароль по протоколу SCRAM-SHA-512 и сертификат передачи по протоколу TLS.

    1. Можно скопировать адрес подключения для «Доступ снаружи кластера — Plain Transmission» на вкладке 【Kafka Instance - Access Method】 и заменить его в реальном коде.

    2. Сертификаты передачи и пароля для протокола TLS

      Используйте команду kubectl для генерации сертификатов пользователя и пароля, скопируйте соответствующие сертификаты в нужные пути на клиенте. Конкретные команды:

      • Генерация CA-сертификата:

        $ kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
      • Получение пароля CA-сертификата:

        $ kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d
    3. Сертификаты пользователя и пароля для протокола TLS

      Используйте команду kubectl для генерации сертификата пользователя и пароля, скопируйте соответствующие сертификаты в нужные пути на клиенте. Конкретные команды:

      • Генерация сертификата пользователя:

        $ kubectl -n <namespace> get secret <Kafka User Name> -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12
      • Получение пароля сертификата пользователя:

        $ kubectl -n <namespace> get secret <Kafka User Name> -o jsonpath='{.data.user\.password}' | base64 -d
    4. Пример кода подключения клиента:

      package com.lanzhiwang.producer;
      import java.util.Properties;
      
      import org.apache.kafka.clients.producer.KafkaProducer;
      import org.apache.kafka.clients.producer.ProducerRecord;
      
      public class ExampleProducer {
          public static void main(String[] args) {
      
              Properties kafkaProps = new Properties();
              kafkaProps.put("bootstrap.servers", "192.168.176.13:31786,192.168.176.23:30535,192.168.176.27:30219");
              kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
              kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      
              // Настройка свойств TLS/SSL
              kafkaProps.put("security.protocol", "SSL");
              // kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
              kafkaProps.put("ssl.truststore.location", "/Users/temp/certs/ca.p12");
              // kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d
              kafkaProps.put("ssl.truststore.password", "************");
              // kubectl -n <namespace> get secret <Kafka User Name> -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12
              kafkaProps.put("ssl.keystore.location", "/Users/temp/certs/user.p12");
              // kubectl -n <namespace> get secret <Kafka User Name> -o jsonpath='{.data.user\.password}' | base64 -d
              kafkaProps.put("ssl.keystore.password", "************");
              // Внутренний доступ не требует настройки, настройка для включения TLS на nodeport
              kafkaProps.put("ssl.endpoint.identification.algorithm", "");
              // ProducerRecord(String topic, K key, V value)
              ProducerRecord<String, String> record = new ProducerRecord<>("my-cluster-topic", "Precision Products", "China");
      
              try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {
                  // Игнорируем возвращаемое значение, нет способа узнать, было ли сообщение успешно отправлено.
                  producer.send(record);
                  // Thread.sleep(2000000); // Задержка 2000 секунд
              } catch (Exception e) {
                  // Если у продюсера возникли ошибки до отправки сообщения в Kafka.
                  e.printStackTrace();
              }
          }
      }