Oliver Meyn

Hadoop & Big Data Consultant

Spark 2.0 streaming from SSL Kafka with HDP 2.4

Hortonworks Data Flow (HDF) bundles Apache NiFi, Apache Kafka, and Apache Storm. Together these make a powerful combination for transporting and transforming data in near-real time. With the addition of Apache Spark for machine learning and model evaluation, Apache Hadoop for storage, and Apache Hive for later analytics querying we have the complete chain for processing data from the Internet of Things (IoT). At a recent customer (in my work for T4G) we were asked to build this workflow, but with the strong requirement that all Kafka communication be encrypted via SSL/TLS, and that all parts of the architecture do secure authentication via Kerberos. These requirements turned out to cause some significant challenges and this blog post explains how we overcame them. Note that all the code referenced later is available on github.

The architecture

The flow of data would be as in the following diagram – originating from television set-top boxes, transiting a NiFi instance outside the cluster firewall, through the SSL Kafka, picked up by Spark running inside the cluster (on YARN) for ML evaluation, and a second NiFi for writing directly to HDFS.

In this case the cluster version is HDP 2.4.2, which natively provides Spark version 1.6. There is an MIT Kerberos daemon (kdc) inside the cluster, accessible through the firewall. NiFi is a single instance of version 1.0 (from HDF 2.0), running outside the cluster. Kafka is a single 0.10 instance (from HDF), providing SSL connections through a self-signed certificate, and running inside the cluster.

NiFi configuration

NiFi allows the connection of various “processors” into any number of workflows through a very user-friendly GUI. It originated as a product of the NSA, with a small subset of their processors included in the initial Apache release. Since it was open sourced many more processors have been added, but not all of them place security as the highest priority. In this case the PublishKafka_0_10 processor behaves very well and configuring the SSL context within NiFi and adding the requisite jaas.conf file with our Kerberos principal was straightforward (reasonably well described in the NiFi documentation). Make sure you’re using the SASL_SSL Security Protocol. Once you have the PublishKafka processor working you can easily test connectivity by adding a ConsumeKafka_0_10 processor that writes to local files. A good idea is also to set up a simple workflow that can put test messages onto your Kafka topic of choice for later testing with the Spark consumer.

Spark time!

In HDP 2.4.2 the shipped version of Spark is 1.6, and Spark 1.6 can not speak SSL to Kafka. Full stop. This is because the Kafka client libraries changed between version 0.8 and 0.10, and SSL is only available as of 0.10 (as described by DataBricks). For further proof, the JIRA that states it won’t work is https://issues.apache.org/jira/browse/SPARK-12177 (note that Spark versioning moves from 1.6 to 2.0 – there’s nothing in between). Sooooo…..

Spark time, for real this time!

Because we’re running Spark on YARN, the application is self contained and in principle it should be easy (hah!) to package it up such that the cluster is none the wiser. Some differences between Spark 1.6 and 2.0 before we go further:

  1.6 2.0
Scala Version 2.10 2.11
Uses assembly? Y N
SSL Support? N Y

Hortonworks has a blog post that shows how to do it on HDP 2.5 – Spark 2 is officially supported by Hortonworks on HDP 2.5 but not on HDP 2.4. Note that in Spark versions less than 2 it used an assembly jar that held all the needed Spark libraries, and this was often hard to configure when building by hand. Starting with Spark 2 the libraries are loaded as needed and there is no more assembly. The steps for installing on 2.4 are basically the same as for 2.5:

1) Download the Spark libraries from http://spark.apache.org/downloads.html, Pre-built for Hadoop 2.7. For this project we used v2.0.2. We recommend using the Scala 2.11 package (the default) for ease of compatibility with other libraries

2) Unzip them into /usr/hdp/current/spark2-client on the machine where you start Spark jobs (typically an Edge node)

3) The user who will run the spark jobs needs ‘export SPARK_HOME=/usr/hdp/current/spark2-client’

4) Presuming you’ve got Spark 1.6 working fine already, copy its spark-defaults.conf to the new spark2-client directory (e.g. cp /usr/hdp/current/spark-client/conf/spark-defaults.conf /usr/hdp/current/spark2-client/conf/)
5) Add/edit the following lines to the spark2-client’s spark-defaults.conf:

