Some guy's blog
Apache Spark is the data processing system that lets you have in-memory analytics and data locality, but how does that work? How does Spark know where the data is? The Spark UI is full of messages talking about NODE_LOCAL, ANY or PROCESS_LOCAL. What are these things and how do they get set? Let’s explore how Spark handles data locality when processing data.
Inside of every Spark application there is an extremely important class called the DAGScheduler.
You have probably seen references to this in Task Failed Exceptions
or any other runtime exception.
DAGScheduler stands for the Direct Acyclic Graph Scheduler and is the core of actually getting any
work done in a Spark Cluster. This is the class that actually takes “tasks” from a Spark application
and sends them out to Executors to get processed. This is why whenever a remote Executor fails to run a
task, DAGScheduler
is going to end up in the stack trace.
When our code calls a Spark action like collect
or saveToCassandra
the request ends up in the
DAGScheduler
. That action ends up submitting a job to the DAGScheduler which transforms the RDD
action and all it’s dependencies into a series of stages made up of tasks. Each task can be
thought of as a series of functions applied to a single Spark partition of data.
The first hint of locality comes in a little function called submitMissingTasks. This
function will be used whenever there is work to be done that has not yet been done. One of the first
things it does is to probe the head RDD for a partition level piece of information called preferredLocation
.
This list of strings is the only message that the underlying RDD can send to tell the scheduler
about data locality.
PreferredLocation is one of the basic components of an RDD. See the clearly labeled java doc
which states that this function can be Optionally overridden by subclasses to specify placement preferences.
RDD
designers, like the authors of the DSE Spark Cassandra Connector, can use this function to indicate that a particular Spark
partition is located on a particular IP address or set of IP addresses.
Hadoop RDDs can use the location of the underlying HDFS data to indicate where their partitions can be run.
Kafka RDDs similarly indicate that a Kafka Spark partition should getting data from the same machine hosting
that Kafka Topic.
In the case of Spark Streaming jobs using Receivers, every partition is marked as local to the node
where the receiver is running and has stored the data. Other RDDs won’t implement this function at
all since they are assumed to be working in place on data that comes from a parent RDD.
The important thing to remember is the decision on what is “Local” for any given Spark Task is going to be solely based upon what the implementer of the RDD decided would be “local” and whether or not that “Set[String]” contains a match for any of the executors currently running.
The DagScheduler
passes tasks to a TaskScheduler
which in turn makes a TaskSet
which gets to
actually decided which tasks are run at which times. Inside the TaskSet
tasks are sorted on a
priority order based on 4 Locality levels.
* PROCESS_LOCAL // This task will be run within the same process as the source data
* NODE_LOCAL // This task will be run on the same machine as the source data
* RACK_LOCAL // This task will be run in the same rack as the source data
* NO_PREF (Shows up as ANY)// This task cannot be run on the same process as the source data or it doesn't matter
When actually running tasks, those with the highest level of locality PROCESS-LOCAL
are assigned
first, then the NODE-LOCAL
tasks and finally the RACK_LOCAL
tasks. But what if we have NODE-LOCAL
tasks for a specific node and run out of execution space on that machine?
spark.locality.wait
Imagine we are pulling data from a co-located Kafka node in a 3 node Spark Cluster. The Kafka is running
on machine A
of Spark nodes A
,B
,and C
All of the data will be marked as being NODE_LOCAL
to a Node A
.
This means once every core on A
is occupied we will be left with tasks whose preferred location is A
but
we only have execution space on B
and C
. Spark only has two options, wait for cores to become available
on A
or downgrade the locality level of the task and try to find a space for it and take whatever
penalty there is for running non-local.
The spark.locality.wait
parameter describes how long to wait before downgrading tasks that could
potentially be run a higher locality level to a lower level. This parameter is basically our estimate of how
much time waiting for locality is worth. The default is 3 seconds which means that in our Kafka example,
once our co-located node A
is saturated with tasks, our other machines B
and C
will stand idle
for 3 seconds before tasks which could have been NODE_LOCAL
are downgraded to ANY
* and
run (code reference).
* If ‘A’ and ‘C’ were specified to be in the same rack, the locality would be downgraded to
RACK_LOCAL
first and the task run on ‘C’ instead. Once C
was full the task would wait another 3
seconds before doing the final downgrade to ANY
and be run on B
.
In a streaming applications, I’ve seen many users just set spark.locality.wait
to 0 to insure
that all tasks are being worked on immediately. A batch window of 5 seconds makes the default wait of 3
seconds excruciatingly long. After all, waiting 60% of your batch time before starting work is probably
inefficient. In other applications I’ve seen locality be extremely important and the wait
time
increased to near infinite. The correct setting will depend on the latency goals and locality benefit
for your particular application. There are even finer grained controls if you would like to make the
delays between different levels of locality
weighted differently.
With these controls it should be easy to tune your application for maximum throughput.
Thanks to Gerard Maas For Reviewing!