Mein Icon

RxJS in Angular: Die wichtigsten Funktionen und Praxisbeispiele

01. Juli 2025·15 Minuten zu lesen
rxjs-angular-wichtigste-funktionen

RxJS ist das Herzstück von Angulars reaktiver Programmierung. Um effizient mit Observables zu arbeiten, solltest du die Kernfunktionen und Operatoren von RxJS gut kennen. Dieses Cheat Sheet gibt dir einen kompakten Überblick über die wichtigsten Konzepte – mit praxisnahen Beispielen.

🎯 Observable Creation – Observables erzeugen

of() – Werte als Observable ausgeben

import { of } from 'rxjs';

of(1, 2, 3).subscribe(value => console.log(value)); 
// Ausgabe: 1, 2, 3

from() – Array, Promise, etc. in Observable umwandeln

import { from } from 'rxjs';

from([10, 20, 30]).subscribe(console.log);
// Ausgabe: 10, 20, 30

interval() – periodisch Werte aussenden

import { interval } from 'rxjs';

interval(1000).subscribe(val => console.log(val)); 
// Ausgabe: 0, 1, 2, ... jede Sekunde

timer() – einmalig oder wiederholt nach Verzögerung

import { timer } from 'rxjs';

timer(2000).subscribe(() => console.log('2 Sekunden später!'));

🧪 Pipeable Operators – Werte transformieren und filtern

Operators werden innerhalb von .pipe() angewendet.

🔄 Transformation

  • map() – Werte transformieren
import { of } from 'rxjs';
import { map } from 'rxjs/operators';

of(1, 2, 3)
  .pipe(map(x => x * 10))
  .subscribe(console.log); // 10, 20, 30
  • pluck() – Eigenschaft aus Objekt extrahieren
import { of } from 'rxjs';
import { pluck } from 'rxjs/operators';

of({name: 'Angular'}, {name: 'RxJS'})
  .pipe(pluck('name'))
  .subscribe(console.log); // "Angular", "RxJS"
  • tap() – Nebenwirkung (z.B. Loggen) ohne Wert zu verändern
import { of } from 'rxjs';
import { tap } from 'rxjs/operators';

of(1, 2, 3)
  .pipe(tap(val => console.log('Wert:', val)))
  .subscribe();

🧼 Filtering

  • filter() – Werte filtern
import { from } from 'rxjs';
import { filter } from 'rxjs/operators';

from([1, 2, 3, 4])
  .pipe(filter(x => x % 2 === 0))
  .subscribe(console.log); // 2, 4
  • take(n) – Nur die ersten n Werte nehmen
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';

interval(1000)
  .pipe(take(3))
  .subscribe(console.log); // 0, 1, 2
  • takeUntil() – Observable beenden, wenn anderes Observable emit
