Which of the following DataFrame operators is never classified as a wide transformation?
As a general rule: After having gone through the practice tests you probably have a good feeling for what classifies as a wide and what classifies as a narrow transformation. If you are unsure, feel
free to play around in Spark and display the explanation of the Spark execution plan via DataFrame.[operation, for example sort()].explain(). If repartitioning is involved, it would count as a wide
transformation.
DataFrame.select()
Correct! A wide transformation includes a shuffle, meaning that an input partition maps to one or more output partitions. This is expensive and causes traffic across the cluster. With the select()
operation however, you pass commands to Spark that tell Spark to perform an operation on a specific slice of any partition. For this, Spark does not need to exchange data across partitions, each
partition can be worked on independently. Thus, you do not cause a wide transformation.
DataFrame.repartition()
Incorrect. When you repartition a DataFrame, you redefine partition boundaries. Data will flow across your cluster and end up in different partitions after the repartitioning is completed. This is
known as a shuffle and, in turn, is classified as a wide transformation.
DataFrame.aggregate()
No. When you aggregate, you may compare and summarize data across partitions. In the process, data are exchanged across the cluster, and newly formed output partitions depend on one or more
input partitions. This is a typical characteristic of a shuffle, meaning that the aggregate operation may classify as a wide transformation.
DataFrame.join()
Wrong. Joining multiple DataFrames usually means that large amounts of data are exchanged across the cluster, as new partitions are formed. This is a shuffle and therefore DataFrame.join()
counts as a wide transformation.
DataFrame.sort()
False. When sorting, Spark needs to compare many rows across all partitions to each other. This is an expensive operation, since data is exchanged across the cluster and new partitions are
formed as data is reordered. This process classifies as a shuffle and, as a result, DataFrame.sort() counts as wide transformation.
More info: Understanding Apache Spark Shuffle | Philipp Brunenberg
Which of the following code blocks reads in the two-partition parquet file stored at filePath, making sure all columns are included exactly once even though each partition has a different schema?
Schema of first partition:
1. root
2. |-- transactionId: integer (nullable = true)
3. |-- predError: integer (nullable = true)
4. |-- value: integer (nullable = true)
5. |-- storeId: integer (nullable = true)
6. |-- productId: integer (nullable = true)
7. |-- f: integer (nullable = true)
Schema of second partition:
1. root
2. |-- transactionId: integer (nullable = true)
3. |-- predError: integer (nullable = true)
4. |-- value: integer (nullable = true)
5. |-- storeId: integer (nullable = true)
6. |-- rollId: integer (nullable = true)
7. |-- f: integer (nullable = true)
8. |-- tax_id: integer (nullable = false)
This is a very tricky Question: and involves both knowledge about merging as well as schemas when reading parquet files.
spark.read.option('mergeSchema', 'true').parquet(filePath)
Correct. Spark's DataFrameReader's mergeSchema option will work well here, since columns that appear in both partitions have matching data types. Note that mergeSchema would fail if one or
more columns with the same name that appear in both partitions would have different data types.
spark.read.parquet(filePath)
Incorrect. While this would read in data from both partitions, only the schema in the parquet file that is read in first would be considered, so some columns that appear only in the second partition
(e.g. tax_id) would be lost.
nx = 0
for file in dbutils.fs.ls(filePath):
if not file.name.endswith('.parquet'):
continue
df_temp = spark.read.parquet(file.path)
if nx == 0:
df = df_temp
else:
df = df.union(df_temp)
nx = nx+1
df
Wrong. The key idea of this solution is the DataFrame.union() command. While this command merges all data, it requires that both partitions have the exact same number of columns with identical
data types.
spark.read.parquet(filePath, mergeSchema='y')
False. While using the mergeSchema option is the correct way to solve this problem and it can even be called with DataFrameReader.parquet() as in the code block, it accepts the value True as a
boolean or string variable. But 'y' is not a valid option.
nx = 0
for file in dbutils.fs.ls(filePath):
if not file.name.endswith('.parquet'):
continue
df_temp = spark.read.parquet(file.path)
if nx == 0:
df = df_temp
else:
df = df.join(df_temp, how='outer')
nx = nx+1
df
No. This provokes a full outer join. While the resulting DataFrame will have all columns of both partitions, columns that appear in both partitions will be duplicated - the Question: says all
columns that
are included in the partitions should appear exactly once.
More info: Merging different schemas in Apache Spark | by Thiago Cordon | Data Arena | Medium
Static notebook | Dynamic notebook: See test 3, Question: 37 (Databricks import instructions)
Which of the following code blocks silently writes DataFrame itemsDf in avro format to location fileLocation if a file does not yet exist at that location?
The trick in this Question: is knowing the 'modes' of the DataFrameWriter. Mode ignore will ignore if a file already exists and not replace that file, but also not throw an error. Mode
errorifexists will throw an error, and is the default mode of the DataFrameWriter. The Question: explicitly calls for the DataFrame to be 'silently' written if it does not exist, so you need to
specify mode('ignore') here to avoid having Spark communicate any error to you if the file already exists.
The `overwrite' mode would not be right here, since, although it would be silent, it would overwrite the already-existing file. This is not what the Question: asks for.
It is worth noting that the option starting with spark.DataFrameWriter(itemsDf) cannot work, since spark references the SparkSession object, but that object does not provide the DataFrameWriter.
As you can see in the documentation (below), DataFrameWriter is part of PySpark's SQL API, but not of its SparkSession API.
More info:
DataFrameWriter: pyspark.sql.DataFrameWriter.save --- PySpark 3.1.1 documentation
SparkSession API: Spark SQL --- PySpark 3.1.1 documentation
Static notebook | Dynamic notebook: See test 1, Question: 59 (Databricks import instructions)
Which of the following code blocks stores DataFrame itemsDf in executor memory and, if insufficient memory is available, serializes it and saves it to disk?
The key to solving this Question: is knowing (or reading in the documentation) that, by default, cache() stores values to memory and writes any partitions for which there is insufficient memory
to disk. persist() can achieve the exact same behavior, however not with the StorageLevel.MEMORY_ONLY option listed here. It is also worth noting that cache() does not have any arguments.
If you have troubles finding the storage level information in the documentation, please also see this student Q&A thread that sheds some light here.
Static notebook | Dynamic notebook: See test 2, Question: 30 (Databricks import instructions)
The code block shown below should add a column itemNameBetweenSeparators to DataFrame itemsDf. The column should contain arrays of maximum 4 strings. The arrays should be composed of
the values in column itemsDf which are separated at - or whitespace characters. Choose the answer that correctly fills the blanks in the code block to accomplish this.
Sample of DataFrame itemsDf:
1. +------+----------------------------------+-------------------+
2. |itemId|itemName |supplier |
3. +------+----------------------------------+-------------------+
4. |1 |Thick Coat for Walking in the Snow|Sports Company Inc.|
5. |2 |Elegant Outdoors Summer Dress |YetiX |
6. |3 |Outdoors Backpack |Sports Company Inc.|
7. +------+----------------------------------+-------------------+
Code block:
itemsDf.__1__(__2__, __3__(__4__, "[\s\-]", __5__))
This Question: deals with the parameters of Spark's split operator for strings.
To solve this question, you first need to understand the difference between DataFrame.withColumn() and DataFrame.withColumnRenamed(). The correct option here is DataFrame.withColumn()
since, according to the question, we want to add a column and not rename an existing column. This leaves you with only 3 answers to consider.
The second gap should be filled with the name of the new column to be added to the DataFrame. One of the remaining answers states the column name as itemNameBetweenSeparators, while the
other two state it as 'itemNameBetweenSeparators'. The correct option here is 'itemNameBetweenSeparators', since the other option would let Python try to interpret itemNameBetweenSeparators
as the name of a variable, which we have not defined. This leaves you with 2 answers to consider.
The decision boils down to how to fill gap 5. Either with 4 or with 5. The Question: asks for arrays of maximum four strings. The code in gap 5 relates to the limit parameter of Spark's split
operator
(see documentation linked below). The documentation states that 'the resulting array's length will not be more than limit', meaning that we should pick the answer option with 4 as the code in the
fifth gap here.
On a side note: One answer option includes a function str_split. This function does not exist in pySpark.
More info: pyspark.sql.functions.split --- PySpark 3.1.2 documentation
Static notebook | Dynamic notebook: See test 3, Question: 38 (Databricks import instructions)