Rheinwerk Computing < openbook >


 
Inhaltsverzeichnis
Materialien
Vorwort
1 Java ist auch eine Sprache
2 Imperative Sprachkonzepte
3 Klassen und Objekte
4 Arrays und ihre Anwendungen
5 Der Umgang mit Zeichenketten
6 Eigene Klassen schreiben
7 Objektorientierte Beziehungsfragen
8 Ausnahmen müssen sein
9 Geschachtelte Typen
10 Besondere Typen der Java SE
11 Generics<T>
12 Lambda-Ausdrücke und funktionale Programmierung
13 Architektur, Design und angewandte Objektorientierung
14 Java Platform Module System
15 Die Klassenbibliothek
16 Einführung in die nebenläufige Programmierung
17 Einführung in Datenstrukturen und Algorithmen
18 Einführung in grafische Oberflächen
19 Einführung in Dateien und Datenströme
20 Einführung ins Datenbankmanagement mit JDBC
21 Bits und Bytes, Mathematisches und Geld
22 Testen mit JUnit
23 Die Werkzeuge des JDK
A Java SE-Module und Paketübersicht
Stichwortverzeichnis


Download:

- Listings, ca. 2,7 MB


Buch bestellen
Ihre Meinung?



Spacer
<< zurück
Java ist auch eine Insel von Christian Ullenboom

Einführung, Ausbildung, Praxis
Buch: Java ist auch eine Insel


Java ist auch eine Insel

Pfeil 16 Einführung in die nebenläufige Programmierung
Pfeil 16.1 Nebenläufigkeit und Parallelität
Pfeil 16.1.1 Multitasking, Prozesse und Threads
Pfeil 16.1.2 Threads und Prozesse
Pfeil 16.1.3 Wie nebenläufige Programme die Geschwindigkeit steigern können
Pfeil 16.1.4 Was Java für Nebenläufigkeit alles bietet
Pfeil 16.2 Existierende Threads und neue Threads erzeugen
Pfeil 16.2.1 Main-Thread
Pfeil 16.2.2 Wer bin ich?
Pfeil 16.2.3 Die Schnittstelle Runnable implementieren
Pfeil 16.2.4 Thread mit Runnable starten
Pfeil 16.2.5 Runnable parametrisieren
Pfeil 16.2.6 Die Klasse Thread erweitern
Pfeil 16.3 Thread-Eigenschaften und Zustände
Pfeil 16.3.1 Der Name eines Threads
Pfeil 16.3.2 Die Zustände eines Threads *
Pfeil 16.3.3 Schläfer gesucht
Pfeil 16.3.4 Wann Threads fertig sind
Pfeil 16.3.5 Einen Thread höflich mit Interrupt beenden
Pfeil 16.3.6 Unbehandelte Ausnahmen, Thread-Ende und UncaughtExceptionHandler
Pfeil 16.3.7 Der stop() von außen und die Rettung mit ThreadDeath *
Pfeil 16.3.8 Ein Rendezvous mit join(…) *
Pfeil 16.3.9 Arbeit niederlegen und wieder aufnehmen *
Pfeil 16.3.10 Priorität *
Pfeil 16.4 Der Ausführer (Executor) kommt
Pfeil 16.4.1 Die Schnittstelle Executor
Pfeil 16.4.2 Glücklich in der Gruppe – die Thread-Pools
Pfeil 16.4.3 Threads mit Rückgabe über Callable
Pfeil 16.4.4 Erinnerungen an die Zukunft – die Future-Rückgabe
Pfeil 16.4.5 Mehrere Callable-Objekte abarbeiten
Pfeil 16.4.6 CompletionService und ExecutorCompletionService
Pfeil 16.4.7 ScheduledExecutorService: wiederholende Aufgaben und Zeitsteuerungen
Pfeil 16.4.8 Asynchrones Programmieren mit CompletableFuture (CompletionStage)
Pfeil 16.5 Zum Weiterlesen
 

Zum Seitenanfang

16.4    Der Ausführer (Executor) kommt Zur vorigen ÜberschriftZur nächsten Überschrift

Zur nebenläufigen Ausführung eines Runnable ist immer ein Thread notwendig. Obwohl die nebenläufige Abarbeitung von Programmcode ohne Threads nicht möglich ist, sind doch beide in der bisherigen Programmierung stark verbunden, und es wäre gut, wenn das Runnable von dem tatsächlich abarbeitenden Thread etwas getrennt wäre. Das hat mehrere Gründe:

  • Schon beim Erzeugen eines Thread-Objekts muss das Runnable-Objekt im Thread-Konstruktor übergeben werden. Es ist nicht möglich, das Thread-Objekt aufzubauen, dann später über einen Setter das Runnable-Objekt zuzuweisen und anschließend den Thread mit start() zu starten.

  • Wird start() auf dem Thread-Objekt zweimal aufgerufen, so führt der zweite Aufruf zu einer Ausnahme. Ein erzeugter Thread kann also ein Runnable durch zweimaliges Aufrufen von start() nicht gleich zweimal abarbeiten. Für eine erneute Abarbeitung eines Runnable ist also mit unseren bisherigen Mitteln immer ein neues Thread-Objekt nötig. Mit anderen Worten: Ein existierender Thread kann nicht einfach ein neues Runnable abarbeiten.

  • Der Thread beginnt mit der Abarbeitung des Runnable-Programmcodes sofort nach dem Aufruf von start(). Die Implementierung des Runnable selbst müsste geändert werden, wenn der Programmcode nicht sofort, sondern später (zur nächsten Tagesschau) oder wiederholt (immer an Weihnachten) ausgeführt werden soll.