spark.driver.extraLibraryPath /usr/hdp/current/hadoop-client/lib/native
spark.executor.extraLibraryPath /usr/hdp/current/hadoop-client/lib/native
spark.driver.extraJavaOptions -Dhdp.version=2.4.2.0-258
spark.yarn.am.extraJavaOptions -Dhdp.version=2.4.2.0-258
spark.hadoop.yarn.timeline-service.enabled false

Note the hdp.version parameters – it is critical that these match your actual cluster version. YARN uses them to build the classpath of its jobs, and HDP ships with some jars (notably lzo) that have the version number in their names, so if it’s wrong the classpath breaks and you’ll get error messages like “Can’t find ApplicationLauncher”. Similarly disabling the timeline service is a consequence of some Jersey classes missing in the newer Spark.

This is enough to get Spark 2 running in HDP 2.4, and you can test by calculating Pi:

$SPARK_HOME/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
--driver-memory 2g \
--executor-memory 2g \
--executor-cores 1 \
$SPARK_HOME/examples/jars/spark-examples*.jar \
10

Note you’ll have to kinit before submitting the job, but that should be enough to get you a few digits of pi. Since our goal is to speak SSL to Kafka from a Spark streaming app, we’re not done yet. We need to tell Spark how to authenticate itself to Kafka (via Kerberos) and to trust the self-signed SSL certificate that Kafka presents.

Adding Kerberos

We need to provide a keytab for our Kerberos principal, and instructions how to use the keytab in the form of a jaas.conf file (JAAS = Java Authentication and Authorization Service). Note that in these examples we use the principal gandalf@DOMAIN.COM – substitute your principal as needed.

jaas.conf

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="./gandalf.keytab"
storeKey=true
useTicketCache=false
serviceName="kafka"
principal="gandalf@DOMAIN.COM";
};

With the jaas.conf and a keytab handy in the directory from which you’ll launch your job, launch as follows:

$SPARK_HOME/bin/spark-submit \
--master yarn \
--files jaas.conf,gandalf.keytab \
--driver-java-options "-Djava.security.auth.login.config=./jaas.conf -Dhdp.version=2.4.2.0-258" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" \
--class com.example.ClassName uber-jar-with-deps-and-hive-site.jar

Note the hdp.version showing up again – by passing the –driver-java-options parameter we’re overriding the parameter given in spark-defaults.conf so if we don’t provide the hdp.version here as well as our security option the classpath will again break.

Adding SSL

Almost there – now we need to trust Kafka. If you have a CA signed certificate in Kafka then you can technically skip this step, but it won’t hurt. Java ships with a store of certificates that it implicitly trusts – these are the signing authorities of the Web. Here we take a copy of that store and add the certificate from Kafka to it. Be careful with language here – Java only talks about keystores, but NiFi and Kafka refer to both keystores and truststores. They refer to keystores as those used by servers that provide a secure connection (e.g. Kafka in our case) and the keystore holds the certificates and private keys for that server. That is different from truststores, which are the certificates that a client (e.g. Spark in our case) trusts implicitly when opening connections to servers. This is the same process as when using a web browser to connect to a secure web server – when you ignore the warnings that you’re going to an insecure site (say if the certificate has expired, or the names don’t match) and check the “Always ignore this warning” box, you’re adding that certificate to your truststore.

This may require root privileges (and note that the default password for the Java cacerts store is ‘changeit’):

cp $JAVA_HOME/jre/lib/security/cacerts .
keytool -storepasswd -new change-me-to-something-safe -keystore cacerts
openssl s_client -connect your.broker.fqdn:9092 -showcerts > kafka-certs.txt
keytool -import -alias kafka1 -file kafka-certs.txt -keystore cacerts -storepass change-me-to-something-safe
rm -f kafka-certs.txt
mv cacerts kafka.client.truststore.jks

Now you have a truststore called kafka.client.truststore.jks and can pass that in when you start your Spark job:

$SPARK_HOME/bin/spark-submit \
--master yarn \
--files jaas.conf,gandalf.keytab,kafka.client.truststore.jks \
--driver-java-options "-Djava.security.auth.login.config=./jaas.conf -Dhdp.version=2.4.2.0-258" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" \
--class com.example.ClassName uber-jar-with-deps-and-hive-site.jar

Adding SparkSQL support (hive-site.xml)

In our case we needed SparkSQL support (for working with DataFrames) and that means bundling your hive-site.xml in the submitted Spark app. Taking a copy of the hive-site.xml from your cluster is fine, with one important change – make sure the hive.execution.engine is not tez, but mr:

