Stream api forkjoinpool как связаны что это такое

API Java 8 Streams как дружественный фасад ForkJoinPool

Одна из функций, которые мне больше всего нравятся в Java 8, — это API потоков. Наконец, он устраняет практически все циклы из кода и позволяет вам писать код, который будет намного более выразительным и целенаправленным.

Проблема: Исполнители Бойлерплейт

Допустим, мы хотим запустить несколько задач параллельно. Ничего особенного, скажем, каждый из них просто выводит имя исполняющего потока (так что мы можем видеть, что он работает параллельно). Мы хотим возобновить выполнение после того, как они все сделали.

Теперь это много кода! Но мы можем сделать лучше.

Решение: Stream API

В конце концов я придумал эту утилиту:

Многоразовые и все. Назовите это как:

Этот распечатывает следующее. Обратите внимание, что он на самом деле также использует основной поток — так как он все равно остается заложником и не может возобновить работу до завершения выполнения.

Другой пример: параллельные вычисления

Вот еще один пример. Вместо того, чтобы делать то же самое N раз, мы можем использовать потоковый API для параллельной обработки ряда различных задач. Мы можем создать («затравить») поток с любой коллекцией или набором значений, иметь функцию, выполняемую на них параллельно, и, наконец, агрегировать результаты (собрать в коллекцию, уменьшить до одного значения и т. Д.)

Давайте посмотрим, как мы можем вычислить сумму первых 45 чисел Фибоначчи:

Он многого достигает в одной строке кода. Сначала он создает поток с описаниями всех задач, которые мы хотим выполнять параллельно. Затем он вызывает функцию для всех из них параллельно. Наконец, он возвращает сумму всех этих результатов.

Это не все, что придумано. Я легко могу представить создание потока с произвольными значениями (включая богатые объекты Java) и выполнение нетривиальной операции над ними. Это не имеет значения, организуя все, что будет выглядеть одинаково.

Когда это сделать?

Я думаю, что это решение довольно хорошо для всех случаев, когда вы знаете загрузку заранее, и вы хотите разветвить выполнение на несколько потоков и продолжить после того, как все они будут выполнены. Я нуждался в этом для некоторого тестового кода, но он, вероятно, хорошо работал бы во многих других сценариях разветвления / соединения или «разделяй и властвуй».

Очевидно, что это не работает, если вы хотите запустить что-то в фоновом режиме и возобновить выполнение или если вы хотите, чтобы фоновый исполнитель работал в течение длительного периода времени.

Источник

Java и сопутствующие товары

пятница, 11 ноября 2016 г.

Stream API и Fork/Join. Параллельные вычисления. Java 8 [10 min reading]

Stream. Что это?

Определим Stream как свободную последовательность элементов, которая не хранит никаких данных и использует коллекции как ресурс. Соответственно Stream не предоставляет непосредственного доступа к данным, а дает возможность применить к ресурсу данных вычислительные операции, которые могут быть выполнены последовательно либо параллельно. Эти вычисления в свою очередь «ленивые» и будут выполнены только после вызова терминальной операции.

Все вместе(ресурс данных, последовательность агрегирующих операций и терминальная операция) это называется stream pipeline.

Stream api forkjoinpool как связаны что это такое. Смотреть фото Stream api forkjoinpool как связаны что это такое. Смотреть картинку Stream api forkjoinpool как связаны что это такое. Картинка про Stream api forkjoinpool как связаны что это такое. Фото Stream api forkjoinpool как связаны что это такое

Stream и параллельные вычисления

В разрезе данной темы для нас важным моментом является как раз возможность выполнять вычисления параллельно. И это не требует от нас ни единой строчки многопоточного кода. Или parallel(), когда мы работаем с готовым stream’ом или parallelStream(), когда создаем его сами.

Но все же интересно, как это работает.

Когда вы запускаем код на многоядерном процессоре, Java 8 «распараллеливает» наш stream, на несколько stream’ов, каждый из которых в отдельном потоке выполняет свою подзадачу и результаты объединяются вместе. За это отвечает наш Fork/Join Framework из прошлого поста.

Пример. Сумма элементов в листе с reduce

Рассмотрим простой пример:
Этот метод считает последовательно сумму элементов. Выполним его:
Вывод:

Последовательное вычисление