Wünschenswert ist eine Abstraktion, die das Ausführen des Runnable-Programmcodes von der technischen Realisierung (etwa den Threads) trennt.

 

Zum Seitenanfang

16.4.1    Die Schnittstelle Executor Zur vorigen ÜberschriftZur nächsten Überschrift

Anstatt das Runnable direkt an einen Thread und somit an seinen Ausführer zu binden, gibt es eine Abstraktion für alle »Abarbeiter«. Die Schnittstelle Executor schreibt eine Methode vor:

interface java.util.concurrent.Executor
  • void execute(Runnable command)

    Wird später von Klassen implementiert, die ein Runnable abarbeiten können.

Jeder, der nun Befehle über Runnable abarbeitet, ist Executor.

Konkrete Executoren

Von dieser Schnittstelle gibt es bisher zwei wichtige Implementierungen:

  • ThreadPoolExecutor: Die Klasse baut eine Sammlung von Threads auf, den Thread-Pool. Ausführungsanfragen werden von den freien Threads übernommen.

  • ScheduledThreadPoolExecutor. Eine Erweiterung von ThreadPoolExecutor um die Fähigkeit, zu bestimmten Zeiten oder mit bestimmten Wiederholungen Befehle abzuarbeiten.

Die beiden Klassen haben nicht ganz so triviale Konstruktoren, und eine Utility-Klasse vereinfacht den Aufbau dieser speziellen Executor-Objekte.

class java.util.concurrent.Executors
  • static ExecutorService newCachedThreadPool()

    Liefert einen Thread-Pool mit wachsender Größe.

  • static ExecutorService newFixedThreadPool(int nThreads)

    Liefert einen Thread-Pool mit maximal nThreads. Mehr dazu folgt im nächsten Abschnitt.

  • static ScheduledExecutorService newSingleThreadScheduledExecutor()

  • static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

    Geben spezielle Executor-Objekte zurück, um Wiederholungen festzulegen. Mehr dazu folgt in Abschnitt 16.4.7, »ScheduledExecutorService: wiederholende Aufgaben und Zeitsteuerungen«.

Es gibt über 20 Methoden in Executors. Diese Aufzählung hier zeigt nur die, die für uns in den nächsten Abschnitten relevant sind.

ExecutorService ist eine Schnittstelle, die Executor erweitert (siehe Anzahl der Teil-Strings einer Zeichenkette *). Unter anderem sind hier Operationen zu finden, die die Ausführer herunterfahren. Im Falle von Thread-Pools ist das nützlich, da die Threads ja sonst nicht beendet würden, weil sie auf neue Aufgaben warten.

[»]  Hinweis

Natürlich lassen sich auch eigene »Ausführer« schreiben, zum Beispiel einer, der ein Runnable von dem Swing-GUI-Thread ausführen lässt:

Executor executor = runnable -> SwingUtilities.invokeLater( runnable );

Beziehungsweise kürzer:

Executor executor = SwingUtilities::invokeLater;
Die Schnittstelle »ExecutorService«, die »Executor« erweitert

Abbildung 16.6    Die Schnittstelle »ExecutorService«, die »Executor« erweitert

 

Zum Seitenanfang

16.4.2    Glücklich in der Gruppe – die Thread-Pools Zur vorigen ÜberschriftZur nächsten Überschrift

Es kostet Zeit, Threads aufzubauen, vom Betriebssystem verwalten zu lassen und dann wieder abzubauen und aus den internen Tabellen herauszunehmen. Wenn es bei einem Server zum Beispiel darum geht, eine Anfrage direkt ohne Verzögerung zu beantworten, kann die Zeit zum Aufbau eines Threads störend sein. Auch sind mit einem Thread Ressourcen wie Stack-Speicher verbunden, die vom Betriebssystem reserviert werden müssen.

Eine Optimierung besteht darin, Threads aufzubauen, und im Pool lebendig zu halten. Es gibt dann drei Schritte:

  1. Gibt es eine Anfrage zur Abarbeitung eines Runnable, wird ein freier Thread aus dem Pool genommen.

  2. Der Thread arbeitet die run()-Methode des Runnable ab.

  3. Nach der Abarbeitung legt sich der Thread wieder in den Pool zurück.