<property>
  <name>hive.execution.engine</name>
  <value>mr</value>
</property>

Spark won’t use MapReduce for anything but when it tries to load the metastore it will get confused (read: everything breaks) if it’s not ‘mr’.

Code

Finally we have all the pieces connected, now we can write some code that actually reads messages from Kafka and does something useful. We’ll leave the something useful to you, but here’s some skeleton code that makes the connection (this code is also available on github):

import grizzled.slf4j.Logger
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.ConsumerStrategies._
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies._
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SimplestStreaming {
 val logger = Logger[this.type]

def main(args: Array[String]): Unit = {
 val spark = SparkSession
 .builder()
 .appName("Simplest streaming (spark 2.0) from Kafka SSL")
 .enableHiveSupport()
 .getOrCreate()
 val sparkContext = spark.sparkContext

val streamingContext = new StreamingContext(sparkContext, Seconds(10))
 // expects jaas.conf, appropriate keytab, and kafka.client.truststore.jks passed in as part of spark-submit
 val kafkaParams = Map[String, Object](
 "bootstrap.servers" -> ":",
 "key.deserializer" -> classOf[StringDeserializer],
 "value.deserializer" -> classOf[StringDeserializer],
 "group.id" -> "test1",
 "auto.offset.reset" -> "latest",
 "enable.auto.commit" -> (false: java.lang.Boolean),
 "security.protocol" -> "SASL_SSL",
 "ssl.truststore.location" -> "./kafka.client.truststore.jks",
 "ssl.truststore.password" -> "change-me-to-something-safe"
 )
 val topic = Set("sasl_ssl_test")

val stream = KafkaUtils.createDirectStream[String, String](
 streamingContext,
 PreferConsistent,
 Subscribe[String, String](topic, kafkaParams)
 )

stream.foreachRDD { rdd =>
 // Get the singleton instance of SparkSession
 val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
 import spark.implicits._

 val df = rdd.map( consumerRecord => {
 consumerRecord.value()
 }).toDF()

 df.show()
 }

// start the computation
 streamingContext.start()
 streamingContext.awaitTermination()
 }
}

Maven

We use Maven to build the project and included in the code is the pom.xml that will let you build this class. Use mvn clean package and your uber jar (with all dependencies except Spark included) will be in the target/ dir.

Conclusion

There is a large dent in the wall where we banged our heads to get this all working, but hopefully we’ve now saved you the same fate. Good luck!

Don’t fill your HDFS disks (upgrading to CDH 5.4.2)

Originally published May 29, 2015 at the GBIF Developer blog.

Just a short post on the dangers of filling your HDFS disks. It’s a warning you’ll hear at conferences and in best practices blog posts like this one, but usually with only a vague consequence of “bad things will happen”. We upgraded from CDH 5.2.0 to CDH 5.4.2 this past weekend and learned the hard way: bad things will happen.

The Machine Configuration

The upgrade went fine in our dev cluster (which has almost no data in HDFS) so we weren’t expecting problems in production. Our production cluster is of course slightly different than our (much smaller) dev cluster. In production we have 3 masters, where one holds the NameNode and another holds the SecondaryNameNode (we’re not yet using a High Availability setup, but it’s in the plan). We have 12 DataNodes where each one has 13 disks dedicated to HDFS storage: 12 are 1TB and one is 512GB. They are formatted with 0% reserved blocks for root. The machines are evenly split into two racks.

Pre Upgrade Status

We were at about 75% total HDFS usage with only a few percent difference between machines. We were configured to use Round Robin block placement (dfs.datanode.fsdataset.volume.choosing.policy) with 10GB reserved for non-hdfs use (dfs.datanode.du.reserved), which are the defaults in CDH manager. Each of the 1TB disks was around 700GB used (of 932GB usable), and the 512 GB disks were all at their limit: 456GB used (of 466GB usable). That left only the configured 10GB free for non-hdfs use on the small disks. Our disks are mounted in the pattern /mnt/disk_a, /mnt/disk_b and so on, with /mnt/disk_m as the small disk. We’re using the free version of CDHM so we can’t do rolling upgrades, meaning this upgrade would bringing everything down. And because our cluster is getting full (> 80% usage is another rumoured “bad things” threshold) we have reduced one class of data (user’s occurrence downloads) to a replication factor of 2 (from the default of 3). This is considered somewhere between naughty and criminal, and you’ll see why below.

