
Room đź”— RxJava
Doing queries in Room with RxJava
Less boilerplate code, compile-time checked SQL queries, and on top of this, the power of asynchronous and observable queries — how does that sound? All of these are possible with Room, the persistence library from the Architecture Components. Async queries return LiveData or RxJava’s Maybe, Single or Flowable. The queries returning LiveData and Flowable are observable queries. They allow you to get automatic updates whenever the data changes to make sure your UI reflects the latest values from your database. If you’re already working with RxJava 2 in your app, then using Room together with Maybe, Single and Flowable will be a breeze.
Later edit: starting with `2.0.0-beta01`, Room also supports
Observable
Later edit 2: starting with Room 2.1.0-alpha01, DAO methods annotated with
@Insert,@Deleteor@Updatesupport Rx return typesCompletable,Single<T>andMaybe<T>.
Let’s consider the following UI: the user is able to see and edit their username. This, together with other info about the user, is saved in the database. Here’s how to insert, update, delete and query the user.
Insert
The Room integration with RxJava allows the following corresponding return types for insert:
Completable— whereonCompleteis called as soon as the insertion was doneSingle<Long>orMaybe<Long>— where the value emitted ononSuccessis the row id of the item insertedSingle<List<Long>>orMaybe<List<Long>>— where the value emitted ononSuccessis the list of row ids of the items inserted
In case of error inserting the data, Completable, Single and Maybe will emit the exception in onError.
@Insert
Completable insert(User user);// or
@Insert
Maybe<Long> insert(User user);// or
@Insert
Single<Long> insert(User[] user);// or
@Insert
Maybe<List<Long>> insert(User[] user);// or
@Insert
Single<List<Long>> insert(User[] user);
Use the observeOn operator to specify the Scheduler on which an Observer will observe the Observable and subscribeOn to specify the Scheduler on which the Observable will operate.
Update/Delete
The Room integration with RxJava allows the following corresponding return types for update/delete:
Completable— whereonCompleteis called as soon as the update/delete was doneSingle<Integer>orMaybe<Integer>— where the value emitted on onSuccess is the number of rows affected by update/delete
@Update
Completable update(User user);// or
@Update
Single<Integer> update(User user);// or
@Update
Single<Integer> updateAll(User[] user);// or
@Delete
Single<Integer> deleteAll(User[] user);// or
@Delete
Single<Integer> deleteAll(User[] user);
Use the observeOn operator to specify the Scheduler on which an Observer will observe the Observable and subscribeOn to specify the Scheduler on which the Observable will operate.
Query
To get the user from the database, we could write the following query in the data access object class (UserDao):
@Query(“SELECT * FROM Users WHERE id = :userId”)
User getUserById(String userId);This approach has two disadvantages:
- It is a blocking, synchronous call
- We need to manually call this method every time our user data is modified
Room provides the option of observing the data in the database and performing asynchronous queries with the help of RxJava Maybe, Single and Flowable objects.
If you’re worried about threads, Room keeps you at ease and ensures that observable queries are done off the main thread. It’s up to you to decide on which thread the events are emitted downstream, by setting the Scheduler in the observeOn method.
For queries that return Maybe or Single, make sure you’re calling subscribeOn with a different Scheduler than AndroidSchedulers.mainThread().
To start using Room with RxJava 2, just add the following dependencies to your build.gradle file:
// RxJava support for Room
implementation “android.arch.persistence.room:rxjava2:1.0.0-alpha5”// Testing support
androidTestImplementation “android.arch.core:core-testing:1.0.0-alpha5”
Maybe
@Query(“SELECT * FROM Users WHERE id = :userId”)
Maybe<User> getUserById(String userId);Here’s what happens:
- When there is no user in the database and the query returns no rows,
Maybewill complete. - When there is a user in the database,
Maybewill triggeronSuccessand it will complete. - If the user is updated after
Maybewas completed, nothing happens.
Single
@Query(“SELECT * FROM Users WHERE id = :userId”)
Single<User> getUserById(String userId);Here are some scenarios:
- When there is no user in the database and the query returns no rows,
Singlewill triggeronError(EmptyResultSetException.class) - When there is a user in the database,
Singlewill triggeronSuccess. - If the user is updated after
Singlewas completed, nothing happens.
Flowable/Observable
@Query(“SELECT * FROM Users WHERE id = :userId”)
Flowable<User> getUserById(String userId);Here’s how the Flowable/Observable behaves:
- When there is no user in the database and the query returns no rows, the
Flowablewill not emit, neitheronNext, noronError. - When there is a user in the database, the
Flowablewill triggeronNext. - Every time the user data is updated, the
Flowableobject will emit automatically, allowing you to update the UI based on the latest data.
Testing
Testing a query that returns a Maybe/Single/Flowable is not very different from testing its synchronous equivalent. In the UserDaoTest, we make sure that we use an in-memory database, since the information stored here is automatically cleared when the process is killed.
@RunWith(AndroidJUnit4.class)
public class UserDaoTest {
…
private UsersDatabase mDatabase;@Before
public void initDb() throws Exception {
mDatabase = Room.inMemoryDatabaseBuilder(
InstrumentationRegistry.getContext(),
UsersDatabase.class)
// allowing main thread queries, just for testing
.allowMainThreadQueries()
.build();
}
@After
public void closeDb() throws Exception {
mDatabase.close();
}
Add the InstantTaskExecutorRule rule to your test, to make sure that Room executes all the database operations instantly.
@Rule
public InstantTaskExecutorRule instantTaskExecutorRule =
new InstantTaskExecutorRule();Let’s implement a test that subscribes to the emissions of getUserById and checks that indeed when the user was inserted, the correct data is emitted by the Flowable.
@Test
public void insertAndGetUserById() {
// Given that we have a user in the data source
mDatabase.userDao().insertUser(USER); // When subscribing to the emissions of user
mDatabase.userDao()
.getUserById(USER.getId())
.test()
// assertValue asserts that there was only one emission
.assertValue(new Predicate<User>() {
@Override
public boolean test(User user) throws Exception {
// The emitted user is the expected one
return user.getId().equals(USER.getId()) &&
user.getUserName().equals(USER.getUserName());
}
});
}
That’s it! If you use RxJava 2 in your app, make your database reactive too and ensure that your UI always shows the latest data. Check out a sample app using Room and RxJava here.








