1

I am working with Angular and NodeJs (with Axios) using RxJs, and currently find it very challenging to achieve this task. I will explain my problem based on a scenario.

I have an array of objects like this, with possibly even more than 100 objects:

let objArr = [{name: 'john', id: '123', country: 'usa'}, {name: 'doe', id: '456', country: 'china'}....]

Then I have another 4-5 validation APIs that can be called for different params e.g. id, name and country based on each object:

api_1 = "validate_step_1/:id"
api_2 = "validate_step_2/:id/:name"
api_3 = "validate_step_3/:id/:country"
api_4 = "validate_step_4/:id:/:name/:country" 

These API calls should strictly happen one after another in a sequential pattern, e.g. api_2 should only be called if api_1 returns true and so on.

What I want:

I would like to execute for-loop on the array that should run in parallel, and each object should then sequentially validate itself based on these 4 API calls. Something like sequential API calls based on each item in for-loop in parallel for all 100 objects.

Is this even possible? Also, any solutions to achieve this on the Node side are also welcomed.

What I tried

Right now I am using this method, but it's very slow, even sometimes resulting in timeout errors in Axios:

  of(...this.objArr).pipe(
    concatMap((obj: any) => this.service.api_1(id)),
    concatMap((obj: any) => this.service.api_2(id, name)),
    concatMap((obj: any) => this.service.api_3(id, country)),
    concatMap((obj: any) => this.service.api_4(id, name, country)),
    catchError( error => this.handleError(error))
  ).subscribe(
    success => {
      console.log("validation succeed", success);
    },
    errorData => {
      console.log("validation failure: ", errorData);
    }
  )

3 Answers 3

2

Switch map your data array into a stream of parallel pipes that join together.

const dataArray: myData[];

of(dataArray).pipe(
  // switch and listen to the returned pipe
  switchMap(arrayOfObjects => {
    // a normal map (not an RxJS map), returns an array of pipes
    const nParallelPipes = arrayOfObjects.map(data => {
      const test1 = this.service.api_1(data.id);
      const test2 = this.service.api_2(data.id, data.name);
      const test3 = this.service.api_3(data.id, data.country);
      const test4 = this.service.api_4(data.id, data.name, data.country);
      return concat(test1, test2, test3, test4).pipe(
        catchError(error => this.handleError(error))
      )
    });
    // listen to all pipes in parallel, and only return when all pipes have completed
    return forkJoin(nParallelPipes) // or combineLatest(nParallelPipes).pipe(take(1))
  })
).subscribe(
  success => {
    console.log("validation succeed", success);
  },
  errorData => {
    console.log("validation failure: ", errorData);
  }
);

nParallelPipes contains n synchronous sequential validation pipes, where n is the amount of data in your initial array. ForkJoining them together (or combineLatest) will fire them off in parallel.

I hope that helps.

Happy coding.


Is it possible to apply certain conditional checks on the response of API before moving to the next?

Yes. It is possible.

const dataArray: string[];
const checkTest1 = (value) => true;
const checkTest2 = (value) => true;
const checkTest3 = (value) => true;
const checkTest4 = (value) => true;

of(dataArray).pipe(
  // switch and listen to the returned pipe
  switchMap(arrayOfObjects => {
    // a normal map (not an RxJS map), returns an array of pipes
    const nParallelPipes = (arrayOfObjects as string[]).map(data => {
      const test1 = this.service.api_1(data.id);
      const test2 = this.service.api_2(data.id, data.name);
      const test3 = this.service.api_3(data.id, data.country);
      const test4 = this.service.api_4(data.id, data.name, data.country);
      const startTesting = concatMap(value => iif(() => checkTest1(value), test1Passed, throwError('T1 Failed')))
      const test1Passed = test2.pipe(concatMap(value => iif(() => checkTest2(value), test2Passed, throwError('T2 Failed'))));
      const test2Passed = test3.pipe(concatMap(value => iif(() => checkTest3(value), test3Passed, throwError('T3 Failed'))));
      const test3Passed = test4.pipe(concatMap(value => iif(() => checkTest4(value), of('success'), throwError('T4 Failed'))))
      return test1.pipe(startTesting);
    });
    // listen to all pipes in parallel, and only return when all pipes have completed
    return forkJoin(nParallelPipes) // or combineLatest(parallelCalls).pipe(take(1))
  })
).subscribe(
  success => {
    console.log("validation succeed", success);
  },
  errorData => {
    console.log("validation failure: ", errorData);
  }
);
Sign up to request clarification or add additional context in comments.