Thread-Pools haben den weiteren Vorteil, dass sie die Auslastung und parallele Verarbeitung beschränken können. Verwaltet ein Thread-Pool nur eine feste Anzahl von Threads und sind alle Threads in Arbeit, kann das System neue Anfragen ablehnen oder zum Warten zwingen. Das beugt effektiv Denial-of-Service-Angriffen vor. Wenn zum Beispiel bei einem Webserver jede eingehende Verbindung einen Thread aufmacht, kann ein Server leicht mit Anfragen bombardiert werden.

Executors.newCachedThreadPool(…)

Eine wichtige statische Methode der Klasse Executors ist newCachedThreadPool(…). Dahinter verbirgt sich ein ThreadPoolExecutor-Konstruktor-Aufruf. Das Ergebnis ist ein ExecutorService-Objekt, also eine Implementierung der Schnittstelle Executor mit der Methode execute(Runnable):

Listing 16.9    src/main/java/com/tutego/insel/thread/concurrent/ThreadPoolDemo.java, main(), Teil 1

Runnable r1 = () -> {

System.out.println( "1.1 " + Thread.currentThread().getName() );

System.out.println( "1.2 " + Thread.currentThread().getName() );

};



Runnable r2 = () -> {

System.out.println( "2.1 " + Thread.currentThread().getName() );

System.out.println( "2.2 " + Thread.currentThread().getName() );

};

Jetzt lässt sich der Thread-Pool als ExecutorService beziehen, und die beiden Befehlsobjekte lassen sich als Runnable über execute(…) ausführen:

Listing 16.10    src/main/java/com/tutego/insel/thread/concurrent/ThreadPoolDemo.java, main(), Teil 2

ExecutorService executor = Executors.newCachedThreadPool();



executor.execute( r1 );

executor.execute( r2 );



Thread.sleep( 500 );



executor.execute( r1 );

executor.execute( r2 );



executor.shutdown();

Die Ausgabe zeigt sehr schön die Wiederverwendung der Threads:

1.1 pool-1-thread-1

2.1 pool-1-thread-2

2.2 pool-1-thread-2

1.2 pool-1-thread-1

2.1 pool-1-thread-2

2.2 pool-1-thread-2

1.1 pool-1-thread-1

1.2 pool-1-thread-1

Am Namen des Threads ist abzulesen, dass hier zwei Threads von einem Thread-Pool pool-1 verwendet werden: thread-1 und thread-2. Nach dem Ausführen der beiden ersten Aufträge r1 und r2 und der kleinen Warterei sind die Threads pool-1-thread-1 und pool-1-thread-2 wieder frei, sodass r1 und r2 wieder von diesen beiden Threads abgearbeitet werden. Interessant sind die folgenden drei Operationen zur Steuerung des Pool-Endes:

interface java.util.concurrent.ExecutorService

extends Executor
  • void shutdown()

    Fährt den Thread-Pool herunter. Laufende Threads werden nicht abgebrochen, aber neue Anfragen werden nicht angenommen.

  • boolean isShutdown()

    Wurde der Executor schon heruntergefahren?

  • List<Runnable> shutdownNow()

    Gerade ausführende Runnables werden zum Stoppen angeregt. Die Rückgabe ist eine Liste der zu beendenden Runnables.

 

Zum Seitenanfang

16.4.3    Threads mit Rückgabe über Callable Zur vorigen ÜberschriftZur nächsten Überschrift

Der nebenläufige Thread kann nur über Umwege Ergebnisse zurückgeben. In einer eigenen Klasse, die Runnable erweitert, lässt sich im Konstruktor zum Beispiel eine Datenstruktur übergeben, in die der Thread ein berechnetes Ergebnis hineinlegt. Die Datenstruktur kann dann vom Aufrufer auf Änderungen hin untersucht werden.

Die Java-Bibliothek bietet noch einen anderen Weg, denn während run() in Runnable als Rückgabe void hat, übermittelt call() einer anderen Schnittstelle Callable (siehe Abbildung 16.7) eine Rückgabe und kann auch eine Ausnahme auslösen. Zum Vergleich:

interface java.lang.

Runnable

  • void run()

    Diese Methode enthält den nebenläufig auszuführenden Programmcode.

interface java.util.concurrent.

Callable<V>

  • V call() throws Exceptio

    Diese Methode enthält den nebenläufig auszuführenden Programmcode und liefert eine Rückgabe vom Typ V.

Tabelle 16.3    Methoden in »Runnable« und »Callable«

Die einfache Schnittstelle »Callable« mit einer Operation

Abbildung 16.7    Die einfache Schnittstelle »Callable« mit einer Operation

Beispiel: Felder sortieren über Callable

Wir wollen ein Beispiel implementieren, das ein Feld sortiert. Das Sortieren soll ein Callable im Hintergrund übernehmen. Ist die Operation beendet, soll der Verweis auf das sortierte Feld zurückgegeben werden. Das Sortieren erledigt wie üblich Arrays.sort(…):

