15.3 Die TPL (Task Parallel Library)
Mit dem .NET Framework 4.0 ist die Entwicklung multithread-fähiger Anwendungen um die Bibliothek TPL (Task Parallel Library) ergänzt worden. Dabei muss der Entwickler sich nicht mehr um jeden Thread einzeln kümmern, sondern für ihn steht primär die Aufgabenstellung im Vordergrund, Tasks zu definieren und zu synchronisieren. Das Erstellen der Threads wird zur Laufzeit automatisch erfolgen, und das sogar unter optimaler Ausnutzung der Hardwareausstattung. Das bedeutet, wenn eine Anwendung auf einem Dual-Core-Prozessor ausgeführt wird, wird der Task in zwei Threads abgearbeitet. Läuft die Anwendung hingegen auf einem Quad-Core-Prozessor, werden vier Threads erzeugt.
Das hört sich alles bereits sehr verlockend an. Allerdings sollte bereits am Anfang darauf hingewiesen werden, dass die parallele Programmierung mit TPL nicht automatisch dazu führt, dass die Anwendungen schneller ausgeführt werden. Ganz im Gegenteil, unter Umständen können Anwendungen sogar langsamer werden. Was sich paradox anhört, ist sehr einfach zu erklären. Mit dem Verteilen über mehrere Prozessorkerne ist ein gewaltiger Overhead verbunden: Daten müssen in die Kerne eingelesen werden, Daten müssen die Kerne auch wieder verlassen und zwischengespeichert werden. Stimmt das Verhältnis zwischen reiner Rechenzeit und den Verwaltungsoperationen nicht, kann die Performance durch Nutzen der Parallelisierung darunter leiden.
Es lässt sich nicht eindeutig sagen, wann die Parallelverarbeitung Vorteile bringt. Als Daumenregel können Sie nehmen: Je mehr Prozessorkerne genutzt werden können, desto aufwendiger und zeitintensiver sollten die Operationen sein, um von der parallelen Verarbeitung profitieren zu können. Im Zweifelsfall sollten Sie das Programm testen – auch auf Rechnern mit unterschiedlicher Ausstattung.
15.3.1 Allgemeines zur Parallelisierung mit der TPL
Die Klassen zur parallelen Programmierung mit der TPL sind im Namespace System.Threading.Tasks zu finden. Allerdings gibt es auch weitere Klassen in anderen Namespaces, die im Zuge der TPL eingeführt worden sind und die Parallelisierung nutzen. Der Namespace System.Collections.Concurrent mit seinen neuen Auflistungsklassen sei an dieser Stelle erwähnt.
Setzen Sie die Task Parallel Library ein, werden Sie es hauptsächlich mit zwei Klassen zu tun haben:
- Die Klasse Parallel: Mit dieser Klasse werden in erster Linie Schleifen parallel ausgeführt. Dazu wird die zu verarbeitende Datenmenge in Teilmengen aufgeteilt. Wie viele Teilmengen verarbeitet werden, entscheidet die Klasse selbst. Dabei kann es aber durchaus vorkommen, dass keine Teilmengen gebildet werden, also keine parallele Verarbeitung stattfindet. Dieses Verhalten zeugt von einer ausgeprägten Optimierung der Klasse hinsichtlich des Performancegewinns. Die notwendigen Threads bezieht die TPL übrigens aus dem Threadpool.
- Die Klasse Task (und deren Ableitung Task<TResult>): Diese Klasse unterstützt die parallele Abarbeitung von Methoden. Liefert ein Task einen Wert zurück, verwenden Sie die Klasse Task<TResult>. Die Task-Klasse bietet Möglichkeiten, wie sie auch von der Klasse Thread her bekannt sind, allerdings unter Verwendung des Threadpools. Im Vergleich zu Thread ist Task aber einfacher zu handhaben.
15.3.2 Die Klasse »Parallel«
Die statische Klasse Parallel unterstützt die Parallelisierung von Codebereichen und Schleifen. Dafür werden insgesamt nur drei Methoden angeboten:
- For
- ForEach
- Invoke
Alle drei Methoden sind überladen (und natürlich statisch). Damit können Sie gewissermaßen ein Feintuning der parallelen Verarbeitung steuern, beispielsweise um die Anzahl der benutzten Prozessorkerne zu beschränken.
Die Methode »Parallel.Invoke«
Die einfachste Art, parallele Operationen anzustoßen, ist die Methode Invoke der Klasse Parallel. Die Methode definiert ein Paramater-Array, dem Sie Action-Objekte übergeben. Die Syntax lautet:
public static void Invoke(params Action[] action)
Action ist ein Delegate und kapselt eine Methode, die über keine Parameter verfügt und keinen Wert zurückgibt. Erst wenn alle Methoden abgearbeitet sind, wird die Programmausführung mit der auf Invoke folgenden Anweisung fortgesetzt. Invoke blockiert also den Programmablauf und hat synchronen Charakter.
Sie müssen sicherstellen, dass innerhalb der Methoden nicht dieselben Variablen benutzt werden. Ansonsten erleben Sie unliebsame Überraschungen.
Im folgenden Beispielprogramm werden mit Task1, Task2 und Task3 drei Methoden beschrieben. Sie leisten nichts Besonderes und geben nur innerhalb einer Schleife Konsolenmeldungen aus, die mit Thread.Sleep noch ein wenig gebremst werden.
// Beispiel: ..\Kapitel 15\ParallelInvokeSample
class Program {
static void Main(string[] args) {
Parallel.Invoke(Task1, Task2, Task3);
Console.ReadLine();
}
static void Task1() {
for (int i = 0; i < 10; i++) {
Thread.Sleep(50);
Console.Write(" #1 ");
}
}
static void Task2() {
for (int i = 0; i < 10; i++) {
Thread.Sleep(50);
Console.Write(" #2 ");
}
}
static void Task3() {
for (int i = 0; i < 10; i++) {
Thread.Sleep(50);
Console.Write(" #3 ");
}
}
}
Listing 15.21 Methoden auf mehrere Prozessorkerne verteilen
Abbildung 15.9 beweist, dass alle Tasks parallel ausgeführt werden. Besser gesagt, ich kann das hiermit nur für Task1 und Task2 beweisen. Der dritte Task wird am Ende komplett auf einmal abgearbeitet – daran können Sie sehen, dass diese Abbildung auf einer Dual-Core-Maschine erstellt worden ist.
Abbildung 15.9 Ausgabe des Beispielprogramms »ParallelInvokeSample«
Im Task-Manager können Sie auch beobachten, dass bei der Ausführung einer parallelen Operation die einzelnen CPUs ihre Arbeit verrichten. Allerdings werden Sie wahrscheinlich praktisch keine Reaktion sehen, wenn Sie unser Beispielprogramm laufen lassen. Dafür wird die Fähigkeit einer CPU von unserem Beispiel viel zu wenig herausgefordert – die CPU langweilt sich, ist praktisch beschäftigungslos. Um in Abbildung 15.10 wenigstens etwas Aktivität deutlich zu machen, wurde die Anzahl der Schleifendurchläufe des Beispielprogramms auf 100000 erhöht.
Nicht unüblich ist die Angabe der Tasks durch Lambda-Ausdrücke. Der Code ist kürzer, aber auch zunächst schlechter lesbar.
static void Main(string[] args) {
Parallel.Invoke(() => { for (int i = 0; i < 10; i++) {
Thread.Sleep(50);
Console.Write(" #1 ");
} },
() => { for (int i = 0; i < 10; i++) {
Thread.Sleep(50);
Console.Write(" #2 ");
} },
() => { for (int i = 0; i < 10; i++) {
Thread.Sleep(50);
Console.Write(" #3 ");
} });
Console.ReadLine();
}
Listing 15.22 Verwendung von Lambda-Ausdrücken
Abbildung 15.10 Anzeige der CPU-Auslastung im Task-Manager
Schleifen mit »Parallel.For«
Mit Parallel.For und Parallel.Foreach lassen sich einfache Schleifendurchläufe parallelisieren. Der Effekt der Performancesteigerung wird sich sicher nicht bei 10 oder 100 Schleifendurchläufen bemerkbar machen, die Anzahl sollte schon deutlich darüber liegen und hängt auch von der Anzahl der beteiligten Prozessorkerne ab.
Die Syntax der For-Methode ähnelt sehr der for-Schleife:
public static ParallelLoopResult For(int fromInclusive,int toExclusive,
Action<int> body )
Der erste Parameter erwartet den Startindex (inclusive), der zweite den Endindex (exklusive). Der letzte Parameter ist wieder ein Delegate, der auf den Code zeigt, der innerhalb der Schleife ausgeführt wird. Der Rückgabewert vom Typ ParallelLoopResult gibt Auskunft, ob die Schleife komplett ausgeführt worden ist oder vorzeitig beendet wurde. Zudem können Sie den Index abrufen, bei dem die Schleife abgebrochen worden ist.
Im nächsten Beispielprogramm wird Parallel.For nicht nur demonstriert, sondern auch die Zeitspanne zwischen einer herkömmlichen Schleife und einer parallelisierten in Millisekunden gemessen. Zur Messung der Zeitspanne, die die Laufzeit zur Abarbeitung der Schleifen braucht, greifen wir auf die Klasse StopWatch aus dem Namespace System.Diagnostics zurück.
// Beispiel: ..\Kapitel 15\ParallelForSample
class Program {
static void Main(string[] args) {
Stopwatch watch = new Stopwatch();
watch.Start();
ParallelTest();
watch.Stop();
Console.WriteLine(watch.ElapsedMilliseconds);
watch.Reset();
watch.Start();
SynchTest();
watch.Stop();
Console.WriteLine(watch.ElapsedMilliseconds);
Console.ReadLine();
}
static void SynchTest() {
double[] arr = new double[1000000];
for(int i = 0; i < 1000000; i++)
arr[i] = Math.Pow(i, 0.333) * Math.Sqrt(Math.Sin(i));
}
static void ParallelTest() {
double[] arr = new double[1000000];
Parallel.For(0, 1000000, i =>
{
arr[i] = Math.Pow(i, 0.333) * Math.Sqrt(Math.Sin(i));
});
}
}
Listing 15.23 Parallel verarbeitete Schleife
Testen Sie das Programm auf Ihrem Rechner, haben Sie mit Sicherheit andere Werte. Aber die Tendenz dürfte ähnlich sein: Die parallele Ausführung dauert im Schnitt 140 ms, die synchrone 268. Das bedeutet fast 50 % Performancegewinn. Je geringer Sie die Anzahl der Schleifendurchläufe festlegen, desto geringer ist auch der Vorteil der Parallelisierung.
Die Erhöhung des Schleifenzählers ist immer +1. Möchten Sie andere Schrittweiten festlegen, müssen Sie im Schleifenblock eine entsprechende Umrechnung vornehmen. Angenommen, Sie sind an allen ganzzahligen Werten zwischen 0 und 100 interessiert, könnten Sie im Schleifenkörper den folgenden Code schreiben:
Parallel.For(0, 50, i =>
{
int newIndex = i * 2;
Console.Write(" {0} ", newIndex);
});
Außerdem ist nur eine positive Iteration möglich.
Schleifenunterbrechung
In der parallelen Verarbeitung muss natürlich eine Schleife anders abgebrochen werden als in einer herkömmlichen Schleife. Die Klasse Parallel stellt dazu eine Überladung der Methode For zur Verfügung, deren Syntax wir uns zuerst ansehen:
public static ParallelLoopResult For(int fromInclusive,int toExclusive,
Action<int, ParallelLoopState> body )
Hier wird eine andere Variante des Delegaten Action benutzt. Sie geben hier ein Objekt vom Typ ParallelLoopState an, das unter anderem auch die Methode Stop zum Abbruch einer parallel ausgeführten Schleife bereitstellt.
Parallel.For(0, 1000000, (i, option) =>
{
arr[i] = Math.Pow(i, 0.333) * Math.Sqrt(Math.Sin(i));
if (i > 1000) option.Stop();
});
Es gibt neben der Methode Stop auch noch die Methode Break. Diese Methode gibt an, dass die Schleife nach der aktuellen Iteration beendet werden soll.
Die For-Methode hat noch mehr Überladungen als die beiden hier vorgestellten. Wir werden aber darauf nicht weiter eingehen.
Auswerten des Status der Schleifenoperation
Die Methode For liefert einen Rückgabewert vom Typ ParallelLoopResult. Mit IsCompleted und LowestBreakIteration hat die Klasse nur zwei Eigenschaften. In beiden Fällen bezieht sich die Auswertung auf einen möglichen vorzeitigen Schleifenabbruch.
IsCompleted gibt an, ob die Schleife bis zum Abschluss ausgeführt wurde. LowestBreakIteration gibt den Indexwert an, bei dem eine Schleife mit Break unterbrochen worden ist. Wurde eine Schleife mit Stop beendet, ist der Rückgabewert null.
Collection mit »ForEach« parallel durchlaufen
Die Parallelisierung der For-Schleife mit Parallel.For habe ich Ihnen gezeigt. Es gibt auch noch die sehr ähnliche Methode ForEach der Klasse Parallel, um auch auf Collections parallel zugreifen zu können.
Grundsätzlich ähnelt der Einsatz der ForEach-Methode dem der For-Methode. Daher soll an dieser Stelle auch nur kurz gezeigt werden, wie die Methode eingesetzt wird.
string[] namen = { "Peter", "Uwe", "Udo", "Willi",
"Pia", "Michael", "Conie" };
Parallel.ForEach(namen, name =>
{
Console.WriteLine(name);
});
Listing 15.24 Parallele Verarbeitung mit »ForEach«
Sie können übrigens nicht erwarten, dass die Namen in der Reihenfolge ausgegeben werden, in der sie in der Liste angegeben sind.
15.3.3 Die Klasse »Task«
Oben habe ich bereits erwähnt, dass die Klasse Task durchaus der Klasse Thread ähnelt. Widmen wir uns nun dieser Klasse.
Einen Task erstellen
Zunächst einmal gilt es, sich ein Objekt vom Typ Task zu besorgen. Dazu bieten sich zwei Möglichkeiten:
- Sie instanziieren in gewohnter Weise mit new und übergeben dabei ein Objekt vom Typ Action (Sie erinnern sich, es handelt sich dabei um einen Delegate).
- Sie rufen Task.factory.StartNew unter Übergabe eines Action-Objekts auf.
Der Unterschied zwischen diesen beiden Varianten ist, dass der Task im ersten Fall noch explizit gestartet werden muss, während im zweiten Fall der Task bereits automatisch gestartet wird. Im folgenden Listing werden beide Möglichkeiten gezeigt.
static void Main(string[] args) {
Task task1 = Task.Factory.StartNew(DoSomething);
Task task2 = new Task(Test);
task2.Start();
}
public static void DoSomething() {
Console.WriteLine("Task wird ausgeführt ...");
}
Listing 15.25 Einsatz der Klasse »Task«
Die durch das Objekt task1 beschriebene Operation wird sofort gestartet, während es des Aufrufs der Methode Start beim Objekt task2 bedarf.
Natürlich können Sie die Operation auch unter Verwendung eines Lambda-Ausdrucks mitteilen, z. B.:
Task task1 = Task.Factory.StartNew(() =>
{
Console.WriteLine("Task wird ausgeführt ...");
});
Daten an den Task übergeben
Manche parallel auszuführende Operationen werden eine Datenübergabe voraussetzen. Natürlich geht auch das. Sehen wir uns zuerst die entsprechende Überladung der Methode StartNew an:
public Task StartNew(Action<Object> action, Object state)
Jetzt wird es etwas komplexer, denn StartNew definiert zwei Parameter. Fangen wir mit dem zweiten an. An dieser Position geben wir unser Übergabeargument an, das als Object-Typ entgegengenommen wird. Der erste Parameter beschreibt nicht nur die parallele Operation (die, wie Sie vorher gesehen haben, »parameterlos« ist), sondern wir müssen hier einen beliebigen Parameter beschreiben, der innerhalb der Operation dazu dient, den tatsächlichen Typ des zweiten Übergabearguments zu bestimmen. Das hört sich kompliziert an, deshalb auch sofort ein einfaches Beispiel.
string name = "Andreas";
Task.Factory.StartNew((str) => { /* ... */ }, name);
Ich habe bewusst auf die Angabe jeglicher Operationen verzichtet, damit der Code gut lesbar bleibt. Das zweite Argument ist hier name, mit str wird das erforderliche erste Argument beschrieben.
Jetzt sehen wir uns ein konkretes Beispiel an. In diesem wird ein Integer-Array an die parallele Operation übergeben und dort in die Konsole geschrieben.
int[] arr = { 1, 2, 3, 4, 5 };
Task.Factory.StartNew((liste) =>
{
int[] p = (int[])liste;
foreach(int item in arr)
Console.WriteLine(item);
}, arr);
Listing 15.26 Datenübergabe an »Task«
Hier können Sie erkennen, wie der erste Parameter liste innerhalb der Methode in den tatsächlichen Typ int[] konvertiert wird und uns den nachfolgenden Zugriff auf die Elemente des Arrays ermöglicht.
Auf das Beenden eines Tasks warten
Oftmals hängt die weitere Programmausführung davon ab, ob der Task oder die Tasks die Bearbeitung beendet haben. In solchen Situation bieten sich mit Wait, WaitAny und WaitAll gleich mehrere Möglichkeiten, auf das Ende eines Tasks zu warten.
Warten Sie auf das Ende der Ausführungen eines bestimmten Tasks, bietet sich die Methode Wait an, die auf die Referenz des Task-Objekts aufgerufen wird.
Task task = Task.Factory.StartNew(() =>
{
Console.WriteLine("Lange Operation ....");
Thread.Sleep(5000);
Console.WriteLine("Ich bin fertig ...");
});
task.Wait();
Console.WriteLine("Task hat Arbeit beendet ...");
Listing 15.27 Auf das Beenden eines Tasks warten
Sie können Wait auch mitteilen, dass Sie nur für eine bestimmte Dauer warten möchten, und übergeben dann die entsprechende Zeitspanne in Millisekunden als Argument. Dieser Fall dürfte aber eher selten auftreten.
Sind mehrere Tasks gestartet, können Sie auf das Beenden eines bestimmten oder auch aller Tasks mit der statischen Methode WaitAll warten, der Sie die Referenzen auf die Tasks angeben, von denen der weitere Ablauf des Programms abhängt.
Task task1 = Task.Factory.StartNew(() =>
{
Thread.Sleep(10000);
Console.WriteLine("Task #1: fertig ...");
});
Task task2 = Task.Factory.StartNew(() =>
{
Thread.Sleep(3000);
Console.WriteLine("Task #2: fertig ...");
});
Task task3 = Task.Factory.StartNew(() =>
{
Thread.Sleep(6000);
Console.WriteLine("Task #3: fertig ...");
});
Task.WaitAll(task1, task2, task3);
Listing 15.28 Warten auf das Beenden mehrerer Tasks
Alternativ bietet sich bei mehreren laufenden Aufgaben auch WaitAny an. WaitAny erwartet eine Liste von Aufgaben. Das Programm wird nach WaitAny fortgesetzt, wenn eine der aufgeführten Aufgaben beendet wird.
Rückgabewerte auswerten
Tasks müssen nicht zwangsläufig vom Typ void sein, sie können auch Rückgabewerte haben. Für solche Aufgaben ist die Klasse Task nicht mehr geeignet. Stattdessen greifen wir auf Task<TResult> zurück, die eine Ableitung von Task ist. Der generische Typparameter beschreibt den Datentyp des Rückgabewerts.
Das Ergebnis der parallelen Operation holen wir uns mit der Eigenschaft Result des Task<TResult>-Objekts ab. Der Ergebniswert entspricht dem Typparameter der Aufgabe. Result wartet übrigens so lange, bis das Ergebnis vorliegt. Im folgenden Listing wird eine int-Variable an die Operation übergeben und damit eine einfache mathematische Berechnung angestellt. Um zeigen zu können, dass Result tatsächlich bis zum Eintreffen des Rückgabewerts wartet, wird die Dauer der Operation um drei Sekunden verzögert.
int value = 12;
Task<long> task = Task<long>.Factory.StartNew((wert) =>
{
int var = (int)wert;
Thread.Sleep(3000);
return var * var;
}, value);
Console.WriteLine("Ich warte ...");
Console.WriteLine("Resultat: {0}", task.Result);
Listing 15.29 Auswerten von Rückgabewerten
Abbruch einer parallelen Operation von außen
Innerhalb einer parallelen Operation können Sie mit return die Operation abbrechen. Von außerhalb ist schon etwas mehr Coding erforderlich. Im Mittelpunkt steht dabei ein Objekt vom Typ CancellationTokenSource, mit dem einem Task signalisiert wird, dass er seine parallele Ausführung beenden soll. Dem CancellationTokenSource-Objekt ist ein CancellationToken-Objekt zugeordnet, das für die Weitergabe einer Abbruchbenachrichtigung verantwortlich ist.
Der Abbruch einer parallelen Ausführung wird mit der Methode Cancel des CancellationTokenSource-Objekts eingeleitet. Das Token gibt in seiner Eigenschaft IsCancellationRequested Auskunft darüber, ob ein Abbruch gewünscht ist. Ist der Wert true, kann mit der Auslösung der Exception OperationCanceledException darauf reagiert werden. Sehr einfach geht die Auslösung der Exception durch den Aufruf der Methode ThrowIfCancellationRequested auf das Token-Objekt.
Das hört sich ziemlich komplex an, deshalb wollen wir uns auch dazu ein Beispiel ansehen.
// Beispiel: ..\Kapitel 15\CancellationSample
class Program {
static void Main(string[] args) {
var cts = new CancellationTokenSource();
CancellationToken token = cts.Token;
Task task = Task.Factory.StartNew(() =>
{
Thread.Sleep(1000);
while(true) {
if (token.IsCancellationRequested) {
token.ThrowIfCancellationRequested();
}
}
}, cts.Token);
cts.Cancel();
Console.WriteLine("Abbruch der parallelen Operation ...");
try {
task.Wait();
}
catch (Exception ex) {
Console.WriteLine("In catch: " + ex.InnerException.Message);
}
Console.ReadLine();
}
}
Nach der Definition des Tasks wird die Methode Cancel aufgerufen und in einem try-Block mit Wait auf das Beenden der parallelen Operation gewartet.
Ihre Meinung
Wie hat Ihnen das Openbook gefallen? Wir freuen uns immer über Ihre Rückmeldung. Schreiben Sie uns gerne Ihr Feedback als E-Mail an kommunikation@rheinwerk-verlag.de.