3

I have this simple test about a crawler that is supposed to call the repo 40 times:

@Test
fun testX() {
   // ... 
   runBlocking {
        crawlYelp.concurrentCrawl()
        // Thread.sleep(5000) // works if I un-comment
   }
   verify(restaurantsRepository, times(40)).saveAll(restaurants)
   // ...
}

and this implementation:

suspend fun concurrentCrawl() {
    cities.map { loc ->
        1.rangeTo(10).map { start ->
            GlobalScope.async {
                val rests = scrapYelp.scrap(loc, start * 10)
                restaurantsRepository.saveAll(rests)
            }
        }
    }
}

But... I get this:

Wanted 40 times:
-> at ....testConcurrentCrawl(CrawlYelpTest.kt:46)
But was 30 times:

(the 30 is changing all the time; so it seems the test is not waiting...)

Why does it pass when I do the sleep? It should not be needed given I run blocking..

BTW, I have a controller that is supposed to be kept asynchronous:

@PostMapping("crawl")
suspend fun crawl(): String {
    crawlYelp.concurrentCrawl()
    return "crawling" // this is supposed to be returned right away
}

Thanks

2 Answers 2

3

runBlocking waits for all suspend functions to finish, but as concurrentCrawl basically just starts new jobs in new threads with GlobalScope.async currentCrawl, and therefore runBlocking, is done after all jobs were started and not after all of this jobs have finished.

You have to wait for all jobs started with GlobalScope.async to finish like this:

suspend fun concurrentCrawl() {
    cities.map { loc ->
        1.rangeTo(10).map { start ->
            GlobalScope.async {
                val rests = scrapYelp.scrap(loc, start * 10)
                restaurantsRepository.saveAll(rests)
            }
        }.awaitAll()
    }
}

If you want to wait for concurrentCrawl() to finish outside of concurrentCrawl() then you have to pass the Deferred results to the calling function like in the following example. In that case the suspend keyword can be removed from concurrentCrawl().

fun concurrentCrawl(): List<Deferred<Unit>> {
    return cities.map { loc ->
        1.rangeTo(10).map { start ->
            GlobalScope.async {
                println("hallo world $start")
            }
        }
    }.flatten()
}


runBlocking {
    concurrentCrawl().awaitAll()
}

As mentioned in the comments: In this case the async method does not return any value so it is better to use launch instead:

fun concurrentCrawl(): List<Job> {
    return cities.map { loc ->
        1.rangeTo(10).map { start ->
            GlobalScope.launch {
                println("hallo world $start")
            }
        }
    }.flatten()
}

runBlocking {
    concurrentCrawl().joinAll()
}
Sign up to request clarification or add additional context in comments.

7 Comments

Thanks! But won't that make concurrentCall be blocking? Or is that awaitAll in the context of the threads?
Yes this makes concurrentCrawl blocking but the individual crawls will be carried out in parallel. If you want to wait for the jobs to continue outside of concurrentCrawl you have to pass the job to the calling function. I included an example for that in my answer
was the suspend needed? I'm not sure from the docs when it should be used. thanks!
In the first case, awaitAll suspends so concurrentCrawl must be a suspend fun. In the second case it never suspends, therefore it shouldn't be a suspend fun. Also, don't use async when you aren't getting a result value from the coroutine. Basically, Deferred<Unit> is a code smell all of its own.
So.. suspending means it can run in parallel or suspended without blocking the main flow right? Regarding Deferred<Unit>, I suppose you mean I should use launch instead.
|
3

You could also just use MockK for this (and so much more).

MockK's verify has a timeout : Long parameter specifically for handling these races in tests.

You could leave your production code as it is, and change your test to this:

import io.mockk.verify

@Test
fun `test X`() = runBlocking {
   // ... 

   crawlYelp.concurrentCrawl()

   verify(exactly = 40, timeout = 5000L) {
      restaurantsRepository.saveAll(restaurants)
   }
   // ...
}

If the verify is successful at any point before 5 seconds, it'll pass and move on. Else, the verify (and test) will fail.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.