Apache Spark is an open-source big data processing framework that enables fast and distributed processing of large data sets. Spark provides an interface for programming distributed data processing across clusters of computers, using a high-level API.
Spark's key feature is its ability to distribute data across many nodes in a cluster and execute computations in parallel, making it possible to process large data sets faster than traditional batch processing systems. It achieves this by utilizing in-memory processing and an optimized DAG (Directed Acyclic Graph) engine that can optimize the execution of complex workflows.
What is a Cluster?
A cluster is a set of computers that are viewed as a single system.
Example (Configuration of a cluster):
- Capacity of each worker node: 16 CPU cores, 64 GB RAM
- Total workers: 10
- Total cluster capacity: 160 CPU cores, 640 GB RAM
What is a Worker?
The workers or worker nodes are responsible for providing the physical resources to the Spark framework. A worker has physical resources of CPU, RAM, and system storage. Workers facilitate Spark's distributed processing. A Spark application can have one or more workers.
Runtime Application (Spark + YARN)
How does a Spark program run on a cluster?
- It all starts when we run a spark-submit command and submit our application to the Resource Manager
- The Resource Manager verifies if there are resources available in the cluster, if so, it requests a Node Manager to create an application called Application Master in a container
The application can be written in PySpark (Python) or Scala.
Spark is written in Scala, which runs on a Java Virtual Machine (JVM). To port Spark to Python, a Java Wrapper had to be created on top of Spark (written in Scala) and then a Python Wrapper on top of Java. The Python Wrapper communicates with the Java Wrapper through a Py4J connection (allowing a Python application to call a Java application).
Our PySpark application initializes the Java application, and it calls the Spark APIs in the JVM. The PySpark main will be our PySpark Driver while the Java main will be the Application Driver.
Note: We will only have a PySpark Driver when we write applications in this language. If we do it in Scala, we will only have the Application Driver.
Now our Application Driver will request resources from our Resource Manager. This will create Spark Executor Containers and assign them to our driver to execute the tasks. Our runners are also JVMs.
Note: If we use Python libraries or Python UDFs, we will have a Python Worker in each executor, which is a Python Runtime Environment.
If we use only PySpark, we will not have this Python Worker because PySpark converts the code to Java language.
In the following image, we can see all the steps described previously in a summarized way.
Spark-submit is a command that allows us to run Spark Applications in a cluster. It is important to know some of your options.
spark-submit --class --master --deploy-mode [application-args]
• spark-submit --master yarn --deploy-mode cluster --driver-cores 1 --driver-memory 4G --num-executors 4 --executor-cores 4 --executor-memory 16G hello-spark.py
• spark-submit --class md.learning.HelloSpark --master yarn --deploy-mode cluster --driver-cores 1 --driver-memory 4G --num-executors 4 --executor-cores 4 --executor-memory 16G hello-spark.jar
• Not available for PySpark, provide the name of the class where you defined the main
--master -> yarn, local
• Tell the cluster manager if you use yarn, mesos, kubernates, etc. If you want to run locally, set local to indicate that it will have 3 threads
--deploy-mode -> client or cluster
• The way it will be deployed, there is no local, this is a configuration
--conf -> spark.executor.memoryOverhead = 0.20
• It allows us to add additional configurations
--driver-cores -> 2
• Amount of CPU that our driver container will have
--driver-memory -> 8G
• Amount of RAM that our driver container will have
--num-executors -> 4
• Amount executor containers
--executor-cores -> 4
• Amount of CPU that each executor container will have
--executor-memory -> 16G
• Amount of RAM that each of our executors’ containers will have
- The cluster manager is in charge of managing its resources
- The driver communicates with the cluster manager through the SparkContext. In this way the driver requests resources for the executors to be deployed
Cluster Manager Types
- Standalone: It is a cluster manager that comes included with Spark, but one of its limitations is that Apache Spark must be the only framework running in the Cluster. If we want to run multiple frameworks in parallel on the same cluster as Apache Spark and Apache Flink, we should consider YARN.
- YARN: The Hadoop 2 and 3 resource manager allows us to work with multiple frameworks in the same cluster.
- Mesos: A cluster manager that allows you to work with MapReduce, however it is already deprecated.
It is responsible for providing and coordinating the necessary resources for the execution of an application. There is only one driver per application.
When we submit an application via the spark-submit command, the Driver takes the application and performs various tasks that help identify all the transformations and actions that exist in the application. All these operations are grouped in a logical flow (logical plan) which is called Directed Acyclic Graph (DAG), which is then converted to a physical execution plan.
Directed Acyclic Graph
DAG does not have cycles. DAG contains the Spark execution hierarchy. These comprise the jobs, which consist of stages, and these in turn in tasks. The DAGs represent the logical execution and are of the lazy type, that is, they will be executed when an action is invoked.
- Jobs -> Stages -> Tasks
The executors are responsible for carrying out the execution of the tasks that are assigned by the Driver that, after being executed, return to the driver. In a cluster, a worker node can be composed of one or more executors where each executor has an assigned number of resources (CPU, RAM, and storage) and can store data in memory or disk. An executor only stays alive until the application is finished. The slots are the number of CPU available that an executor has, and these help to parallelize the execution of tasks.
Note: In the case of Databricks, the concept of executor and worker is the same. That is, we only indicate the number of workers/executor and their capacity (CPU, RAM, and storage).
- Shuffling: It is the process used to redistribute data across cluster nodes. Data is moved across the network, and shuffle groups data into buckets where each bucket forms a partition.
- Job: It is the processing of an algorithm in Spark. A Spark application can have one or more jobs depending on its processing.
- Stage: A stage consists of multiple tasks (tasks). When we execute a job, Spark configures the number of stages needed (Stages 1 and 2). Each stage is delimited by a shuffling action, that is, a new stage is created every time we must move data in the network.
- Tasks: A task is the simplest and most basic unit of processing in Spark. These are executed by the executors. Spark tasks are always based on the partitioning of our data, this allows us to be executed in parallel by one or more executors.
Spark Deploy Modes
Cluster Mode: When we use this mode, our Application Driver runs on a container inside one of the worker nodes of the cluster. Some advantages of this mode are that once the spark-submit is done, we can disconnect from our client machine, and the latency time between the driver and the executor is minimal. In production environments we should always choose to use Cluster Mode.
Client Mode: When we use this mode our Application Driver runs on the client machine. This mode is used by spark-shell, notebooks interactively, yet a dependency is generated between the client and the cluster. If the client suffers any damage or shuts down, the driver dies, and the executors assigned to the driver are orphaned, and so the Resource Manager terminates the process.
Fast-growing tech companies partner with Encora to outsource product development and drive growth. Contact us to learn more about our software engineering capabilities.