SVDS held our first public meetup on October 7th, hosting the Advanced Spark Meetup at our headquarters in Mountain View. To date, we’ve used Spark on five client projects, from ingestion pipelines to PySpark-enabled analytics platforms, using Java, Scala, and Python, and are always keen to dissect technical details with other big data practitioners. (We even have a contributor—high five, Andrew!)
Our audience of engineers got right into the guts of Spark’s GraySort benchmark win last year with Chris Fregly from IBM Spark Technology Center. Here are a few highlights from the meetup.
Configuration, configuration, configuration
Getting the best performance out of Spark when every byte matters means digging into the guts of Spark and tuning parameters. Chris spent a large portion of the talk going over the config that was used and why. This deserves some props; these parameters are important, but don’t get a lot of airtime because they can be, well, boring. Chris’s goal was to pull the curtain back on some Spark configuration and make it more approachable. You’ll thank him later when you are comparing a ShuffleJoin vs BroadcastJoin.
Besides enlightening us, the config discussion illuminated relevant details about how Spark performs the sort. For example (from the Databricks Graysort paper):
“Spark’s scheduler uses a technique called Delay Scheduling to schedule tasks based on the locality preference, i.e. it tries to schedule tasks onto nodes with local data. In this experiment, we increased the delay scheduling wait to a very large number so our sorting job gets 100% locality. Locality scheduling coupled with HDFS short circuit local reads result in all tasks reading from local disks directly rather than going through the HDFS client. It also means the map stage does not have any network I/O.”
If you didn’t know about Delay Scheduling and its consequences before, you do now!
(Also, note that there was no compression used in the shuffle stage for this benchmark.)
Choice of algorithm and its corresponding running time complexity is obviously a critical decision for any large-scale application. When performance really matters, we also care about that pre-factor that we could ignore in CS classes. Clever implementations can cut running times in half by leveraging cache locality or sequential disk reads. Here, TimSort was used on 16-byte key-pointer records.
TimSort first looks for a “run” of ascending or strictly descending elements, and uses Insertion Sort to add elements so it is at least minrun size (which depends on the array’s size.) It then processes the next run, merging only consecutive runs. The rules are designed to create balanced merges that also exploit cache locality (hence mechanical sympathy) and also the partially-sorted nature of non-random data to improve real-world average case sort performance.
Perhaps even more hardware sympathetic is the use of a key/pointer sort. Instead of sorting the actual 100-byte record, the sort is performed on a 16-byte key+pointer array. This avoids shuffling around the entire record by sorting only 16 bytes based on the first 10 bytes, and improves cache performance.
Shuffle improvements (more below) also improved disk IO. As everyone knows: avoid disk seeks, and prefer sequential reads.
Whether you call it Amdhal’s Law (as an engineer) or Goldratt’s Theory of Constraints (as a manager), focus on improving the rate-limiting step to get the biggest speedup for your efforts. Our takeaway: better (aka easier-to-use) profiling in Spark, ideally before and during production, will help us design more efficient applications by identifying bottlenecks.
Know Thy Shuffle
Depending on your application and data, the shuffle stage is most often the performance bottleneck (and most non-trivial use cases require at least one.) There are numerous parameters you can use to fine-tune shuffle performance, but the biggest gain came from more fundamental optimizations, including:
- replacing Spark’s hash-based shuffle with a sort-based shuffle: http://0x0fff.com/spark-architecture-shuffle/;
- tuning to fit shuffle files in-memory (i.e., do not spill shuffle to disk and perform external sorts); and
- using Netty. From the Databricks blog: “That [Netty] was critical in scaling up Spark’s shuffle operation and winning the Sort Benchmark.”
Chris also pointed out a pitfall that we have encountered in real workloads. If your data is highly skewed, one of the tasks might hang during the shuffle because, for example, a huge number of records are partitioned to one reducer. Check your Spark UI. If one task is processing a huge amount of data and taking forever, look at your data and the partitioning of the offending operation.
In general, keep a close eye on your shuffle performance (e.g. spilling to disk), and of course avoiding them if possible (DataFrames make this easier.) Also, for current versions, check out the TungstenSort implementation.
SVDS will be hosting more meetups in the future, discussing engineering and data science lessons learned from doing client work across many industries. Stay tuned to our upcoming events.