3

I have the following problem. I want to subscribe to an observable using the async pipe in Angular. But the data in the pipe can originate from different data sources and the pipe can look different depending on the data source.

The way it is implemented now does not work, because I override the first value of this.data$ and thus source A never gets subscribed to.

Is it possible to 'split' the pipe depending on a filter?

problem.component.ts:

// Data source A and logic A 
this.data$ = this.service.context.pipe(
  filter(context => context.flag === true),
  switchMap(context => this.dataSourceA(context.id))
  map(data => this.processDataA(data))
);

// Data source B and logic B 
this.data$ = this.service.context.pipe(
  filter(context => context.flag === false),
  switchMap(context => this.dataSourceB(context.id))
  map(data => this.processDataB(data))
);

problem.component.html

<pro-table [data]="data$ | async"></pro-table>
1
  • What stops you from using an if else statement Commented Dec 23, 2019 at 16:07

3 Answers 3

6

Move the logic to your switchMap()

this.data$ = this.service.context.pipe(
   switchMap(context => context.flag
      ? this.dataSourceA(context.id).pipe(map(d => this.processDataA(d)))
      : this.dataSourceB(context.id).pipe(map(d => this.processDataB(d)))
   )
);
Sign up to request clarification or add additional context in comments.

3 Comments

How would you solve this problem if context.flag is not a boolean but for example an enum?
@mafus1, simply switch on the context ensuring eash branch returns an Observable.
@mafus1 dealing with logic in an observable stream can get very verbose with the source code. I have a few operators that let you express some logic in a declarative way. github.com/reactgular/observables
0

That's just wrong way to achieve the result you desire. You have to take over to something called "higher order observable": Observable<Observable<T>>. Like a "higher order function" (that able to produce another function based on it's arguments), this guy is able to produce another Observable based on whatever parametrization you rely on.

this.service.context.pipe(
  switchMap(context => context.flag ? this.dataSourceA(context.id).pipe(...) : this.dataSourceB(context.id).pipe(...))
);

...it might seem a littble bit esoteric at the beginging, but actually it isn't. \


P.S. Important point to remember: higher order observables let you parametrize your streams if a fancy way: by dealing with streams of streams.

4 Comments

@Reactgular, then what is it?
A higher order observable is one that emits another observable, and then is usually followed by a mergeAll. There is no higher order here in your example. Your code does answer the question. So I removed my down vote, but the description isn't correct.
@Reactgular, you are missing that switchMap is actuallly pipe(map(), switchAll()) in essence, where map is exact point where Observable<Observable<T>> (i.e. "stream of streams") is produced; that's what "higher order observable" is.
@Reactgular, it isn't "bad" to my opinion; I would say that it requires slightly deeper understanding.
0

You can add a Subject which will catch many data sources and subscribe to that:

    this.data$ = new Subject();
    observer = {
     next(data) { this.data$.next(data) },
     error(msg) { /*handle error*/ }
   };
    // Data source A and logic A 
   this.service.context.pipe(
     filter(context => context.flag === true),
     switchMap(context => this.dataSourceA(context.id))
     map(data => this.processDataA(data))
   ).subscribe(observer);

   this.service.context.pipe(
     filter(context => context.flag === false),
     switchMap(context => this.dataSourceB(context.id))
     map(data => this.processDataB(data))
   ).subscribe(observer);

In this situation though it seems like you can use the flag to keep it in one chunk:

this.service.context.pipe(
     switchMap(context => context.flag ?
         this.dataSourceA(context.id) : this.dataSourceB(context.id)
    )
     map(data => this.processData(data)) // handle both cases
   ).subscribe(observer);

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.