DSE 5.0.7, Spark 1.6.3

Accessing Spark

Accessing spark from outside the spark cluster

dse spark --master spark://MASTER_ADDRESS:7077

Spark SQL

$ dse -u avm_analytics -p avm1111 spark-sql  
spark-sql> select count(*) from avm.transactions where txn_date='2017_05_08';  
446313   
Time taken: 523.556 seconds, Fetched 1 row(s)  

Pyspark

$ dse -u avm_analytics -p avm1111 pyspark  
>>> sqlContext.sql("select count(*) from avm.test")  
DataFrame[_c0: bigint]  
>>> sqlContext.sql("select count(*) from avm.test").collect()  
[Row(_c0=4)]  
>>> sqlContext.sql("select * from avm.test").collect()  
[Row(event_id=30, event_name=u'cat'), Row(event_id=1, event_name=u'aaa'), Row(event_id=2, event_name=u'bbb'), Row(event_id=40, event_name=u'fox')] 

Scala

$ dse -u avm_analytics -p avm1111 spark  
Welcome to  
     ____              __
    / __/__  ___ _____/ /__
   _\ \/ _ \/ _ `/ __/  '_/
  /___/ .__/\_,_/_/ /_/\_\   version 1.6.2
     /_/
Using Scala version 2.10.5 (OpenJDK 64-Bit Server VM, Java 1.8.0_111)  
Type in expressions to have them evaluated.  
Type :help for more information.  
Initializing SparkContext with MASTER: spark://172.31.12.201:7077  
Created spark context..  
Spark context available as sc.  
Hive context available as sqlContext. Will be initialized on first use.  
scala> :showSchema avm  
scala> :showSchema avm  
scala> val rdd = sc.cassandraTable("avm","test")  
rdd: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:15  
scala> rdd.toArray.foreach(println)  
warning: there were 1 deprecation warning(s); re-run with -deprecation for details  
CassandraRow{event_id: 1, event_name: aaa}  
CassandraRow{event_id: 2, event_name: bbb}  
scala> rdd.first  
scala> val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40)))  
collection: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:60  
scala> collection.saveToCassandra("avm","test", SomeColumns("event_name","event_id"))  
scala> rdd.toArray.foreach(println)  
warning: there were 1 deprecation warning(s); re-run with -deprecation for details  
CassandraRow{event_id: 30, event_name: cat}  
CassandraRow{event_id: 1, event_name: aaa}  
CassandraRow{event_id: 2, event_name: bbb}  
CassandraRow{event_id: 40, event_name: fox}  

Spark Job

ubuntu@ip-172-31-30-155:~/scripts$ dse -u cassandra -p **** spark-submit --total-executor-cores 1 --executor-memory 1G --conf spark.cassandra.auth.username=cassandra --conf spark.cassandra.auth.password=**** sample.py  
cat sample.py  
from pyspark import SparkContext, SparkConf  
from pyspark.sql import SQLContext, Row  
from pyspark.sql.functions import col, lit,concat  
from pyspark.sql import types  
conf = SparkConf()\  
 .set("spark.cassandra.output.ignoreNulls",True) \  
 .set("input.consistency.level", "QUORUM") \  
 .set("output.consistency.level", "QUORUM")  
sc = SparkContext(appName="My Sample Spark Job", conf=conf)  
sc.setLocalProperty("spark.scheduler.pool", "admin")  
csc = SQLContext(sc)  
csc.sql("""CREATE TEMPORARY TABLE users \  
 USING org.apache.spark.sql.cassandra \  
 OPTIONS ( table "users", \  
 keyspace "avm", \  
 cluster "DSETestCluster", \  
 pushdown "true") \  
 """)  
query = "SELECT * FROM users WHERE userid='123456789'"  
print query  
df = csc.sql(query)  
print df.show()  

Spark Thrift / Beeline

ubuntu@ip-172-31-26-109:~$ dse -u cassandra -p **** spark-beeline --total-executor-cores 2 --executor-memory 2G  
The log file is at /home/ubuntu/.spark-beeline.log  
Beeline version 1.2.1.2_dse_spark by Apache Hive  
beeline> !connect jdbc:hive2://localhost:10000 cassandra  
Connecting to jdbc:hive2://localhost:10000  
Enter password for jdbc:hive2://localhost:10000: ****************  
Connected to: Spark SQL (version 1.6.3)  
Driver: Hive JDBC (version 1.2.1.2_dse_spark)  
Transaction isolation: TRANSACTION_REPEATABLE_READ  
0: jdbc:hive2://localhost:10000>SELECT * FROM avm.users WHERE userid='123456789'  

note

Difference between thrift query and spark-sql query, is that thrift server uses a shared SparkSQL context whereas Spark SQL does not.

Accessing Cassandra Materialized Views from Spark

users_by_email_mview is a materialized view in Cassandra

$ dse -u avm_analytics -p avm1111 pyspark  
>>> sqlContext.sql("""CREATE TEMPORARY TABLE users_by_email_mview \  
... USING org.apache.spark.sql.cassandra \  
... OPTIONS ( table "users_by_email_mview", \  
... keyspace "avm", \  
... cluster "DSETestCluster", \  
... pushdown "true") \  
... """)  
DataFrame[]  
>>> sqlContext.sql("select * from users_by_email_mview where email ='harry@gmail.com'").show()  
$ dse -u avm_analytics -p avm1111 spark-sql  
spark-sql> select * from avm.users_by_email_mview where email ='harry@gmail.com';  

Accessing Cassandra List datatype from Spark

spark-sql> select * from avm.k1;  
1 1 [10,30]  
2 2 [100]  
3 3 [100,130]  
Time taken: 5.84 seconds, Fetched 3 row(s)  
spark-sql> select c[0] from avm.k1;  
10  
100  
100  
Time taken: 1.564 seconds, Fetched 3 row(s)  
spark-sql> select c[0]+c[1] from avm.k1;  
40  
NULL  
230  
Time taken: 1.033 seconds, Fetched 3 row(s)  
spark-sql> select c[0]+nvl(c[1],0) from avm.k1;  
40  
100  
230  
Time taken: 0.934 seconds, Fetched 3 row(s)  
spark-sql> select concat(c[0],c[1]) from avm.k1;  
1030  
NULL  
100130  
Time taken: 0.95 seconds, Fetched 3 row(s)  
spark-sql> select array_contains(c,100) from avm.k1;  
false  
true  
true  
spark-sql> select a,b,explode(c) from avm.k1;  
1 1 10  
1 1 30  
2 2 100  
3 3 100  
3 3 130  
spark-sql> select * from avm.k1 where c[0]+nvl(c[1],0) > 200;  
3 3 [100,130]  
spark-sql>select * from avm.k1 where size(c) > 1 ;  
1 1 [10,30]  
3 3 [100,130]  

Spark SQL Date/Time functions

select txn_time,timestamp(date_add(txn_time,1/2)) from avm.transactions limit 1;  
2016-03-26T14:44:19.000363 2016-03-26T20:14:19.000363 2016-03-26T00:00:00  
select txn_time,date_add(txn_time,1/2) from avm.transactions limit 1;  
2016-03-26T14:44:19.000363 2016-03-26T20:14:19.000363 2016-03-26  
admin@cqlsh> select toDate(txn_time),txn_time from avm.transactions limit 1;  
system.todate(txn_time) | txn_time  
--------------------------------------+---------------------------------  
 2016-03-20 | 2016-03-20 12:20:26.000000+0000  
(1 rows)  
Group by Day  
select date_sub(txn_time,0),count(*) from avm.transactions group by date_sub(txn_time,0) order by date_sub(txn_time,0)  
Group by Month  
select trunc(txn_time,’MM’),count(*) from avm.transactions group by trunc(txn_time,’MM’) order by trunc(txn_time,’MM’)  

Spark IS NOT NULL

Cassandra utilizes the concept of unset values, because it uses nulls to denote tombstones. When you use IS NOT NULL, Spark wasn’t designed just for Cassandra so it’s checking for actual nulls. You will most likely have to check if the value is equal to an empty string. Optionally you can see if you’re able to compare to the unset value available in the Spark Cassandra Connector.

select email, concat(concat('#',email),'#') from users where email IS NOT NULL limit 2;  
harry@gmail.com #harry@gmail.com#  
 ## => this row has empty email, but is returned in the resultset  
select email, length(email) from users where email IS NOT NULL limit 2;  
harry@gmail.com 15  
 0 => this row has empty email, but is returned in the resultset 

So to be safe, include both of above conditions if you want to check for columns with no data.

Fetching Large Data onto the client

Fetching large amounts of data onto the client from spark will cause the driver to run out of memory. So it is recommended to bring data to the client partition wise.

ubuntu@ip-172-31-26-109:~$ dse -u cassandra -p **** pyspark --conf spark.executor.memory=2G --conf spark.cores.max=1  
Python 2.7.6 (default, Oct 26 2016, 20:30:19)  
[GCC 4.8.4] on linux2  
Type "help", "copyright", "credits" or "license" for more information.  
Welcome to  
     ____              __
    / __/__  ___ _____/ /__
   _\ \/ _ \/ _ `/ __/  '_/
  /__ / .__/\_,_/_/ /_/\_\   version 1.6.3
     /_/