Stream api forkjoinpool как связаны что это такое. Смотреть фото Stream api forkjoinpool как связаны что это такое. Смотреть картинку Stream api forkjoinpool как связаны что это такое. Картинка про Stream api forkjoinpool как связаны что это такое. Фото Stream api forkjoinpool как связаны что это такое

Параллельное вычисление

Теперь вызовем parallelStream():
В этом раз вывод другой:
Посмотрим на имплементацию reduce в Java 8 в классе ReferencePipeline:
Мы видим, что наш accumulator используется еще и как combiner. Но вызывается combiner только в случае параллельных вычислений.

Давайте передадим свой combiner непосредственно через вызов друго метода reduce:
Из доки мы видим требование:

combiner function for combining two values, which must compatible with the accumulator function

combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)

В нашем случае это означает, что нам нужно правильно смерджить аккумуляторы из разных потоков, т.е. просто их сложить, и можно было оставить все как есть и не заморачиваться, но для наглядности немного переделаем наш код:
Вывод:
О чем это нам говорит? О том, что наш задача была fork’нута на четыре подзадачи, каждая из которых выполнилась в своем потоке, а после этого результаты были объединены(join):

Stream api forkjoinpool как связаны что это такое. Смотреть фото Stream api forkjoinpool как связаны что это такое. Смотреть картинку Stream api forkjoinpool как связаны что это такое. Картинка про Stream api forkjoinpool как связаны что это такое. Фото Stream api forkjoinpool как связаны что это такое

Вывод

Stream API дает нам возможность выполнять вычисления параллельно, используя для этого Fork/Join Framework, который выбирает подходящий вариант разбивки задачи и объединения результата. При этом, мы работаем с тем же самым common ForkJoinPool, которые мы использовали в прошлом посте:

Это мы можем увидеть в дебаггере:

Stream api forkjoinpool как связаны что это такое. Смотреть фото Stream api forkjoinpool как связаны что это такое. Смотреть картинку Stream api forkjoinpool как связаны что это такое. Картинка про Stream api forkjoinpool как связаны что это такое. Фото Stream api forkjoinpool как связаны что это такое

Мы видим текущий поток(1), вызов нашего метода(2) из подзадачи(3), которая запущена из общего ForkJoinPool(4).

Источник

Незаслуженно забытый ForkJoinPool

Stream api forkjoinpool как связаны что это такое. Смотреть фото Stream api forkjoinpool как связаны что это такое. Смотреть картинку Stream api forkjoinpool как связаны что это такое. Картинка про Stream api forkjoinpool как связаны что это такое. Фото Stream api forkjoinpool как связаны что это такое

Всем известно о новых функциях, которыми нас порадовал JDK 8, и, вероятно, трудно найти Java-разработчика, который не знает, что такое Java Streams, лямбды или CompletableFutures. Итак, все эти приятности появились несколько лет назад вместе с JDK 8, но что произошло немного раньше, когда состоялся выпуск JDK 7?

Даже если мы заглянем в раздел “Новые функции в релизе JDK 7”, то все еще будет непросто определить одну из самых важных представленных функций. Чтобы увидеть наконец то, что мы ищем, придется перейти в раздел “Утилиты параллелизма”.

Само собой, мы говорим о выпуске в JDK7 фреймворка ForkJoinPool, что, на мой взгляд, не получило заслуживаемой огласки. Я бы сказал, что многие Java-разработчики сейчас вообще незнакомы с ForkJoinPool и где его применять.

В этой статье мы рассмотрим внутренние компоненты фреймворка ForkJoinPool, объясним, почему он так важен, и воздадим должное этой забытой части JDK.

Вступление

Что же из себя представляет фреймворк ForkJoinPool? Это детализированный фреймворк для эффективного распараллеливания выполнения задач. Как уже говорилось, он был представлен как часть релиза JDK 7.

Что же такого интересного в ForkJoinPool? Что он дает нам, чего не могут дать существующие исполнители задач? На этот вопрос можно ответить двумя словами: кража работы (work stealing)!

Дизайн ForkJoinPool основан на фреймворке work-stealing, созданном для Cilk. Если вас интересует оригинальный дизайн, вы можете прочитать об этом здесь.

Как работает ForkJoinPool

Дизайн ForkJoinPool на самом деле прост и в то же время очень эффективен. Он основан на алгоритме “Разделяй и властвуй”: каждая задача разбивается на подзадачи по максимуму, затем они выполняются параллельно, и как только все из них завершаются, происходит объединение результатов.

