Big Data Processing in Spark

In the traditional 3-tier architecture, data processing is performed by the application server where the data itself is stored in the database server.Application server and database server are typically two different machine.Therefore, the processing cycle proceeds as follows.


--Application server send a query to the database server to retrieve the necessary data
--Application server perform processing on the received data
--Application server will save the changed data to the database server
In the traditional data processing paradigm, we move data to the code.



Then big data phenomenon arrives.Because the data volume is huge, it cannot be hold by a single database server.Big data is typically partitioned and stored across many physical DB server machines. On the other hand, application servers need to be added to increase the processing power of big data.as we increase the number of App servers and DB servers for storing and processing the big data, more data need to be transfer back and forth across the network during the processing cycle, up to a point where the network becomes a major bottleneck.



Moving code to data
To overcome the network bottleneck, we need a new computing paradigm.  Instead of moving data to the code, we move the code to the data and perform the processing at where the data is stored.
Notice the change of the program structure
--The program execution starts at a driver, which orchestrate the execution happening remotely across many worker servers within a cluster.
--Data is no longer transferred to the driver program, the driver program holds a data reference in its variable rather than the data itself.  The data reference is basically an id to locate the corresponding data residing in the database server
--Code is shipped from the program to the database server, where the execution is happening, and data is modified at the database server without leaving the server machine.
--Finally the program request a save of the modified data.  Since the modified data resides in the database server, no data transfer happens over the network.
By moving the code to the data, the volume of data transfer over network is significantly reduced.  This is an important paradigm shift for big data processing.

In the following session, I will use Apache Spark to illustrate how this big data processing paradigm is implemented.

RDD(Resilient Distributed Dataset)
RDD(Resilient Distributed Dataset) is how Spark implements the data reference concept. RDD(Resilient Distributed Dataset) is a logical reference of a dataset which is partitioned across many server machines in the cluster.
To make a clear distinction between data reference and data itself, a Spark program is organized as a sequence of execution steps, which can either be a "transformation" or an "action".

Programming Model
A typical program is organized as follows
--From an environment variable "context", create some initial data reference RDD(Resilient Distributed Dataset) objects.
--Transform initial RDD(Resilient Distributed Dataset) objects to create more RDD(Resilient Distributed Dataset) objects. Transformation is expressed in terms of functional programming where a code block is shipped from the driver program to multiple remote worker server, which hold a partition of the RDD(Resilient Distributed Dataset).Variable appears inside the code block can either be an item of the RDD(Resilient Distributed Dataset) or a local variable inside the driver program which get serialized over to the worker machine.After the code (and the copy of the serialized variables) is received by the remote worker server, it will be executed there by feeding the items of RDD(Resilient Distributed Dataset) residing in that partition.  Notice that the result of a transformation is a brand new RDD (the original RDD is not mutated)
--At the end, the RDD(Resilient Distributed Dataset) object (the data reference) will need to be materialized. This is achieved through an "action", which will dump the RDD(Resilient Distributed Dataset) into a storage, or return its value data to the driver program.
word count example 
# Get initial RDD from the context
file = spark.textFile("hdfs://...")

# Three consecutive transformation of the RDD
counts = file.flatMap(lambda line: line.split(" "))
             .map(lambda word: (word, 1))
             .reduceByKey(lambda a, b: a + b)


# Materialize the RDD using an action
counts.saveAsTextFile("hdfs://...") 

When the driver program starts its execution, it builds up a graph where nodes are RDD(Resilient Distributed Dataset) and edges are transformation steps.  However, no execution is happening at the cluster until an action is encountered.  At that point, the driver program will ship the execution graph as well as the code block to the cluster, where every worker server will get a copy.

The execution graph is a DAG(
Directed acyclic graph):
--Each DAG is a atomic unit of execution. 
--Each source node (no incoming edge) is an external data source or driver memory
--Each intermediate node is a RDD
--Each sink node (no outgoing edge) is an external data source or driver memory
Green edge connecting to RDD represents a transformation.  Red edge connecting to a sink node represents an action