Using Python version 2.7.6 (default, Oct 26 2016 20:30:19)  
SparkContext available as sc, HiveContext available as sqlContext.  
>>> df = sqlContext.createDataFrame([[1],[2],[3]])  
>>> it = df.rdd.toLocalIterator()  
>>> row = next(it)  
>>> row  
Row(_1=1)  
>>> row = next(it)  
>>> row  
Row(_1=2)  
>>> row = next(it)  
>>> row  
Row(_1=3)  
>>> row = next(it)  
Traceback (most recent call last):  
 File "<stdin>", line 1, in <module>  
StopIteration  
>>> rdd = sc.parallelize(range(10))  
>>> [x for x in rdd.toLocalIterator()]  
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]  

If you have a cassandra wide partition, then fetching one partition itself might cause OOM error. So we can use repartition to split the cassandra partition into smaller partitions.

rdd = sc.parallelize(range(100))  
rdd.getNumPartitions()  
rdd = rdd.repartition(10) => 10 partitions   
rdd.getNumPartitions()  
rdd.glom().collect()  

Saving Spark Results to Textfile

DSEFS is by default enabled on DSE 5.1.0, whereas it is not the case in 5.0.7

cat sparkcsv.py  
from pyspark import SparkContext, SparkConf  
from pyspark.sql import SQLContext, Row  
from pyspark.sql.functions import col, lit,concat  
from pyspark.sql import types  
conf = SparkConf()\  
 .setAppName("Save As Text File Job") \  
 .set("spark.cassandra.connection.host", "172.31.26.109,172.31.20.239,172.31.10.93,172.31.0.4") \  
 .set("spark.cassandra.auth.username", "cassandra") \  
 .set("spark.cassandra.auth.password", "****") \  
 .set("spark.cassandra.output.ignoreNulls",True)  