Звучит знакомо? Да, параллельные потоки Java выполняются очень похожим образом.

Итак, у нас есть фреймворк, который следует вычислительной модели делимых задач, где каждая из них соответствует шаблону, подобному этому фрагменту псевдокода:

Stream api forkjoinpool как связаны что это такое. Смотреть фото Stream api forkjoinpool как связаны что это такое. Смотреть картинку Stream api forkjoinpool как связаны что это такое. Картинка про Stream api forkjoinpool как связаны что это такое. Фото Stream api forkjoinpool как связаны что это такое

Эту задачу можно рекурсивно разделять на несколько подзадач до тех пор, пока не будет достигнуто заранее определенное состояние, когда уже можно счесть, что задача достаточно мала и ее можно выполнять. Примерно это и называется моделью Fork-Join.

Модель Fork-Join

Как мы сказали только что, модель fork-join — это метод, в котором мы разделяем каждую задачу (fork), а затем ждем объединения (join) всех получившихся подзадач и получаем результат.

Stream api forkjoinpool как связаны что это такое. Смотреть фото Stream api forkjoinpool как связаны что это такое. Смотреть картинку Stream api forkjoinpool как связаны что это такое. Картинка про Stream api forkjoinpool как связаны что это такое. Фото Stream api forkjoinpool как связаны что это такое

На рисунке выше видно, как каждая задача разделяется всякий раз, когда вызывается fork. Точно так же, когда все задачи завершаются, они объединяются посредством join для получения конечного результата.

Stream api forkjoinpool как связаны что это такое. Смотреть фото Stream api forkjoinpool как связаны что это такое. Смотреть картинку Stream api forkjoinpool как связаны что это такое. Картинка про Stream api forkjoinpool как связаны что это такое. Фото Stream api forkjoinpool как связаны что это такое

Теперь, когда мы понимаем, как работает эта модель, перейдем к самой важной части: как ForkJoinPool выполняет эти задачи внутри себя?

Внутреннее устройство ForkJoinPool

Вы также можете создать свой собственный ForkJoinPool, указав, сколько потоков вам нужно. Только помните, что пул с числом потоков, превосходящим количество доступных процессоров, для задач с интенсивным использованием процессора полезен не будет. Однако, если у вас задачи интенсивного ввода-вывода (то есть им придется часто ждать завершения операций ввода-вывода), большой пул все-таки может пригодиться.

Каждый рабочий поток имеет собственную двухстороннюю рабочую очередь (deques) типа WorkQueue.

Stream api forkjoinpool как связаны что это такое. Смотреть фото Stream api forkjoinpool как связаны что это такое. Смотреть картинку Stream api forkjoinpool как связаны что это такое. Картинка про Stream api forkjoinpool как связаны что это такое. Фото Stream api forkjoinpool как связаны что это такое

Таким образом, каждый из рабочих потоков продолжает сканировать доступные для выполнения подзадачи. Основная цель в том, чтобы как можно больше нагружать рабочие потоки и максимизировать использование ядер процессора. Поток блокируется только тогда, когда нет доступных для выполнения подзадач.

Что происходит, когда рабочий поток не может найти задачи для запуска в своей собственной очереди? Он будет пытаться “украсть” задачи у тех процессоров, которые загружены сильнее!

Вот тут-то и возникает интересный вопрос: как фреймворк гарантирует, что владелец очереди и “похититель” не будут мешать друг другу, если они попытаются перехватить задачу в одно и то же время?

Чтобы свести к минимуму конкуренцию и сделать ее более эффективной, как владелец очереди, так и “похитители” захватывают задачи из разных частей очереди.

Stream api forkjoinpool как связаны что это такое. Смотреть фото Stream api forkjoinpool как связаны что это такое. Смотреть картинку Stream api forkjoinpool как связаны что это такое. Картинка про Stream api forkjoinpool как связаны что это такое. Фото Stream api forkjoinpool как связаны что это такое

Здесь представлен метод LIFO — Last In, First Out (“последним вошел — первым вышел”). Почему именно так? Разве не разумнее было бы сначала обработать задачи, которые дольше пробыли в очереди?

