Обновить метаданные Pyspark Dataframe

Программное обеспечение для управления заказами Изучение

В этой статье мы обсудим, как обновить метаданные фрейма данных PySpark. В частности, мы рассмотрим следующие темы:

  • Понимание важности метаданных в PySpark DataFrames
  • Как получить доступ и просмотреть метаданные PySpark DataFrame
  • Различные способы обновления метаданных PySpark DataFrame
  • Лучшие практики управления метаданными в PySpark DataFrames

К концу этой статьи у нас будет четкое понимание того, как обновлять метаданные PySpark DataFrame и как эффективно управлять метаданными в проектах PySpark.

Важность метаданных в PySpark DataFrames

Метаданные в PySpark DataFrame относятся к информации о данных, такой как имена столбцов, типы данных и ограничения. Это важно, потому что предоставляет важную информацию о структуре и содержании данных. Эта информация используется PySpark во время таких операций, как запросы, фильтрация и присоединение. Если метаданные неверны или противоречивы, это может привести к ошибкам и неожиданным результатам в операциях PySpark. Кроме того, точные метаданные могут повысить производительность операций PySpark, позволяя оптимизатору принимать более взвешенные решения. Важно поддерживать точность и актуальность метаданных, чтобы обеспечить правильное функционирование PySpark DataFrames и общую целостность данных.

Как получить доступ и просмотреть метаданные PySpark DataFrame

В PySpark мы можем получить доступ к метаданным DataFrame, используя атрибут.schema. Это возвращает объект StructType, который содержит метаданные для DataFrame. Мы можем просмотреть метаданные, вызвав метод printSchema() в DataFrame. Это напечатает метаданные в формате дерева, показывая имена столбцов, типы данных, а также то, может ли столбец принимать значение NULL или нет.

Вот пример доступа и просмотра метаданных фрейма данных:

Python3

# Importing required modules
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import SparkSession
 
# Create a SparkSession
spark = SparkSession.builder.appName("Metadata").getOrCreate()
 
# Define schema of data frame
schema = StructType([
    StructField("name", StringType()),
    StructField("age", IntegerType())
])
 
# Create data frame
data = [("Alice", 25),
        ("Bob", 30), 
        ("Charlie", 35)]
df = spark.createDataFrame(data, schema)
 
# Access and view the metadata
print(df.schema)
df.printSchema()

Вывод: первая строка вывода будет объектом StructType, а следующая будет древовидным форматом метаданных фрейма данных.

StructType([StructField('name', StringType(), True), StructField('age', IntegerType(), True)])
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

Мы также можем использовать атрибут dtypes, чтобы получить имя столбца и информацию о типе данных в формате кортежа.

df.dtypes

Это вернет список кортежей, каждый из которых содержит имя столбца и тип данных.

[('name', 'string'), ('age', 'int')]

Различные способы обновления метаданных PySpark DataFrame

Существует несколько способов обновить метаданные PySpark DataFrame в зависимости от конкретных изменений, которые необходимо внести. Вот несколько примеров:

Изменить имена столбцов фрейма данных в PySpark

Этот метод используется для изменения имени столбца во фрейме данных. Метод withColumnRenamed() используется для изменения имени столбца. В этом мы собираемся изменить имя столбца «имя» на » имя пользователя «. Ниже приведены шаги по изменению имени столбца.

Шаг 1: Сначала мы импортируем все необходимые модули, а затем создаем искровую сессию.

Шаг 2: Создайте фрейм данных PySpark с именами данных и столбцов, такими как «имя» и «возраст».

Шаг 3: Используйте метод withColumnRenamed(), чтобы изменить имя столбца «имя» на «имя пользователя».

Шаг 4: Вызовите метод printSchema(), чтобы распечатать схему DataFrame после изменения, которое показывает, что имя столбца было изменено на «имя пользователя».

Python3

# Importing required modules
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import SparkSession
 
# Create a SparkSession
spark = SparkSession.builder.appName("Metadata").getOrCreate()
 
# Create a DataFrame
schema = StructType([
    StructField("name", StringType()),
    StructField("age", IntegerType())
])
# Create a DataFrame
data = [("Alice", 25),
        ("Bob", 30),
        ("Charlie", 35)]
df = spark.createDataFrame(data,
                           ["name", "age"])
# print schema of data frame
df.printSchema()
 
# Change column names
df = df.withColumnRenamed("name", "username")
df.printSchema()

Вывод перед изменением имени столбца:

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

Вывод после изменения имени столбца:

root
 |-- username: string (nullable = true)
 |-- age: long (nullable = true)

Изменить типы данных во фрейме данных в PySpark

Этот метод используется для изменения типа данных столбца. Метод cast() используется для изменения типа данных столбца. Например, чтобы изменить тип данных столбца » age » с long на double, мы должны выполнить следующие шаги.