Upgrade Time

We followed the recommended procedure and did the oozie, hive, and CDH manager backups, downloaded the latest parcels, and pressed the big Update button. Everything appeared to be going fine until HDFS tried to start up again, where the symptom was that it was taking a really long time (several minutes, after which the CDHM upgrade process finally gave up saying the DataNodes weren’t making contact). Looking at the NameNode logs we see that it was performing a “Block Pool Upgrade”, which took btw 90 and 120 seconds for each of our ~700GB disks. Here’s an excerpt of where it worked without problems:

2015-05-23 20:18:53,715 INFO org.apache.hadoop.hdfs.server.common.Storage: Lock on /mnt/disk_a/dfs/dn/in_use.lock acquired by nodename 27117@c4n1.gbif.org
2015-05-23 20:18:53,811 INFO org.apache.hadoop.hdfs.server.common.Storage: Analyzing storage directories for bpid BP-2033573672-130.226.238.178-1367832131535
2015-05-23 20:18:53,811 INFO org.apache.hadoop.hdfs.server.common.Storage: Locking is disabled for /mnt/disk_a/dfs/dn/current/BP-2033573672-130.226.238.178-1367832131535
2015-05-23 20:18:53,823 INFO org.apache.hadoop.hdfs.server.common.Storage: Upgrading block pool storage directory /mnt/disk_a/dfs/dn/current/BP-2033573672-130.226.238.178-1367832131535.
   old LV = -56; old CTime = 1416737045694.
   new LV = -56; new CTime = 1432405112136
2015-05-23 20:20:33,565 INFO org.apache.hadoop.hdfs.server.common.Storage: HardLinkStats: 59768 Directories, including 53157 Empty Directories, 0 single Link operations, 6611 multi-Link operations, linking 22536 files, total 22536 linkable files.  Also physically copied 0 other files.
2015-05-23 20:20:33,609 INFO org.apache.hadoop.hdfs.server.common.Storage: Upgrade of block pool BP-2033573672-130.226.238.178-1367832131535 at /mnt/disk_a/dfs/dn/current/BP-2033573672-130.226.238.178-1367832131535 is complete

That upgrade time happens sequentially for each disk, so even the though the machines were upgrading in parallel, we were still looking at ~30 minutes of downtime for the whole cluster. As if that wasn’t sufficiently worrying, then we finally get to disk_m, our nearly full 512G disk:

2015-05-23 20:53:05,814 INFO org.apache.hadoop.hdfs.server.common.Storage: Lock on /mnt/disk_m/dfs/dn/in_use.lock acquired by nodename 12424@c4n1.gbif.org
2015-05-23 20:53:05,869 INFO org.apache.hadoop.hdfs.server.common.Storage: Analyzing storage directories for bpid BP-2033573672-130.226.238.178-1367832131535
2015-05-23 20:53:05,870 INFO org.apache.hadoop.hdfs.server.common.Storage: Locking is disabled for /mnt/disk_m/dfs/dn/current/BP-2033573672-130.226.238.178-1367832131535
2015-05-23 20:53:05,886 INFO org.apache.hadoop.hdfs.server.common.Storage: Upgrading block pool storage directory /mnt/disk_m/dfs/dn/current/BP-2033573672-130.226.238.178-1367832131535.
   old LV = -56; old CTime = 1416737045694.
   new LV = -56; new CTime = 1432405112136