Нет, не совсем. Основная причина — повышение производительности. Всегда, выбирая самую последнюю задачу, мы увеличиваем шансы на то, что ресурсы задачи все еще будут распределены в кэшах процессора, а это значительно повысит производительность. Это обычно называют локальностью ссылок.

В этом случае мы уже следуем подходу FIFO (“первым вошел — первым вышел”). Это в основном служит для уменьшения конкуренции, необходимой для синхронизации как владельцу очереди, так и “похитителю”.

Еще одна хорошая причина в том, что из-за природы самих делимых задач более старые задачи в очереди, скорее всего, будут больше по объему, поскольку еще не подверглись разделению.

Stream api forkjoinpool как связаны что это такое. Смотреть фото Stream api forkjoinpool как связаны что это такое. Смотреть картинку Stream api forkjoinpool как связаны что это такое. Картинка про Stream api forkjoinpool как связаны что это такое. Фото Stream api forkjoinpool как связаны что это такое

Методы push и pop вызываются только владельцем очереди, а метод poll вызывается только процессором, пытающимся “украсть” работу у другого процессора.

Методы push и pop — это операции CAS (Compare-and-Swap, сравнение с обменом) без ожидания, так что они весьма эффективны. Однако метод poll не всегда свободен от блокировки. Он блокируется в тех случаях, когда очередь почти пуста, поскольку требуется некоторая синхронизация для гарантии, что только владелец или похититель выберет данную задачу, но не оба сразу.

У этого фреймворка действительно любопытный дизайн.

Сводим все воедино

Учитывая то, что мы рассмотрели, становится совершенно очевидно, что для использования этой структуры нам понадобятся задачи, которые легко могут разделяться.

Вот где более чем уместны коллекции Java. Начиная с JDK 8, Java-коллекции можно легко делить, поэтому Java Streams и ForkJoinPool — идеальные партнеры!

Благодаря фреймворку ForkJoinPool параллельные потоки Java могут работать очень эффективно, оптимизируя использование доступных ядер!

Но что если для обработки нужного типа задачи неприменимы потоки Java? В этом случае можно создавать свои собственные “делимые” задачи, просто расширяя класс ForkJoinTask.

Что такое ForkJoinTask? ForkJoinTask — это Java-класс, который ведет себя аналогично потоку Java, но он гораздо более легкий. В основном потому, что ему не приходится поддерживать свой собственный стек времени выполнения или счетчики программ.

Существует три подтипа ForkJoinTask: RecursiveAction, RecursiveTask и CountedCompleter. Выбор того или иного подтипа будет зависеть от типа задач, которые вы пишете. Изучите документацию, чтобы понять, какой из них лучше всего соответствует вашим потребностям.

Стоит отметить, что эти три класса не являются функциональными интерфейсами, главным образом потому, что ForkJoinPool был выпущен в JDK7. Поэтому, к сожалению, лямбда-выражениями воспользоваться будет нельзя. Однако платформа параллельного потока Java 8 предоставляет функциональный API для прозрачного использования ForkJoinPool.

Давайте попробуем реализовать простую задачу с помощью ForkJoinTask. Напишем вычисление чисел Фибоначчи.

Вот и все. Наша задача будет разделена на несколько подзадач, и рабочие потоки ForkJoinPool будут действовать вместе, чтобы решить их все до одной.

Обратите внимание: в примере нам пришлось использовать BigInteger ради возможности обрабатывать большие числа, но это всего лишь одна из потенциальных реализаций ForkJoinTask.

Очень хорошая практика, которую стоит применять при написании ForkJoinTasks, — это всегда писать их как чистые функции, не разделяя состояние и избегая мутации объектов. Это лучший способ гарантировать, что подзадачи выполняются безопасно и независимо.

Также имейте в виду, что ForkJoinPool позволяет отправлять не только ForkJoinTasks, но также вызываемые (Callable) или выполняемые (Runnable) задачи, поэтому вы можете применять ForkJoinPool таким же образом, как и другие существующие исполнители. Единственное отличие в том, что ваша задача не будет разделяться сама по себе, но вы можете извлечь выгоду из повышения производительности кражи работы, если будет отправлено несколько задач и у некоторых потоков будет меньше загрузка, чем у других.

Если вам хочется лучше понять параллелизм и многопоточность, а также то, почему сейчас мы делаем всё именно таким образом, а не по-другому, я бы рекомендовал прочитать книгу “Параллелизм Java на практике” Брайана Гетца.