Listing 16.11    src/main/java/com/tutego/insel/thread/concurrent/SorterCallable.java, SorterCallable

class SorterCallable implements Callable<byte[]> {



private final byte[] b;



SorterCallable( byte[] b ) {

this.b = b;

}



@Override public byte[] call() {

Arrays.sort( b );

return b;

}

}

Natürlich bringt es wenig, das Callable-Objekt aufzubauen und selbst call() aufzurufen, denn ein Thread soll die Aufgabe im Hintergrund erledigen. Dazu ist jedoch nicht die Klasse Thread selbst zu verwenden, sondern ein ExecutorService, den wir etwa über Executors. newCachedThreadPool() bekommen:

Listing 16.12    src/main/java/com/tutego/insel/thread/concurrent/CallableGetDemo.java, main() Ausschnitt

byte[] b = new byte[ 4000000 ];

new Random().nextBytes( b );

Callable<byte[]> c = new SorterCallable( b );

ExecutorService executor = Executors.newCachedThreadPool();

Future<byte[]> result = executor.submit( c );

Der ExecutorService bietet eine submit(Callable)-Methode, die unser Callable annimmt und einen Thread für die Abarbeitung aussucht. Die Rückgabe ist ein mysteriöses Future

ExecutorService führt aus: Callable und ein Runnable mit Zukunft

Aus Gründen der Symmetrie gibt es neben submit(Callable) noch zwei submit(…)-Methoden, die ebenfalls ein Runnable annehmen. Zusammen ergeben sich:

interface java.util.concurrent.ExecutorService

extends Executor
  • <T> Future<T> submit(Callable<T> task)

    Der ExecutorService soll die Aufgabe abarbeiten und Zugriff auf das Ergebnis über die Rückgabe geben.

  • Future<?> submit(Runnable task)

    Der ExecutorService arbeitet das Runnable ab und ermöglicht es, über das Future-Objekt zu erfragen, ob die Ausgabe schon abgearbeitet wurde oder nicht. get() liefert am Ende null.

  • <T> Future<T> submit(Runnable task, T result)

    Wie submit(task), nur: Die get(…)-Anfrage über Future liefert result.

Um ein Runnable in ein Callable umzuwandeln, gibt es noch einige Hilfsmethoden in der Klasse Executors. Dazu zählen die statische Methode callable(Runnable task), die ein Callable<Object> liefert, und die Methode callable(Runnable task, T result), die ein Callable<T> zurückgibt.

 

Zum Seitenanfang

16.4.4    Erinnerungen an die Zukunft – die Future-Rückgabe Zur vorigen ÜberschriftZur nächsten Überschrift

Weil das Ergebnis asynchron ankommt, liefert submit(Callable) ein Future-Objekt zurück, über das wir herausfinden können, ob das Ergebnis schon da ist oder ob wir noch warten müssen. Eigentlich ist nach einem submit(…) die beste Zeit, noch andere nebenläufige Aufgaben anzustoßen, um dann später mit get(…) das Ergebnis einzusammeln. Das Programmiermuster ist immer gleich: Erst Arbeit an den ExecutorService übergeben, dann etwas anderes machen und später zurückkommen. Da wir in unserem Beispiel jedoch in der Zwischenzeit nichts anderes zu tun haben, als ein Bytefeld zu sortieren, setzen wir das Callable ab und warten mit get() sofort auf das sortierte Feld:

Listing 16.13    src/main/java/com/tutego/insel/thread/concurrent/CallableGetDemo.java, main()

byte[] b = new byte[ 4000000 ];

new Random().nextBytes( b );

Callable<byte[]> c = new SorterCallable( b );

ExecutorService executor = Executors.newCachedThreadPool();

Future<byte[]> result = executor.submit( c );

// Jetzt werden erst einmal andere Dinge gemacht, und später …

try {

byte[] bs = result.get();

System.out.printf( "%d, %d, %d%n",

bs[0], bs[1], bs[bs.length - 1] ); // -128, -128, 127

}

catch ( InterruptedException | ExecutionException e ) {

e.printStackTrace();

}

Da das Feld sortiert ist und der Wertebereich eines Bytes mit –128 bis +127 sehr klein ist, ist vermutlich bei 4.000.000 Werten das kleinste Element der Zufallszahlen –128 und das größte 127.

Die Operationen der Schnittstelle Future im Einzelnen sind:

interface java.util.concurrent.Future<V>
  • V get() throws InterruptedException, ExecutionException

    Wartet auf das Ergebnis und gibt es dann zurück. Die Methode blockiert so lange, bis das Ergebnis da ist. Es kann zu Ausnahmen kommen: CancellationException, wenn die Berechnung abgebrochen wurde, ExecutionException, wenn die Berechnung eine Ausnahme auslöste, InterruptedException, wenn der aktuelle Thread beim Warten unterbrochen wurde.

  • V get(long timeout, TimeUnit unit)

    throws InterruptedException, ExecutionException, TimeoutException

    Wartet eine gegebene Zeit auf das Ergebnis und gibt es dann zurück. Kommt das Ergebnis in der vorgegebenen Dauer nicht, gibt es eine TimeoutException.

  • boolean isDone()

    Wurde die Arbeit beendet oder sogar abgebrochen?

  • boolean cancel(boolean mayInterruptIfRunning)

    Bricht die Arbeit ab.

  • boolean isCancelled()

    Wurde die Arbeit vor dem Ende abgebrochen?