2015-05-23 20:54:12,469 WARN org.apache.hadoop.hdfs.server.common.Storage: Failed to analyze storage directories for block pool BP-2033573672-130.226.238.178-1367832131535
java.io.IOException: Cannot create directory /mnt/disk_m/dfs/dn/current/BP-2033573672-130.226.238.178-1367832131535/current/finalized/subdir91/subdir168
        at org.apache.hadoop.hdfs.server.datanode.DataStorage.linkBlocksHelper(DataStorage.java:1259)
        at org.apache.hadoop.hdfs.server.datanode.DataStorage.linkBlocksHelper(DataStorage.java:1296)
        at org.apache.hadoop.hdfs.server.datanode.DataStorage.linkBlocksHelper(DataStorage.java:1296)
        at org.apache.hadoop.hdfs.server.datanode.DataStorage.linkBlocks(DataStorage.java:1023)
        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceStorage.linkAllBlocks(BlockPoolSliceStorage.java:647)
        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceStorage.doUpgrade(BlockPoolSliceStorage.java:456)
        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceStorage.doTransition(BlockPoolSliceStorage.java:390)
        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceStorage.loadStorageDirectory(BlockPoolSliceStorage.java:171)
        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceStorage.loadBpStorageDirectories(BlockPoolSliceStorage.java:214)
        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceStorage.recoverTransitionRead(BlockPoolSliceStorage.java:242)
        at org.apache.hadoop.hdfs.server.datanode.DataStorage.addStorageLocations(DataStorage.java:396)
        at org.apache.hadoop.hdfs.server.datanode.DataStorage.recoverTransitionRead(DataStorage.java:478)
        at org.apache.hadoop.hdfs.server.datanode.DataNode.initStorage(DataNode.java:1397)
        at org.apache.hadoop.hdfs.server.datanode.DataNode.initBlockPool(DataNode.java:1362)
        at org.apache.hadoop.hdfs.server.datanode.BPOfferService.verifyAndSetNamespaceInfo(BPOfferService.java:317)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.connectToNNAndHandshake(BPServiceActor.java:227)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.run(BPServiceActor.java:839)
        at java.lang.Thread.run(Thread.java:745)
2015-05-23 20:54:12,476 WARN org.apache.hadoop.hdfs.server.common.Storage: Failed to add storage for block pool: BP-2033573672-130.226.238.178-1367832131535 : Cannot create directory /mnt/disk_m/dfs/dn/current/BP-2033573672-130.226.238.178-1367832131535/current/finalized/subdir91/subdir168

The somewhat misleading “Cannot create directory” is not a file permission problem but rather a disk full problem. During this block pool upgrade some temporary space is needed for rewriting metadata, and that space is apparently more than the 10G that was available to “non-HDFS” (which we’ve concluded means “not HDFS storage files, but everything else is fair game”). Because some space is available to start the upgrade, it begins, but then when it exhausts the disk it fails, and This Kills The DataNode. It does clean up after itself, but prevents the DataNode from starting, meaning our cluster was on its knees and in no danger of standing up.

So the problem was lack of free space, which on 10 of our 12 machines we were able to solve by wiping temporary files from the colocated yarn directory. Those 10 machines were then able to upgrade their disk_m and started up. We still had two nodes down and unfortunately they were in different racks, so that meant we had a big pile of our replication factor 2 files missing blocks (the default HDFS block replication policy places the second and subsequent copies on a different rack from the first copy).

While digging around in the different properties that we thought could affect our disks and HDFS behaviour we were also restarting the failing DataNodes regularly. At some point the log message changed to:

WARN org.apache.hadoop.hdfs.server.common.Storage: java.io.FileNotFoundException: /mnt/disk_m/dfs/dn/in_use.lock (No space left on device)

After that message the DataNode started, but with disk_m marked as a failed volume. We’re not sure why this happened but presume that after one of our failures it didn’t clean up it’s temp files on disk_m and then on subsequent restarts found the disk completely full and (rightly) considered it unusable and tried to carry on. With the final two DataNodes up we had almost all of our cluster, minus the two failed volumes. There were only 35 corrupted files (missing blocks) left after they came up. These were files set to replication factor 2, and by bad luck had both copies of some of their blocks on the failed disk_m (one from rack1, one from rack2).

It would not have been the end of the world to just delete the corrupted user downloads (they were all over a year old) but on principle, it would not be The Right Thing To Do.

On inodes and hardlinks

The normal directory structure of the dfs dir in a DataNode is /dfs/dn/current//current/finalized and within finalized are a whole series of directories to fan out the various blocks that the volume contains. During the block pool upgrade a copy of ‘finalized’ is made called previous.tmp. It’s not a normal copy however – it uses hardlinks in order to avoid duplicating all of the data (which obviously wouldn’t work). The copy is needed during the upgrade and is removed afterwards. Since our upgrade failed halfway through we had both directories and had no choice but to move the entire /dfs directory off of /disk_m to a temporary disk and complete the upgrade there. We first tried a copy (use cp -a to preserve hardlinks) to a mounted NFS share. The copy looked fine but on startup the DataNode didn’t understand the mounted drive (“drive not formatted”). Then we tried copying to a USB drive plugged into the machine and that ultimately worked (despite feeling decidedly un-Yahoo). Once the USB drive was upgraded and online in the cluster, replication took over and copied all of its blocks to new homes on /rack2. We then unmounted the USB drive, wiped both /disk_m’s and then let replication balance out again. Final result: no lost blocks.

