RxJS in Angular: Die wichtigsten Funktionen und Praxisbeispiele
01. Juli 2025·15 Minuten zu lesen

RxJS ist das Herzstück von Angulars reaktiver Programmierung. Um effizient mit Observables zu arbeiten, solltest du die Kernfunktionen und Operatoren von RxJS gut kennen. Dieses Cheat Sheet gibt dir einen kompakten Überblick über die wichtigsten Konzepte – mit praxisnahen Beispielen.
🎯 Observable Creation – Observables erzeugen
of()
– Werte als Observable ausgeben
import { of } from 'rxjs';
of(1, 2, 3).subscribe(value => console.log(value));
// Ausgabe: 1, 2, 3
from()
– Array, Promise, etc. in Observable umwandeln
import { from } from 'rxjs';
from([10, 20, 30]).subscribe(console.log);
// Ausgabe: 10, 20, 30
interval()
– periodisch Werte aussenden
import { interval } from 'rxjs';
interval(1000).subscribe(val => console.log(val));
// Ausgabe: 0, 1, 2, ... jede Sekunde
timer()
– einmalig oder wiederholt nach Verzögerung
import { timer } from 'rxjs';
timer(2000).subscribe(() => console.log('2 Sekunden später!'));
🧪 Pipeable Operators – Werte transformieren und filtern
Operators werden innerhalb von .pipe()
angewendet.
🔄 Transformation
map()
– Werte transformieren
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
of(1, 2, 3)
.pipe(map(x => x * 10))
.subscribe(console.log); // 10, 20, 30
pluck()
– Eigenschaft aus Objekt extrahieren
import { of } from 'rxjs';
import { pluck } from 'rxjs/operators';
of({name: 'Angular'}, {name: 'RxJS'})
.pipe(pluck('name'))
.subscribe(console.log); // "Angular", "RxJS"
tap()
– Nebenwirkung (z.B. Loggen) ohne Wert zu verändern
import { of } from 'rxjs';
import { tap } from 'rxjs/operators';
of(1, 2, 3)
.pipe(tap(val => console.log('Wert:', val)))
.subscribe();
🧼 Filtering
filter()
– Werte filtern
import { from } from 'rxjs';
import { filter } from 'rxjs/operators';
from([1, 2, 3, 4])
.pipe(filter(x => x % 2 === 0))
.subscribe(console.log); // 2, 4
take(n)
– Nur die ersten n Werte nehmen
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';
interval(1000)
.pipe(take(3))
.subscribe(console.log); // 0, 1, 2
takeUntil()
– Observable beenden, wenn anderes Observable emit
import { interval, Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
const stop$ = new Subject();
interval(1000)
.pipe(takeUntil(stop$))
.subscribe(val => console.log(val));
// Irgendwann später
stop$.next(); // stoppt den Stream
first()
/last()
– Erster bzw. letzter Wert
import { of } from 'rxjs';
import { first, last } from 'rxjs/operators';
of(1, 2, 3).pipe(first()).subscribe(console.log); // 1
of(1, 2, 3).pipe(last()).subscribe(console.log); // 3
distinctUntilChanged()
– Nur wenn Wert sich ändert
import { of } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';
of(1, 1, 2, 2, 3, 1)
.pipe(distinctUntilChanged())
.subscribe(console.log); // 1, 2, 3, 1
debounceTime(ms)
– Verzögerung nach letztem Event
import { fromEvent } from 'rxjs';
import { debounceTime } from 'rxjs/operators';
fromEvent(document, 'keyup')
.pipe(debounceTime(300))
.subscribe(() => console.log('User stoppt Tippen seit 300ms'));
🔁 Flattening – Wenn Observable Observables liefert
switchMap()
– Vorheriges abbrechen, neues starten
import { fromEvent, of } from 'rxjs';
import { switchMap, delay } from 'rxjs/operators';
const clicks$ = fromEvent(document, 'click');
clicks$
.pipe(
switchMap(() => of('HTTP Request').pipe(delay(1000)))
)
.subscribe(console.log);
mergeMap()
– Alle inneren Observables parallel ausführen
import { of } from 'rxjs';
import { mergeMap, delay } from 'rxjs/operators';
of(1, 2, 3)
.pipe(mergeMap(x => of(x * 10).pipe(delay(1000))))
.subscribe(console.log);
concatMap()
– Inner Observables nacheinander ausführen
import { of } from 'rxjs';
import { concatMap, delay } from 'rxjs/operators';
of(1, 2, 3)
.pipe(concatMap(x => of(x * 10).pipe(delay(1000))))
.subscribe(console.log);
exhaustMap()
– Ignoriert neue, wenn eins läuft
import { fromEvent, interval } from 'rxjs';
import { exhaustMap, take } from 'rxjs/operators';
fromEvent(document, 'click')
.pipe(exhaustMap(() => interval(1000).pipe(take(3))))
.subscribe(console.log);
⏱️ Timing & Control
delay(ms)
– Emission verzögern
import { of } from 'rxjs';
import { delay } from 'rxjs/operators';
of('Hallo')
.pipe(delay(1000))
.subscribe(console.log);
throttleTime(ms)
– Werte begrenzen
import { fromEvent } from 'rxjs';
import { throttleTime } from 'rxjs/operators';
fromEvent(document, 'mousemove')
.pipe(throttleTime(1000))
.subscribe(() => console.log('Mausbewegung (max. 1/s)'));
retry(n)
– Bei Fehler neu versuchen
import { throwError, of } from 'rxjs';
import { retry, catchError } from 'rxjs/operators';
throwError('Fehler')
.pipe(
retry(2),
catchError(() => of('Fallback Wert'))
)
.subscribe(console.log);
catchError()
– Fehler behandeln
import { throwError, of } from 'rxjs';
import { catchError } from 'rxjs/operators';
throwError('Fehler')
.pipe(catchError(err => of('Fehler abgefangen: ' + err)))
.subscribe(console.log);
finalize()
– Aufräumarbeiten bei Abschluss
import { of } from 'rxjs';
import { finalize } from 'rxjs/operators';
of(1, 2, 3)
.pipe(finalize(() => console.log('Stream abgeschlossen')))
.subscribe(console.log);
🧩 Kombination von Observables
combineLatest()
– Letzte Werte von mehreren Observables
import { combineLatest, of } from 'rxjs';
const obs1$ = of(1, 2);
const obs2$ = of('A', 'B');
combineLatest([obs1$, obs2$])
.subscribe(([num, char]) => console.log(num, char));
withLatestFrom()
– Neu auslösen, aber mit letztem Wert eines anderen
import { interval } from 'rxjs';
import { withLatestFrom } from 'rxjs/operators';
const timer1$ = interval(1000);
const timer2$ = interval(500);
timer1$
.pipe(withLatestFrom(timer2$))
.subscribe(console.log);
forkJoin()
– Warten bis alle Observables fertig sind
import { forkJoin, of } from 'rxjs';
import { delay } from 'rxjs/operators';
forkJoin([
of('A').pipe(delay(1000)),
of('B').pipe(delay(2000))
]).subscribe(console.log); // ['A', 'B']
concat()
– Observables nacheinander ausführen
import { concat, of } from 'rxjs';
concat(
of(1, 2),
of(3, 4)
).subscribe(console.log); // 1, 2, 3, 4
merge()
– Observables parallel ausführen
import { merge, of } from 'rxjs';
import { delay } from 'rxjs/operators';
merge(
of(1).pipe(delay(2000)),
of(2).pipe(delay(1000))
).subscribe(console.log); // 2, 1
zip()
– Werte paarweise zusammenführen
import { zip, of } from 'rxjs';
zip(
of(1, 2, 3),
of('A', 'B', 'C')
).subscribe(console.log); // [1, 'A'], [2, 'B'], [3, 'C']
🧠 Best Practices in Angular
- Nutze den
async
-Pipe im Template:{{ data$ | async }}
- Vermeide unnötiges
.subscribe()
in Komponenten – lieber reaktive Verkettungen - Nutze
takeUntil
oderfirst
für das Aufräumen von Subscriptions - Denke in Datenströmen, nicht in Callbacks
Mit diesem Cheat Sheet bist du bestens gerüstet, um RxJS in Angular gezielt einzusetzen und deine Anwendungen reaktiver, sauberer und effizienter zu gestalten. Viel Erfolg beim Coden! 🚀