Data Shuffling
Although we ship the code to worker server where the data processing happens, data movement cannot be completely eliminated.  For example, if the processing requires data residing in different partitions to be grouped first, then we need to shuffle data among worker server.
Spark carefully distinguish "transformation" operation in two types.
--"Narrow transformation" refers to the processing where the processing logic depends only on data that is already residing in the partition and data shuffling is unnecessary.  Examples of narrow transformation includes filter(), sample(), map(), flatMap() .... etc.
--"Wide transformation" refers to the processing where the processing logic depends on data residing in multiple partitions and therefore data shuffling is needed to bring them together in one place.  Example of wide transformation includes groupByKey(), reduceByKey() ... etc.



Joining two RDD can also affect the amount of data being shuffled.  Spark provides two ways to join data.  In a shuffle join implementation, data of two RDD with the same key will be redistributed to the same partition.  In other words, each of the items in each RDD will be shuffled across worker servers.

Beside shuffle join, Spark provides another alternative call broadcast join.  In this case, one of the RDD will be broadcasted and copied over to every partition.  Imagine the situation when one of the RDD is significantly smaller relative to the other, then broadcast join will reduce the network traffic because only the small RDD need to be copied to all worker servers while the large RDD doesn't need to be shuffled at all.

In some cases, transformation can be re-ordered to reduce the amount of data shuffling.  Below is an example of a JOIN between two huge RDDs followed by a filtering.

Plan1 is a naive implementation which follows the given order.  It first join the two huge RDD and then apply the filter on the join result.  This ends up causing a big data shuffling because the two RDD is huge, even though the result after filtering is small.

Plan2 offers a smarter way by using the "push-down-predicate" technique where we first apply the filtering in both RDDs before joining them.  Since the filtering will reduce the number of items of each RDD significantly, the join processing will be much cheaper.

Execution planning
As explain above, data shuffling incur the most significant cost in the overall data processing flow.  Spark provides a mechanism that generate an execute plan from the DAG that minimize the amount of data shuffling.
--Analyze the DAG to determine the order of transformation.  Notice that we starts from the action (terminal node) and trace back to all dependent RDDs.
--To minimize data shuffling, we group the narrow transformation together in a "stage" where all transformation tasks can be performed within the partition and no data shuffling is needed.  The transformations becomes tasks that are chained together within a stage
--Wide transformation sits at the boundary of two stages, which requires data to be shuffled to a different worker server.  When a stage finishes its execution, it persist the data into different files (one per partition) of the local disks.  Worker nodes of the subsequent stage will come to pickup these files and this is where data shuffling happens
Below is an example how the execution planning turns the DAG into an execution plan involving stages and tasks.

Reliability and Fault Resiliency
Since the DAG defines a deterministic transformation steps between different partitions of data within each RDD, fault recovery is very straightforward.  Whenever a worker server crashes during the execution of a stage, another worker server can simply re-execute the stage from the beginning by pulling the input data from its parent stage that has the output data stored in local files.  In case the result of the parent stage is not accessible (e.g. the worker server lost the file), the parent stage need to be re-executed as well.  Imagine this is a lineage of transformation steps, and any failure of a step will trigger a restart of execution from its last step.

Since the DAG itself is an atomic unit of execution, all the RDD values will be forgotten after the DAG finishes its execution.  Therefore, after the driver program finishes an action (which execute a DAG to its completion), all the RDD value will be forgotten and if the program access the RDD again in subsequent statement, the RDD needs to be recomputed again from its dependents.  To reduce this repetitive processing, Spark provide a caching mechanism to remember RDDs in worker server memory (or local disk).  Once the execution planner finds the RDD is already cache in memory, it will use the RDD right away without tracing back to its parent RDDs.  This way, we prune the DAG once we reach an RDD that is in the cache.

Apache Spark(Apache Spark™ is a fast and general engine for large-scale data processing.)provides a powerful framework for big data processing.  By the caching mechanism that holds previous computation result in memory, Spark out-performs Hadoop significantly because it doesn't need to persist all the data into disk for each round of parallel processing.
Speed
Run programs up to 100x faster than Hadoop MapReduce in memory, or 10x faster on disk.
--Spark has an advanced DAG execution engine that supports cyclic data flow and in-memory computing.
Logistic regression in Hadoop and Spark
Ease of Use
Write applications quickly in Java, Scala, Python, R.
--Spark offers over 80 high-level operators that make it easy to build parallel apps. And you can use it interactively from the Scala, Python and R shells.
text_file = spark.textFile("hdfs://...")