2 Comments

This is just what I want. Thanks. It runs really well but I just have few questions - Is it possible to apply certain conditional checks on the response of API before moving to next, e.g. before we move to API-2 we first apply some checks on API-1 resp and only proceed if it satisfies the criteria. is it possible?
second question, upon success I get to see list of all the observables like this [0: Observable {source: Observable, operator: ƒ}, 1: 0: Observable {source: Observable, operator: ƒ}] Can I use pipe to simplify them and know their contents?
1

I would adopt this approach.

First of all create a function that build an Observable that runs all the validations in parallel and handle any error we may encounter (more on errors later). Something like this

function validate(id, name, country) {
  return concat(
    this.service.api1(id),
    this.service.api2(id, name),
    this.service.api3(id, country),
    this.service.api4(id, name, country)
  ).pipe(
    catchError(err => of(err.message))
  );
}

Then I would use the from function from the rxjs library to turn the array of objects into a stream and then apply the mergeMap operator to launch all the validations in parallel, like this

from(objArr)
  .pipe(mergeMap((obj) => validate(obj.id, obj.name, obj.country)))
  .subscribe((v) => console.log(v));

Why use mergeMap over forkJoin in this case. The main reason is that with mergeMap you can control the level of concurrency you want to have. If you do not specify anything, all the validations run in parallel. But, for instance, if you want to limit the number of parallel validations, you can use the optional concurrent parameter of mergeMap like this

const concurrent = 3 // whatever limit 
from(objArr)
   .pipe(mergeMap((obj) => validate(obj.id, obj.name, obj.country), concurrent))
   .subscribe((v) => console.log(v));

If you want to proceed in the validations only if a certain criteria is met, you can simply use the tap operator and throw an error if the condition is not met. The error will be caught by the catchError operator which we have added at the end of the pipe in the validate function. So, a validation API would look like this

api3(id, country) {
  return this.invokeApi3(id, country).pipe(
    tap(() => {
      if (// check the criteria for which you do not want to continue) {
        throw new Error('Error in API 3');
      }
    })
  );
}

You can look at this stackblitz for an examp.le

15 Comments

Thanks. I was able to run this fine, however I am not able to understand how can I catch response of api3() using Tap and apply some conditions to its response, and then based on that criteria decide whether to proceed to api4() or throw error and complete the process.
The example you showed checks for the country that is passed in argument, so i tried with tap((resp) => {if resp.data === criteria}) to see if it fulfils criteria, but its only returning observable.
@Dhruvify can you explain in more details under which circumstances you want to stop the validations?
sure, so for an instance during the sequential api calls if somehow we can get hold of the response for this.service.api2(id, name) and if the response returns any wrong type of data e.g. if resp.data !== validCriteria we would like to halt our execution and avoid calling this.service.api3(id, country), and the rest of APIs mentioned in concatmap. Instead we just throw the error out and cancel the rest of the process. Can this be done?
Also by cancelling the rest of the process I mean that the process only needs to be cancelled for that particular object in an array for which api2 resulted validation failure, whereas, the validations can continue for other individual objects in parallel like what you shown above.
|
0

you can use forkJoin operator to call multiple endpoints in case you're only interested in the final response of all the endpoints and no more future emissions, it takes an array of observables and gives tou an array of responses then completes...something like this:

const obs1$ = this.service.api1;
const obs2$ = this.service.api2;
const subscription = forkJoin([obs1$,obs2$].subscribe(([response1, response2]) => console.log(response1, response2))
 

but in case you'll be expecting more emissions from one or more observables you can use combineLatest or preferably zip operator.

1 Comment

Hi, in your case we are calling APIs in parallel using forkJoin but in my case it can be only called in sequence like Api-1, then API-2 and so on. The only parallel operation I want to achieve is as per my question, that based on array of objects how to call set of 4 sequential APIs in parallel for each object so we don't have to wait for so long until the last object inside array.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.