Oliver Meyn

Hadoop & Big Data Consultant

Spark 2.0 streaming from SSL Kafka with HDP 2.4

Hortonworks Data Flow (HDF) bundles Apache NiFi, Apache Kafka, and Apache Storm. Together these make a powerful combination for transporting and transforming data in near-real time. With the addition of Apache Spark for machine learning and model evaluation, Apache Hadoop for storage, and Apache Hive for later analytics querying we have the complete chain for processing data from the Internet of Things (IoT). At a recent customer (in my work for T4G) we were asked to build this workflow, but with the strong requirement that all Kafka communication be encrypted via SSL/TLS, and that all parts of the architecture do secure authentication via Kerberos. These requirements turned out to cause some significant challenges and this blog post explains how we overcame them. Note that all the code referenced later is available on github.

The architecture

The flow of data would be as in the following diagram – originating from television set-top boxes, transiting a NiFi instance outside the cluster firewall, through the SSL Kafka, picked up by Spark running inside the cluster (on YARN) for ML evaluation, and a second NiFi for writing directly to HDFS.

In this case the cluster version is HDP 2.4.2, which natively provides Spark version 1.6. There is an MIT Kerberos daemon (kdc) inside the cluster, accessible through the firewall. NiFi is a single instance of version 1.0 (from HDF 2.0), running outside the cluster. Kafka is a single 0.10 instance (from HDF), providing SSL connections through a self-signed certificate, and running inside the cluster.

NiFi configuration

NiFi allows the connection of various “processors” into any number of workflows through a very user-friendly GUI. It originated as a product of the NSA, with a small subset of their processors included in the initial Apache release. Since it was open sourced many more processors have been added, but not all of them place security as the highest priority. In this case the PublishKafka_0_10 processor behaves very well and configuring the SSL context within NiFi and adding the requisite jaas.conf file with our Kerberos principal was straightforward (reasonably well described in the NiFi documentation). Make sure you’re using the SASL_SSL Security Protocol. Once you have the PublishKafka processor working you can easily test connectivity by adding a ConsumeKafka_0_10 processor that writes to local files. A good idea is also to set up a simple workflow that can put test messages onto your Kafka topic of choice for later testing with the Spark consumer.

Spark time!

In HDP 2.4.2 the shipped version of Spark is 1.6, and Spark 1.6 can not speak SSL to Kafka. Full stop. This is because the Kafka client libraries changed between version 0.8 and 0.10, and SSL is only available as of 0.10 (as described by DataBricks). For further proof, the JIRA that states it won’t work is https://issues.apache.org/jira/browse/SPARK-12177 (note that Spark versioning moves from 1.6 to 2.0 – there’s nothing in between). Sooooo…..

Spark time, for real this time!

Because we’re running Spark on YARN, the application is self contained and in principle it should be easy (hah!) to package it up such that the cluster is none the wiser. Some differences between Spark 1.6 and 2.0 before we go further:

  1.6 2.0
Scala Version 2.10 2.11
Uses assembly? Y N
SSL Support? N Y

Hortonworks has a blog post that shows how to do it on HDP 2.5 – Spark 2 is officially supported by Hortonworks on HDP 2.5 but not on HDP 2.4. Note that in Spark versions less than 2 it used an assembly jar that held all the needed Spark libraries, and this was often hard to configure when building by hand. Starting with Spark 2 the libraries are loaded as needed and there is no more assembly. The steps for installing on 2.4 are basically the same as for 2.5:

1) Download the Spark libraries from http://spark.apache.org/downloads.html, Pre-built for Hadoop 2.7. For this project we used v2.0.2. We recommend using the Scala 2.11 package (the default) for ease of compatibility with other libraries

2) Unzip them into /usr/hdp/current/spark2-client on the machine where you start Spark jobs (typically an Edge node)

3) The user who will run the spark jobs needs ‘export SPARK_HOME=/usr/hdp/current/spark2-client’

4) Presuming you’ve got Spark 1.6 working fine already, copy its spark-defaults.conf to the new spark2-client directory (e.g. cp /usr/hdp/current/spark-client/conf/spark-defaults.conf /usr/hdp/current/spark2-client/conf/)
5) Add/edit the following lines to the spark2-client’s spark-defaults.conf:

spark.driver.extraLibraryPath /usr/hdp/current/hadoop-client/lib/native
spark.executor.extraLibraryPath /usr/hdp/current/hadoop-client/lib/native
spark.driver.extraJavaOptions -Dhdp.version=2.4.2.0-258
spark.yarn.am.extraJavaOptions -Dhdp.version=2.4.2.0-258
spark.hadoop.yarn.timeline-service.enabled false