Warten mit Zeitbeschränkung

Nicht immer ist das potenziell unendliche Blockieren erwünscht. Für diesen Fall ermöglicht die überladene Methode von get(…) eine Parametrisierung mit einer Wartezeit und Zeiteinheit:

Listing 16.14    src/main/java/com/tutego/insel/thread/concurrent/CallableGetTimeUnitDemo.java, Ausschnitt

byte[] bs = result.get( 2, TimeUnit.SECONDS );

Ist das Ergebnis nicht innerhalb von 2 Sekunden verfügbar, löst die Methode eine TimeoutException aus, die so aussehen wird:

java.util.concurrent.TimeoutException

at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)

at java.util.concurrent.FutureTask.get(FutureTask.java:91)

at com.tutego.insel.thread.concurrent.CallableDemo.main(CallableDemo.java:27)
[»]  Hinweis

Wenn zwischen dem Absenden der Aufgabe und dem Abholen des Ergebnisses genügend Zeit vergeht, sodass es beim Abholen zu keiner blockierenden Wartesituation kommt, ist das perfekt. Ungünstig ist es, wenn kurz nach dem submit(…) schon das get(…) kommt, aber das Ergebnis auf sich warten lässt – dann wäre nicht viel über das Future gewonnen. Eine interessante Lösung bietet eine Implementierung von Future, das CompletableFuture, das Aufgaben in eine Abfolge setzt. Die Idee ist einfach: Damit es zu keiner Wartezeit kommt, wird das Ergebnis, wenn es von einem Schritt berechnet wurde, zum nächsten Verarbeitungsschritt direkt weitergeleitet.

Callable oder Runnable mit FutureTask ummanteln

Wenn wir genau auf den Stack-Aufruf von eben achten, fällt der Typ java.util.concurrent.FutureTask ins Auge. Die Klasse implementiert Future, Runnable und RunnableFuture und wird intern von der Java-Bibliothek verwendet, wenn wir mit submit(…) etwas beim ExecutorService absetzen. Auch wir können den Typ direkt als Wrapper um ein Callable oder Runnable nutzen, denn es gibt praktische Callback-Methoden, die wir überschreiben können, etwa done(), wenn eine Berechnung fertig ist.

Dazu ein Beispiel: Ein Callable liefert den Namen des Benutzers. Ein FutureTask legt sich um dieses Callable, bekommt mit, wann das Callable fertig ist, modifiziert dann den Benutzernamen und gibt außerdem eine Meldung aus.

Listing 16.15    src/main/java/com/tutego/insel/thread/concurrent/WrappedUsername.java, Ausschnitt

Callable<String> username = () -> System.getProperty( "user.name" );

FutureTask<String> wrappedUsername = new FutureTask<>( username ) {

@Override protected void done() {

try {

System.out.printf( "done: isDone=%s, isCancelled=%s%n", isDone(), isCancelled() );

System.out.println( "done: get=" + get() );

}

catch ( InterruptedException | ExecutionException e ) { /* Ignore */ }

}

@Override protected void set( String v ) {

System.out.println( "set: " + v );

super.set( v.toUpperCase() );

}

};

ExecutorService scheduler = Executors.newCachedThreadPool();

scheduler.submit( wrappedUsername );

System.out.println( "main: " + wrappedUsername.get() );

scheduler.shutdown();

Wichtig in der Nutzung ist, nicht die Rückgabe vom submit(…) auszuwerten, was wir normalerweise machen, sondern das übergebene FutureTask zu erfragen.

Die Ausgaben vom Programm sind oft ein wenig durcheinander:

set: Christian

done: isDone=true, isCancelled=false

done: get=CHRISTIAN

main: CHRISTIAN

Die Reihenfolge in den Aufrufen ist immer so: Der FutureTask stellt die Fertigstellung des Callable fest und ruft set(…) auf. Anschließend wird done() ausgeführt.

 

Zum Seitenanfang

16.4.5    Mehrere Callable-Objekte abarbeiten Zur vorigen ÜberschriftZur nächsten Überschrift

Die Methode submit(Callable) des ExecutorService nimmt genau ein Callable an und führt es aus:

  • <T> Future<T> submit(Callable<T> task)

Muss eine Anwendung mehrere Callable-Objekte abarbeiten, kann es natürlich mehrere Aufrufe von submit(Callable) geben. Doch ein ExecutorService kann von sich aus mehrere Callable-Objekte abarbeiten. Dabei gibt es zwei alternative Varianten:

  • Alle Callable-Objekte einer Liste werden ausgeführt, und das Ergebnis ist eine Liste von Future-Objekten.

  • Alle Callable-Objekte einer Liste werden ausgeführt, doch das erste, das mit der Arbeit fertig wird, ergibt das Resultat.

