Russell Spitzer's Blog

Some guy's blog


Most folks don’t know that the Spark Cassandra Connector is actually able to connect to multiple Cassandra clusters at the same time. This allows us to move data between Cassandra clusters or even manage multiple clusters from the same application (or even the spark shell)


For SparkSQL Look here

Operations within the Spark Cassandra Connector are goverened by CassandraConnector objects. The default CassandraConnector is created based on the parameters in your SparkConfiguration and the parameters passed on the command line of the form spark.cassandra.*. But this default CassandraConnector does not have to be the only connection configuration used.

To use a different connection configuration, the simplest thing to do is to specify a new code block with an implicit CassandraConnector defined in that block. The implicit will then be used by all of the operations within that block. This works in the shell as well.

####Example of reading from one cluster and writing to another

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._

import org.apache.spark.SparkContext


def twoClusterExample ( sc: SparkContext) = {
  val connectorToClusterOne = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.1"))
  val connectorToClusterTwo = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.2"))

  val rddFromClusterOne = {
    // Sets connectorToClusterOne as default connection for everything in this code block
    implicit val c = connectorToClusterOne
    sc.cassandraTable("ks","tab")
  }

  {
    //Sets connectorToClusterTwo as the default connection for everything in this code block
    implicit val c = connectorToClusterTwo
    rddFromClusterOne.saveToCassandra("ks","tab")
  }

}

Within this example we make two different CassandraConnector objects. Each of them pulls down the defaults set in the SparkContext but overrides the spark.cassandra.connection.host parameter.

val connectorToClusterOne = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.1"))
val connectorToClusterTwo = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.2"))

At this point we haven’t changed anything about the default connection configuration, any operations will still use the default config. So to choose one of these to do something we need to open up a code block and declare it as the implicit CassandraConnector in that block

val rddFromClusterOne = {
// Sets connectorToClusterOne as default connection for everything in this code block
implicit val c = connectorToClusterOne
sc.cassandraTable("ks","tab")
}

Within this block the cassandraTable call will use the implicit CassandraConnector c which is connectorToClusterOne. The RDD that is created will point at the cluster in 127.0.0.1 regardless of what else we do elsewhere in the code.

{
//Sets connectorToClusterTwo as the default connection for everything in this code block
implicit val c = connectorToClusterTwo
rddFromClusterOne.saveToCassandra("ks","tab")
}

The brackets here start our next code block. This allows us to specify a new implicit CassandraConnector this will change the operation of the saveToCassandra method to point towards 127.0.0.2. The rddFromClusterOne is already setup and will not be affected by the new implicit connector. When Spark hits the saveToCassandra call, an action will begin and data will be pulled from the first cluster and placed in the second.