Note the hdp.version parameters – it is critical that these match your actual cluster version. YARN uses them to build the classpath of its jobs, and HDP ships with some jars (notably lzo) that have the version number in their names, so if it’s wrong the classpath breaks and you’ll get error messages like “Can’t find ApplicationLauncher”. Similarly disabling the timeline service is a consequence of some Jersey classes missing in the newer Spark.

This is enough to get Spark 2 running in HDP 2.4, and you can test by calculating Pi:

$SPARK_HOME/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
--driver-memory 2g \
--executor-memory 2g \
--executor-cores 1 \
$SPARK_HOME/examples/jars/spark-examples*.jar \
10

Note you’ll have to kinit before submitting the job, but that should be enough to get you a few digits of pi. Since our goal is to speak SSL to Kafka from a Spark streaming app, we’re not done yet. We need to tell Spark how to authenticate itself to Kafka (via Kerberos) and to trust the self-signed SSL certificate that Kafka presents.

Adding Kerberos

We need to provide a keytab for our Kerberos principal, and instructions how to use the keytab in the form of a jaas.conf file (JAAS = Java Authentication and Authorization Service). Note that in these examples we use the principal gandalf@DOMAIN.COM – substitute your principal as needed.

jaas.conf

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="./gandalf.keytab"
storeKey=true
useTicketCache=false
serviceName="kafka"
principal="gandalf@DOMAIN.COM";
};

With the jaas.conf and a keytab handy in the directory from which you’ll launch your job, launch as follows:

$SPARK_HOME/bin/spark-submit \
--master yarn \
--files jaas.conf,gandalf.keytab \
--driver-java-options "-Djava.security.auth.login.config=./jaas.conf -Dhdp.version=2.4.2.0-258" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" \
--class com.example.ClassName uber-jar-with-deps-and-hive-site.jar

Note the hdp.version showing up again – by passing the –driver-java-options parameter we’re overriding the parameter given in spark-defaults.conf so if we don’t provide the hdp.version here as well as our security option the classpath will again break.

Adding SSL

Almost there – now we need to trust Kafka. If you have a CA signed certificate in Kafka then you can technically skip this step, but it won’t hurt. Java ships with a store of certificates that it implicitly trusts – these are the signing authorities of the Web. Here we take a copy of that store and add the certificate from Kafka to it. Be careful with language here – Java only talks about keystores, but NiFi and Kafka refer to both keystores and truststores. They refer to keystores as those used by servers that provide a secure connection (e.g. Kafka in our case) and the keystore holds the certificates and private keys for that server. That is different from truststores, which are the certificates that a client (e.g. Spark in our case) trusts implicitly when opening connections to servers. This is the same process as when using a web browser to connect to a secure web server – when you ignore the warnings that you’re going to an insecure site (say if the certificate has expired, or the names don’t match) and check the “Always ignore this warning” box, you’re adding that certificate to your truststore.

This may require root privileges (and note that the default password for the Java cacerts store is ‘changeit’):

cp $JAVA_HOME/jre/lib/security/cacerts .
keytool -storepasswd -new change-me-to-something-safe -keystore cacerts
openssl s_client -connect your.broker.fqdn:9092 -showcerts > kafka-certs.txt
keytool -import -alias kafka1 -file kafka-certs.txt -keystore cacerts -storepass change-me-to-something-safe
rm -f kafka-certs.txt
mv cacerts kafka.client.truststore.jks

Now you have a truststore called kafka.client.truststore.jks and can pass that in when you start your Spark job:

$SPARK_HOME/bin/spark-submit \
--master yarn \
--files jaas.conf,gandalf.keytab,kafka.client.truststore.jks \
--driver-java-options "-Djava.security.auth.login.config=./jaas.conf -Dhdp.version=2.4.2.0-258" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" \
--class com.example.ClassName uber-jar-with-deps-and-hive-site.jar

Adding SparkSQL support (hive-site.xml)

In our case we needed SparkSQL support (for working with DataFrames) and that means bundling your hive-site.xml in the submitted Spark app. Taking a copy of the hive-site.xml from your cluster is fine, with one important change – make sure the hive.execution.engine is not tez, but mr:

<property>
  <name>hive.execution.engine</name>
  <value>mr</value>
</property>

Spark won’t use MapReduce for anything but when it tries to load the metastore it will get confused (read: everything breaks) if it’s not ‘mr’.

Code

Finally we have all the pieces connected, now we can write some code that actually reads messages from Kafka and does something useful. We’ll leave the something useful to you, but here’s some skeleton code that makes the connection (this code is also available on github):