Заключение

Как мы убедились сегодня, модель Fork Join — эффективный способ обработки задач с использованием метода “Разделяй и властвуй”. Вместе с функцией “кражи работы” этот метод делает ForkJoinPool мощным инструментом для распараллеливания Java-кода.

В наше время ForkJoinPool несложно и прозрачно применяется вместе с Java Streams или CompletableFutures, поэтому, скорее всего, нам разве что изредка придется писать собственные делимые задачи. В любом случае всегда полезно понимать, как работает этот функционал, и знать, что такая возможность есть.

Источник

Stream API & ForkJoinPool

Продолжаем серию полезностей, которыми мы делимся с вами. Теперь уже вновь по Java.

Если вы уже знакомы со Stream API и использовали его, то знаете, что это удобный способ обработки данных. С помощью различных встроенных операций, таких как map, filter, sort и других можно преобразовать входящие данные и получить результат. До появления стримов разработчик был вынужден императивно описывать процесс обработки, то есть создавать цикл for по элементам, затем сравнивать, анализировать и сортировать при необходимости. Stream API позволяет декларативно описать, что требуется получить без необходимости описывать, как это делать. Чем-то это напоминает SQL при работе с базами данных.

Stream api forkjoinpool как связаны что это такое. Смотреть фото Stream api forkjoinpool как связаны что это такое. Смотреть картинку Stream api forkjoinpool как связаны что это такое. Картинка про Stream api forkjoinpool как связаны что это такое. Фото Stream api forkjoinpool как связаны что это такое

Стримы сделали Java-код компактнее и читаемее. Еще одной идеей при создании Stream API было предоставить разработчику простой способ распараллеливания задач, чтобы можно было получить выигрыш в производительности на многоядерных машинах. При этом нужно было избежать сложности, присущей многопоточному программированию. И это удалось сделать, в Stream API есть методы BaseStream::parallel и Collection.parallelStream(), которые возвращают параллельный стрим.

То есть, если у нас был код:

то его легко распараллелить, если изменить один вызов

либо в общем случае для произвольного stream:

Как и за всяким простым API, за parallelStream() скрывается сложный механизм распараллеливания операций. И разработчику придется столкнуться с тем, что использование параллельного стрима может не улучшить производительность, а даже ухудшить её, поэтому важно понимать, что происходит за вызовом parallelStream(). Есть статья Doug Lea о том, в каких случаях использование параллельных стримов даст положительный эффект. Следует обратить внимание на следующие факторы:

F — операция, которая будет применяться к каждому элементу стрима. Она должна быть независимой — то есть не оказывает влияние на другие элементы, кроме текущего и не зависит от других элементов (stateless non-interfering function)

S — источник данных (коллекция) эффективно разделима (efficiently splittable). Например, ArrayList — это эффективно разделимый источник, легко вычислить индексы и интервалы, которые можно обрабатывать параллельно. Также эффективно обрабатывать HashMap. BlockingQueue, LinkedList и большинство IO-источников это плохие кандидаты для параллельной обработки.

Оценка преимущества параллельной обработки. На современных машинах имеет смысл распараллеливать задачи, время выполнения которых превышает 100 микросекунд.

Таким образом, прежде чем использовать этот инструмент, нужно понять, насколько ваша задача укладывается в описанные ограничения.

Экспериментируя с parallel() наткнулись ещё на один интересный момент, связанный с текущей реализацией. Parallel() пытается исполнять ваш код в несколько потоков и становится интересно, кто эти потоки создаёт и как ими управляет.

Попробуем запустить такой код:

Уже интересно, оказывается, по умолчанию parallel stream используют ForkJoinPool.commonPool. Этот пул создается статически, то есть при первом обращении к ForkJoinPool, он не реагирует на shutdown()/shutdownNow() и живет, пока не будет вызван System::exit. Если задачам не указывать конкретный пул, то они будут исполняться в рамках commonPool.

Попробуем выяснить, каков же размер commonPool и посмотрим в исходники jdk1.8.0_111. Для читаемости убраны некоторые вызовы, которые не относятся к parallelism.

Из того же класса константа:

Нас интересует parallelism, который отвечает за количество воркеров в пуле. По-умолчанию, размер пула равен Runtime.getRuntime().availableProcessors() — 1, то есть на 1 меньше, чем количество доступных ядер. Когда вы создаете кастомный FJPool, то можно установить желаемый уровень параллелизма через конструктор. А для commonPool можно задать уровень через параметры JVM:

