Spark Basics

Spark on a local machine


To run Spark interactively in a Python interpreter, use bin/pyspark:

./bin/pyspark --master local[2]

Submit jobs

./bin/spark-submit examples/src/main/python/pi.py 10

Spark and MongoDB

Spark and NLP

Here is a complete set of example on how to use DL4J (Deep Learning for Java) that uses UIMA on the SPARK platform

Deep Learning for Java

and in the following project the use of CTAKES UIMA module from within the Spark framework

Natural Language Processing with Apache Spark


Spark on AWS EMR

Spark on AWS EMR

Create a Cluster With Spark

To launch a cluster with Spark installed using the console

The following procedure creates a cluster with Spark installed.

  1. Open the Amazon EMR console at https://console.aws.amazon.com/elasticmapreduce/.
  2. Choose Create cluster to use Quick Create.
    1. For the Software Configuration field, choose Amazon Release Version emr-5.0.0 or later.
    2. In the Select Applications field, choose either All Applications or Spark.
    3. Select other options as necessary and then choose Create cluster.NoteTo configure Spark when you are creating the cluster, see Configure Spark.

To launch a cluster with Spark installed using the AWS CLI

Create the cluster with the following command:

aws emr create-cluster --name "Spark cluster" --release-label emr-5.0.0 --applications Name=Spark \ --ec2-attributes KeyName=myKey --instance-type m3.xlarge --instance-count 3 --use-default-roles

Note: For Windows, replace the above Linux line continuation character () with the caret (^).

With the AWS CLI

Simple cluster:

aws emr create-cluster --name "Spark cluster" --release-label  --applications Name=Spark \
--ec2-attributes KeyName=myKey --instance-type m3.xlarge --instance-count 3 --use-default-roles

With config file:

aws emr create-cluster --release-label --applications Name=Spark \
--instance-type m3.xlarge --instance-count 3 --configurations https://s3.amazonaws.com/mybucket/myfolder/myConfig.json


    "Classification": "spark",
    "Properties": {
      "maximizeResourceAllocation": "true"

With Spot instances:

aws emr create-cluster --name "Spot cluster" --release-label emr-5.0.0 --applications Name=Spark \
--use-default-roles --ec2-attributes KeyName=myKey \
--instance-groups InstanceGroupType=MASTER,InstanceType=m3.xlarge,InstanceCount=1,BidPrice=0.25 \

# InstanceGroupType=TASK,BidPrice=0.10,InstanceType=m3.xlarge,InstanceCount=3

In Java:

// start Spark on EMR in java
AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient(credentials);
Application sparkApp = new Application() .withName("Spark");
Applications myApps = new Applications();
RunJobFlowRequest request = new RunJobFlowRequest() .withName("Spark Cluster") .withApplications(myApps) .withReleaseLabel("") .withInstances(new JobFlowInstancesConfig() .withEc2KeyName("myKeyName") .withInstanceCount(1) .withKeepJobFlowAliveWhenNoSteps(true) .withMasterInstanceType("m3.xlarge") .withSlaveInstanceType("m3.xlarge") ); RunJobFlowResult result = emr.runJobFlow(request);

Connect to the Master Node Using SSH

To connect to the master node using SSH, you need the public DNS name of the master node and your Amazon EC2 key pair private key. The Amazon EC2 key pair private key is specified when you launch the cluster.

To retrieve the public DNS name of the master node using the AWS CLI

  1. To retrieve the cluster identifier, type the following command.aws emr list-clustersThe output lists your clusters including the cluster IDs. Note the cluster ID for the cluster to which you are connecting."Status": { "Timeline": { "ReadyDateTime": 1408040782.374, "CreationDateTime": 1408040501.213 }, "State": "WAITING", "StateChangeReason": { "Message": "Waiting after step completed" } }, "NormalizedInstanceHours": 4,"Id": "j-2AL4XXXXXX5T9", "Name": "My cluster"

  2. To list the cluster instances including the master public DNS name for the cluster, type one of the following commands. Replace j-2AL4XXXXXX5T9 with the cluster ID returned by the previous command.aws emr list-instances --cluster-id j-2AL4XXXXXX5T9Or:aws emr describe-clusters --cluster-id j-2AL4XXXXXX5T9

View Web Interfaces Hosted on Amazon EMR Clusters

Launching Applications with spark-submit

[Launching applications with spark submit] ( https://spark.apache.org/docs/latest/submitting-applications.html#launching-applications-with-spark-submit )

Apache Zeppelin

Connect to Zeppelin using the same SSH tunneling method to connect to other web servers on the master node. Zeppelin server is found at port 8890.


DataFrames API

DataFrame operations: - printSchema() - select() - show() - count() - groupBy() - sum() - limit() - orderBy() - filter() - withColumnRenamed() - join() - withColumn()

// In the Regular Expression below:
// ^  - Matches beginning of line
// .* - Matches any characters, except newline

 .show() // By default, show will return 20 rows

// Import the sql functions package, which includes statistical functions like sum, max, min, avg, etc.
import org.apache.spark.sql.functions._



A new column is constructed based on the input columns present in a dataframe:

df("columnName") // On a specific DataFrame.
col("columnName") // A generic column no yet associated with a DataFrame.
col("columnName.field") // Extracting a struct field
col("`a.column.with.dots`") // Escape `.` in column names.
$"columnName" // Scala short hand for a named column.
expr("a + 1") // A column that is constructed from a parsed SQL Expression.
lit("abc") // A column that produces a literal (constant) value.

Column objects can be composed to form complex expressions:

$"a" + 1
$"a" === $"b"

File Read

CSV - Create a DataFrame with the anticipated structure

val clickstreamDF = sqlContext.read.format("com.databricks.spark.csv")
  .option("header", "true")
  .option("delimiter", "\\t")
  .option("mode", "PERMISSIVE")
  .option("inferSchema", "true")

PARQUET - To create Dataset[Row] using SparkSession

val people = spark.read.parquet("...")
val department = spark.read.parquet("...")

people.filter("age > 30")
  .join(department, people("deptId") === department("id"))
  .groupBy(department("name"), "gender")
  .agg(avg(people("salary")), max(people("age")))

Repartitioning / Caching

val clickstreamNoIDs8partDF = clickstreamNoIDsDF.repartition(8)

An ideal partition size in Spark is about 50 MB - 200 MB. The cache gets stored in Project Tungsten binary compressed columnar format.