Choosing Connection Address and Configuring Connection Method

When business applications access Kafka instances, there are multiple connection methods. This section describes how to choose the connection address, access the Kafka instance through this address, and provides examples.

TOC

Description of Connection Methods Provided by the Server and Applicable Scenarios

When creating a Kafka instance, the server will provide various combinations of access links, with relevant parameter descriptions:

  • Internal/External Access: Whether the application and Kafka server are within the same business cluster.
  • Authentication Mechanism: Whether authentication is enabled for client identity verification. The authentication mechanisms support SCRAM-SHA-512 and TLS protocols (Note: the TLS authentication mechanism only supports being enabled over a TLS encrypted transmission link).
  • Transport Encryption: Ensures the security of data during transmission, using data encryption technology to prevent data from being intercepted or eavesdropped during network transmission. The transport encryption protocol is typically TLS.
Access MethodAuthentication MechanismTransport EncryptionApplicable Scenarios
InternalDisabledDisabledTesting scenario
InternalDisabledEnabledTesting scenario
InternalEnabled SCRAM-SHA-512 (recommended)DisabledAccess through the internal network, messages are transmitted without encryption, but authentication is required for sending and receiving messages.
InternalEnabled SCRAM-SHA-512 or TLS (more secure)EnabledAccess through the internal network, messages are transmitted with encryption, and authentication is required for sending and receiving messages. (Higher security, performance may be affected)
ExternalDisabledDisabledNot recommended
ExternalDisabledEnabledTesting scenario
ExternalEnabled SCRAM-SHA-512DisabledAccess through the external network, messages are transmitted without encryption
ExternalEnabled SCRAM-SHA-512 or TLS (recommended)EnabledAccess through the external network, messages are transmitted with encryption, and authentication is required for sending and receiving messages. (Higher security, performance may be affected)

Client Configuration Description

When connecting, the client needs to choose the appropriate link and configure the corresponding parameters based on the following information:

ParameterClient
Transport EncryptionIf the client needs to use an encrypted path, it must use the corresponding port and configure the TLS protocol certificate.
Authentication and AuthorizationIf authentication is enabled for the Kafka instance, the client needs to configure the corresponding user, password, or certificate for the protocol.
Internal/External ConnectionThe application selects the required connection address based on its location.

We demonstrate several typical access links (Java client), where other combinations can be configured accordingly based on the appropriate link:

  • Internal access, without authentication, non-encrypted transmission
  • Internal access, with authentication (TLS protocol), encrypted transmission
  • External access, with authentication (SCRAM-SHA-512 protocol), non-encrypted transmission
  • External access, with authentication (SCRAM-SHA-512 protocol), encrypted transmission
  • External access, with authentication (TLS protocol), encrypted transmission

1. Internal Access - Without Authentication, Non-encrypted Transmission

At this point, the client and Kafka instance are located in the same business cluster. Since the Kafka server has not enabled authentication, the transmission process does not require encryption. The client only needs to obtain the internal route of the Kafka instance.

You can copy the connection address for "Access within the Cluster - Plain Transmission" under the 【Kafka Instance - Access Method】 tab and replace it in the actual code snippet.

The client connection code example is as follows:

package com.lanzhiwang.producer;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class ExampleProducer {
    public static void main(String[] args) {

        Properties kafkaProps = new Properties();
        kafkaProps.put("bootstrap.servers", "test-kafka-bootstrap:9092");
        kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // ProducerRecord(String topic, K key, V value)
        ProducerRecord<String, String> record = new ProducerRecord<>("my-cluster-topic", "Precision Products", "China");

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {
            // Ignore the returned value, no way to know the message was sent successfully or not.
            producer.send(record);
            // Thread.sleep(2000000); // Delay 2000 seconds
        } catch (Exception e) {
            // If the producer encountered errors before sending the message to Kafka.
            e.printStackTrace();
        }
    }
}

2. Internal Access - With Authentication (TLS Protocol), Encrypted Transmission