Сверху свойство ограничено числом 32767 (0x7fff);

Это может быть полезно, если вы не хотите отдавать все ядра под задачи ForkJoinPool, возможно, ваше приложение в обычном режиме утилизирует 4 из 8 CPU, тогда имеет смысл отдать под FJ оставшиеся 4 ядра.

Появляется вопрос, почему количество воркеров на 1 меньше количества ядер. Ответ можно увидеть в документации к ForkJoinPool.java:

When external threads submit to the common pool, they can perform subtask processing (see externalHelpComplete and related methods) upon joins. This caller-helps policy makes it sensible to set common pool parallelism level to one (or more) less than the total number of available cores, or even zero for pure caller-runs

То есть, когда некий тред отправляет задачу в common pool, то пул может использовать вызывающий тред (caller-thread) в качестве воркера. Вот почему в выводе программы мы видели main! Разгадка найдена, ForkJoinPool пытается загрузить своими задачами и вызывающий тред. В коде выше это main, но если вызовем код из другого треда, то увидим, что это работает и для произвольного потока:

Теперь мы знаем немного больше об устройстве ForkJoinPool и parallel stream. Оказывается, что количество воркеров parallel stream ограничено и эти воркеры общего назначения, то есть могут быть использованы любыми другими задачами, которые запускаются на commonPool. Попробуем понять, чем это чревато для нас при разработке.

В коде происходит следующее: мы пытаемся полностью занять пул, отправив туда parallelism + 1 задачу (то есть 3 штуки в данном случае). После этого запускаем параллельную обработку стрима из первого примера. По логам видно, что parallel стрим исполняется в один поток, так как все ресурсы пула исчерпаны. Не зная о такой особенности будет сложно понять, если в вашей программе вырастет время обработки какого то запроса через BaseStream::parallel.

Что же делать, если вы хотите быть уверены, что ваш код действительно будет распараллелен? Есть решение, нужно запустить parallel() на кастомном пуле, для этого нам придётся немного модифицировать код из примера выше и запустить код обработки данных, как Runnable на кастомном FJPool:

Окей, теперь мы добились своей цели и уверены, что наши вычисления под контролем и никто не может повлиять на них со стороны.

Прежде чем применять любой, даже самый простой инструмент необходимо выяснить его особенности и ограничения. Для parallel stream таких особенностей много и необходимо учитывать, насколько ваша задача подходит для распараллеливания. Parallel stream хорошо работают, если операции независимы и не хранят состояние, источник данных может быть легко разделен на сегменты для параллельной обработки и задачу действительно имеет смысл выполнять параллельно. Помимо этого нужно учесть особенности реализации и убедиться, что для важных вычислений вы используете отдельный пул потоков, а не делите с общим пулом приложения.

Вопросы и предложения, как всегда приветствуются, т.к. это является частью нашего курса по Java и нам интересно мнение по материалу.

Источник

Вилкой в глаз, или ForkJoinPool в Java

Всем привет. Сегодня я хотел бы поговорить о многопоточности. Вернее, не о многопоточности вообще, а о таком её механизме как ForkJoinPool. Нельзя сказать, что данная технология является новой (она появилась ещё в Java 7), или что в сети нельзя найти материалы по данной теме. Информации хватает. Например, для глубокого погружения могу порекомендовать лекцию блистательного Алексея Шипилёва, которую можно без труда найти на YouTube. Но лично мне большинство этих материалов показались либо слишком сложными, либо наоборот – поверхностными. Так же некоторые из них содержат явные ошибки, что вносит ещё большую неразбериху в данную тему. Судя по тому, что в комментариях под одной из этих статей я нашёл вот такую картинку, подобные проблемы были не только у меня.

Stream api forkjoinpool как связаны что это такое. Смотреть фото Stream api forkjoinpool как связаны что это такое. Смотреть картинку Stream api forkjoinpool как связаны что это такое. Картинка про Stream api forkjoinpool как связаны что это такое. Фото Stream api forkjoinpool как связаны что это такое

Если вдруг и вы перелопатили всё, что нашли в сети по поводу ForkJoinPool, а просветления так и не достигли, добро пожаловать под кат. Попробуем на максимально простом примере разобраться в данной теме, с котиками, картинками, всё как вы любите.

