Параллелизм при императивной обработке данных
Класс System.Threading.Tasks.Parallel позволяет распараллеливать циклы и последовательность блоков кода. Эта функциональность реализована как набор статических методов For(), ForEach() и Invoke().
Методы Parallel.For() и Parallel.ForEach() являются параллельными аналогами циклов for и foreach. Их использование корректно в случае независимости итераций цикла, то есть, если ни в одной итерации не используется результаты работы предыдущих итераций.
Существует несколько перегруженных вариантов метода Parallel.For(), однако любой из них подразумевает указание начального и конечного значения счётчика (тип int или long) и тела цикла в виде объекта делегата. В качестве примера использования Parallel.For() приведём метод, выполняющий перемножение двух квадратных матриц.
public void Multiply(int size, int[,] m1, int[,] m2, int[,] result)
{
Parallel.For(0, size, i =>
{
for (int j = 0; j < size; j++)
{
result[i, j] = 0;
for (int k = 0; k < size; k++)
{
result[i, j] += m1[i, k] * m2[k, j];
}
}
});
}
Метод Parallel.ForEach() имеет множество перегрузок. Простейший вариант предполагает указание коллекции, реализующей IEnumerable, и объекта делегата Action, описывающего тело цикла:
Parallel.ForEach(Directory.GetFiles(path, "*.jpg"), img => Process(img));
Статический метод Parallel.Invoke() позволяет распараллелить исполнение блоков операторов. Часто в приложениях существуют такие последовательности операторов, для которых не имеет значения порядок выполнения операторов внутри них. В таких случаях вместо последовательного выполнения операторов одного за другим, возможно их параллельное выполнение, позволяющее сократить время решения задачи. В базовом варианте Invoke() принимает параметр-список объектов делегата Action:
Parallel.Invoke(DoSomeWork, // DoSomeWork - это некий метод
DoAnotherWork, // DoAnotherWork - некий метод
() => Console.WriteLine("Working..."));
В заключение заметим, что каждый из методов For(), ForEach() и Invoke() может принимать аргумент типа ParallelOptions, используемый для настройки поведения метода.
Параллелизм при декларативной обработке данных
PLINQ (Parallel Language-Integrated Query) параллельная реализация LINQ, в которой запросы выполняются параллельно, используя все доступные ядра и процессоры. PLINQ полностью поддерживает все операторы запросов, имеющиеся в LINQ to Objects, и имеет минимальное влияние на существующую модель LINQ-операторов.
Рассмотрим простой пример использования PLINQ. Предположим, что имеется медленный метод, который проверяет делимость целого числа на 5:
public static bool IsDivisibleBy5(int x)
{
Thread.Sleep(100);
return x % 5 == 0;
}
Подсчитаем количество чисел, которые делятся на 5, в заданном интервале при помощи обычного LINQ:
var numbers = Enumerable.Range(1, 100);
int count = numbers.Where(IsDivisibleBy5).Count();
Console.WriteLine(count);
Чтобы распараллелить этот запрос средствами PLINQ, достаточно применить к источнику данных (numbers) метод расширения AsParallel():
var numbers = Enumerable.Range(1, 100);
int count = numbers.AsParallel().Where(IsDivisibleBy5).Count();
Console.WriteLine(count);
Почему работоспособен приведённый выше код? В пространстве имён System.Linq содержится статический класс ParallelEnumerable. Он имеет набор методов расширения, аналогичный набору класса Enumerable, но расширяющих класс ParallelQuery. Метод расширения AsParallel() просто «конвертирует» коллекцию IEnumerable в объект ParallelQuery.
Кроме AsParallel(), класс ParallelEnumerable содержит ещё несколько особых методов:
-
AsSequential() конвертирует объект ParallelQuery в коллекцию IEnumerable так, что все запросы выполняются последовательно;
-
AsOrdered() при параллельной обработке заставляет сохранять в ParallelQuery порядок элементов;
-
AsUnordered() при параллельной обработке позволяет игнорировать в ParallelQuery порядок элементов;
-
WithCancellation() устанавливает для ParallelQuery указанное значение токена отмены;
-
WithDegreeOfParallelism() устанавливает для ParallelQuery целочисленное значение степени параллелизма (число ядер процессоров);
-
WithExecutionMode() задаёт опции выполнения параллельных запросов в виде перечисления ParallelExecutionMode.
int count = numbers.AsParallel().
AsOrdered().
WithExecutionMode(ParallelExecutionMode.ForceParallelism).
Where(IsDivisibleBy5).Count();
Обработка исключений и отмена выполнения задач
Создание параллельных приложений требует особых подходов при обработке исключительных ситуаций и прерывании работы задач. Исключительные ситуации могут возникнуть одновременно в разных потоках, а обработка исключительных ситуаций может выполняться в отдельном потоке. Задачи могут образовывать цепочку выполнения, а, значит, отмена одной задачи должна вести к отмене всех следующих задач цепочки.
В библиотеке параллельных расширений используются следующие принципы работы с исключительными ситуациями:
-
При возникновении исключения в задаче (как созданной явно, так и порожденной неявно, например, методом Parallel.For()) это исключение обрабатывается средствами библиотеки (если перехват не был предусмотрен программистом) и перенаправляется в ту задачу, которая ожидает завершения данной;
-
При параллельном возникновении нескольких исключений все они собираются в единое исключение System.AggregateException, которое переправляется дальше по цепочке вызовов задач;
-
Если возникла в точности одна исключительная ситуация, то на её основе будет создан объект AggregateException в целях единообразной обработки всех исключительных ситуаций.
Исключительные ситуации типа AggregateException могут возникать при работе со следующими конструкциями библиотеки параллельных расширений:
-
Класс Task исключения, возникшие в теле задачи, будут повторно возбуждены в месте вызова метода Wait() данной задачи. Кроме того, исключение доступно через свойство Exception объекта Task.
-
Класс Task исключения, возникшие в теле задачи, будут повторно возбуждены в месте вызова метода Wait() или в месте обращения к экземплярному свойству Task.Result.
-
Класс Parallel исключения могут возникнуть в параллельно исполняемых итерациях циклов Parallel.For() и Parallel.ForEach() или в параллельных блоках кода при работе с Parallel.Invoke().
-
PLINQ из-за отложенного характера исполнения запросов PLINQ, исключения обычно возникают на этапе перебора элементов, полученных по запросу.
В библиотеке параллельных расширений применяется особый подход для выполнения отмены задач. Отметим, что многие методы задач, упоминавшиеся выше, принимают в качестве аргумент значение типа CancellationToken. Это так называемый токен отмены – своеобразный маркер того, что задачу можно отменить. Класс System.Threading.CancellationTokenSource содержит свойство Token для получения токенов отмены и метод Cancel() для отмены выполнения всех задач, использующих общий токен.
В следующем фрагменте кода демонстрируется типичный сценарий использования токенов отмены: вначале создаётся объект CancellationToken, затем его токен назначается задачам, а потом вызывается метод Cancel(), прерывающий выполнение задач:
var ct = new CancellationTokenSource();
var task = new Task(work, ct.Token);
task.Start();
Task.Factory.StartNew(() =>
{
Thread.Sleep(2000);
Console.WriteLine("Done");
}, ct.Token);
ct.Cancel(); // в нужный момент отменяем обе задачи
Поделитесь с Вашими друзьями: |