Mitigation

With the cluster happy again we made a few changes to hopefully ensure this doesn’t happen again:
dfs.datanode.du.reserved:25GB this guarantees 25GB free on each volume (up from 10GB) and should be enough to allow a future upgrade to happen
dfs.datanode.fsdataset.volume.choosing.policy:AvailableSpace
dfs.datanode.available-space-volume-choosing-policy.balanced-space-preference-fraction:1.0 together these two direct new blocks to disks that have more free space, thereby leaving our now full /disk_m alone

Conclusion

This was one small taste of what can go wrong with filling heterogenous disks in an HDFS cluster. We’re sure there are worse dangers lurking on the full-disk horizon, so hopefully you’ve learned from our pain and will give yourself some breathing room when things start to fill up. Also, don’t use a replication factor of less than 3 if there’s anyway you can help it.

Upgrading our cluster from CDH4 to CDH5

Originally published November 26, 2014 at the GBIF Developer blog.

A little over a year ago we wrote about upgrading from CDH3 to CDH4 and now the time had come to upgrade from CDH4 to CDH5. The short version: upgrading the cluster itself was easy, but getting our applications to work with the new classpaths, especially MapReduce v2 (YARN), was painful.

The Cluster

Our cluster has grown since the last upgrade (now 12 slaves and 3 masters), and we no longer had the luxury of splitting the machines to build a new cluster from scratch. So this was an in-place upgrade, using CDH Manager.

UPGRADE CDH MANAGER

The first step was upgrading to CDH Manager 5.2 (from our existing 4.8). The Cloudera documentation is excellent so I don’t need to repeat it here. What we did find was that the management service now requests significantly more RAM for it’s monitoring services (minimum “happy” config of 14GB), to the point where our existing masters were overwhelmed. As a stop gap we’ve added a 4th old machine to the “masters” group, used exclusively for the management service. In the longer term we’ll replace the 4 masters with 3 new machines that have enough resources.

UPGRADE CLUSTER MEMBERS

Again the Cloudera documentation is excellent but I’ll just add a bit. The upgrade process will now ask if a JAVA jdk should be installed (an improvement over the old behaviour of just installing one anyway). That means we could finally remove the Oracle JDK 6 rpms that have been lying around on the machines. For some reason the Host Inspector still complains about OpenJDK 7 vs Oracle 7 but we’ve happily been running on OpenJDK 7 since early 2014, and so far so good with CDH5 as well. After the upgrade wizard finished we had to tweak memory settings throughout the cluster, including setting the “Memory Overcommit Validation Threshold” to 0.99, up from its (very conservative) default of 0.8. Cloudera has another nice blog post on figuring out memory settings for YARN. Additionally Hue’s configuration required some attention because after the upgrade it had forgotten where Zookeeper and the HBase Thrift server were. All in all quite painless.

The Gotchas

Getting our software to work with CDH5 was definitely not painless. All of our problems stemmed from conflicting versions of jars, due either to changes in CDH dependencies, or in changes to how a user classpath is specified as having priority over that of Yarn/HBase/Oozie. Additionally it took some time to wrap our heads around the new artifact packaging used by YARN and HBase. Note that we also use Maven for dependency management.

Guava

We’re not alone in our suffering at the hands of mismatched Guava versions (e.g. HADOOP-10101, HDFS-7040), but suffer we did. We resorted to specifying version 14.0.1 in any of our code that touches Hadoop and more importantly HBase, and exclude any higher version guavas from our dependencies. This meant downgrading some actual code that was using guava 15, but was the easiest path to getting a working system.

Jackson

We have many dependencies on Jackson 1.9 and 2+ throughout our code, so downgrading to match HBase’s shipped 1.8.8 was not an option. It meant figuring out the classpath precedence rules described below, and solving the problems (like logging) that doing so introduced.

Logging