At this point, the client and Kafka instance are located within the same business cluster. Since the Kafka server has enabled authentication, the transmission process needs to be encrypted. The client needs to obtain the internal route address of the Kafka instance, the user and password certificate for the TLS protocol, and the transmission certificate.

  1. Internal route address of the Kafka instance

    You can copy the connection address for "Access within the Cluster - TLS Transmission" under the 【Kafka Instance - Access Method】 tab and replace it in the actual code snippet.

  2. TLS protocol transmission and password certificate

    Use the kubectl command to generate user and password certificates, and copy the relevant certificates to the appropriate paths on the client. The specific commands are:

    • Generate CA certificate:

      $ kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
    • Generate CA certificate password:

      $ kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d
  3. TLS protocol user and password certificate

    Use the kubectl command to generate the transmission and password certificate and copy the relevant certificates to the appropriate paths on the client. The specific commands are:

    • Generate user certificate:

      $ kubectl -n <namespace> get secret <Kafka User Name> -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12
    • Generate user certificate password:

      $ kubectl -n <namespace> get secret <Kafka User Name> -o jsonpath='{.data.user\.password}' | base64 -d
  4. The client connection code example is as follows:

package com.lanzhiwang.producer;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class ExampleProducer {
    public static void main(String[] args) {

        Properties kafkaProps = new Properties();
        kafkaProps.put("bootstrap.servers", "test-kafka-bootstrap:9093");
        kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // Configure TLS/SSL properties
        kafkaProps.put("security.protocol", "SSL");
        // kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
        kafkaProps.put("ssl.truststore.location", "/Users/temp/certs/ca.p12");
        // kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d
        kafkaProps.put("ssl.truststore.password", "************");
        // kubectl -n <namespace> get secret <Kafka User Name> -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12
        kafkaProps.put("ssl.keystore.location", "/Users/temp/certs/user.p12");
        // kubectl -n <namespace> get secret <Kafka User Name> -o jsonpath='{.data.user\.password}' | base64 -d
        kafkaProps.put("ssl.keystore.password", "************");
        // Internal access does not require configuration, configuration for enabling TLS on nodeport
        kafkaProps.put("ssl.endpoint.identification.algorithm", "");
        // ProducerRecord(String topic, K key, V value)
        ProducerRecord<String, String> record = new ProducerRecord<>("my-cluster-topic", "Precision Products", "China");

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {
            // Ignore the returned value, no way to know the message was sent successfully or not.
            producer.send(record);
            // Thread.sleep(2000000); // Delay 2000 seconds
        } catch (Exception e) {
            // If the producer encountered errors before sending the message to Kafka.
            e.printStackTrace();
        }
    }
}

3. External Access - With Authentication (SCRAM-SHA-512 Protocol), Non-encrypted Transmission

At this point, the client and Kafka instance are located in different business clusters. Since the Kafka server has enabled authentication, the transmission process does not require encryption. The client needs to obtain the external access address of the Kafka instance, along with the username and password under the SCRAM-SHA-512 protocol.

  1. You can copy the connection address for "Access from outside the Cluster - Plain Transmission" under the 【Kafka Instance - Access Method】 tab and replace it in the actual code snippet.

  2. You can use an existing user or create a new user under the 【Kafka Instance - User Management】 tab.

  3. The client connection code example is as follows:

package com.lanzhiwang.producer;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class ExampleProducer {
    public static void main(String[] args) {

        Properties kafkaProps = new Properties();
        kafkaProps.put("bootstrap.servers", "192.168.176.13:31786,192.168.176.23:30535,192.168.176.27:30219");
        kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // Configure SCRAM-SHA-512 properties
        kafkaProps.put("sasl.mechanism", "SCRAM-SHA-512");
        kafkaProps.put("security.protocol", "SASL_PLAINTEXT");
        kafkaProps.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"*************\" password=\"*************\";");
        // ProducerRecord(String topic, K key, V value)
        ProducerRecord<String, String> record = new ProducerRecord<>("my-cluster-topic", "Precision Products", "China");

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {
            // Ignore the returned value, no way to know the message was sent successfully or not.
            producer.send(record);
            // Thread.sleep(2000000); // Delay 2000 seconds
        } catch (Exception e) {
            // If the producer encountered errors before sending the message to Kafka.
            e.printStackTrace();
        }
    }
}

4. External Access - With Authentication (SCRAM-SHA-512 Protocol), Encrypted Transmission

At this point, the client and Kafka instance are located in different business clusters. Since the Kafka server has enabled authentication, the transmission process needs to be encrypted. The client needs to obtain the external access address of the Kafka instance, username and password under the SCRAM-SHA-512 protocol, as well as the TLS protocol transmission certificate.

  1. You can copy the connection address for "Access from outside the Cluster - Plain Transmission" under the 【Kafka Instance - Access Method】 tab and replace it in the actual code snippet.

  2. You can use an existing user or create a new user under the 【Kafka Instance - User Management】 tab.

  3. TLS protocol transmission and password certificate

    Use the kubectl command to generate user and password certificates, and copy the relevant certificates to the appropriate paths on the client. The specific commands are:

    • Generate CA certificate:

      $ kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
    • Generate CA certificate password:

      $ kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d
  4. The client connection code example is as follows:

