Cloudera Enterprise 6.0 Beta | Other versions

Spark 2 Kafka Integration

Minimum Required Role: Cluster Administrator (also provided by Full Administrator)

CDH 6 includes a Kafka integration feature that uses the new Kafka consumer API. This new Kafka consumer API supports reading data from secure Kafka clusters. In this context, secure clusters are those that are authenticated by Kerberos, and optionally using TLS/SSL for wire encryption.

Continue reading:

Requirements

To read data securely from Kafka, or to use the new Spark-Kafka integration that uses the new Kafka consumer API, requires the following software versions:

  • For CDH 6: Any version of CDH 6.x.

  • For CDH 5: Cloudera Distribution of Apache Spark 2.1 release 1 or higher.

  • Cloudera Distribution of Apache Kafka 2.1 or higher. (Although the Kafka consumer API is available starting in Cloudera Distribution of Apache Kafka 2.0, the Spark integration requires Kafka 2.1.)

Running Spark Jobs that Integrate with Kafka

To run jobs that use the new Kafka integration, you can use one of the following two techniques.

  Note:

If you do not intend to use the new Kafka consumer integration and are using the existing Kafka integration (either the Kafka receiver or the direct connector), you do not need to do anything regardless of what Kafka version you are using. If you want to use the new Kafka consumer API integration, use one of the following two techniques to make sure you are using the newer Kafka jars in the Spark classpath.

Technique #1: Set SPARK_KAFKA_VERSION environment variable

When running jobs that require the new Kafka integration, set SPARK_KAFKA_VERSION=0.10 in the shell before launching spark-submit. Use the appropriate environment variable syntax for your shell, such as:

# Permanently set the environment variable
export SPARK_KAFKA_VERSION=0.10
spark-submit arguments

# Or:

# Set the environment variable only for the duration of 1 command
SPARK_KAFKA_VERSION=0.10 spark-submit arguments

Technique #2: Set spark_kafka_version setting through Cloudera Manager

Set spark_kafka_version configuration in Cloudera Manager’s Spark 2 service to be 0.10 and redeploy the client configuration. No need to source any environment variables when launching spark-submit.

Technique #2 is preferable if you have upgraded your Kafka brokers to a Cloudera Distribution of Apache Kafka 2.1 or higher, and do not intend to have your Spark jobs communicate with brokers running a version of Kafka prior to Cloudera Distribution of Apache Kafka 2.1.

If you modify the default Kafka version to 0.10 by using technique #2, you can connect to old Kafka brokers (for example, one based on Cloudera Distribution of Apache Kafka 2.0) by setting SPARK_KAFKA_VERSION=0.9 when running your application.

  Note: To keep compatibility and not break any existing users, the default is to use older (Cloudera Kafka 2.0-based) client jars in the Spark classpath. Those jars let users communicate with older (Cloudera Kafka 2.0) brokers but also newer brokers (Cloudera Kafka 2.1) brokers as long as the old Kafka consumer APIs are being used. However, to use the new kafka consumer API integration, one of the two steps are required. This is to ensure that the new Kafka client jars (from Cloudera Kafka 2.1) are in the Spark classpath. It is not practical to put both Kafka 2.0 client jars and Kafka 2.1 clients jars on the Spark classpath due to incompatibilities; the engineering best practice is to only use one version of any jar file in the classpath.

Building Applications

To build applications against the new Kafka integration, you can add the dependency by using the following Maven coordinates:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
  <version>2.1.0.cloudera1</version>
</dependency>

Reading from Authorized Kafka

To read from a Kafka cluster authorized by Sentry, give privileges to your consumer group as described in . You must also grant privileges to another consumer group spark-executor-your_consumer_group in the same way. This is because the driver uses the consumer group specified in your app, but the executors use a different consumer group, which is hardcoded to spark-executor-your_consumer_group.

Page generated March 7, 2018.