В этой статье мы обсудим, как обновить метаданные фрейма данных PySpark. В частности, мы рассмотрим следующие темы:
- Понимание важности метаданных в PySpark DataFrames
- Как получить доступ и просмотреть метаданные PySpark DataFrame
- Различные способы обновления метаданных PySpark DataFrame
- Лучшие практики управления метаданными в PySpark DataFrames
К концу этой статьи у нас будет четкое понимание того, как обновлять метаданные PySpark DataFrame и как эффективно управлять метаданными в проектах PySpark.
- Важность метаданных в PySpark DataFrames
- Как получить доступ и просмотреть метаданные PySpark DataFrame
- Различные способы обновления метаданных PySpark DataFrame
- Изменить типы данных во фрейме данных в PySpark
- Добавить новые столбцы во фрейм данных в PySpark
- Удалить столбцы фрейма данных в Pyspark
- Изменить метаданные столбца фрейма данных в PySpark
- Обновить схему с помощью функции withMetadata() в PySpark
- Заключение
Важность метаданных в PySpark DataFrames
Метаданные в PySpark DataFrame относятся к информации о данных, такой как имена столбцов, типы данных и ограничения. Это важно, потому что предоставляет важную информацию о структуре и содержании данных. Эта информация используется PySpark во время таких операций, как запросы, фильтрация и присоединение. Если метаданные неверны или противоречивы, это может привести к ошибкам и неожиданным результатам в операциях PySpark. Кроме того, точные метаданные могут повысить производительность операций PySpark, позволяя оптимизатору принимать более взвешенные решения. Важно поддерживать точность и актуальность метаданных, чтобы обеспечить правильное функционирование PySpark DataFrames и общую целостность данных.
Как получить доступ и просмотреть метаданные PySpark DataFrame
В PySpark мы можем получить доступ к метаданным DataFrame, используя атрибут.schema. Это возвращает объект StructType, который содержит метаданные для DataFrame. Мы можем просмотреть метаданные, вызвав метод printSchema() в DataFrame. Это напечатает метаданные в формате дерева, показывая имена столбцов, типы данных, а также то, может ли столбец принимать значение NULL или нет.
Вот пример доступа и просмотра метаданных фрейма данных:
Python3
# Importing required modules
from
pyspark.sql.types
import
StructType, StructField, StringType, IntegerType
from
pyspark.sql
import
SparkSession
# Create a SparkSession
spark
=
SparkSession.builder.appName(
"Metadata"
).getOrCreate()
# Define schema of data frame
schema
=
StructType([
StructField(
"name"
, StringType()),
StructField(
"age"
, IntegerType())
])
# Create data frame
data
=
[(
"Alice"
,
25
),
(
"Bob"
,
30
),
(
"Charlie"
,
35
)]
df
=
spark.createDataFrame(data, schema)
# Access and view the metadata
(df.schema)
df.printSchema()
Вывод: первая строка вывода будет объектом StructType, а следующая будет древовидным форматом метаданных фрейма данных.
StructType([StructField('name', StringType(), True), StructField('age', IntegerType(), True)]) root |-- name: string (nullable = true) |-- age: integer (nullable = true)
Мы также можем использовать атрибут dtypes, чтобы получить имя столбца и информацию о типе данных в формате кортежа.
df.dtypes
Это вернет список кортежей, каждый из которых содержит имя столбца и тип данных.
[('name', 'string'), ('age', 'int')]
Различные способы обновления метаданных PySpark DataFrame
Существует несколько способов обновить метаданные PySpark DataFrame в зависимости от конкретных изменений, которые необходимо внести. Вот несколько примеров:
Изменить имена столбцов фрейма данных в PySpark
Этот метод используется для изменения имени столбца во фрейме данных. Метод withColumnRenamed() используется для изменения имени столбца. В этом мы собираемся изменить имя столбца «имя» на » имя пользователя «. Ниже приведены шаги по изменению имени столбца.
Шаг 1: Сначала мы импортируем все необходимые модули, а затем создаем искровую сессию.
Шаг 2: Создайте фрейм данных PySpark с именами данных и столбцов, такими как «имя» и «возраст».
Шаг 3: Используйте метод withColumnRenamed(), чтобы изменить имя столбца «имя» на «имя пользователя».
Шаг 4: Вызовите метод printSchema(), чтобы распечатать схему DataFrame после изменения, которое показывает, что имя столбца было изменено на «имя пользователя».
Python3
# Importing required modules
from
pyspark.sql.types
import
StructType, StructField, StringType, IntegerType
from
pyspark.sql
import
SparkSession
# Create a SparkSession
spark
=
SparkSession.builder.appName(
"Metadata"
).getOrCreate()
# Create a DataFrame
schema
=
StructType([
StructField(
"name"
, StringType()),
StructField(
"age"
, IntegerType())
])
# Create a DataFrame
data
=
[(
"Alice"
,
25
),
(
"Bob"
,
30
),
(
"Charlie"
,
35
)]
df
=
spark.createDataFrame(data,
[
"name"
,
"age"
])
# print schema of data frame
df.printSchema()
# Change column names
df
=
df.withColumnRenamed(
"name"
,
"username"
)
df.printSchema()
Вывод перед изменением имени столбца:
root |-- name: string (nullable = true) |-- age: long (nullable = true)
Вывод после изменения имени столбца:
root |-- username: string (nullable = true) |-- age: long (nullable = true)
Изменить типы данных во фрейме данных в PySpark
Этот метод используется для изменения типа данных столбца. Метод cast() используется для изменения типа данных столбца. Например, чтобы изменить тип данных столбца » age » с long на double, мы должны выполнить следующие шаги.
Шаг 1: Сначала мы импортируем все необходимые модули, а затем создаем искровую сессию.
Шаг 2: Создайте фрейм данных с именами данных и столбцов, такими как «имя» и «возраст».
Шаг 3: Используйте метод withColumn() вместе с методом cast(), чтобы изменить тип данных столбца «age» на double.
Шаг 4: Вызовите метод printSchema (), чтобы распечатать схему DataFrame после изменения, которое показывает, что тип данных столбца «age» был изменен на double.
Python3
# Import required modules
from
pyspark.sql.types
import
StructType, StructField, StringType, IntegerType
from
pyspark.sql
import
SparkSession
# Create a SparkSession
spark
=
SparkSession.builder.appName(
"Metadata"
).getOrCreate()
# Create a DataFrame
schema
=
StructType([
StructField(
"name"
, StringType()),
StructField(
"age"
, IntegerType())
])
# Create a DataFrame
data
=
[(
"Alice"
,
25
),
(
"Bob"
,
30
),
(
"Charlie"
,
35
)]
df
=
spark.createDataFrame(data,
[
"name"
,
"age"
])
df.printSchema()
# Change column names
from
pyspark.sql.types
import
DoubleType
df
=
df.withColumn(
"age"
,
df[
"age"
].cast(DoubleType()))
df.printSchema()
Вывод перед изменением типа данных:
root |-- name: string (nullable = true) |-- age: long (nullable = true)
Вывод после изменения типа данных:
root |-- name: string (nullable = true) |-- age: double (nullable = true)
Добавить новые столбцы во фрейм данных в PySpark
Этот метод используется для добавления нового столбца во фрейм данных. Метод withColumn() вместе с методом lit() используется для добавления нового столбца. Здесь мы собираемся добавить новый столбец » пол » строкового типа.
Шаг 1: Сначала мы импортируем все необходимые модули, а затем создаем искровую сессию.
Шаг 2: Создайте фрейм данных с именами данных и столбцов, такими как «имя» и «возраст».
Шаг 3: Используйте метод withColumn() вместе с методом lit(), чтобы добавить новый столбец » пол » строкового типа со значением по умолчанию «неизвестно».
Шаг 4: Используйте метод printSchema() для печати схемы DataFrame после изменения, которое показывает, что новый столбец » пол » был добавлен в фрейм данных строкового типа.
Python3
# Import required modules
from
pyspark.sql.types
import
StructType, StructField, StringType, IntegerType
from
pyspark.sql
import
SparkSession
# Create a SparkSession
spark
=
SparkSession.builder.appName(
"Metadata"
).getOrCreate()
# Create a DataFrame
schema
=
StructType([
StructField(
"name"
, StringType()),
StructField(
"age"
, IntegerType())
])
# Create a DataFrame
data
=
[(
"Alice"
,
25
),
(
"Bob"
,
30
),
(
"Charlie"
,
35
)]
df
=
spark.createDataFrame(data,
[
"name"
,
"age"
])
df.printSchema()
# Add column
from
pyspark.sql.functions
import
lit
df
=
df.withColumn(
"gender"
,
lit(
"unknown"
))
# Print Schema of data frame
df.printSchema()
df.show()
Вывод перед добавлением нового столбца:
root |-- name: string (nullable = true) |-- age: long (nullable = true)
Вывод после добавления нового столбца:
root |-- name: string (nullable = true) |-- age: long (nullable = true) |-- gender: string (nullable = false) +-------+---+-------+ | name|age| gender| +-------+---+-------+ | Alice| 25|unknown| | Bob| 30|unknown| |Charlie| 35|unknown| +-------+---+-------+
Удалить столбцы фрейма данных в Pyspark
В этом мы собираемся удалить столбец фрейма данных, используя метод drop(), который используется для удаления столбца из фрейма данных. Мы собираемся удалить столбец «пол», который мы создали в предыдущем примере. Вот шаги, чтобы сделать это.
Шаг 1: Сначала мы импортируем все необходимые модули, а затем создаем искровую сессию.
Шаг 2: мы создаем фрейм данных с данными, именами столбцов, такими как «имя», «возраст» и «пол».
Шаг 3: Используйте метод drop(), чтобы удалить столбец «пол» из фрейма данных.
Шаг 4: Вызовите метод printSchema(), чтобы распечатать схему DataFrame после изменения, которое показывает, что столбец «пол» был удален.
Python3
# Importing required modules
from
pyspark.sql.types
import
StructType, StructField, StringType, IntegerType
from
pyspark.sql
import
SparkSession
# Create a SparkSession
spark
=
SparkSession.builder.appName(
"Metadata"
).getOrCreate()
# Create a DataFrame
schema
=
StructType([
StructField(
"name"
, StringType()),
StructField(
"age"
, IntegerType())
])
# Create a DataFrame
data
=
[(
"Alice"
,
25
,
"female"
),
(
"Bob"
,
30
,
"male"
),
(
"Charlie"
,
35
,
"male"
)]
df
=
spark.createDataFrame(data,
[
"name"
,
"age"
,
"gender"
])
df.printSchema()
# Remove column
df
=
df.drop(
"gender"
)
df.printSchema()
Вывод перед удалением столбца пола:
root |-- name: string (nullable = true) |-- age: long (nullable = true) |-- gender: string (nullable = true)
Вывод после удаления столбца пола:
root |-- name: string (nullable = true) |-- age: long (nullable = true)
Изменить метаданные столбца фрейма данных в PySpark
В этом мы собираемся изменить метаданные определенного столбца, сделав его «nullable = true» на «nullable = false». Это можно сделать, создав новый объект схемы с помощью класса StructType и передав его методу createDataFrame(). Вот шаги, чтобы сделать это.
Шаг 1: Сначала мы импортируем все необходимые модули, а затем создаем искровую сессию.
Шаг 2: Создайте фрейм данных с данными, именами столбцов как «имя» и «возраст».
Шаг 3: Создайте новый объект схемы, создав список полей с обновленными метаданными, в частности, сделав столбцы недействительными.
Шаг 4: Создайте новый фрейм данных с помощью метода createDataFrame() и передайте ему RDD исходного фрейма данных и новую схему, которая обновит метаданные фрейма данных.
Python3
# Importing required modules
from
pyspark.sql.types
import
StructType, StructField, StringType, IntegerType
from
pyspark.sql
import
SparkSession
# Create a SparkSession
spark
=
SparkSession.builder.appName(
"Metadata"
).getOrCreate()
# Create a DataFrame
data
=
[(
"Alice"
,
25
),
(
"Bob"
,
30
),
(
"Charlie"
,
35
)]
df
=
spark.createDataFrame(data,
[
"name"
,
"age"
])
df.printSchema()
# Change column metadata
fields
=
[StructField(field.name,
field.dataType,
False
)
for
field
in
df.schema.fields]
# Store changed data frame in new_schema
new_schema
=
StructType(fields)
df
=
spark.createDataFrame(df.rdd,
new_schema)
df.printSchema()
Вывод перед изменением метаданных столбца:
root |-- name: string (nullable = true) |-- age: long (nullable = true)
Вывод после изменения метаданных столбца:
root |-- name: string (nullable = false) |-- age: long (nullable = false)
Обновить схему с помощью функции withMetadata() в PySpark
Функция withMetadata() не является встроенной функцией PySpark для обновления метаданных DataFrame. Однако мы можем обновить метаданные DataFrame, используя функцию withMetadata(), определяемым пользователем способом.
Шаг 1: Мы начинаем с импорта необходимых модулей, освещенных из pyspark.sql.functions и JSON.
Шаг 2: Создайте DataFrame с именами данных и столбцов как » имя » и » возраст «, используя метод createDataFrame ().
Шаг 3: Мы определяем функцию withMetadata(), которая принимает два аргумента: DataFrame и словарь метаданных.
Шаг 4: Внутри функции мы обновляем метаданные DataFrame, используя различные операции, такие как переименование столбцов, изменение типов данных, добавление и удаление столбцов и изменение метаданных столбцов.
Шаг 5: Преобразуйте метаданные, переданные в виде словаря, в строку JSON с помощью метода json.dumps().
Шаг 6: Добавьте метаданные в DataFrame, добавив новый столбец «метаданные» со значением переданных метаданных в формате строки JSON, используя метод withColumn() и функцию lit().
Шаг 7: Вызовите функцию withMetadata() и передайте DataFrame и метаданные в качестве аргументов.
Шаг 8: Вызовите метод printSchema(), чтобы распечатать схему DataFrame после изменений, которая показывает, что новый столбец «метаданные» был добавлен с переданными метаданными в формате строки JSON.
Вот шаги, как мы можем использовать функцию withMetadata() для обновления метаданных DataFrame:
Python3
# Import required modules
from
pyspark.sql.types
import
StructType, StructField, StringType, IntegerType
from
pyspark.sql.functions
import
lit
from
pyspark.sql
import
SparkSession
# Create a SparkSession
spark
=
SparkSession.builder.appName(
"Metadata"
).getOrCreate()
from
pyspark.sql.functions
import
lit
import
json
# Create a DataFrame
data
=
[(
"Alice"
,
25
),
(
"Bob"
,
30
),
(
"Charlie"
,
35
)]
df
=
spark.createDataFrame(data,
[
"name"
,
"age"
])
df.printSchema()
# Define a function to update the metadata
def
withMetadata(df, metadata):
# Update the metadata of the DataFrame
df
=
df.withColumnRenamed(
"name"
,
"username"
)
df
=
df.withColumn(
"age"
,
df[
"age"
].cast(
"double"
))
df
=
df.withColumn(
"gender"
,
lit(
"unknown"
))
df
=
df.drop(
"gender"
)
fields
=
[StructField(field.name,
field.dataType,
False
)
for
field
in
df.schema.fields]
new_schema
=
StructType(fields)
df
=
spark.createDataFrame(df.rdd,
new_schema)
# Add the metadata to the DataFrame
df
=
df.withColumn(
"metadata"
,
lit(json.dumps(metadata)))
return
df
# Update the metadata of the DataFrame
df
=
withMetadata(df, {
"source"
:
"file"
,
"date"
:
"2022-01-01"
})
df.printSchema()
Вывод перед вызовом функции withMetadata():
root |-- name: string (nullable = true) |-- age: long (nullable = true)
Вывод после вызова функции withMetadata():
root |-- username: string (nullable = false) |-- age: double (nullable = false) |-- metadata: string (nullable = false)
Заключение
Подводя итог, метаданные в PySpark DataFrames относятся к информации о данных, такой как имена столбцов, типы данных и ограничения. Это важно, потому что он предоставляет важную информацию о структуре и содержании данных и используется PySpark во время таких операций, как запросы, фильтрация и объединение. Чтобы обновить метаданные PySpark DataFrame.