Some guy's blog
One of the biggest issues with working with Spark and Cassandra is dealing with large Partitions. There are several issues we need to overcome before we can really handle the challenge well. I’m going to use this blogpost as a way of formalizing my thoughts on the issue. Let’s get into the details
Spark is a Data Parallel system. A large set of data
is broken into partitions (splits for hadoop users) all of which are worked on independently. You can
see this in the RDD
class. The work done in each Spark task is defined by the metadata in a single Partition
. The
work itself is coded in the compute method.
Note that compute
takes an argument of a single partition
, this means that
The partition metadata
is also only made once before the work begins. These aspects
are what make RDD’s not only Immutable but also more or less deterministic. Each partition is
independent of every other partition and this lets Spark spread work to any machine and even
recompute partitions without ill effects (most of the time[^1]).
One of the difficulties of writing a datasource RDD is determining how that partitioning will be done.
There are many different DataSources for reading from Spark and they all take different
approaches to the partitioning problem. JDBC sources require a specific partitioning
column
as well as min/max and step value; all of which must be provided before the target database
is contacted. Each partition is built of the results of a query with a where clause bounded by
the step range. Most of the file system sources map one file to one Spark Partition. The
Spark Cassandra connector takes another approach based on the underlying distribution of data
within Cassandra.
Cassandra distributes data based on it’s own partitioner
which is separate from the Spark partitioning and Spark Partitioners. The partitioner takes
incoming data and uses a hash function (Murmur3-ish [^2]) to map a piece of data to a
particular value. The mapped value is the Token and the range of all possible tokens is the
Token Range.
Different nodes in the cluster will own (or be a replica) of particular portions of the token range.
The piece of the data which is hashed is called the Partition Key
. Within each Cassandra partition
the order of the data is determined by other columns which are called the Clustering Keys
. With
this mechanism any piece of data can be located by computing the hash of its Partition Key
to
find it’s Token. Then the mapping that Token to the node which owns the correct token range.
The Spark Cassandra
connector makes Spark Partitions by taking the Token Range and dividing
it into pieces based on the Ranges already owned by the nodes in the cluster. The number and
size of the Spark Partitions are based on metadata which is read from the Cassandra cluster. The
estimated data size[^3] is used to build Spark Partitions containing the amount of data
specified by a user parameter split_size_in_mb
.
For more information see this visual guide
This strategy for partitioning has a few direct results
Because Spark Partitions always must hold a full Cassandra Partition we can run into problems when Cassandra has hotspots (partitions which are exceedingly large). We can also run into issues if all of our partitions start exceeding a manageable size for Spark.
The real problem with breaking up large partitions is that it is impossible for us to know which partitions are truly large without reading them first. As I noted previously, we need to specify the partitioning of the RDD before we actually do any computation. This means we are blind to many aspects of the data. That said, let’s explore some approaches we might be able to take to break up these large partitions.
The key problem with randomly choosing the partition breakpoints is that the domain of all clustering column values is exceedingly large while actual usage domain is relatively dense.
Considering a single clustering column c which is an integer. If we have no information on
the actual data’s bounds, we must is to assume that the data could span Int.MinValue
to
Int.MaxValue
. If the end user was only using values between 1 and 1000000 it would require
2147 partitions before we did our first slice which would actually divide the user’s data.
The problem becomes even more difficult if the clustering key is made of multiple columns.
Consider a clustering key (c: Int, d: String, e: Int)
, now our range is all of the
ValidIntegers * ValidStrings * ValidIntegers. A random partitioning of this enormous range
(which is also unbounded because of the String) would almost certainly not match a user’s
actual domain.
We could do much better at choosing our breakpoints if we had some approximate information about column distributions prior to deciding where to make our cuts. SSTables could be built with attached metadata describing approximate unique values, mins, and maxs. These values could be merged to get a conservative estimate about the actual bounds of column values. This would be significantly better than assuming the true max bounds of each field. Unfortunately this would require a deal of upstream work inside of the Cassandra project.
Another approach is to run a “pre-scan” job which would distribute a bunch of small tasks. A fully independent Spark Job could be launched before the full scan was actually executed. Each small task would select some portion of the existing partitions and read them into memory. We would then compute our own statistics live on this data and use that information to break up the full scan we would subsequently do.
This is a pretty expensive solution as it requires running a pre-job for every full job. Sampling randomly from within Partitions also seems like it may be a problem to me.
Pushing the task to the end-user is also a possibility (and the current workaround.) Most end users already understand why they have long partitions and know in general the domain their column values fall in. This makes it possible for them to manually divide up a request so that it chops up large partitions.
For example, assuming the user knows clustering column c spans from 1 to 1000000. They could write code like
val minRange = 0
val maxRange = 1000000
val numSplits = 10
val subSize = (maxRange - minRange) / numSplits
sc.union(
(minRange to maxRange by subSize)
.map(start =>
sc.cassandraTable("ks", "tab")
.where("c > $start and c < ${start + subSize}"))
)
Each RDD would contain a unique set of tasks drawing only portions of full partitions. The union
operation joins all those disparate tasks into a single RDD. The maximum number of rows any single
Spark Partition would draw from a single Cassandra partition would be limited to maxRange/ numSplits
.
This approach, while requiring user intervention, would preserve locality and would still minimize
the jumps between disk sectors.
Pursuing more sstable adjacent metadata seems like the right course from here on out. The sstables are immutable and we can do some calculations to get conservative estimates of column statistics. The underlying problem of having completely accurate statistics is caused by the lack of monotonicity in Cassandra operations. Even having estimates within an order of magnitude is probably a good first step. This sort of column metadata could be useful even outside the realm of Cassandra specific operations as the data could be fed into the Catalyst cost-based optimizer.
a compute method could read data from a Database whose contents change. Computing the same partition at different times would yield different results. Another example would be a compute method could explicitly generate a random number with a seed that is not tied to the Partition metadata, this would also yield different results of the partition was recomputed.
experts should use and most that do use it regret it; Random which is slightly larger and slower than Murmur3 and was the old default; and Murmur3 which is actually not a canonical (read slightly incorrect) implementation so its actually more like Murmur3-ish. The “ish” attribute is only important if you are writing a new driver and want to get token range awareness right or you are trying to mimic the behavior of the internal Cassandra partitioning for some other reason outside the helper functions provided by the common drivers.
very rough estimate of the amount of data in a particular Token Range.