text_file.flatMap(lambda line: line.split())
    .map(lambda word: (word, 1))
    .reduceByKey(lambda a, b: a+b)
Word count in Spark's Python API
Generality
Combine SQL, streaming, and complex analytics.Spark powers a stack of libraries including SQL and DataFrames, MLlib for machine learning, GraphX, and Spark Streaming. You can combine these libraries seamlessly in the same application.
Runs Everywhere
Spark runs on Hadoop, Mesos, standalone, or in the cloud. It can access diverse data sources including HDFS, Cassandra, HBase, and S3.You can run Spark using its standalone cluster mode, on EC2, on Hadoop YARN, or on Apache Mesos. Access data in HDFS, Cassandra, HBase,Hive, Tachyon, and any Hadoop data source.

Spark Examples

These examples give a quick overview of the Spark API. Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects. You create a dataset from external data, then apply parallel operations to it. There are two types of operations: transformations, which define a new dataset based on previous ones, and actions, which kick off a job to execute on a cluster.

Text Search

In this example, we search through the error messages in a log file:
text_file = spark.textFile("hdfs://...")
errors = text_file.filter(lambda line: "ERROR" in line)
# Count all the errors
errors.count()
# Count errors mentioning MySQL
errors.filter(lambda line: "MySQL" in line).count()
# Fetch the MySQL errors as an array of strings
errors.filter(lambda line: "MySQL" in line).collect()
The red code fragments are function literals (closures) that get passed automatically to the cluster. The blue ones are Spark operations.

In-Memory Text Search

Spark can cache datasets in memory to speed up reuse. In the example above, we can load just the error messages in RAM using:
errors.cache()
After the first action that uses errors, later ones will be much faster.

Word Count

In this example, we use a few more transformations to build a dataset of (String, Int) pairs called counts and then save it to a file.
text_file = spark.textFile("hdfs://...")
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...")

Estimating Pi

Spark can also be used for compute-intensive tasks. This code estimates π by "throwing darts" at a circle. We pick random points in the unit square ((0, 0) to (1,1)) and see how many fall in the unit circle. The fraction should be π / 4, so we use this to get our estimate.
def sample(p):
    x, y = random(), random()
    return 1 if x*x + y*y < 1 else 0

count = spark.parallelize(xrange(0, NUM_SAMPLES)).map(sample) \
             .reduce(lambda a, b: a + b)
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)

Logistic Regression

This is an iterative machine learning algorithm that seeks to find the best hyperplane that separates two sets of points in a multi-dimensional feature space. It can be used to classify messages into spam vs non-spam, for example. Because the algorithm applies the same MapReduce operation repeatedly to the same dataset, it benefits greatly from caching the input in RAM across iterations.
points = spark.textFile(...).map(parsePoint).cache()
w = numpy.random.ranf(size = D) # current separating plane
for i in range(ITERATIONS):
    gradient = points.map(
        lambda p: (1 / (1 + exp(-p.y*(w.dot(p.x)))) - 1) * p.y * p.x
    
).reduce(lambda a, b: a + b)
    w -= gradient
print "Final separating plane: %s" % w
Note that the current separating plane, w, gets shipped automatically to the cluster with every map call.
The graph below compares the running time per iteration of this Spark program against a Hadoop implementation on 100 GB of data on a 100-node cluster, showing the benefit of in-memory caching:

Additional Examples

Many additional examples are distributed with Spark:
Built-in Libraries:
Third-Party Packages

Spark 1.6.0 released
We are happy to announce the availability of Spark 1.6.0! Spark 1.6.0 is the seventh release on the API-compatible 1.X line. With this release the Spark community continues to grow, with contributions from 248 developers!
Visit the release notes to read about the new features, or download the release today.

Spark Documentation

Setup instructions, programming guides, and other documentation are available for each version of Spark below:

Books

Related Posts