Шаг 1: Сначала мы импортируем все необходимые модули, а затем создаем искровую сессию.

Шаг 2: Создайте фрейм данных с именами данных и столбцов, такими как «имя» и «возраст».

Шаг 3: Используйте метод withColumn() вместе с методом cast(), чтобы изменить тип данных столбца «age» на double.

Шаг 4: Вызовите метод printSchema (), чтобы распечатать схему DataFrame после изменения, которое показывает, что тип данных столбца «age» был изменен на double.

Python3

# Import required modules
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("Metadata").getOrCreate()
# Create a DataFrame
schema = StructType([
    StructField("name", StringType()),
    StructField("age", IntegerType())
])
# Create a DataFrame
data = [("Alice", 25),
        ("Bob", 30),
        ("Charlie", 35)]
df = spark.createDataFrame(data,
                           ["name", "age"])
df.printSchema()
 
# Change column names
from pyspark.sql.types import DoubleType
df = df.withColumn("age",
                   df["age"].cast(DoubleType()))
 
df.printSchema()

Вывод перед изменением типа данных:

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

Вывод после изменения типа данных:

root
 |-- name: string (nullable = true)
 |-- age: double (nullable = true)

Добавить новые столбцы во фрейм данных в PySpark

Этот метод используется для добавления нового столбца во фрейм данных. Метод withColumn() вместе с методом lit() используется для добавления нового столбца. Здесь мы собираемся добавить новый столбец » пол » строкового типа.

Шаг 1: Сначала мы импортируем все необходимые модули, а затем создаем искровую сессию.

Шаг 2: Создайте фрейм данных с именами данных и столбцов, такими как «имя» и «возраст».

Шаг 3: Используйте метод withColumn() вместе с методом lit(), чтобы добавить новый столбец » пол » строкового типа со значением по умолчанию «неизвестно».

Шаг 4: Используйте метод printSchema() для печати схемы DataFrame после изменения, которое показывает, что новый столбец » пол » был добавлен в фрейм данных строкового типа.

Python3

# Import required modules
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("Metadata").getOrCreate()
# Create a DataFrame
schema = StructType([
    StructField("name", StringType()),
    StructField("age", IntegerType())
])
 
# Create a DataFrame
data = [("Alice", 25),
        ("Bob", 30),
        ("Charlie", 35)]
df = spark.createDataFrame(data,
                           ["name", "age"])
df.printSchema()
 
# Add column
from pyspark.sql.functions import lit
df = df.withColumn("gender",
                   lit("unknown"))
 
# Print Schema of data frame
df.printSchema()
df.show()

Вывод перед добавлением нового столбца:

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

Вывод после добавления нового столбца:

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- gender: string (nullable = false)

+-------+---+-------+
|   name|age| gender|
+-------+---+-------+
|  Alice| 25|unknown|
|    Bob| 30|unknown|
|Charlie| 35|unknown|
+-------+---+-------+

Удалить столбцы фрейма данных в Pyspark

В этом мы собираемся удалить столбец фрейма данных, используя метод drop(), который используется для удаления столбца из фрейма данных. Мы собираемся удалить столбец «пол», который мы создали в предыдущем примере. Вот шаги, чтобы сделать это.

Шаг 1: Сначала мы импортируем все необходимые модули, а затем создаем искровую сессию.

Шаг 2: мы создаем фрейм данных с данными, именами столбцов, такими как «имя», «возраст» и «пол».

Шаг 3: Используйте метод drop(), чтобы удалить столбец «пол» из фрейма данных.

Шаг 4: Вызовите метод printSchema(), чтобы распечатать схему DataFrame после изменения, которое показывает, что столбец «пол» был удален.

Python3

# Importing required modules
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import SparkSession
 
# Create a SparkSession
spark = SparkSession.builder.appName("Metadata").getOrCreate()
 
# Create a DataFrame
schema = StructType([
    StructField("name", StringType()),
    StructField("age", IntegerType())
])
 
# Create a DataFrame
data = [("Alice", 25, "female"),
        ("Bob", 30, "male"),
        ("Charlie", 35, "male")]
df = spark.createDataFrame(data,
                           ["name"
                            "age",
                            "gender"])
df.printSchema()
 
# Remove column
df = df.drop("gender")
df.printSchema()

Вывод перед удалением столбца пола:

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)

Вывод после удаления столбца пола:

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

Изменить метаданные столбца фрейма данных в PySpark

В этом мы собираемся изменить метаданные определенного столбца, сделав его «nullable = true» на «nullable = false». Это можно сделать, создав новый объект схемы с помощью класса StructType и передав его методу createDataFrame(). Вот шаги, чтобы сделать это.