import { interval, Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

const stop$ = new Subject();

interval(1000)
  .pipe(takeUntil(stop$))
  .subscribe(val => console.log(val));

// Irgendwann später
stop$.next(); // stoppt den Stream
  • first() / last() – Erster bzw. letzter Wert
import { of } from 'rxjs';
import { first, last } from 'rxjs/operators';

of(1, 2, 3).pipe(first()).subscribe(console.log); // 1
of(1, 2, 3).pipe(last()).subscribe(console.log);  // 3
  • distinctUntilChanged() – Nur wenn Wert sich ändert
import { of } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';

of(1, 1, 2, 2, 3, 1)
  .pipe(distinctUntilChanged())
  .subscribe(console.log); // 1, 2, 3, 1
  • debounceTime(ms) – Verzögerung nach letztem Event
import { fromEvent } from 'rxjs';
import { debounceTime } from 'rxjs/operators';

fromEvent(document, 'keyup')
  .pipe(debounceTime(300))
  .subscribe(() => console.log('User stoppt Tippen seit 300ms'));

🔁 Flattening – Wenn Observable Observables liefert

  • switchMap() – Vorheriges abbrechen, neues starten
import { fromEvent, of } from 'rxjs';
import { switchMap, delay } from 'rxjs/operators';

const clicks$ = fromEvent(document, 'click');

clicks$
  .pipe(
    switchMap(() => of('HTTP Request').pipe(delay(1000)))
  )
  .subscribe(console.log);
  • mergeMap() – Alle inneren Observables parallel ausführen
import { of } from 'rxjs';
import { mergeMap, delay } from 'rxjs/operators';

of(1, 2, 3)
  .pipe(mergeMap(x => of(x * 10).pipe(delay(1000))))
  .subscribe(console.log);
  • concatMap() – Inner Observables nacheinander ausführen
import { of } from 'rxjs';
import { concatMap, delay } from 'rxjs/operators';

of(1, 2, 3)
  .pipe(concatMap(x => of(x * 10).pipe(delay(1000))))
  .subscribe(console.log);
  • exhaustMap() – Ignoriert neue, wenn eins läuft
import { fromEvent, interval } from 'rxjs';
import { exhaustMap, take } from 'rxjs/operators';

fromEvent(document, 'click')
  .pipe(exhaustMap(() => interval(1000).pipe(take(3))))
  .subscribe(console.log);

⏱️ Timing & Control

  • delay(ms) – Emission verzögern
import { of } from 'rxjs';
import { delay } from 'rxjs/operators';

of('Hallo')
  .pipe(delay(1000))
  .subscribe(console.log);
  • throttleTime(ms) – Werte begrenzen
import { fromEvent } from 'rxjs';
import { throttleTime } from 'rxjs/operators';

fromEvent(document, 'mousemove')
  .pipe(throttleTime(1000))
  .subscribe(() => console.log('Mausbewegung (max. 1/s)'));
  • retry(n) – Bei Fehler neu versuchen
import { throwError, of } from 'rxjs';
import { retry, catchError } from 'rxjs/operators';

throwError('Fehler')
  .pipe(
    retry(2),
    catchError(() => of('Fallback Wert'))
  )
  .subscribe(console.log);
  • catchError() – Fehler behandeln
import { throwError, of } from 'rxjs';
import { catchError } from 'rxjs/operators';

throwError('Fehler')
  .pipe(catchError(err => of('Fehler abgefangen: ' + err)))
  .subscribe(console.log);
  • finalize() – Aufräumarbeiten bei Abschluss
import { of } from 'rxjs';
import { finalize } from 'rxjs/operators';

of(1, 2, 3)
  .pipe(finalize(() => console.log('Stream abgeschlossen')))
  .subscribe(console.log);

🧩 Kombination von Observables

  • combineLatest() – Letzte Werte von mehreren Observables
import { combineLatest, of } from 'rxjs';

const obs1$ = of(1, 2);
const obs2$ = of('A', 'B');

combineLatest([obs1$, obs2$])
  .subscribe(([num, char]) => console.log(num, char));
  • withLatestFrom() – Neu auslösen, aber mit letztem Wert eines anderen
import { interval } from 'rxjs';
import { withLatestFrom } from 'rxjs/operators';

const timer1$ = interval(1000);
const timer2$ = interval(500);

timer1$
  .pipe(withLatestFrom(timer2$))
  .subscribe(console.log);
  • forkJoin() – Warten bis alle Observables fertig sind
import { forkJoin, of } from 'rxjs';
import { delay } from 'rxjs/operators';

forkJoin([
  of('A').pipe(delay(1000)),
  of('B').pipe(delay(2000))
]).subscribe(console.log); // ['A', 'B']
  • concat() – Observables nacheinander ausführen
import { concat, of } from 'rxjs';

concat(
  of(1, 2),
  of(3, 4)
).subscribe(console.log); // 1, 2, 3, 4
  • merge() – Observables parallel ausführen
import { merge, of } from 'rxjs';
import { delay } from 'rxjs/operators';

merge(
  of(1).pipe(delay(2000)),
  of(2).pipe(delay(1000))
).subscribe(console.log); // 2, 1
  • zip() – Werte paarweise zusammenführen
import { zip, of } from 'rxjs';

zip(
  of(1, 2, 3),
  of('A', 'B', 'C')
).subscribe(console.log); // [1, 'A'], [2, 'B'], [3, 'C']

🧠 Best Practices in Angular

  • Nutze den async-Pipe im Template: {{ data$ | async }}
  • Vermeide unnötiges .subscribe() in Komponenten – lieber reaktive Verkettungen
  • Nutze takeUntil oder first für das Aufräumen von Subscriptions
  • Denke in Datenströmen, nicht in Callbacks

Mit diesem Cheat Sheet bist du bestens gerüstet, um RxJS in Angular gezielt einzusetzen und deine Anwendungen reaktiver, sauberer und effizienter zu gestalten. Viel Erfolg beim Coden! 🚀

© Felix Stiffel 2025 | Alle Rechte vorbehalten