0

I have rxjava3 Flowable having Item objects and want to run async operation in batches only for elements matching a condition. Then I want to allow downstream operators to iterate over original Item objects stream in the original order.

Flowable<Item> source ...
Flowable<Item> updated = source.filter(i -> i.isCondition())
  .buffer(3)
  .flatMap(batch -> {
     return apiCall(batch).flatMapPublisher(apiRes -> {
        for (Item item : batch) {
           item.updateFrom(apiRes)
        }      
        return Flowable.fromIterable(batch);   
     }         
  }
// return Flowable from source in the same order
// but only after updated completed for the batch 
// or isCondition is false

Lets say I have list of ids and for ids bigger than 10 I want to get group name using remote service accepting list of Ids.

5
4
12
3
15
6

It should call getGroupIds(12, 15) and return:

5, null
4, null
12, sport
3, null
15, music
6, null
  

What is best option to achieve it for large data set? The error in apiCall should terminate the source flowable.

There is a potential of large number of elements between 12 and 15 in above example - it will increase size of buffer used to keep original order and thus it should call apiCall if i.isCondition() is true for one element and false for following n elements to let buffer to flush to the output after apiCall gets processed. It can be simplified by using source.buffer(1000) before filter gets applied.

2
  • Please edit your question to include a description why you use buffer(). It sounds like a simple map() call would be enough to either return null when the condition is not met and the actual API call result, when the condition is met. Commented Nov 29, 2024 at 18:38
  • Program - I use buffer because I want to call apiCall(batch) with a list of arguments instead of single argument to decrease numbers of calls to remote API. Commented Dec 5, 2024 at 12:36

1 Answer 1

0

You can buffer the elements as you already did, but not filter the elements with the filter() method. Instead, you can use a map() call and filter the elements inside the map() call. The filtered list will be used in an API call. After the API call, you change the original items in the list as you want and return the list again from the map() call. What you get is a Flowable of lists. Use an additional flatMap() call to convert the Flowable<List<Item>> back to a Flowable<Item>>. See the following example code:

public class Item {
    
    public Item(int v) {
        this.value = v;
    }
    
    public int value;
    
    public String additionalData;
}

public static void main(String[] args) {
    List<Item> itemSource = new ArrayList<>();
    itemSource.add(new Item(3));
    itemSource.add(new Item(10));
    itemSource.add(new Item(5));
    itemSource.add(new Item(12));
    itemSource.add(new Item(7));
    itemSource.add(new Item(6));
    itemSource.add(new Item(3));
    itemSource.add(new Item(11));
    itemSource.add(new Item(15));
    itemSource.add(new Item(4));
    itemSource.add(new Item(19));
    itemSource.add(new Item(8));
    
    Flowable<Item> source = Flowable.fromIterable(itemSource);

    source
        .buffer(3)
        .map(items -> {
            List<Item> filtered = new ArrayList<>();
            for (Item item : items) {
                if (item.value > 10) {
                    filtered.add(item);
                }
            }
            if (filtered.size() > 0) {
                List<String> result = fakeApiCall(filtered);
                for (int i=0; i<result.size(); i++) {
                    filtered.get(i).additionalData = result.get(i);
                }
            }
            return items;
        })
        .flatMap(items -> Flowable.fromIterable(items))
        .subscribe(it -> {
            System.out.println("item: (" + it.value + ", " + it.additionalData+")");
        
        });
}

public static List<String> fakeApiCall(List<Item> items) {
    List<String> result = new ArrayList<String>();
    for(Item item:items) {
        result.add("some data for "+item.value+" value");
    }
    return result;
}

This will generate the following output:

item: (3, null)
item: (10, null)
item: (5, null)
item: (12, some data for 12 value)
item: (7, null)
item: (6, null)
item: (3, null)
item: (11, some data for 11 value)
item: (15, some data for 15 value)
item: (4, null)
item: (19, some data for 19 value)
item: (8, null)

As you see, some items has been enriched with additional data and the items are emitted in the original order.

Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.