import grizzled.slf4j.Logger
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.ConsumerStrategies._
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies._
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SimplestStreaming {
 val logger = Logger[this.type]

def main(args: Array[String]): Unit = {
 val spark = SparkSession
 .builder()
 .appName("Simplest streaming (spark 2.0) from Kafka SSL")
 .enableHiveSupport()
 .getOrCreate()
 val sparkContext = spark.sparkContext

val streamingContext = new StreamingContext(sparkContext, Seconds(10))
 // expects jaas.conf, appropriate keytab, and kafka.client.truststore.jks passed in as part of spark-submit
 val kafkaParams = Map[String, Object](
 "bootstrap.servers" -> ":",
 "key.deserializer" -> classOf[StringDeserializer],
 "value.deserializer" -> classOf[StringDeserializer],
 "group.id" -> "test1",
 "auto.offset.reset" -> "latest",
 "enable.auto.commit" -> (false: java.lang.Boolean),
 "security.protocol" -> "SASL_SSL",
 "ssl.truststore.location" -> "./kafka.client.truststore.jks",
 "ssl.truststore.password" -> "change-me-to-something-safe"
 )
 val topic = Set("sasl_ssl_test")

val stream = KafkaUtils.createDirectStream[String, String](
 streamingContext,
 PreferConsistent,
 Subscribe[String, String](topic, kafkaParams)
 )

stream.foreachRDD { rdd =>
 // Get the singleton instance of SparkSession
 val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
 import spark.implicits._

 val df = rdd.map( consumerRecord => {
 consumerRecord.value()
 }).toDF()

 df.show()
 }

// start the computation
 streamingContext.start()
 streamingContext.awaitTermination()
 }
}

Maven

We use Maven to build the project and included in the code is the pom.xml that will let you build this class. Use mvn clean package and your uber jar (with all dependencies except Spark included) will be in the target/ dir.

Conclusion

There is a large dent in the wall where we banged our heads to get this all working, but hopefully we’ve now saved you the same fate. Good luck!

Upgrading our cluster from CDH4 to CDH5

Originally published November 26, 2014 at the GBIF Developer blog.

A little over a year ago we wrote about upgrading from CDH3 to CDH4 and now the time had come to upgrade from CDH4 to CDH5. The short version: upgrading the cluster itself was easy, but getting our applications to work with the new classpaths, especially MapReduce v2 (YARN), was painful.

The Cluster

Our cluster has grown since the last upgrade (now 12 slaves and 3 masters), and we no longer had the luxury of splitting the machines to build a new cluster from scratch. So this was an in-place upgrade, using CDH Manager.

UPGRADE CDH MANAGER

The first step was upgrading to CDH Manager 5.2 (from our existing 4.8). The Cloudera documentation is excellent so I don’t need to repeat it here. What we did find was that the management service now requests significantly more RAM for it’s monitoring services (minimum “happy” config of 14GB), to the point where our existing masters were overwhelmed. As a stop gap we’ve added a 4th old machine to the “masters” group, used exclusively for the management service. In the longer term we’ll replace the 4 masters with 3 new machines that have enough resources.

UPGRADE CLUSTER MEMBERS

Again the Cloudera documentation is excellent but I’ll just add a bit. The upgrade process will now ask if a JAVA jdk should be installed (an improvement over the old behaviour of just installing one anyway). That means we could finally remove the Oracle JDK 6 rpms that have been lying around on the machines. For some reason the Host Inspector still complains about OpenJDK 7 vs Oracle 7 but we’ve happily been running on OpenJDK 7 since early 2014, and so far so good with CDH5 as well. After the upgrade wizard finished we had to tweak memory settings throughout the cluster, including setting the “Memory Overcommit Validation Threshold” to 0.99, up from its (very conservative) default of 0.8. Cloudera has another nice blog post on figuring out memory settings for YARN. Additionally Hue’s configuration required some attention because after the upgrade it had forgotten where Zookeeper and the HBase Thrift server were. All in all quite painless.

The Gotchas

Getting our software to work with CDH5 was definitely not painless. All of our problems stemmed from conflicting versions of jars, due either to changes in CDH dependencies, or in changes to how a user classpath is specified as having priority over that of Yarn/HBase/Oozie. Additionally it took some time to wrap our heads around the new artifact packaging used by YARN and HBase. Note that we also use Maven for dependency management.

Guava

We’re not alone in our suffering at the hands of mismatched Guava versions (e.g. HADOOP-10101, HDFS-7040), but suffer we did. We resorted to specifying version 14.0.1 in any of our code that touches Hadoop and more importantly HBase, and exclude any higher version guavas from our dependencies. This meant downgrading some actual code that was using guava 15, but was the easiest path to getting a working system.

Jackson

We have many dependencies on Jackson 1.9 and 2+ throughout our code, so downgrading to match HBase’s shipped 1.8.8 was not an option. It meant figuring out the classpath precedence rules described below, and solving the problems (like logging) that doing so introduced.

