Some guy's blog
Apache Spark is the data processing system that lets you have in-memory analytics and data locality, but how does that work? How does Spark know where the data is? The Spark UI is full of messages talking about NODE_LOCAL, ANY or PROCESS_LOCAL. What are these things and how do they get set? Let’s explore how Spark handles data locality when processing data.
Inside of every Spark application there is an extremely important class called the DAGScheduler.
You have probably seen references to this in
Task Failed Exceptions or any other runtime exception.
DAGScheduler stands for the Direct Acyclic Graph Scheduler and is the core of actually getting any
work done in a Spark Cluster. This is the class that actually takes “tasks” from a Spark application
and sends them out to Executors to get processed. This is why whenever a remote Executor fails to run a
DAGScheduler is going to end up in the stack trace.
When our code calls a Spark action like
saveToCassandra the request ends up in the
DAGScheduler . That action ends up submitting a job to the DAGScheduler which transforms the RDD
action and all it’s dependencies into a series of stages made up of tasks. Each task can be
thought of as a series of functions applied to a single Spark partition of data.
The first hint of locality comes in a little function called submitMissingTasks. This
function will be used whenever there is work to be done that has not yet been done. One of the first
things it does is to probe the head RDD for a partition level piece of information called
This list of strings is the only message that the underlying RDD can send to tell the scheduler
about data locality.
PreferredLocation is one of the basic components of an RDD. See the clearly labeled java doc
which states that this function can be
Optionally overridden by subclasses to specify placement preferences. RDD
designers, like the authors of the DSE Spark Cassandra Connector, can use this function to indicate that a particular Spark
partition is located on a particular IP address or set of IP addresses.
Hadoop RDDs can use the location of the underlying HDFS data to indicate where their partitions can be run.
Kafka RDDs similarly indicate that a Kafka Spark partition should getting data from the same machine hosting
that Kafka Topic.
In the case of Spark Streaming jobs using Receivers, every partition is marked as local to the node
where the receiver is running and has stored the data. Other RDDs won’t implement this function at
all since they are assumed to be working in place on data that comes from a parent RDD.
The important thing to remember is the decision on what is “Local” for any given Spark Task is going to be solely based upon what the implementer of the RDD decided would be “local” and whether or not that “Set[String]” contains a match for any of the executors currently running.
DagScheduler passes tasks to a
TaskScheduler which in turn makes a
TaskSet which gets to
actually decided which tasks are run at which times. Inside the
TaskSet tasks are sorted on a
priority order based on 4 Locality levels.
* PROCESS_LOCAL // This task will be run within the same process as the source data * NODE_LOCAL // This task will be run on the same machine as the source data * RACK_LOCAL // This task will be run in the same rack as the source data * NO_PREF (Shows up as ANY)// This task cannot be run on the same process as the source data or it doesn't matter
When actually running tasks, those with the highest level of locality
PROCESS-LOCAL are assigned
first, then the
NODE-LOCAL tasks and finally the
RACK_LOCAL tasks. But what if we have
tasks for a specific node and run out of execution space on that machine?
Imagine we are pulling data from a co-located Kafka node in a 3 node Spark Cluster. The Kafka is running
A of Spark nodes
C All of the data will be marked as being
NODE_LOCAL to a Node
This means once every core on
A is occupied we will be left with tasks whose preferred location is
we only have execution space on
C. Spark only has two options, wait for cores to become available
A or downgrade the locality level of the task and try to find a space for it and take whatever
penalty there is for running non-local.
spark.locality.wait parameter describes how long to wait before downgrading tasks that could
potentially be run a higher locality level to a lower level. This parameter is basically our estimate of how
much time waiting for locality is worth. The default is 3 seconds which means that in our Kafka example,
once our co-located node
A is saturated with tasks, our other machines
C will stand idle
for 3 seconds before tasks which could have been
NODE_LOCAL are downgraded to
run (code reference).
* If ‘A’ and ‘C’ were specified to be in the same rack, the locality would be downgraded to
RACK_LOCAL first and the task run on ‘C’ instead. Once
C was full the task would wait another 3
seconds before doing the final downgrade to
ANY and be run on
In a streaming applications, I’ve seen many users just set
spark.locality.wait to 0 to insure
that all tasks are being worked on immediately. A batch window of 5 seconds makes the default wait of 3
seconds excruciatingly long. After all, waiting 60% of your batch time before starting work is probably
inefficient. In other applications I’ve seen locality be extremely important and the
increased to near infinite. The correct setting will depend on the latency goals and locality benefit
for your particular application. There are even finer grained controls if you would like to make the
delays between different levels of locality
With these controls it should be easy to tune your application for maximum throughput.
Thanks to Gerard Maas For Reviewing!