Das ergibt zwei Methoden, und da sie zusätzlich mit einer Zeitbeschränkung ausgestattet werden, sind es vier:

interface java.util.concurrent.ExecutorService

extends Executor
  • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)

    throws InterruptedException

    Führt alle Aufgaben aus. Liefert eine Liste von Future-Objekten, die die Ergebnisse repräsentieren.

  • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,

    long timeout, TimeUnit unit) throws InterruptedException

    Führt alle Aufgaben aus und wird die Ergebnisse als Liste von Future-Objekten liefern, solange die Zeit timeout in der gegebenen Zeiteinheit nicht überschritten wird.

  • <T> T invokeAny(Collection<? extends Callable<T>> tasks)

    throws InterruptedException, ExecutionException

    Führt alle Aufgaben aus, aber liefert das Ergebnis eines Ausführers, der als Erster fertig ist. Ein get(…) wird also nie warten müssen.

  • <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)

    throws InterruptedException, ExecutionException, TimeoutException

    Führt alle Aufgaben aus, gilt aber nur für eine beschränkte Zeit. Das erste Ergebnis eines Callable-Objekts, das in der Zeit fertig wird, gibt invokeAny(…) zurück.

[+]  Tipp

Die get(long timeout, TimeUnit unit)-Methode von Future sendet dem Thread kein Interrupt, wenn er es nicht schafft, in der Zeit ein Ergebnis zu produzieren. Das ist der Vorteil der genannten xxxAny(…, TimeUnit)-Methoden, die ein Interrupt auslösen.

 

Zum Seitenanfang

16.4.6    CompletionService und ExecutorCompletionService Zur vorigen ÜberschriftZur nächsten Überschrift

Die invokeAll(…)-Methoden aus dem ExecutorService sind praktisch, wenn es darum geht, mehrere Aufgaben nebenläufig abzusenden und später die Ergebnisse einzusammeln. Allerdings ist die Rückgabe vom Typ List<Future<T>> und wir werden nicht informiert, wenn ein Ergebnis vorliegt. Wir können zwar die Liste immer wieder ablaufen und jedes Future-Objekt mit isDone() fragen, ob es fertig ist, aber das ist keine ideale Lösung.

Mit java.util.concurrent.CompletionService gibt es eine weitere Java-Schnittstelle (die keinen Basistyp erweitert), mit der wir ein Callable oder Runnable arbeiten lassen können und später nacheinander die Ergebnisse einsammeln können, die fertig sind. Die Java-Bibliothek bringt mit ExecutorCompletionService eine Implementierung der Schnittstelle mit, die intern die fertigen Ergebnisse in einer Queue sammelt, und wir können die Queue abfragen. Schauen wir uns das in einem Beispiel an.

Listing 16.16    src/main/java/com/tutego/insel/thread/concurrent/ExecutorCompletionServiceDemo.java, Ausschnitt

ExecutorService executor = Executors.newCachedThreadPool();

CompletionService<Integer> completionService =

new ExecutorCompletionService<>( executor );

List.of( 4, 3, 2, 1 ).forEach( duration -> completionService.submit( () -> {

TimeUnit.SECONDS.sleep( duration );

return duration;

} ) );



for ( int i = 0; i < 4; i++ ) {

try {

System.out.println( completionService.take().get() );

}

catch ( InterruptedException | ExecutionException e ) {

e.printStackTrace();

}

}



executor.shutdown();

Der Typ ExecutorCompletionService erwartet im Konstruktor einen Executor, der den Code ausführen soll; wir setzen einen Thread-Pool ein. CompletionService hat zwei submit(…)-Methoden:

  • Future<V> submit(Runnable task, V result)

  • Future<V> submit(Callable<V> task)

Abgesendet werden vier Callable-Exemplare, die 4, 3, 2, 1 Sekunden warten und ihre Wartezeit am Ende zurückgeben. Natürlich wird als Erstes das Callable mit der Rückgabe 1 fertig, dann 2 usw.

Für die Rückgaben interessiert sich unser Programm nicht, denn es nutzt die take()-Methode. Insgesamt hat CompletionService drei Entnahme-Methoden:

  • Future<V> take()

    Liefert das Ergebnis von der ersten abgeschlossenen Aufgabe und entfernt es von der internen Queue. Liegt kein Ergebnis an, wartet die Methode.

  • Future<V> poll()

    Liefert das Ergebnis von der ersten abgeschlossenen Aufgabe und entfernt es von der internen Queue. Liegt kein Ergebnis ist, wartet poll() nicht, sondern liefert null.

  • Future<V> poll (long timeout, TimeUnit unit)

    Wartet take() auf ein Ergebnis, doch kommt es nach dem Ablauf von timeout nicht, liefert die Methode wie poll() als Rückgabe null.