Logging

Logging in Java is a horrible mess, and with the number of intermingled projects required to make application software run on a Hadoop/HBase cluster it’s not surprise that getting logging to work was brutal. We code to the SLF4J API and use Logback as our implementation of choice. The Hadoop world uses a mix of Java Commons Logging, java.util.logging, and log4j. We thought that meant we’d be clear if we used the same SLF4J API (1.7.5) and used the bridges (log4j-over-slf4j, jcl-over-slf4j, and jul-to-slf4j), which has worked for us up to now. <montage>Angry men smash things angrily over the course of days</montage> Turns out, there’s a bug in the 1.7.5 implementation of log4j-over-slf4j, which blows up as we described over at YARN-2875. Short version – use 1.7.6+ in client code that attempts to use YARN and log4j-over-slf4j.

YARN

The crux of our problems was having our classpath loaded after the Hadoop classpath had been loaded, meaning old versions of our dependencies were loaded first. The new, surprisingly hard to find parameter that tells YARN to load your classpath first is “mapreduce.job.user.classpath.first”. YARN also quizzically claims that the parameter is deprecated, but.. works for me.

Oozie

Convincing Oozie to load our classpath involved another montage of angry faces. It uses the same parameter as YARN, but with a prefix, so what you want is “oozie.launcher.mapreduce.job.user.classpath.first”. We had been loading the old parameter “mapreduce.task.classpath.user.precedence” in each action in the workflow using the tag to load the configs from a file called hive-default.xml. We then encountered two problems:
Note the name – we used hive-default.xml instead of hive-site.xml because of a bug in Oozie (discussed here and here). That was fixed in the CDH5.2 Oozie, but we didn’t get the memo. Now the file is called hive-site.xml and contains our specific configs and is again being picked up. BUT:
Adding oozie.launcher.mapreduce.job.user.classpath.first to hive-site.xml doesn’t work! As we wrote up in Oozie bug OOZIE-2066 this parameter has to be specified for each action, at the action level, in the workflow.xml. Repeating the example workaround from the bug report:

 <action name="run-test">  
  <java>  
   <job-tracker>c1n2.gbif.org:8032</job-tracker>  
   <name-node>hdfs://c1n1.gbif.org:8020</name-node>  
   <configuration>  
    <property>  
     <name>oozie.launcher.mapreduce.task.classpath.user.precedence</name>  
     <value>true</value>  
    </property>  
   </configuration>  
   <main-class>test.CPTest</main-class>  
  </java>  
  <ok to="end" />  
  <error to="kill" />  
 </action>  

New Packaging Woes

We build our jars using a combination of jar-with-dependencies and the shade plugin, but in both cases it means all our dependencies are built in. The problems come when a downstream, transitive dependency loads a different (typically older) version of one of the jars we’ve bundled in our main jar. This happens a lot with the Hadoop and HBase artifacts, especially when it comes to MR1 and logging.

Example exclusions

hbase-server (needed to run MapReduce over HBase): https://github.com/gbif/datacube/blob/master/pom.xml#L268

hbase-testing-util (needed to run mini clusters): https://github.com/gbif/datacube/blob/master/pom.xml#L302

hbase-client: https://github.com/gbif/metrics/blob/master/pom.xml#L226

hadoop-client (removing logging): https://github.com/gbif/metrics/blob/master/pom.xml#L327

Beyond just sorting conflicting dependencies, we also encountered a problem that presented as “No FileSystem for scheme: file”. It turns out we had projects bringing in both hadoop-common and hadoop-hdfs, and so we were getting only one of the META-INF/services files in the final jar. Thus we could not use the FileSystem to read local files (like jars for the class path) and also from HDFS. The fix was to include the org.apache.hadoop.fs.FileSystem in our project explicitly: https://github.com/gbif/metrics/blob/master/cube/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem

Finally we had to stop the TableMapReduceUtil from bringing in it’s own dependent jars, which brought in yet more conflicting jars – this appears to be a change in the default behaviour, where dependent jars are now being brought in by default in the shorter versions of initTableMapper:
https://github.com/gbif/metrics/blob/master/cube/src/main/java/org/gbif/metrics/cube/occurrence/backfill/BackfillCallback.java#L37

Conclusion

As you can see the client side of the upgrade was beset on all sides by the iniquities of jars, packaging and old dependencies. It seems strange that upgrading Guava is considered a no-no, major breaking change by these projects, yet discussions about removing HBaseTablePool are proceeding apace and will definitely break many projects (including any of ours that touch HBase). While we’re ultimately pleased that everything now works, and looking forward to benefiting from the performance improvements and new features of CDH5, it wasn’t a great trip. Hopefully our experience will help others migrate more smoothly.