Logging in Java is a horrible mess, and with the number of intermingled projects required to make application software run on a Hadoop/HBase cluster it’s not surprise that getting logging to work was brutal. We code to the SLF4J API and use Logback as our implementation of choice. The Hadoop world uses a mix of Java Commons Logging, java.util.logging, and log4j. We thought that meant we’d be clear if we used the same SLF4J API (1.7.5) and used the bridges (log4j-over-slf4j, jcl-over-slf4j, and jul-to-slf4j), which has worked for us up to now. <montage>Angry men smash things angrily over the course of days</montage> Turns out, there’s a bug in the 1.7.5 implementation of log4j-over-slf4j, which blows up as we described over at YARN-2875. Short version – use 1.7.6+ in client code that attempts to use YARN and log4j-over-slf4j.

YARN

The crux of our problems was having our classpath loaded after the Hadoop classpath had been loaded, meaning old versions of our dependencies were loaded first. The new, surprisingly hard to find parameter that tells YARN to load your classpath first is “mapreduce.job.user.classpath.first”. YARN also quizzically claims that the parameter is deprecated, but.. works for me.

Oozie

Convincing Oozie to load our classpath involved another montage of angry faces. It uses the same parameter as YARN, but with a prefix, so what you want is “oozie.launcher.mapreduce.job.user.classpath.first”. We had been loading the old parameter “mapreduce.task.classpath.user.precedence” in each action in the workflow using the tag to load the configs from a file called hive-default.xml. We then encountered two problems:
Note the name – we used hive-default.xml instead of hive-site.xml because of a bug in Oozie (discussed here and here). That was fixed in the CDH5.2 Oozie, but we didn’t get the memo. Now the file is called hive-site.xml and contains our specific configs and is again being picked up. BUT:
Adding oozie.launcher.mapreduce.job.user.classpath.first to hive-site.xml doesn’t work! As we wrote up in Oozie bug OOZIE-2066 this parameter has to be specified for each action, at the action level, in the workflow.xml. Repeating the example workaround from the bug report:

 <action name="run-test">  
  <java>  
   <job-tracker>c1n2.gbif.org:8032</job-tracker>  
   <name-node>hdfs://c1n1.gbif.org:8020</name-node>  
   <configuration>  
    <property>  
     <name>oozie.launcher.mapreduce.task.classpath.user.precedence</name>  
     <value>true</value>  
    </property>  
   </configuration>  
   <main-class>test.CPTest</main-class>  
  </java>  
  <ok to="end" />  
  <error to="kill" />  
 </action>  

New Packaging Woes

We build our jars using a combination of jar-with-dependencies and the shade plugin, but in both cases it means all our dependencies are built in. The problems come when a downstream, transitive dependency loads a different (typically older) version of one of the jars we’ve bundled in our main jar. This happens a lot with the Hadoop and HBase artifacts, especially when it comes to MR1 and logging.

Example exclusions

hbase-server (needed to run MapReduce over HBase): https://github.com/gbif/datacube/blob/master/pom.xml#L268

hbase-testing-util (needed to run mini clusters): https://github.com/gbif/datacube/blob/master/pom.xml#L302

hbase-client: https://github.com/gbif/metrics/blob/master/pom.xml#L226

hadoop-client (removing logging): https://github.com/gbif/metrics/blob/master/pom.xml#L327

Beyond just sorting conflicting dependencies, we also encountered a problem that presented as “No FileSystem for scheme: file”. It turns out we had projects bringing in both hadoop-common and hadoop-hdfs, and so we were getting only one of the META-INF/services files in the final jar. Thus we could not use the FileSystem to read local files (like jars for the class path) and also from HDFS. The fix was to include the org.apache.hadoop.fs.FileSystem in our project explicitly: https://github.com/gbif/metrics/blob/master/cube/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem

Finally we had to stop the TableMapReduceUtil from bringing in it’s own dependent jars, which brought in yet more conflicting jars – this appears to be a change in the default behaviour, where dependent jars are now being brought in by default in the shorter versions of initTableMapper:
https://github.com/gbif/metrics/blob/master/cube/src/main/java/org/gbif/metrics/cube/occurrence/backfill/BackfillCallback.java#L37

Conclusion

As you can see the client side of the upgrade was beset on all sides by the iniquities of jars, packaging and old dependencies. It seems strange that upgrading Guava is considered a no-no, major breaking change by these projects, yet discussions about removing HBaseTablePool are proceeding apace and will definitely break many projects (including any of ours that touch HBase). While we’re ultimately pleased that everything now works, and looking forward to benefiting from the performance improvements and new features of CDH5, it wasn’t a great trip. Hopefully our experience will help others migrate more smoothly.