Was der Schnittstelle fehlt, ist eine Methode, die die verbleibende Anzahl liefert. Wir müssen in unserem Code daher einen Zähler als extra Variable einführen.

 

Zum Seitenanfang

16.4.7    ScheduledExecutorService: wiederholende Aufgaben und Zeitsteuerungen Zur vorigen ÜberschriftZur nächsten Überschrift

Die Klasse ScheduledThreadPoolExecutor ist eine weitere Klasse neben ThreadPoolExecutor, die die Schnittstellen Executor und ExecutorService implementiert. Die wichtige Schnittstelle, die diese Klasse außerdem implementiert, ist aber ScheduledExecutorService, ein direkter Untertyp von ExecutorService. Hier sind mehrere scheduleXXX(…)-Operationen deklariert, die ein Runnable oder Callable zu bestimmten Zeiten und Wiederholungen ausführen. (Zwar gibt es mit dem java.util.Timer etwas Ähnliches, doch der ScheduledThreadPoolExecutor nutzt Threads aus dem Pool.)

Executors bietet mehrere statische Methoden, die uns fertig konfigurierte ScheduledExecutorService-Objekte liefern, zum Beispiel newScheduledThreadPool(int corePoolSize).

Das folgende Beispiel führt nach einer Startzeit von 1 Sekunde alle 2 Sekunden eine Ausgabe aus:

Listing 16.17    src/main/java/com/tutego/insel/thread/concurrent/ScheduledExecutorServiceDemo.java. main()

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool( 1 );



scheduler.scheduleAtFixedRate(

() -> System.out.println( "Tata" ),

1 /* initial delay */,

2 /* period */,

TimeUnit.SECONDS );

Nach 1 Sekunde Startverzögerung bekommen wir jede zweite Sekunde ein »Tata«.

 

Zum Seitenanfang

16.4.8    Asynchrones Programmieren mit CompletableFuture (CompletionStage) Zur vorigen ÜberschriftZur nächsten Überschrift

Das Schöne an Future-Objekten ist die Abarbeitung im Hintergrund und die Möglichkeit, später abzufragen, ob das Ergebnis schon da ist. Allerdings fehlt der Future-Schnittstelle eine Methode, um automatisch nach der Fertigstellung einen Folgeauftrag abzuarbeiten. Dafür bietet die Java-Bibliothek eine spezielle Unterklasse CompletableFuture. Die Klasse implementiert die Schnittstelle CompletionStage, die vermutlich die größte Anzahl von Operationen in der gesamten Java SE hat. Der Typname drückt aus, dass es um die Fertigstellung (engl. completion) von Abschnitten (engl. stage) geht.

Ein Beispiel für einen trinkfesten mutigen Piraten:

Listing 16.18    src/main/java/com/tutego/insel/thread/concurrent/Pirat.java

package com.tutego.insel.thread.concurrent;



import java.time.LocalTime;

import java.util.concurrent.CompletableFuture;

import java.util.concurrent.TimeUnit;

import java.util.logging.Logger;



class Pirate {



public static void main( String[] arg ) throws Throwable {



System.setProperty( "java.util.logging.SimpleFormatter.format",

"-> %2$s: %5$s %6$s%n");



String result =

CompletableFuture.supplyAsync( Pirate::newName )

.thenApply( Pirate::swear )

.thenCombine( drinkRum(), Pirate::combinePiratAndDrinks )

.thenCombine( drinkRum(), Pirate::combinePiratAndDrinks )

.exceptionally( e -> "Pirat Guybrush hat den Todesfluch '" +

e.getCause().getMessage() + "' nicht überlebt" )

.get();

System.out.println( result );

// Pirat Guybrush flucht und trinkt dann 10 Flaschen Rum und trinkt dann

// 11 Flaschen Rum

// Pirat Guybrush hat den Todesfluch 'Avada Kedavra' nicht überlebt

}



static String newName() {

Logger.getGlobal().info( Thread.currentThread().getName() );

return "Pirat Guybrush";

}



static String swear( String pirate ) {

Logger.getGlobal().info( Thread.currentThread().getName() );

if ( Math.random() < 0.4 )

throw new IllegalStateException( "Avada Kedavra" );

return pirate + " flucht";

}



static CompletableFuture<Integer> drinkRum() throws InterruptedException {

Logger.getGlobal().info( Thread.currentThread().getName() );

TimeUnit.SECONDS.sleep( 1 );

return CompletableFuture.supplyAsync( () -> LocalTime.now().getSecond() );

}



static String combinePiratAndDrinks( String pirat, int bottlesOfRum ) {

Logger.getGlobal().info( Thread.currentThread().getName() );

return pirat + " und trinkt dann " + bottlesOfRum + " Flaschen Rum";

}

}

Die Ausgabe ist ohne Ausnahme:

-> com.tutego.insel.thread.concurrent.Pirate drinkRum: main