Шаг 1: Сначала мы импортируем все необходимые модули, а затем создаем искровую сессию.

Шаг 2: Создайте фрейм данных с данными, именами столбцов как «имя» и «возраст».

Шаг 3: Создайте новый объект схемы, создав список полей с обновленными метаданными, в частности, сделав столбцы недействительными.

Шаг 4: Создайте новый фрейм данных с помощью метода createDataFrame() и передайте ему RDD исходного фрейма данных и новую схему, которая обновит метаданные фрейма данных.

Python3

# Importing required modules
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import SparkSession
 
# Create a SparkSession
spark = SparkSession.builder.appName("Metadata").getOrCreate()
 
# Create a DataFrame
data = [("Alice", 25),
        ("Bob", 30),
        ("Charlie", 35)]
 
df = spark.createDataFrame(data,
                           ["name", "age"])
df.printSchema()
 
# Change column metadata
fields = [StructField(field.name,
          field.dataType,
          False) for field in df.schema.fields]
 
# Store changed data frame in new_schema
new_schema = StructType(fields)
df = spark.createDataFrame(df.rdd,
                           new_schema)
df.printSchema()

Вывод перед изменением метаданных столбца:

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

Вывод после изменения метаданных столбца:

root
 |-- name: string (nullable = false)
 |-- age: long (nullable = false)

Обновить схему с помощью функции withMetadata() в PySpark

Функция withMetadata() не является встроенной функцией PySpark для обновления метаданных DataFrame. Однако мы можем обновить метаданные DataFrame, используя функцию withMetadata(), определяемым пользователем способом.

Шаг 1: Мы начинаем с импорта необходимых модулей, освещенных из pyspark.sql.functions и JSON.

Шаг 2: Создайте DataFrame с именами данных и столбцов как » имя » и » возраст «, используя метод createDataFrame ().

Шаг 3: Мы определяем функцию withMetadata(), которая принимает два аргумента: DataFrame и словарь метаданных.

Шаг 4: Внутри функции мы обновляем метаданные DataFrame, используя различные операции, такие как переименование столбцов, изменение типов данных, добавление и удаление столбцов и изменение метаданных столбцов.

Шаг 5: Преобразуйте метаданные, переданные в виде словаря, в строку JSON с помощью метода json.dumps().

Шаг 6: Добавьте метаданные в DataFrame, добавив новый столбец «метаданные» со значением переданных метаданных в формате строки JSON, используя метод withColumn() и функцию lit().

Шаг 7: Вызовите функцию withMetadata() и передайте DataFrame и метаданные в качестве аргументов.

Шаг 8: Вызовите метод printSchema(), чтобы распечатать схему DataFrame после изменений, которая показывает, что новый столбец «метаданные» был добавлен с переданными метаданными в формате строки JSON.

Вот шаги, как мы можем использовать функцию withMetadata() для обновления метаданных DataFrame:

Python3

# Import required modules
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import lit
 
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("Metadata").getOrCreate()
from pyspark.sql.functions import lit
import json
# Create a DataFrame
data = [("Alice", 25),
        ("Bob", 30),
        ("Charlie", 35)]
df = spark.createDataFrame(data,
                           ["name", "age"])
df.printSchema()
 
# Define a function to update the metadata
def withMetadata(df, metadata):
    # Update the metadata of the DataFrame
    df = df.withColumnRenamed("name",
                              "username")
    df = df.withColumn("age",
                       df["age"].cast("double"))
    df = df.withColumn("gender",
                       lit("unknown"))
    df = df.drop("gender")
    fields = [StructField(field.name,
              field.dataType,
              False) for field in df.schema.fields]
    new_schema = StructType(fields)
    df = spark.createDataFrame(df.rdd,
                               new_schema)
     
    # Add the metadata to the DataFrame
    df = df.withColumn("metadata",
                       lit(json.dumps(metadata)))
    return df
 
# Update the metadata of the DataFrame
df = withMetadata(df, {"source": "file",
                       "date": "2022-01-01"})
df.printSchema()

Вывод перед вызовом функции withMetadata():

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

Вывод после вызова функции withMetadata():

root
 |-- username: string (nullable = false)
 |-- age: double (nullable = false)
 |-- metadata: string (nullable = false)

Заключение

Подводя итог, метаданные в PySpark DataFrames относятся к информации о данных, такой как имена столбцов, типы данных и ограничения. Это важно, потому что он предоставляет важную информацию о структуре и содержании данных и используется PySpark во время таких операций, как запросы, фильтрация и объединение. Чтобы обновить метаданные PySpark DataFrame.

Читайте также:  React или Angular или Vue — какой фреймворк лучше?
Оцените статью
bestprogrammer.ru
Добавить комментарий