sc = SparkContext("local", "transactions", conf=conf)  
csc = SQLContext(sc)  
csc.sql("""CREATE TEMPORARY TABLE transactions \  
 USING org.apache.spark.sql.cassandra \  
 OPTIONS ( table "transactions", \  
 keyspace "avm", \  
 cluster "DSETestCluster", \  
 pushdown "true") \  
 """)  
query = "select * from transactions limit 10"  
csc.sql(query).rdd.saveAsTextFile('/tmp/k.csv') => Will save as Cassandra Row object  
OR  
df = csc.sql(query)  
df_csv = df.rdd.map(lambda r: "\t".join([str(c) for c in r])) => Will create a tab separated file  
df_csv.saveAsTextFile('/tmp/kk.csv')  
OR  
df.write.format(‘com.databricks.spark.csv').options(header='true').save("/tmp/kk.csv") => Will create a csv file  

Access Postgres from Spark SQL

scala> sqlContext.sql("""  
 CREATE TEMPORARY TABLE users  
 USING org.apache.spark.sql.jdbc  
 OPTIONS (  
 url "jdbc:postgresql://postgreshost:port?user=avm&password=avm1111",  
 dbtable "users"  
 )""")  
res0: org.apache.spark.sql.DataFrame = []  
scala> sqlContext.sql("SELECT * from users limit 1”).collect => Reads the table in postgres  

Access files in S3 from Spark

/etc/dse/spark/hive-site.xml  
 <property>  
 <name>fs.s3.awsAccessKeyId</name>  
 <value>AKIAIMVY6BF5J6TLABCA</value>  
 </property>  
 <property>  
 <name>fs.s3.awsSecretAccessKey</name>  
 <value>AAD7sGFfszI1TVFalAci7n4t/ABC3Q9iFjd/gyiV</value>  
 </property>  
 <property>  
 <name>fs.s3n.awsAccessKeyId</name>  
 <value>AKIAIMVY6BF5J6TLABCA</value>  
 </property>  
 <property>  
 <name>fs.s3n.awsSecretAccessKey</name>  
 <value>AAD7sGFfszI1TVFalAci7n4t/ABC3Q9iFjd/gyiV</value>  
 </property>  
 <property>  
 <name>fs.s3a.access.key</name>  
 <value>AKIAIMVY6BF5J6TLABCA</value>  
 </property>  
 <property>  
 <name>fs.s3a.secret.key</name>  
 <value>AAD7sGFfszI1TVFalAci7n4t/ABC3Q9iFjd/gyiV</value>  
 </property> 

spark-sql uses was credentials from bashrc

export AWS_ACCESS_KEY_ID="AKIAIMVY6BF5J6TLABCA"  
export AWS_SECRET_ACCESS_KEY="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"  
export AWS_REGION="ap-southeast-1"  
ubuntu@ip-172-31-26-109:~$ dse -u cassandra -p **** spark-beeline --total-executor-cores 2 --executor-memory 2G  
0: jdbc:hive2://localhost:10000> CREATE temporary TABLE test_table (name string,phone string) ROW FORMAT DELIMITED FIELDS TERMINATED ',' LOCATION 's3a://bucket/folder/';  
0: jdbc:hive2://localhost:10000> select count(*) from test_table;