-> com.tutego.insel.thread.concurrent.Pirate newName: ForkJoinPool.commonPool-worker-1

-> com.tutego.insel.thread.concurrent.Pirate swear: ForkJoinPool.commonPool-worker-1

-> com.tutego.insel.thread.concurrent.Pirate combinePiratAndDrinks: main

-> com.tutego.insel.thread.concurrent.Pirate drinkRum: main

-> com.tutego.insel.thread.concurrent.Pirate combinePiratAndDrinks: main

Pirat Guybrush flucht und trinkt dann 55 Flaschen Rum und trinkt dann 56 Flaschen Rum

Zum Programm:

  1. supplyAsync(…): Zunächst muss die Kette von Abschnitten aufgebaut werden. Das kann entweder mit dem Standard-Konstruktor geschehen oder mit statischen Methoden. In unserem Fall nutzen wir supplyAsync(Supplier<U> supplier). Die Methode nimmt sich einen freien Thread aus dem ForkJoinPool.commonPool() und lässt den Thread den supplier abarbeiten. Das Ergebnis ist über die Rückgabe, ein CompletableFuture, abrufbar.

  2. thenApply(…): Als Nächstes wenden wir thenApply(Function<? super T,? extends U> fn) an, das vergleichbar ist mit einer map(…)-Operation eines Streams.

  3. thenCombine(…): Interessant wird es bei thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn). Diese Methode verbindet das Ergebnis der eigenen CompletionStage über eine Bi-Funktion (combinePiratAndDrinks(…)) mit einer anderen CompletionStage (geliefert von drinkRum(…)), die wir in unserem Fall auch wieder mit supplyAsync(…) aufbauen. So kombinieren wir zwei unabhängige CompletionStages miteinander und synchronisieren das Ergebnis. Wir können gut an der Ausgabe ablesen, dass drinkRum ganz am Anfang schon ausgeführt wird, und zwar vom Thread[main,5,main], nicht vom ForkJoinPool, weil es unabhängig von den anderen läuft.

  4. exceptionally(…): Es gibt mehrere Methoden, die mit möglichen Ausnahmen während der Verarbeitungskette umgehen können. Eine davon ist exceptionally(Function<Throwable,? extends T> fn), die eine Exception auffängt und einen Standardwert zurückliefert. Unsere Behandlung nimmt aus der intern weitergeleiteten java.util.concurrent.CompletionException den Grund mit getCause() heraus, und so bekommen wir die IllegalStateException, die eigentlich ausgelöst wurde.

Die Ausgabe im Fehlerfall zeigt die fehlende Abarbeitung von combinePiratAndDrinks(…):

-> com.tutego.insel.thread.concurrent.Pirate drinkRum: main

-> com.tutego.insel.thread.concurrent.Pirate newName: ForkJoinPool.commonPool-worker-1

-> com.tutego.insel.thread.concurrent.Pirate swear: ForkJoinPool.commonPool-worker-1

-> com.tutego.insel.thread.concurrent.Pirate drinkRum: main

Pirat Guybrush hat den Todesfluch 'Avada Kedavra' nicht überlebt

 


Ihre Meinung?

Wie hat Ihnen das Openbook gefallen? Wir freuen uns immer über Ihre Rückmeldung. Schreiben Sie uns gerne Ihr Feedback als E-Mail an kommunikation@rheinwerk-verlag.de

<< zurück
 Zum Rheinwerk-Shop
Zum Rheinwerk-Shop: Java ist auch eine Insel Java ist auch eine Insel

Jetzt Buch bestellen


 Buchempfehlungen
Zum Rheinwerk-Shop: Captain CiaoCiao erobert Java

Captain CiaoCiao erobert Java




Zum Rheinwerk-Shop: Java SE 9 Standard-Bibliothek

Java SE 9 Standard-Bibliothek




Zum Rheinwerk-Shop: Algorithmen in Java

Algorithmen in Java




Zum Rheinwerk-Shop: Objektorientierte Programmierung

Objektorientierte Programmierung




 Lieferung
Versandkostenfrei bestellen in Deutschland, Österreich und in die Schweiz

InfoInfo



 

 


Copyright © Rheinwerk Verlag GmbH 2021

Für Ihren privaten Gebrauch dürfen Sie die Online-Version natürlich ausdrucken. Ansonsten unterliegt das Openbook denselben Bestimmungen, wie die gebundene Ausgabe: Das Werk einschließlich aller seiner Teile ist urheberrechtlich geschützt.

Alle Rechte vorbehalten einschließlich der Vervielfältigung, Übersetzung, Mikroverfilmung sowie Einspeicherung und Verarbeitung in elektronischen Systemen.

 

[Rheinwerk Computing]



Rheinwerk Verlag GmbH, Rheinwerkallee 4, 53227 Bonn, Tel.: 0228.42150.0, Fax 0228.42150.77, service@rheinwerk-verlag.de



Cookie-Einstellungen ändern