Разделение данных в PySpark

Как стать программистом без образования с нуля Изучение

В этой статье мы изучим разделение данных с помощью PySpark в Python.

В PySpark разделение данных относится к процессу разделения большого набора данных на более мелкие фрагменты или разделы, которые могут обрабатываться одновременно. Это важный аспект распределенных вычислений, поскольку он позволяет более эффективно обрабатывать большие наборы данных за счет разделения рабочей нагрузки между несколькими машинами или процессорами.

Преимущества разделения данных:

  1. Улучшенная производительность: благодаря разделению данных на более мелкие разделы их можно обрабатывать параллельно на нескольких компьютерах, что приводит к сокращению времени обработки и повышению производительности.
  2. Масштабируемость. Разделение обеспечивает горизонтальную масштабируемость, а это означает, что по мере роста объема данных к кластеру можно добавить больше машин для обработки возросшей нагрузки без внесения изменений в код обработки данных.
  3. Улучшенная отказоустойчивость. Разделение также позволяет распределять данные по нескольким компьютерам, что может помочь предотвратить потерю данных в случае отказа одного компьютера.
  4. Организация данных: секционирование позволяет организовать данные более значимым образом, например, по периоду времени или географическому местоположению, что может упростить анализ и запрос данных.

В этой статье мы увидим различные методы разделения данных.

Методы разделения данных в PySpark

  1. Разбиение хэша
  2. Разделение диапазона
  3. Использование разделаBy

Использование хэш-разбиения

Это метод разделения по умолчанию в PySpark. Он работает, присваивая уникальное хэш-значение каждой записи на основе указанного столбца, а затем помещая запись в соответствующий раздел. Это гарантирует, что записи с одинаковым значением для указанного столбца будут помещены в один и тот же раздел. Разделение по хешу — это метод разделения набора данных на разделы на основе хеш-значений указанных столбцов.

Шаги для реализации секционирования хэша:

Шаг 1: Сначала мы импортируем все необходимые библиотеки и создадим образец DataFrame с тремя столбцами id, name и age.

Читайте также:  10 советов, как максимизировать производительность кода Python в 2023 году

Шаг 2: Используйте функцию перераспределения, чтобы выполнить разбиение хэша в DataFrame на основе столбца идентификатора. Укажем, что хотим создать четыре раздела.

Шаг 3: Мы можем проверить разделение, используя метод rdd для доступа к базовому RDD, а затем вызвав метод glom, который возвращает массив всех элементов в каждом разделе.

Вот полный код:

Python3

# Import required modules
from pyspark.sql import SparkSession
 
# Create a SparkSession
spark = SparkSession.builder.appName("hash\
              _partitioning").getOrCreate()
 
# Create a sample DataFrame
df = spark.createDataFrame([
    (1, "Alice", 25),
    (2, "Bob", 30),
    (3, "Charlie", 35),
    (4, "Dave", 40),
    (5, "Eve", 45),
    (6, "Frank", 50)
], ["id", "name", "age"])
 
# Print the DataFrame
df.show()
# Perform hash partitioning on the 
# DataFrame based on the "id" column
df = df.repartition(4, "id")
 
# Print the elements in each partition
print(df.rdd.glom().collect())

Вывод: в приведенном ниже выводе мы видим фрейм данных, к которому мы собираемся применить раздел, а под этим фреймом данных можно увидеть вложенный массив, который содержит разделенные данные.

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 25|
|  2|    Bob| 30|
|  3|Charlie| 35|
|  4|   Dave| 40|
|  5|    Eve| 45|
|  6|  Frank| 50|
+---+-------+---+

[[Row(id=2, name='Bob', age=30), Row(id=4, name='Dave', age=40), Row(id=5, name='Eve', age=45)],
 [Row(id=1, name='Alice', age=25), Row(id=6, name='Frank', age=50)],
 [],
 [Row(id=3, name='Charlie', age=35)]]

Использование разбиения по диапазону

Этот метод включает в себя разделение данных на разделы на основе диапазона значений для указанного столбца. Например, мы можем разбить набор данных на основе диапазона дат, при этом каждый раздел будет содержать записи за определенный период времени. В этом методе мы будем использовать функцию repartitionByRange () для разделения диапазонов в DataFrame на основе столбца возраста.

Python3

from pyspark.sql import SparkSession
 
# Create a SparkSession
spark = SparkSession.builder.appName("range\
              _partitioning").getOrCreate()
 
# Create a sample DataFrame
df = spark.createDataFrame([
    (1, "Alice", 25),
    (2, "Bob", 30),
    (3, "Charlie", 35),
    (4, "Dave", 40),
    (5, "Eve", 45),
    (6, "Frank", 50)
], ["id", "name", "age"])
 
# Perform range partitioning on the 
# DataFrame based on the "age" column
df = df.repartitionByRange(3, "age")
 
# Print the elements in each partition
print(df.rdd.glom().collect())

Вывод: в приведенном ниже выводе мы видим, что фрейм данных разделен на три части, как указано в функции repartitionByRange().

[[Row(id=1, name='Alice', age=25), Row(id=2, name='Bob', age=30)],
 [Row(id=3, name='Charlie', age=35), Row(id=4, name='Dave', age=40)],
 [Row(id=5, name='Eve', age=45), Row(id=6, name='Frank', age=50)]]

Использование метода partitionBy()

Метод partitionBy() в PySpark используется для разделения DataFrame на более мелкие, более управляемые разделы на основе значений в одном или нескольких столбцах. Метод принимает одно или несколько имен столбцов в качестве аргументов и возвращает новый DataFrame, секционированный на основе значений в этих столбцах. Здесь мы будем использовать набор данных крикета, который можно скачать по этой ссылке Cricket_data_set_odi.csv. Давайте посмотрим, как разделить данные с помощью функции partitionBy().

Шаг 1: Импортируйте необходимые модули и прочитайте CSV-файл, а затем распечатайте его схему.

Python3

# importing module
import pyspark
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# create DataFrame
df=spark.read.option(
"header",True).csv("Cricket_data_set_odi.csv")
 
# Display schema
df.printSchema()

Вывод :

Импортируйте необходимые модули и прочитайте CSV

Шаг 2: На этом этапе мы будем использовать два столбца «Команда» и «Специализация». Все разделы, основанные на командах и их специализациях, хранятся в папке «Team-Speciality» с помощью функции write.option(), а разделение выполняется с помощью функции partitionBy().

Python3

# From above DataFrame, we will be using
# Team and Speciality as a partition key
# for our examples below.
# partitionBy()
df.write.option("header", True) \
        .partitionBy("Team", "Speciality") \
        .mode("overwrite") \
        .csv("Team-Speciality")

Вывод: в этом выводе мы видим обзор папок разделов.

воде мы видим обзор папок

Заключение

Важно отметить, что разделение данных может существенно повлиять на производительность приложения PySpark. Правильное разбиение может значительно повысить скорость и эффективность кода, в то время как неправильное разбиение может привести к снижению производительности и неэффективному использованию ресурсов.

Оцените статью
bestprogrammer.ru
Добавить комментарий