Dies ist Teil 3 einer Blog-Serie über reaktive Architekturen mit RxJava. Einführung und Übersicht finden sich hier. Die Grundlagen Reaktiver Systeme wurden in Teil 1: Grundlagen reaktiver Systeme behandelt, während Teil 2 Programmiermodelle in Java zur asynchronen Programmierung betrachtet.
Reaktive Programmierung mit RxJava
RxJava – Reactive Extensions for the JVM – ist eine Bibliothek zur Erzeugung, Komposition, Transformation und Konsumierung von Event-Folgen. Neben RxJava gibt es Implementierungen der Reactive Extensions für eine Vielzahl von anderen Sprachen. ReactiveX (www.reactivex.io) bildet hierbei die übergreifende Plattform und stellt eine hervorragende Dokumentation bereit.
RxJava basiert auf dem Observer-Pattern (siehe Abbildung 1): Ein Observer registriert sich bei einem Observable mit den Callbacks OnNext, OnError und OnComplete, um eine Sequenz von Daten zu erhalten. Das Observable ruft für jeden neue Datensatz T OnNext(T), OnError(Throwable) bei einer Exception und OnComplete() nach Übermittlung aller Daten auf.
Abbildung 1: Anwendung des Observer-Patterns in RxJava
Beispiel 1 illustriert die Verwendung von Observables: Im ersten Schritt wird die Funktion customerService.findByName aufgerufen, die ein Observable-Objekt zurückliefert. Dabei wird die eigentliche Logik der Kundensuche noch nicht ausführt. Erst nachdem im zweiten Schritt die Funktionen zur Datenannahme (onNext), zur Fehlerbehandlung (onError) und zur Benachrichtigung über den Abschluss der Sequenz (onCompleted) registriert sind, wird die Abfrage ausgeführt. Das Observable ruft dann für jeden zurückgelieferten Kundendatensatz die OnNext-Funktion aus und zum Abschluss OnCompleted.
//asynchrone Methode liefert ein Observable zurück
Observable<Customer> obs = customerService.findByName("Kundenname");
//Anmeldung durch den Observer
obs.subscribe(customer -> System.out.println(customer),
error -> System.out.println(error),
() -> System.out.println("completed"));
Beispiel 1: Konsumierung eines Observables mit RxJava
Die Eleganz des Ansatzes ergibt sich aus der Möglichkeit, auf Basis von Observables Funktionsketten zur Transformation, Filterung und Kombination von Daten zu bilden. Dies wird in Beispiel 2 demonstriert. Die Kundendaten werden zunächst gefiltert, so dass nur Kunden mit dem Vornamen Hans weiterverarbeitet werden. Danach wird der Kundendatensatz über die Map-Funktion auf seine Id abgebildet und in der Konsole ausgegeben.
//Asynchrone Methode liefert ein Observable zurück
customerService.findByName("Kundenname ");
.filter(customer -> customer.getVorname().equals("Hans"))
.map(customer -> customer.getId())
.subscribe(customerId -> System.out.println(customerId),
error -> System.out.println(error),
() -> System.out.println("completed"));
Beispiel 2: Filterung und Transformation mit RxJava
Da Vert.x eine RxJava-Integration bietet, lässt sich das Beispiel 2 aus Reaktive Architekturen mit RxJava Teil 2 ganz elegant ohne Schachtelungen umsetzen, wie in Code-Beispiel 3 dargestellt. Die Funktion sendObservable liefert ein Observable zurück, auf dessen Grundlage die Funktionskette mittels der map-Funktion die Antwort des Serviceaufrufs auf das Kundenobjekt abbildet und die Adressdaten aktualisiert. Danach erfolgt mittels flatMap ein weiterer asynchroner Aufruf, um die Daten zurückzuschreiben. Da sendObservables ein Observable zurückliefert und bei der Verarbeitung mehrerer Events auch mehrere Observables erzeugt würden, vereinigt FlatMap die Events zu einem Observable.
vertx.eventBus().sendObservable("CustomerService.findById", customerId)
//Customer aus der Antwort holen
.map(asyncResult -> (Customer) asyncResult.body())
//aktualisere Kundendaten
.map(customer -> ... )
//Speichere Kundendaten
.flatMap(customer -> vertx.eventBus().sendObservable("CustomerService.update", customer))
//Konsumiere Ergebnis und Fehler
.subscribe(customer -> ..., error -> ...);
Beispiel 3: Aktualisierung eines Kundendatensatzes mit RxJava
Die Entwicklungshistorie von RxJava
RxJava ist eine Java Umsetzung von Rx.NET, das 2009 als Teil des .NET Frameworks 4 erschienen ist. Nach Einführung von LINQ als Sprache zur Abfrage und Transformation von Daten mit dem .Net-Frameworks 3.5, folgte Rx.NET als eine asynchrone Variante zur Datenabfrage. Rx.NET war somit eine Erweiterung der LINQ-Funktionalität um das Konzept von Observables und Schedulern.
RxJava wurde 2013 durch Netflix im Rahmen der Einführung eines API-Gateways initiiert, das Client-spezifische Service-Kompositionen als REST-Endpunkte zur Verfügung stellen sollte. Die Bereitstellung dieser Dienstkompositionen erforderte die parallelen Aufrufe unterschiedlicher Services und die Aggregation der resultierenden Daten. Um ein vereinfachtes und einheitliches Programmiermodel zum Umgang mit nebenläufigen Abfragen nutzen zu können, wurde in die Entwicklung von RxJava investiert.
Abbildung 2: Entwicklungshistorie von RxJava
Aus den reaktiven Frameworks RxJava, Reactor und Akka ist seit 2013 die Reactive-Streams Initiative hervorgegangen. Das Ziel von Reactive Streams ist die Standardisierung des Datenaustausches über asynchronen Grenzen hinweg unter der Gewährleistung, dass die empfangende Seite nicht gezwungen ist, beliebig viele Daten zu puffern. Somit ist Backpressure ein wesentlicher Bestandteil dieses Konzeptes. Am 30. April 2015 hat die Initiative neben einer Spezifikation auch eine Java API bereitgestellt, die als Flow API in Java 9 einfließen wird. Die Flow API definiert Interfaces für das Observable (java.util.concurrent.Flow.Publisher) und den Observer (java.util.concurrent.Flow.Subscriber).
Am 29. Oktober 2016 wurde RxJava 2.0 freigegeben, das auf Basis der Reactive-Streams-Spezifikation von Grund auf neu geschrieben wurde. Die große Neuerung ist die Unterstützung von Backpressure.
Mit der Version 2.0 entfernt sich RxJava allerdings ein wenig von seinen ReactiveX-Schwesterprojekten, um sich besser mit dem Reactive Streams Standard und Java 9 zu integrieren.
Das Einsatzszenario entscheidet
Die Frage nach der Einsetzbarkeit von RxJava hängt nicht zuletzt von der Platzform ab. Bei Einsatz von Vert.x ist RxJava sicherlich die natürliche Wahl, um die Codebasis wartbar und lesbar zu halten. Sollen hingegen Akka oder Spring 5 zum Einsatz kommen, dann sind Akka Streams respektive Reactor die natürlich Wahl zur Gestaltung der Schnittstellen. Beispielsweise bietet Reactor eine alternative Neuimplementierung von RxJava mit weitgehender Schnittstellenkompatibilität – aber ohne Altlasten – und ist die Standard-Bibliothek für reaktive Services mit Spring 5.
Tabelle 1: Übersicht reaktive Frameworks
RxJava lässt sich aber auch leicht in Kombination mit leichtgewichtigen, nicht-blokierenden Server-Frameworks wie Netty oder Undertow nutzen. Für Netty gibt es mit RxNetty bereits ein Projekt mit fertiger RxJava-Integration. Damit lassen sich hoch-performante Anwendungen bauen, die Daten über WebSockets auch direkt zu Web-Frontends pushen können.
In jedem Fall ist bei der Verwendung reaktiver Frameworks die Notwendigkeit der durchgehenden Nutzung nicht-blockierender Bibliotheken und Frameworks bis zur Datenbankschicht zu beachten. Die Durchführung langlaufender, blockierender Datenbankabfragen mit JDBC ist im Kontext von reaktiven Systeme wohl wenig sinnhaft und würde zu einer Blockierung der Event-Loop führen.
Abbildung 3: Umsetzung von Service-Kompositionen mit RxJava
Zu guter Letzt eignet sich RxJava aber auch für die Umsetzung von Nebenläufigkeit im Kontext klassischer Thread-basierter Anwendungen. Durch den großen Umfang an Funktionen zur Datentransformation bietet sich RxJava als Alternative zu CompletableFuture an. Ähnlich zum Netflix-Szenario lassen sich so Service-Kompositionen einfach umsetzen und Daten aggregieren.
Weitere Beispiele zur Gestaltung von asynchronen Schnittstellen mit RxJava sind im Projekt rxjava-katas in GitHub verfügbar.