Spark submit что это
Submitting Applications
The spark-submit script in Spark’s bin directory is used to launch applications on a cluster. It can use all of Spark’s supported cluster managers through a uniform interface so you don’t have to configure your application especially for each one.
Bundling Your Application’s Dependencies
If your code depends on other projects, you will need to package them alongside your application in order to distribute the code to a Spark cluster. To do this, create an assembly jar (or “uber” jar) containing your code and its dependencies. Both sbt and Maven have assembly plugins. When creating assembly jars, list Spark and Hadoop as provided dependencies; these need not be bundled since they are provided by the cluster manager at runtime. Once you have an assembled jar you can call the bin/spark-submit script as shown here while passing your jar.
Launching Applications with spark-submit
Once a user application is bundled, it can be launched using the bin/spark-submit script. This script takes care of setting up the classpath with Spark and its dependencies, and can support different cluster managers and deploy modes that Spark supports:
Some of the commonly used options are:
† A common deployment strategy is to submit your application from a gateway machine that is physically co-located with your worker machines (e.g. Master node in a standalone EC2 cluster). In this setup, client mode is appropriate. In client mode, the driver is launched directly within the spark-submit process which acts as a client to the cluster. The input and output of the application is attached to the console. Thus, this mode is especially suitable for applications that involve the REPL (e.g. Spark shell).
Alternatively, if your application is submitted from a machine far from the worker machines (e.g. locally on your laptop), it is common to use cluster mode to minimize network latency between the drivers and the executors. Currently, the standalone mode does not support cluster mode for Python applications.
Master URLs
The master URL passed to Spark can be in one of the following formats:
Loading Configuration from a File
The spark-submit script can load default Spark configuration values from a properties file and pass them on to your application. By default, it will read options from conf/spark-defaults.conf in the Spark directory. For more detail, see the section on loading default configurations.
Advanced Dependency Management
Spark uses the following URL scheme to allow different strategies for disseminating jars:
Note that JARs and files are copied to the working directory for each SparkContext on the executor nodes. This can use up a significant amount of space over time and will need to be cleaned up. With YARN, cleanup is handled automatically, and with Spark standalone, automatic cleanup can be configured with the spark.worker.cleanup.appDataTtl property.
More Information
Once you have deployed your application, the cluster mode overview describes the components involved in distributed execution, and how to monitor and debug applications.
Запускаем Apache Spark на Kubernetes
Дорогие читатели, доброго дня. Сегодня поговорим немного про Apache Spark и его перспективы развития.
В современном мире Big Data Apache Spark является де факто стандартом при разработке задач пакетной обработки данных. Помимо этого, он также используется для создания стриминговых приложений, работающих в концепции micro batch, обрабатывающих и отгружающих данные маленькими порциями (Spark Structured Streaming). И традиционно он являлся частью общего стека Hadoop, используя в качестве менеджера ресурсов YARN (или, в некоторых случаях, Apache Mesos). К 2020 году его использование в традиционном виде для большинства компаний находится под большим вопросом в виду отсутствия приличных дистрибутивов Hadoop — развитие HDP и CDH остановлено, CDH недостаточно проработан и имеет высокую стоимость, а остальные поставщики Hadoop либо прекратили своё существование, либо имеют туманное будущее. Поэтому всё больший интерес у сообщества и крупных компаний вызывает запуск Apache Spark с помощью Kubernetes — став стандартом в оркестрации контейнеров и управлении ресурсами в приватных и публичных облаках, он решает проблему с неудобным планированием ресурсов задач Spark на YARN и предоставляет стабильно развивающуюся платформу с множеством коммерческих и открытых дистрибутивов для компаний всех размеров и мастей. К тому же на волне популярности большинство уже успело обзавестись парой-тройкой своих инсталляций и нарастить экспертизу в его использовании, что упрощает переезд.
Начиная с версии 2.3.0 Apache Spark обзавёлся официальной поддержкой запуска задач в кластере Kubernetes и сегодня, мы поговорим о текущей зрелости данного подхода, различных вариантах его использования и подводных камнях, с которыми предстоит столкнуться при внедрении.
Прежде всего, рассмотрим процесс разработки задач и приложений на базе Apache Spark и выделим типовые случаи, в которых требуется запустить задачу на кластере Kubernetes. При подготовке данного поста в качестве дистрибутива используется OpenShift и будут приведены команды, актуальные для его утилиты командной строки (oc). Для других дистрибутивов Kubernetes могут быть использованы соответствующие команды стандартной утилиты командной строки Kubernetes (kubectl) либо их аналоги (например, для oc adm policy).
Первый вариант использования — spark-submit
В процессе разработки задач и приложений разработчику требуется запускать задачи для отладки трансформации данных. Теоретически для этих целей могут быть использованы заглушки, но разработка с участием реальных (пусть и тестовых) экземпляров конечных систем, показала себя в этом классе задач быстрее и качественнее. В том случае, когда мы производим отладку на реальных экземплярах конечных систем, возможны два сценария работы:
Первый вариант имеет право на существование, но влечёт за собой ряд недостатков:
Расскажем подробнее о процессе настройки Spark для локального запуска. Чтобы начать пользоваться Spark его требуется установить:
Собираем необходимые пакеты для работы с Kubernetes:
Полная сборка занимает много времени, а для создания образов Docker и их запуска на кластере Kubernetes в реальности нужны только jar файлы из директории «assembly/», поэтому можно собрать только данный подпроект:
Для запуска задач Spark в Kubernetes требуется создать образ Docker, который будет использоваться в качестве базового. Здесь возможны 2 подхода:
С её помощью можно создавать образы Docker и осуществлять их загрузку в удалённые реестры, но по умолчанию она имеет ряд недостатков:
С её помощью собираем базовый образ Spark, содержащий в себе тестовую задачу для вычисления числа Pi с помощью Spark (здесь
Авторизуемся в кластере OKD с помощью консольной утилиты (здесь
Получим токен текущего пользователя для авторизации в Docker Registry:
Авторизуемся во внутреннем Docker Registry кластера OKD (в качестве пароля используем токен, полученный с помощью предыдущей команды):
Загрузим собранный образ Docker в Docker Registry OKD:
Проверим, что собранный образ доступен в OKD. Для этого откроем в браузере URL со списком образов соответствующего проекта (здесь
Для запуска задач должен быть создан сервисный аккаунт с привилегиями запуска подов под root (в дальнейшем обсудим этот момент):
Выполним команду spark-submit для публикации задачи Spark в кластере OKD, указав созданный сервисный аккаунт и образ Docker:
—name — имя задачи, которое будет участвовать в формировании имени подов Kubernetes;
—class — класс исполняемого файла, вызываемый при запуске задачи;
—conf — конфигурационные параметры Spark;
spark.executor.instances — количество запускаемых экзекьюторов Spark;
spark.kubernetes.authenticate.driver.serviceAccountName — имя служебной учётной записи Kubernetes, используемой при запуске подов (для определения контекста безопасности и возможностей при взаимодействии с API Kubernetes);
spark.kubernetes.namespace — пространство имён Kubernetes, в котором будут запускаться поды драйвера и экзекьютеров;
spark.submit.deployMode — способ запуска Spark (для стандартного spark-submit используется «cluster», для Spark Operator и более поздних версий Spark «client»);
spark.kubernetes.container.image — образ Docker, используемый для запуска подов;
spark.master — URL API Kubernetes (указывается внешний так обращение происходит с локальной машины);
local:// — путь до исполняемого файла Spark внутри образа Docker.
Переходим в соответствующий проект OKD и изучаем созданные поды — https://
Для упрощения процесса разработки может быть использован ещё один вариант, при котором создаётся общий базовый образ Spark, используемый всеми задачами для запуска, а снэпшоты исполняемых файлов публикуются во внешнее хранилище (например, Hadoop) и указываются при вызове spark-submit в виде ссылки. В этом случае можно запускать различные версии задач Spark без пересборки образов Docker, используя для публикации образов, например, WebHDFS. Отправляем запрос на создание файла (здесь
При этом будет получен ответ вида (здесь
Загружаем исполняемый файл Spark в HDFS (здесь
После этого можем делать spark-submit с использованием файла Spark, загруженного на HDFS (здесь
При этом надо заметить, что для доступа к HDFS и обеспечения работы задачи может потребоваться изменить Dockerfile и скрипт entrypoint.sh — добавить в Dockerfile директиву для копирования зависимых библиотек в директорию /opt/spark/jars и включить файл конфигурации HDFS в SPARK_CLASSPATH в entrypoint.sh.
Второй вариант использования — Apache Livy
Далее, когда задача разработана и требуется протестировать полученный результат, возникает вопрос её запуска в рамках процесса CI/CD и отслеживания статусов её выполнения. Конечно, можно запускать её и с помощью локального вызова spark-submit, но это усложняет инфраструктуру CI/CD поскольку требует установку и конфигурацию Spark на агентах/раннерах CI сервера и настройки доступа к API Kubernetes. Для данного случая целевой реализацией выбрано использование Apache Livy в качестве REST API для запуска задач Spark, размещённого внутри кластера Kubernetes. С его помощью можно запускать задачи Spark на кластере Kubernetes используя обычные cURL запросы, что легко реализуемо на базе любого CI решения, а его размещение внутри кластера Kubernetes решает вопрос аутентификации при взаимодействии с API Kubernetes.
Выделим его в качестве второго варианта использования — запуск задач Spark в рамках процесса CI/CD на кластере Kubernetes в тестовом контуре.
Немного про Apache Livy — он работает как HTTP сервер, предоставляющий Web интерфейс и RESTful API, позволяющий удалённо запустить spark-submit, передав необходимые параметры. Традиционно он поставлялся в составе дистрибутива HDP, но также может быть развёрнут в OKD или любой другой инсталляции Kubernetes с помощью соответствующего манифеста и набора образов Docker, например, этого — github.com/ttauveron/k8s-big-data-experiments/tree/master/livy-spark-2.3. Для нашего случая был собран аналогичный образ Docker, включающий в себя Spark версии 2.4.5 из следующего Dockerfile:
Созданный образ может быть собран и загружен в имеющийся у вас репозиторий Docker, например, внутренний репозиторий OKD. Для его развёртывания используется следующий манифест (
После его применения и успешного запуска пода графический интерфейс Livy доступен по ссылке: http://
Выполним первый запрос из коллекции, перейдём в интерфейс OKD и проверим, что задача успешно запущена — https://
Теперь покажем механизм работы Livy. Для этого изучим журналы контейнера Livy внутри пода с сервером Livy — https://
Третий вариант использования — Spark Operator
Теперь, когда задача протестирована, встаёт вопрос её регулярного запуска. Нативным способом для регулярного запуска задач в кластере Kubernetes является сущность CronJob и можно использовать её, но в данный момент большую популярность имеет использование операторов для управления приложениями в Kubernetes и для Spark существует достаточно зрелый оператор, который, в том числе, используется в решениях Enterprise уровня (например, Lightbend FastData Platform). Мы рекомендуем использовать его — текущая стабильная версия Spark (2.4.5) имеет достаточно ограниченные возможности по конфигурации запуска задач Spark в Kubernetes, при этом в следующей мажорной версии (3.0.0) заявлена полноценная поддержка Kubernetes, но дата её выхода остаётся неизвестной. Spark Operator компенсирует этот недостаток, добавляя важные параметры конфигурации (например, монтирование ConfigMap с конфигурацией доступа к Hadoop в поды Spark) и возможность регулярного запуска задачи по расписанию.
Выделим его в качестве третьего варианта использования — регулярный запуск задач Spark на кластере Kubernetes в продуктивном контуре.
Spark Operator имеет открытый исходный код и разрабатывается в рамках Google Cloud Platform — github.com/GoogleCloudPlatform/spark-on-k8s-operator. Его установка может быть произведена 3 способами:
Если оператор установлен корректно, то в соответствующем проекте появится активный под с оператором Spark (например, cloudflow-fdp-sparkoperator в пространстве Cloudflow для установки Cloudflow) и появится соответствующий тип ресурсов Kubernetes с именем «sparkapplications». Изучить имеющиеся приложений Spark можно следующей командой:
Для запуска задач с помощью Spark Operator требуется сделать 3 вещи:
В данном манифесте указана сервисная учётная запись, для которой требуется до публикации манифеста создать необходимые привязки ролей, предоставляющие необходимые права доступа для взаимодействия приложения Spark с API Kubernetes (если нужно). В нашем случае приложению нужны права на создание Pod’ов. Создадим необходимую привязку роли:
Также стоит отметить, что в спецификации данного манифеста может быть указан параметр «hadoopConfigMap», который позволяет указать ConfigMap с конфигурацией Hadoop без необходимости предварительного помещения соответствующего файла в образ Docker. Также он подходит для регулярного запуска задач — с помощью параметра «schedule» может быть указано расписание запуска данной задачи.
После этого сохраняем наш манифест в файл spark-pi.yaml и применяем его к нашему кластеру Kubernetes:
При этом создастся объект типа «sparkapplications»:
При этом будет создан под с приложением, статус которого будет отображаться в созданном «sparkapplications». Его можно посмотреть следующей командой:
По завершении задачи POD перейдёт в статус «Completed», который также обновится в «sparkapplications». Логи приложения можно посмотреть в браузере или с помощью следующей команды (здесь
Также управление задачами Spark может быть осуществлено с помощью специализированной утилиты sparkctl. Для её установки клонируем репозиторий с её исходным кодом, установим Go и соберём данную утилиту:
Изучим список запущенных задач Spark:
Создадим описание для задачи Spark:
Запустим описанную задачу с помощью sparkctl:
Изучим список запущенных задач Spark:
Изучим список событий запущенной задачи Spark:
Изучим статус запущенной задачи Spark:
В заключение хотелось бы рассмотреть обнаруженные минусы эксплуатации текущей стабильной версии Spark (2.4.5) в Kubernetes:
Русские Блоги
[Режим работы Spark]
Обычно используются следующие параметры при использовании Spark-Submit для отправки задач Spark:
Режим развертывания предназначен для кластера, который относится к режиму развертывания кластера и разделен на два метода в зависимости от того, где размещен основной процесс драйвера: клиент и кластер, по умолчанию это клиент
1. Клиентский режим
1. В режиме клиента процесс Driver выполняется на главном узле, а не на узле Worker, поэтому по сравнению с кластером Worker, участвующим в реальных вычислениях, драйвер эквивалентен стороннему «клиенту»
2. Поскольку процесс Driver не находится на узле Worker, он независим и не будет потреблять ресурсы кластера Worker.
3. В режиме клиента узлы Master и Worker должны находиться в одной и той же локальной сети, так как Drive должен взаимодействовать с Executorr. Например, Drive должен распространять пакет Jar для Executor через Netty HTTP. Driver должен назначать задачи для Executor и т. Д.
4. В клиентском режиме отсутствует контролируемый механизм перезапуска. Если процесс Driver зависает, требуется дополнительный перезапуск программы.
два, cluster mode
2. Драйвер программы занимает ресурсы рабочего
4. Главный узел и рабочий узел, как правило, не находятся в одной и той же локальной сети в режиме кластера, поэтому пакет Jar нельзя распределить по каждому рабочему, поэтому в режиме кластера необходимо, чтобы пакет Jar был заранее помещен в каталог, соответствующий каждому рабочему.
Вы выбираете режим клиента или режим кластера?
Вообще говоря, если узел, отправляющий задачу (т. Е. Master) и кластер Worker, находятся в одной сети, клиентский режим является более подходящим
Если узел, отправляющий задачу, находится далеко от рабочего кластера, режим кластера будет использоваться для минимизации задержки в сети между драйвером и исполнителем.
Режим работы Spark: кластер и клиент
1. client mode, childMainClass = mainClass
2. standalone cluster mde, childMainClass = org.apache.spark.deploy.Client
3. yarn cluster mode, childMainClass = org.apache.spark.deploy.yarn.Client
Интеллектуальная рекомендация
Многослойная презентацияViewController Jap
Распечатать список с конца до головы
В случае, когда таблица цепи не может изменять дисплей, данные хранения стека могут рассматриваться с рекурсивным методом. Разрешить модификацию структуры ссылки.
Типы данных и переменные
тип данных Компьютерная программа может обрабатывать различные значения. Однако компьютеры могут обрабатывать гораздо больше, чем числовые значения. Они также могут обрабатывать различные данные, таки.
оглавление 1. Одно место 2. Случайное расположение 3. Добавьте баллы для оценки 4. Получение файла 5. Установите уровень сложности. 6. Срок завершения 7. Выберите заполнение пропусков. 1. Одно место Н.
Практика использования Spark SQL, или Как не наступить на грабли
Если вы работаете с SQL, то вам это будет нужно очень скоро. Apache Spark – это один из инструментов, входящих в экосистему Hadoop, который обрабатывает данные в оперативной памяти. Одним из его расширений является Spark SQL, позволяющий выполнять SQL-запросы над данными. Spark SQL удобно использовать для работы посредством SQL-запросов с большими объемами данных и в системах с высокой нагрузкой.
Ниже вы найдёте некоторые нехитрые приёмы по работе со Spark SQL:
Spark SQL помогает в обработке данных многим компаниям из Global 100. При простоте разработки Spark SQL даёт на выходе высокую производительность, если использовать его правильно. Spark SQL можно использовать в различных ETL процессах при обработке и загрузке данных. Альтернативами Spark SQL при работе с данными можно назвать использование Impala SQL или Hive (c обработкой SQL-запросов в mapreduce). Если же выходить за рамки SQL, то альтернатива – использование в Spark языка DSL. Не приходится сомневаться в перспективности языка SQL, существующего десятилетия и имеющего миллионы пользователей. Немалые перспективы развития имеет и Spark SQL. Например, с новыми версиями Spark появляется возможность использовать новые оптимизационные хинты, способы обработки данных становятся эффективнее. Spark развивается от версии к версии в погоне за повышением эффективности обработки данных и интеграцией с DeepLearning. Меняются в сторону улучшения подходы: RDD, DataFrame, DataSet. Тем самым, изучение и применение Spark SQL – актуально и многообещающе. Для компаний же использование Spark SQL ведёт в конечном итоге к накоплению знаний о клиентах, их обработке и построении новых бизнесов на новых знаниях.
Для более тонкой настройки загрузки данных посредством Spark потребуется выйти за рамки Spark SQL и использовать, например, DSL.
Уточним, что речь в статье идёт о работе с версией Apache Spark 2.3, загрузка данных осуществляется из Hadoop в Hadoop, управление ресурсами осуществляется посредством Yarn, используется операционная система Linux, а также СУБД Hive с сохранением данных в hdfs в формате parquet.
Код приложения, запускающего переданный ему в качестве параметра файл с командой SQL полностью приводить не будем, напишем лишь, что в приложении нужно открыть переданный файл, загрузить его содержимое в переменную sqlStr и вызвать:
Получится приложение *.jar, которому можно передавать в качестве параметра файл с командой SQL и выполнять его посредством spark-submit:
При этом в передаваемом файле sqlFile можно указывать команды SQL, например:
Сбор и просмотр статистики для Spark, хинт BROADCAST
Как известно, одной из наиболее сложных операций для Spark является соединение (join) таблиц или датасетов. При этом в Spark существуют различные алгоритмы реализации join-ов: SortMergeJoin, BroadcastHashJoin, CartesianProduct и др. Чтобы помочь Spark автоматически понять, большие или маленькие таблицы он соединяет и какой тип соединения оптимален, нужно собрать статистику по этим таблицам. Для этого посредством spark-submit вызываем команду вида:
Эту команду сбора статистики мы можем передать вышеописанному модулю sqlrunner.jar в качестве параметра вместо команды SQL. Для выполнения этой команды требуются права на чтение и запись в таблицу, по которой собирается статистика.
Проверить, что статистика собрана, можно в среде hive командой вида:
Нужно посмотреть, появились ли в конце описания в блоке TBLPROPERTIES свойства ‘spark.sql.statistics.numRows’ и ‘spark.sql.statistics.totalSize’:
Таким образом, целесообразно иметь собранную статистику по таблицам-источникам, по используемым в расчётах промежуточным таблицам, а также по таблицам витрин, которые тоже могут быть использованы в соединениях в пользовательских запросах.
Теперь скажем пару слов о том, что такое broadcast в Spark. При присоединении маленькой таблицы к большому датасету часто оказывается целесообразным не распределять маленькую таблицу на различные ноды по ключам соединения в виде порций данных, а поместить её целиком на все ноды кластера, использующиеся в приложении. При этом говорят, что осуществляется broadcast маленькой таблицы на все ноды и тип присоединения маленькой таблицы к основному массиву данных – broadcast join.
Зачастую, broadcast join нужен в Spark SQL в тех случаях, когда в реляционных базах данных требуется nested loop join. Особенно необходим broadcast при неравномерной статистике распределения данных в участвующих в соединении ключах, например, при присоединении маленького справочника валют к большой таблице проводок, в которой 99% строк относится к рублям.
Для того, чтобы указать Spark, что надо сделать broadcast какой-то небольшой таблицы или датасета (до
1Gb), можно в SQL-запросе указывать хинт /*+ BROADCAST(t)*/, где t – алиас таблицы или датасета.
В том числе, если по какой-то таблице статистика не собрана или мы имеем дело с подзапросом, результат которого формируется “на лету” и Spark заранее не знает, большой ли он, то хинт /*+ BROADCAST(t)*/ особенно целесообразен, если в таблице или подзапросе мало данных.
Если broadcast какого-то подзапроса завершается позже чем, через 5 минут с начала выполнения запроса, то стоит вызывать spark-submit с нижеприведённым параметром (здесь 36000 – время в секундах), потому как выделенных по умолчанию 5 минут может не хватить:
Например, такой параметр пригодится в нижеприведённом запросе, где делается broadcast результата подзапроса /*+ BROADCAST(small) */, который становится готов не с самого начала работы запроса, а на более поздних этапах:
Убедиться, что хинт учтён оптимизатором и broadcast реально осуществлён можно, посмотрев в Yarn по ссылке в столбце Tracking UI у соответствующего приложения Spark план выполнения запроса на закладке SQL:
Отметим, что в Spark SQL есть и другие хинты, в т.ч. с версии 2.4 появляются хинты /*+ COALESCE(n) */, где n – количество партиций, на которые будет разбит результат, и /* + REPARTITION (n) */, где n – количество партиций при repartition.
Обход SKEWED JOIN явным указанием SKEWED ключей
Рассмотрим две таблицы:
Рассмотрим случай, когда значения ключей, которые встречаются в одной из таблиц, распределены неравномерно.
Пусть в таблице B на значения ключей BID 4089, 4107, 4468 и 6802 приходится в сумме 70% строк, в то время как на миллион прочих значений ключей BID приходится лишь оставшиеся 30% строк.
При этом будем считать, что в таблице A значения ключей AID распределены более-менее равномерно, при этом значения 4089, 4107, 4468 и 6802 встречаются по одному разу.
В таком случае это соединение таблиц называется skewed join, т.е. “перекошенное соединение”, а слишком частые ключи называются skewed keys.
По умолчанию задача соединения двух таблиц, выполненная посредством spark-submit, будет разбита на 200 партиций. Изменить эту настройку по умолчанию можно, задав другое значение конфигурации spark.sql.shuffle.partitions, например:
Как Spark будет обрабатывать skewed join?
Spark без подсказки не знает, что данные распределены неравномерно, и распределит задачу соединения таблиц так: большинство значений ключей попадёт в “лёгкие” партиции, которые быстро обработают небольшой объём данных; при этом обработка skewed keys попадёт в “тяжёлые” партиции.
Картинки ниже показывают пример такого случая: 999 из 1000 партиций отработали быстро, выдав небольшое количество итоговых строк, а одна партиция, в которую попали почти все данные, работала несколько часов.
Как сделать распределение соединяемых данных по партициям более равномерным?
Как вариант, нужно соединять таблицы не по одному ключу A.AID = B.BID, а по паре ключей, при этом распределение данных по паре ключей сделать уже более равномерным.
Сделаем допущение, что в таблице B имеется также числовое поле BID2, данные в котором распределены равномерно (в отличие от BID). Если такового равномерно распределенного числового поля нет, то его можно сгенерировать работающей в hive и spark функцией hash() из конкатенации других полей. В таком случае функция hash() выдаст равномерно распределённые данные.
Нижеприведённый работающий под spark запрос решает задачу. Этот запрос целесообразно применить вместо “select A.*, B.* from A inner join B on A.AID = B.BID;”
В секции with приведён пример вышеописанных данных для таблиц A и B. Имеет место перекошенность данных в ключе BID. Формируется подзапрос ANTISKEW, в котором перекошенные значения ключей 4089, 4107, 4468 и 6802 размножены до 10 экземпляров каждое. Данные из таблицы А соединяются с подзапросом ANTISKEW, тем самым сформировав в массиве A_ для ключей AID, равных 4089, 4107, 4468 и 6802, по 10 экземпляров с различными значениями ANTISKEWKEY от 0 до 9. При этом для прочих значений AID в массиве A_ будет по одному экземпляру с ANTISKEWKEY = 0. Далее массив A_ соединяется с таблицей B не только по условию A_.AID = B.BID, но и по второй паре ключей. А именно, для перекошенных значений ключей BID многочисленные записи из таблицы B соединятся с одним из значений ANTISKEWKEY от 0 до 9 в зависимости от остатка от деления BID2 на 10. Неперекошенные значения BID соединятся один к одному с ANTISKEWKEY = 0. Тем самым будет достигнуто более равномерное распределение перекошенных ключей BID по партициям – они пойдут не в одну, а в 10 партиций.
Для ещё более равномерного распределения данных нужно подбирать в подзапросе ANTISKEW, на сколько именно партиций хочется разбить каждый ключ. Описанный метод требует знания статистики – по каким перекошенным ключам сколько процентов строк следует ожидать.
Партиционирование помогает сделать BROADCAST
Ниже описан способ, как в Spark SQL делать broadcast join вместо shuffle join в случае, если размер меньшей таблицы слишком велик для broadcast.
Допустим, нужно соединить таблицу big_table (100Gb)
с таблицей small_table (4Gb)
При этом предполагается, что размер таблицы small_table слишком велик, чтобы сделать её broadcast, о чём выдаётся ошибка.
Целью является заполнить результирующую таблицу:
Соединение по ключу id при этом предполагается нормально распределённым.
Сделаем обе соединяемые таблицы партиционированными:
При заполнении обеих партиционированных таблиц big_table и small_table ключ партиционирования part_mod сделаем равным остатку от деления id на 4:
Заведём псевдотаблицу, смотрящую своим location на одну партицию big_table (part_mod = 0):
Также заведём псевдотаблицу, смотрящую своим location на одну партицию small_table (part_mod = 0):
Теперь мы можем записать в результирующую таблицу одну четверть требуемых данных из соединения вышеописанных псевдотаблиц.
При этом проверено, что хинт BROADCAST успешно срабатывает:
Следующими тремя шагами мы можем аналогично дописать в результирующую таблицу оставшиеся три четверти данных с part_mod = 1, part_mod = 2, part_mod = 3.
Итоговая продолжительность загрузки результирующей таблицы «методом поэтапного broadcast партиций» обычно получается на одинаковых ресурсах меньше, чем методом shuffle join. Однако такой метод требует, чтобы таблицы с исходными данными были заранее партиционированы одинаковым образом, т.е. по остатку от деления id на какое-либо число, по которому соединяемся. Если id не целочисленное, то можно брать остаток от деления hash(id) на какое-либо число. Особенно легко организовать, чтобы заранее партиционированными были промежуточные (стейджинговые) таблицы, дизайн которых находится на усмотрении разработчика.
Мониторинг ресурсов
Как известно, при работе с озером данных важно правильно выделять ресурсы задачам по загрузке данных. В противном случае имеются риски падения задачи от нехватки ресурсов, слишком длительного выполнения задачи, либо неоправданно высокого выделения ресурсов в ущерб другим задачам.
Как правильно выделять ресурсы – отдельная тема. В этой же статье мы поговорим о том, как понять:
Сначала опишем первый вид мониторинга (“мониторинг yarn”).
Строки в этой таблице имеют вид:
Затем можно на том же Spark SQL написать запрос к этой текстовой таблице, в котором распарсить значения показателей использованных ресурсов (MB-seconds, vcore-seconds) на каждый момент осуществления мониторинга. Далее в этом запросе можно визуализировать эти показатели в виде графиков в формате векторной графики *.svg:
Результат запроса можно сохранить в файл *.svg и посмотреть в браузере. Пример картинки с результатами мониторинга ресурсов, использованных группой наблюдаемых приложений Yarn, приведен ниже. Красным цветом показана динамика использованной памяти, синим цветом — динамика использованных процессорных ядер:
На основании такой визуализации можно принимать решения об изменении расписания выполнения определённых приложений Yarn с целью более равномерного использования ресурсов в течение суток.
Теперь опишем второй вид мониторинга (“мониторинг spark”).
Итак, интересна ситуация, когда Yarn выделил задаче Spark, например, 200 ядер и не отдаёт эти ресурсы другим задачам. При этом в реальности 199 из 200 ядер уже не используются в приложении Spark, а работу реально продолжает только одно ядро. Этот вид мониторинга среди прочего нацелен на поиск таких ситуаций.
Считаем, что подвергаемые мониторингу приложения Spark вызываются из некого управляющего механизма. В случае Сбербанка это, например, Oozie или Informatica BDM. Таким образом, управляющий механизм может по завершению работы приложения Spark скопировать его лог в hdfs и запустить маленькое приложение на Spark SQL по парсингу этого лога (или группы логов) и визуализации динамики реально использовавшихся Spark ядер в формате *.svg.
Итак, с помощью команды нижеприведённого вида мы:
В результате в hdfs получаем строки вида:
При парсинге в логе приложения Spark ищутся события начала использования и высвобождения ядер кластера тем или иным контейнером, т.е. мы ищем время записи в лог событий по шаблонам:
Таким образом, на любой момент времени известно, сколько процессорных ядер было активно при работе приложения Spark, т.е. начали и еще не завершили свою работу.
Для парсинга собранных в hdfs логов формируется таблица spark_monitoring в текстовом формате:
Как было написано выше, парсинг логов осуществляется силами Spark SQL. Т.е. написан запрос к этой таблице spark_monitoring, который на выходе даёт картинку в формате *.svg с динамикой реального использования ядер кластера приложением Spark.
Результат запроса можно сохранить в файл *.svg и посмотреть в браузере. Пример картинки приведен ниже:
На основании таких графиков можно понять, эффективно ли приложение Spark использует выделенные ему процессорные ядра кластера. Можно смотреть приложения массово. Резкие скачки вверх-вниз, видные на графиках – это переходы между этапами (stages) приложений Spark. Когда заканчивается какой-то этап по чтению или соединению массивов данных и по одному освобождаются все ядра, получается снижение графика к нулю. А вначале выполнения новых этапов идёт резкий рост используемых ядер. Ситуация, когда график притянут к максимуму выделенных ядер и скачки между этапами резкие – это признак оптимального использования ресурсов. Медленные же снижения могут свидетельствовать, например, о skew joins, о которых рассказывалось в этой статье выше.
Автор: Михаил Гричик, эксперт профессионального сообщества Сбербанка SberProfi DWH/BigData.
Профессиональное сообщество SberProfi DWH/BigData отвечает за развитие компетенций в таких направлениях, как экосистема Hadoop, Teradata, Oracle DB, GreenPlum, а также BI инструментах Qlik, SAP BO, Tableau и др.