0

It appears I am lacking knowledge on which RxJS operator to resolve the following problem:

In my music application, I have a submission page (this is like a music album). To load the submission, I use the following query:

this.submissionId = parseInt(params['album']);

if (this.submissionId) {
  this.submissionGQL.watch({
    id: this.submissionId
  }).valueChanges.subscribe((submission) => {
      //submission loaded here!
  });
}

Easy enough! However, once I've loaded the submission, I have to load some auxiliary information such as the current user (to check if they are the artist of the submission) and comments. In order to avoid nested subscriptions, I can modify the above query to use switchMap to switch the query stream to user and comments observables once the submission resolves:

// stream to query for the submission and then switch query to user
this.submissionGQL.watch({
    id: this.submissionId
  }).valueChanges.pipe(
  switchMap(submission => {
    this.submission = submission;
    return this.auth.user$
  })
).subscribe((user) => {
  // needs value of submission here
  if (user.id == this.submission.user.id) {
    //user is owner of submission
  }
})

// stream to query for the submission and then switch query to comments
this.submissionGQL.watch({
    id: this.submissionId
  }).valueChanges.pipe(
  switchMap(submission => {
    this.comments$ = this.commentsGQL.watch({
      submissionId: submission.id //needs submission response here
    })
    return this.comments$.valueChanges
  })
).subscribe((comments) => {
  this.comments = comments;
})

Great! I've avoided the nested subscription issue BUT now...the first part of each submission request is identical. Basically, once, the submission is queried, i want to launch off two parallel queries:

  • a query for the user
  • a query for the comments

Which RxJS operator can perform such an operation? I suppose the subscribe at the end would emit an array response like:

.subscribe([user, comments] => {
    // check if user == submission.user.id here
    // also assign comments to component variable here
})

I believe mergeMap is sort of what I need but I'm not sure how to implement that properly. Or is this a case where I should share() the submission query and then build off my parallel queries separately? I'm very curious! Please let me know, thanks!

2 Answers 2

1

You can use the RxJS forkJoin operator for this scenario. As stated on the documentation,

When all observables complete, emit the last emitted value from each.

const userQuery$ = this.submissionGQL.watch({
    id: this.submissionId
  }).valueChanges.pipe(
  switchMap(submission => {
    this.submission = submission;
    return this.auth.user$
  })
)

// stream to query for the submission and then switch query to comments
const commentsQuery$ = this.submissionGQL.watch({
    id: this.submissionId
  }).valueChanges.pipe(
  switchMap(submission => {
    this.comments$ = this.commentsGQL.watch({
      submissionId: submission.id //needs submission response here
    })
    return this.comments$.valueChanges
  })
)

forkJoin(userQuery$, commentsQuery$).subscribe([user, comments] => {
  // check if user == submission.user.id here
  // also assign comments to component variable here
})
Sign up to request clarification or add additional context in comments.

2 Comments

I agree that forkJoin would work in the above scenario. However, I guess I was trying to see if it would be possible to not have to subscribe to submissionGQL twice. Does that make sense? Is that even feasible?
@JordanLewallen hmmm.. technically speaking you are only subscribing to it once, just that there are 2 different pipelines to handle this process. I am not sure how can we join it into a single pipelines though.. Nonetheless, I do believe that using forkJoin or combineLatest would be the most elegant solution here.
0

Try:

  this.submissionGQL.watch({
    id: this.submissionId
  }).valueChanges.pipe(
   switchMap(submission => {
     this.submission = submission;
     const user$ = this.auth.user$;
     this.comments$ = this.commentsGQL.watch({
       submissionId: submission.id 
     });
     return combineLatest(user$, this.comments$);
   }),
   // maybe put a takeUntil to remove subscription and not cause memory leaks
  ).subscribe(([user, comments]) => {
    // check if user == submission.user.id here
    // also assign comments to component variable here
 });

Something you should consider is eliminating instance variables with the help of the async pipe given by Angular (https://malcoded.com/posts/angular-async-pipe/). It will subscribe to the observable, present it into the view and automatically unsubscribe when the view is destroyed.

So, using that, we can get rid of this.submissions = submission by putting:

submissions$: Observable<ISubmission>; // assuming there is an interface of ISubmission, if not put any

// then when this.submissionId is defined
this.submissions$ = this.submissionGQL.watch({
  id: this.submissionId
}).valueChanges;

// then when using it in your view you can do {{ this.submissions$ | async }}

The same thing can go for this.comments$. All of this is optional though. I try to minimize instance variables as much as possible when using RxJS because too many instance variables leads to confusion.

Then you can lead off of this.submissions$ observable and subscribe for the other main stream.

 this.submission$.pipe(
  switchMap(submission => ..... // everything else being the same
 )

I chose the combineLatest operator but you can use zip and forkJoin as you see fit. They all have subtle differences (https://scotch.io/tutorials/rxjs-operators-for-dummies-forkjoin-zip-combinelatest-withlatestfrom).

7 Comments

thanks for answering the other question too! Trying to implement both now and will get back
Also, in terms of instance variables, I guess in this case renaming to this.submission$ wouldn't be smart...right? because my observable is returning a combineLatest of user and comments, not a submission object.
It depends, you can do as you see fit. When I say you can get rid of this.submissions = submission, I assume this.submissions is used in the HTML. My whole tangent of using the async pipe is my "political view" of what I think is cleaner code. My "political view" changes over time and I think this is good, it should for everybody. Also, I may have not understood your question entirely. It can be tough over text.
oh wait, so if I understand correctly, you are creating an instance variable of this.submission$ and assigning it to the submissionsGQL observable. This request is triggered when using the async in html like: submission$ | async. BUT also... in my component, you are saying to pipe this.submission$ to use with switchMap to get the data for comments and user. Wouldn't this still mean that we would still have two queries of submissionGQL? (one triggered from the async html pipe and the other from the switchMap)? Or am i thinking about this wrong
ok so i just tested my theory above and surprisingly...it doesn't trigger the submissionGQL twice and...I don't quite know why. My understanding of switchMap is that it triggers the observable (in this case the submissionGQL) and then kills it before switching to the new observable (in this case the combineLatest). I'm using Apollo for GraphQL, so I'm not sure if it's because it's batching the request as one or this is just how observables are supposed to work
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.