Для понимания всего, что будет изложено ниже, крайне желательно быть знакомым с основами многопоточности (Thread, Runnable, Callable, Future и т.д.).

Если вы уже что-то копали по теме ForkJoin, то должны знать, что в основе данной технологии лежит старый как мир принцип «разделяй и властвуй». Легко нагуглить, что если у нас есть какая-то задача, с помощью ForkJoinPool мы сначала делим её на подзадачи, выполняем их, потом объединяем результаты и делаем это всё рекурсивно…

Stream api forkjoinpool как связаны что это такое. Смотреть фото Stream api forkjoinpool как связаны что это такое. Смотреть картинку Stream api forkjoinpool как связаны что это такое. Картинка про Stream api forkjoinpool как связаны что это такое. Фото Stream api forkjoinpool как связаны что это такое

Заполнить массив можно любыми числами (в данном случае это не важно). Скорее всего System.out.println(new Date()) – не самый оптимальный способ измерить скорость выполнения кода, но весьма простой и для нашего примера сгодится. Thread.sleep(1) добавлен для того, чтобы сымитировать задачу, которая при работе в одном потоке вызывает значительную загрузку процессора. У меня на выполнение данного кода ушло 17 секунд. Таким образом, мы имеем некую большую задачу, существенно замедляющую работу нашей программы. Очевидно, что запуск её в параллельном потоке проблемы не решит. Что же делать? Конечно же, разбить эту задачу на подзадачи. Допустим, мы разделим наш массив пополам, суммирование первой части массива запустим в одном потоке, суммирование второй части массива – в другом, а потом сложим получившиеся результаты. Проблема в том, что если задача достаточно большая, то обе её половинки также могут получиться достаточно большого размера, что не лучшим образом скажется на производительности. Следовательно, возможно и их нужно будет поделить на части и продолжать данную операцию до достижения некоего оптимального размера. Когда условие будет достигнуто, каждый из этих кусочков мы отдадим отдельному потоку, а потом соберём получившиеся результаты воедино. Чувствуете? В воздухе отчётливо запахло рекурсией, и мы всё ближе приближаемся к ForkJoinPool.

Stream api forkjoinpool как связаны что это такое. Смотреть фото Stream api forkjoinpool как связаны что это такое. Смотреть картинку Stream api forkjoinpool как связаны что это такое. Картинка про Stream api forkjoinpool как связаны что это такое. Фото Stream api forkjoinpool как связаны что это такое

Допустим, что в деле изучения ForkJoinPool вы уже миновали стадию гнева и находитесь на стадии отрицания, тогда у вас может возникнуть вполне резонный вопрос: «Ну, и зачем нам нужен этот ForkJoin, да ещё и с какой-то рекурсией? Разве нельзя всё сделать проще?» В каком-то смысле можно. Напомню, что у нас есть интерфейс Callable, метод которого call() возвращает некое значение и запускается асинхронно в отдельном потоке. Ничто не мешает нам создать класс, имплементирующий данный интерфейс и содержащий в качестве поля числовой массив. Мы можем поделить наш огромный массив на 100500 маленьких массивов, создать 100500 экземпляров такого класса, создать 100500 отдельных потоков, собрать их в одну коллекцию, запустить их в цикле, потом ещё в одном цикле получить из них значения. Но вы уверены, что хотите построить ещё один велосипед из костылей, а не воспользоваться уже готовым решением, пусть и несколько сложным? Кроме того, описанное решение, обладает ещё одним существенным недостатком. Создание отдельных потоков – операция весьма тяжеловесная и ресурсозатратная. Рассчитывая получить прирост в производительности, и создавая 100500 потоков, мы рискуем получить прямо противоположный результат. Именно по этой причине и был придуман пул потоков, одним из видов которого является ForkJoinPool.

Итак, в основе своей ForkJoinPool – это пул потоков, преимущество которого состоит в том, что он работает на основе принципа WorkStealing, что дословно можно перевести как «кража работы». Когда один из потоков ForkJoinPool заканчивает свою работу, он не идёт пить кофе или чилить в ютубчике, он проявляет «сознательность» и берёт из общей очереди работ новую задачу. Это продолжается до тех пор, пока задачи не кончатся.

