Some guy's blog
Spark (SQL) Thrift Server is an excellent tool built on the HiveServer2 for allowing multiple remote clients to access Spark. It provides a generic JDBC endpoint that lets any client including BI tools connect and access the power of Spark. Let’s talk about how it came to be and why you should use it.
TLDR: Spark Thrift Server is not related to the Cassandra Thrift Protocol.
Users of Cassandra who remember the good old days may have a gut reaction of “Thrift?
I thought we got rid of that.” Well let’s start out by clarifying what Thrift
actually is. Apache Thrift is a framework which lets developers quickly
develop RPC interfaces to their software. Basically, when developing a client-server application you
want an easy way for the client to call functions on the server. Cassandra was one such application
which had clients (drivers) and a server (the Cassandra Server). The Cassandra Thrift Protocol
refers to the original Cassandra protocol built upon the Apache Thrift Framework.
Modern Cassandra now uses a unique native protocol
developed in-house for efficiency and performance. HiveServer also used a Thrift-based framework
to develop its original client server communication but this is unrelated to the old Cassandra
thrift protocol.
This is a bit of a complicated history, basically a “Ship of Theseus” story. We start with our original server, which is almost completely Hive. Then over time Hive code is replaced, piece by piece, until almost none of the original code remains.
Spark originally started out shipping with Shark and SharkServer (a portmanteau of Spark and Hive). In those days there was a lot of Hive code in the mix. SharkServer was Hive, it parsed HiveQL, it did optimizations in Hive, it read Hadoop Input Formats, and at the end of the day it actually ran Hadoop style Map/Reduce jobs on top of the Spark engine. It was still really cool at the time as it provided a way to utilize Spark without doing any functional programming. With only HQL you could access all the power of Spark. Unfortunately Map/Reduce and Hive were not ideal matches for the Spark ecosystem and all Shark development ended at Spark 1.0 as Spark stared moving to more Spark-native expressions of SQL.
Spark began replacing those various Hive-isms. Spark introduced a new representation for distributed tabular data, called
SchemaRDDs
Dataframes
DataSets … naming is hard.
And with that, a brand new Spark-native optimization engine known as Catalyst!
Catalyst, a Tree Manipulation Framework, provided a basis for the query optimization present in everything from GraphFrames to
Structured Streaming. The advent of Catalyst meant that old Map/Reduce-style execution could be dropped, and instead, Spark-optimized execution plans
could be built and run. In addition, Spark released a new API which lets us build Spark-Aware interfaces
called “DataSources” (like this Cassandra one).
The flexibility of DataSources ended reliance on Hadoop Input Formats (although they are still supported).
DataSources can tap directly into the query plans generated by Spark and perform predicate push-downs and other optimizations.
While all this was happening the Hive Parser was pulled and replaced with one a native Spark Parser.
HQL is still accepted but the syntax has been greatly expanded. Spark SQL can now handle all of the TPC-DS queries,
as well as a bunch of Spark-specific extensions. (There was a short period in development where you
had to pick between a
HiveContext
and SqlContext
both of which had different parsers, but we don’t talk about that anymore.
Today all requests start with a SparkSession
).
Now there is almost no Hive left in Spark. While the Sql Thrift Server is still built on
the HiveServer2
code,
almost all of the internals are now completely Spark-native.
I’ve written about this before; Spark Applications are Fat. Each application is a complete self-contained cluster with exclusive execution resources. This is a problem if you want multiple users to share the same pool of cluster resources. The Spark Thrift Server provides a single context with a well-defined external protocol. This means external users can simultaneously send requests for Spark work without any Spark dependencies.
Spark Contexts are also unable to share cached resources amongst each other. This means that unless you have a single Spark Context, it is impossible for multiple users to share a cached data. The Spark Thrift server can be that “single context,” providing globally-available cache.
The Thrift Server can also benefit from Fair Scheduling. Fair Scheduling means that user requests do not have to be answered in a “First in, First out” manner. Instead, tasks from user queries can be interleaved. A long running query will not be able to block a shorter query from completing.
Additionally, the Thrift Server provides a greater level of Security by limiting the domain of jobs that a user can run. The Thrift Server prohibits running generic JVM code. Only SQL can be processed and executed.
The modern Thrift Server is a relatively simple application. A single SparkSession
is started, and
then on a loop it accepts new strings and executes them with a .collect
. The results are recieved
from the cluster and then delivered to the requester. You can see in the code
that the basic execution is
result = sqlContext.sql(statement)
resultList = Some(result.collect())
with a bunch of formatting and other bells and whistles. End users connect via JDBC and then send SQL strings. The strings are parsed and run using the same code you might use in a stand-alone Spark Application. There are two options most users should be aware of:
spark.sql.hive.thriftServer.singleSession=false
Multiple clients connecting to the JDBC Server can either share the same Session or not. This basically fits two different use-cases. Should users be able to access each other’s configuration and registered functions? Or should every user act independently?
For example:
User1 registers function and temporary views
User(2,3,4,5) use those functions and views
User 1 makes a private view of the data
User 2 makes a private view of the data
No user can see another's work or override their temporary catalog entries
This will *not* prevent users from editing the underlying sources
spark.sql.thriftServer.incrementalCollect=false
This internal/mostly undocumented feature is necessary for Business Intelligence tools or other
sources that request enormous result-sets through the Thrift Server. By default, collect
is used as an
operation to get the results from Spark before they are fed back through JDBC. collect
pulls data-sets
completely into the driver’s heap. This means that a JDBC request which returns a huge result-set will be
placed completely in the heap of the Spark Sql Thrift Server. This can lead to Out Of Memory Errors(OOMs),
surprising users expecting the result-set to be paged through the Thrift Server small bits at a time.
It is sometimes possible to avoid OOMs by increasing the driver
heap size to fit the entire result-set,
but it is also possible to do some manual paging and have the same effect.
The setting IncrementalCollect
changes the gather method from collect
to toLocalIterator
.
toLocalIterator
is a Spark action which only returns one Spark Partition’s worth of data at a time.
This can hurt performance but reduces the amount of RAM required on the Thrift Server heap – from the
entire result set down to only a single partition.
Remember that even with these considerations, if multiple users are making requests simultaneously each request still requires one partition of data in the heap. Therefore it is still important to control the number of concurrent users to avoid OOMs.
I hope this has been useful and please check out these additional blog posts for more information on the Thrift Server.
One whole post about Spark without talking about or using Monads. Oops ruined it.
Marguerite Sheffer Jaroslaw Grabowski Jacek Laskowski Brian Hess