package com.lanzhiwang.producer;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class ExampleProducer {
    public static void main(String[] args) {

        Properties kafkaProps = new Properties();
        kafkaProps.put("bootstrap.servers", "192.168.176.13:31786,192.168.176.23:30535,192.168.176.27:30219");
        kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        kafkaProps.put("security.protocol", "SASL_SSL");
        kafkaProps.put("sasl.mechanism", "SCRAM-SHA-512");
        kafkaProps.put("ssl.truststore.location", "/Users/temp/certs/ca.p12");
        kafkaProps.put("ssl.truststore.password", "************");
        kafkaProps.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"*************\" password=\"*************\";");
        // Internal access does not require configuration, configuration for enabling TLS on nodeport
        kafkaProps.put("ssl.endpoint.identification.algorithm", "");
        // ProducerRecord(String topic, K key, V value)
        ProducerRecord<String, String> record = new ProducerRecord<>("my-cluster-topic", "Precision Products", "China");

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {
            // Ignore the returned value, no way to know the message was sent successfully or not.
            producer.send(record);
            // Thread.sleep(2000000); // Delay 2000 seconds
        } catch (Exception e) {
            // If the producer encountered errors before sending the message to Kafka.
            e.printStackTrace();
        }
    }
}

External Access - With Authentication (TLS Protocol), Encrypted Transmission

At this point, the client and Kafka instance are located in different business clusters. Since the Kafka server has enabled authentication, the transmission process needs to be encrypted. The client needs to obtain the external access address of the Kafka instance, along with the username and password under the SCRAM-SHA-512 protocol, and the TLS protocol transmission certificate.

  1. You can copy the connection address for "Access from outside the Cluster - Plain Transmission" under the 【Kafka Instance - Access Method】 tab and replace it in the actual code snippet.

  2. TLS protocol transmission and password certificate

    Use the kubectl command to generate user and password certificates, and copy the relevant certificates to the appropriate paths on the client. The specific commands are:

    • Generate CA certificate:

      $ kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
    • Generate CA certificate password:

      $ kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d
  3. TLS protocol user and password certificate

    Use the kubectl command to generate the transmission and password certificate and copy the relevant certificates to the appropriate paths on the client. The specific commands are:

    • Generate user certificate:

      $ kubectl -n <namespace> get secret <Kafka User Name> -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12
    • Generate user certificate password:

      $ kubectl -n <namespace> get secret <Kafka User Name> -o jsonpath='{.data.user\.password}' | base64 -d
  4. The client connection code example is as follows:

package com.lanzhiwang.producer;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class ExampleProducer {
    public static void main(String[] args) {

        Properties kafkaProps = new Properties();
        kafkaProps.put("bootstrap.servers", "192.168.176.13:31786,192.168.176.23:30535,192.168.176.27:30219");
        kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // Configure TLS/SSL properties
        kafkaProps.put("security.protocol", "SSL");
        // kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
        kafkaProps.put("ssl.truststore.location", "/Users/temp/certs/ca.p12");
        // kubectl -n <namespace> get secret <name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d
        kafkaProps.put("ssl.truststore.password", "************");
        // kubectl -n <namespace> get secret <Kafka User Name> -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12
        kafkaProps.put("ssl.keystore.location", "/Users/temp/certs/user.p12");
        // kubectl -n <namespace> get secret <Kafka User Name> -o jsonpath='{.data.user\.password}' | base64 -d
        kafkaProps.put("ssl.keystore.password", "************");
        // Internal access does not require configuration, configuration for enabling TLS on nodeport
        kafkaProps.put("ssl.endpoint.identification.algorithm", "");
        // ProducerRecord(String topic, K key, V value)
        ProducerRecord<String, String> record = new ProducerRecord<>("my-cluster-topic", "Precision Products", "China");

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {
            // Ignore the returned value, no way to know the message was sent successfully or not.
            producer.send(record);
            // Thread.sleep(2000000); // Delay 2000 seconds
        } catch (Exception e) {
            // If the producer encountered errors before sending the message to Kafka.
            e.printStackTrace();
        }
    }
}