spark-configs

Spark Configurations
Spark Configurations

Contents
  1. Configurations Overview
  2. Configs via spark-submit command
  3. Configs via spark conf class
  4. Configs via spark-defaults.config file
  5. Important Configurations
  6. Calculating Spark Job resource values
1. Overview

The entry point to the spark application is where you define configurations and can call internal APIs to create RDDs, DataFrames & DataSets.

In Spark 1.X versions, the entry points are spark context, sql context and hive context.
we need to build the sql and hive contexts from main type which is spark context.

In Spark 2.X versions, spark session is the unified entry point and above three contexts are available under this.

We can specify the configurations in 3 ways.
1. spark-submit command (via runtime configs) Secondary priority
2. code level configs(via SparkConf) High priority
3. spark-defaults.conf file Least priority

2. Via Spark Submit command

$ spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
--conf <key>=<value> \

$ spark-submit \
--class com.test.examples.TestSparkConnection \
--master local[*] \
--deploy-mode client \
--conf spark.driver.memory=2g\
--conf spark.executors.memory=4g

3. Via Spark Conf class

SparkConf is a util class to maintain all the spark related configurations handy in k,v pairs.
It can be passed to spark builder to create Spark session.
SparkConf() - default constructor is equivalent to SparkConf(true)
SparkConf(boolean loadDefaults) - true to load all values from classpath and environment variables, false to not.


import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object Main{
def main(args: Array[String]): Unit = {

val confs = List(("spark.driver.maxResultSize","6g"),
("spark.driver.cores","2"),
("spark.driver.memory","4g"),
("spark.executor.cores","2"),
("spark.executor.memory","4g"),
("spark.dynamicAllocation.enabled","true"))

val sparkConf = new SparkConf()
.setAppName("TestApp")
.setMaster("local[*]")
.setAll(confs)

val sparkSession = SparkSession.builder()
.config(sparkConf)
.getOrCreate()

println(sparkSession.version)
println(sparkSession.sessionState.conf)
sparkSession.stop()

}
}


from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark_llap import HiveWarehouseSession

def main():
print('Spark Init')
print('Spark Session Init')


configs = [('spark.driver.cores','1'),('spark.executor.cores','1')]
conf = SparkConf(True).setAll(configs)

print(conf.getAll())
spark_builder = SparkSession.builder.config(conf=conf).appName("dsf").master('local[*]')
spark_builder.config('spark.jars','/home/mkm/softwares/spark-2.4.8/jars/jackson-module-paranamer-2.7.9.jar')
spark_builder.enableHiveSupport()
print(spark_builder.config)
spark = spark_builder.getOrCreate()
print(spark.conf)
print(spark.version)
empdf = spark.createDataFrame([(1,'abc'),(2,'xyz'),(3,'pqr'),(4,'lmn')],['id','name'])
empdf.show()
print('Closing Spark Session')
print('Closing Spark App')
spark.stop()


if __name__ == '__main__':
main()

4. Via spark-defaults.conf file
By default, spark-submit command reads spark-defaults.conf file, if it's in the SPARK_HOME/conf directory.


spark.master             spark://5.6.7.8:7077
spark.executor.memory    4g
spark.eventLog.enabled   true
spark.serializer         org.apache.spark.serializer.KryoSerializer

5. Some Important Configurations

Legends: Application Execution Memory Compression Serialization Resource Allocation

Config
Description
Default Value
spark.app.name A
Name of your spark application
No default value
spark.driver.cores E
Set the cores for driver program
1
spark.executor.cores E
Set the cores for each executor
yarn - 1
others - all available cores
spark.driver.memory M
Set the driver program execution memory
1g
spark.executor.memory M
Set the individual executor memory
1g
spark.executor.pyspark.memory M
Pyspark's memory in each executor
No default value
spark.driver.maxResultSize M
Driver program memory to hold all the serialized values generated by an action.
1g
spark.driver.memoryOverheadFactor M
Percentage of driver's memory is allocated for non JVM tasks
spark.driver.memoryOverhead = driverMemory * spark.driver.memoryOverheadFactor
0.1
spark.executor.memoryOverheadFactor M
Percentage of executor's memory is allocated for non JVM tasks.
spark.executor.memoryOverhead = executorMemory * spark.executor.memoryOverheadFactor
0.1
spark.shuffle.compress C
Compresses the intermediate map output files
true
spark.serializer S
Name of the serializer type of the shuffling objects
Types - Java(default) & Kyro(Fast)
org.apache.spark.serializer.JavaSerializer
spark.dynamicAllocation.enabled R
Executors can be added/removed as per workload
false
6. Calculating Spark Job resource values



Ideal cores per executor are 5
Ideal memory overhead per executor is 0.7% of calculated.
Example 1. 10 Node cluster with each node having 16 cores and 32gb memory.
1 core and 1gb memory should be kept for OS related activities on each node.
Ideal cores per executor are 5, so executors per node = total cores per node/5 = 15/5 = 3
Total executors = executors per node * total nodes = 3 * 10 = 30 executors, 1 executor is reserved for Application Master
Executor memory per node = memory per node/executors per node = 31/3 = 10gb
Memory overhead per executor = 0.7 % of 10gb = 0.07gb = 700mb
Final memory per executor = rounded(calculated executor memory per node - memory overhead) = 10gb - 700mb = 9gb
$ spark-submit run-job.py --num-executors 29 --executor-cores 5 --executor-memory 9gb
Example 2. 16 Node cluster with each node having 32 cores and 64gb memory.
1 core and 1gb memory should be kept for OS related activities on each node.
Ideal cores per executor are 5, so executors per node = total cores per node/5 = 31/5 = 6
Total executors = executors per node * total nodes = 6 * 15 = 90 executors, 1 executor is reserved for Application Master
Executor memory per node = memory per node/executors per node = 63/6 = 10gb
Memory overhead per executor = 0.7 % of 10gb = 0.07gb = 700mb
Final memory per executor = rounded(calculated executor memory per node - memory overhead) = 10gb - 700mb = 9gb
$ spark-submit run-job.py --num-executors 89 --executor-cores 5 --executor-memory 9gb
Compiled on WEDNESDAY, 09-OCTOBER-2024, 10:52:40 AM IST

Comments

Popular posts from this blog

hadoop-installation-ubuntu

jenv-tool

hive-installation-in-ubuntu