Ещё одной особенностью ForkJoinPool является то, что в него нельзя подать Callable или Runnable задачу. У него есть своя иерархия задач, наследуемая от абстрактного класса ForkJoinTask. Основные реализации – RecursiveTask и RecursiveAction. У каждого из них есть абстрактный метод compute(), который и надо реализовывать при наследование. RecursiveTask. compute() возвращает некое значение, RecursiveAction. compute() возвращает void.

Не знаю, как у вас, но у меня при первом знакомстве с данными классами по спине пробежали мурашки. «Раз они recursive, значит в них обязательно надо применить чёрную магию рекурсии…» Как я понял, на самом деле не обязательно (если я не прав, напишите в комментариях). Такой код вполне легален и будет работать.

Если мы передадим экземпляр такого класса на выполнение в ForkJoinPool, то получим обычную строку без всякой рекурсии. Судя по всему, (возможно, я ошибаюсь), создатели данных классов добавили в название слово Recursive в качестве некой рекомендации, а не обязательного требования.

Следующий не вполне очевидный вопрос: как запустить задачу в ForkJoinPool на исполнение? Для этого есть метод T invoke(ForkJoinTask task)

Здесь мы вообще обошлись без всякого ForkJoinPool. Задача выполнила сама себя! Это никуда не годится. Для того, чтобы понять, какой именно метод нужно использовать и в чём разница между ними, обратимся к официальной документации. С методом invoke() всё ясно, он «выполняет данную задачу, возвращая результат по завершении». А вот метод fork() работает немного сложнее, он «организует асинхронное выполнение этой задачи в пуле, в котором выполняется текущая задача». Для полного духовного просветления изменим наш класс.

и видим в консоле: «I am work in thread: ForkJoinPool-1-worker-1» и «I am just innocent simple class».

и видим: «I am work in thread: main» и «I am just innocent simple class». То есть при вызове метода fork() задача не «выполнила сама себя» магическим образом, а была выполнена в том же потоке из которого и был вызван данный метод. Вызов метода ForkJoinPool.invoke() передал задачу на выполнение в один из потоков данного пула. Важно отметить, что метод fork() отправляет задачу в какой-либо поток, но при этом не запускает её выполнения. Для получения результата служит метод join().

Разобравшись с основными классами и методами, вернёмся к нашему массиву и попробуем применить на практике, полученный духовный опыт.

Сначала мы будем рекурсивно делить наш массив на всё более мелкие части, пока не получим массивы, состоящие всего из 2 элементов. Почему именно из 2? Потому что наш условный «слабенький» процессор может условно «быстро» выполнить именно такую условно «маленькую» задачу. Почему рекурсивно? Просто потому, что применение рекурсии в данном случае действительно удобно. Это позволяет сначала выполнить некую работу по подготовке, а потом получить результат. Если вам не нравится рекурсия, то, наверное, можно попробовать найти какой-то другой способ, её применение, судя по всему, не является обязательным.

После того, как мы получим «100500» маленьких массивов, состоящих всего из 2 элементов, мы запустим «100500» маленьких задач на выполнение и суммируем их результаты. И для этого нам не придётся создавать 100500 отдельных нитей выполнения.

При создании экземпляра класса ValueSumCounter мы передаём в него массив. В методе compute() сначала проверяется длинна массива, и если он «слишком большой», то разбивается пополам на 2 части, на основе каждой из которых в свою очередь создаётся своя задача и отправляется на выполнение путём вызова метода fork(). Когда разбивка будет закончена, наступает время «собирать камни», метод join() запускает каждую задачу на выполнение и возвращает полученный результат. Выполнение данной задачи с помощью ForkJoinPool заняло у меня на компьютере 3 секунды. Напомню, что эта же задача, выполненная с помощью цикла в одном потоке, ранее заняла 17 секунд.

Нам осталось убедиться, что при использовании ForkJoinPool не создаётся «100500» отдельных потоков. Для этого добавим в метод compute() всего одну строку

Запустив код на выполнение, мы увидим, что для выполнения большого количества задач используется несколько одних и тех же потоков (в моём случае 4).

Вот, пожалуй, и всё, что хотелось бы рассказать о данной технологии. Если вдруг в коде или описании обнаружатся ошибки, просьба не кидать в автора тапками, а написать в комментариях об этом. Надеюсь, данный материал поможет духовному просветлению юных падаванов, ищущих знаний. Да пребудет с вами сила Java.

Источник

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *