Concurrent HTTP Fetcher
Build a concurrent URL fetcher that retrieves N URLs in parallel while capping how
many requests run at once and enforcing a per-request timeout — Kotlin’s answer to
a Go fan-out worker pool, but with structured concurrency instead of a sync.WaitGroup.
What you’ll build
Section titled “What you’ll build”A ConcurrentFetcher that fans out one coroutine per URL, gates them through a
shared Semaphore, times each one out, and fans the results back in as a typed list.
- A
ConcurrentFetcherclass configured with:maxConcurrency: Int— the max simultaneous requests (the role a buffered channel plays as a semaphore in Go).timeoutMs: Long— the per-request timeout.fetchAll(urls: List<String>): List<FetchResult>— fetches every URL concurrently.
- A
FetchResultsealed class with two variants:Success(url, body, durationMs)Failure(url, error, durationMs)
- Concurrency control with
Semaphore(orlimitedParallelism) to cap how many requests run at the same time. - A per-request timeout via
withTimeoutOrNull. - Summary output: how many succeeded, how many failed, and the total elapsed time.
The shape maps cleanly from Go: instead of spawning a fixed pool of worker goroutines
that pull from a jobs channel, you launch one coroutine per job and let a Semaphore
throttle them. coroutineScope { ... } is the structured-concurrency replacement for
a WaitGroup — it suspends until every child finishes (and cancels the rest if one
throws).
The worked solution
Section titled “The worked solution”A single-module Gradle project — the entire thing lives in one Main.kt.
Directoryconcurrent-fetcher/
- build.gradle.kts coroutines deps + application plugin
- settings.gradle.kts project name
Directorysrc/main/kotlin/
- Main.kt the whole program: types, fetcher, and
main
- Main.kt the whole program: types, fetcher, and
build.gradle.kts
Section titled “build.gradle.kts”The only thing this exercise adds over a plain Kotlin project is the coroutines
runtime. kotlinx-coroutines-core brings coroutineScope, async, Semaphore,
and withTimeoutOrNull; kotlinx-coroutines-test is there for testing time-based
code without real delay waits.
plugins { kotlin("jvm") version "2.1.0" application}
repositories { mavenCentral()}
dependencies { implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.1")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.1") testImplementation(kotlin("test"))}
application { mainClass.set("MainKt")}
tasks.test { useJUnitPlatform()}rootProject.name = "concurrent-fetcher"The result type
Section titled “The result type”FetchResult is a sealed class: every fetch ends as exactly one of two known
subtypes, so a when over them is exhaustive (no else branch needed — the compiler
proves you’ve covered every case). The shared url and durationMs fields are
declared abstract on the parent and override-d in each data class. If you’re
coming from TypeScript this is a discriminated union; from Go, it’s the closest
idiomatic equivalent to a tagged Result you’d otherwise hand-roll.
sealed class FetchResult { abstract val url: String abstract val durationMs: Long
data class Success( override val url: String, val body: String, override val durationMs: Long, ) : FetchResult()
data class Failure( override val url: String, val error: String, override val durationMs: Long, ) : FetchResult()}The fetcher — concurrency control
Section titled “The fetcher — concurrency control”This is the centerpiece. Three coroutine tools do all the work, and it’s worth seeing exactly which job each one owns.
class ConcurrentFetcher( private val maxConcurrency: Int = 5, private val timeoutMs: Long = 5000,) { private val semaphore = Semaphore(maxConcurrency)
suspend fun fetchAll(urls: List<String>): List<FetchResult> = coroutineScope { urls.map { url -> async { semaphore.withPermit { fetchOne(url) } } }.awaitAll() }
private suspend fun fetchOne(url: String): FetchResult { val start = System.currentTimeMillis() return try { val body = withTimeoutOrNull(timeoutMs) { doFetch(url) } val elapsed = System.currentTimeMillis() - start if (body != null) { FetchResult.Success(url, body, elapsed) } else { FetchResult.Failure(url, "Timeout after ${timeoutMs}ms", elapsed) } } catch (e: Exception) { val elapsed = System.currentTimeMillis() - start FetchResult.Failure(url, e.message ?: "Unknown error", elapsed) } }}How the three pieces fit together:
coroutineScope { ... }is the structured-concurrency boundary.fetchAlldoesn’t return until every child coroutine launched inside it completes, and if any child fails fatally the scope cancels its siblings. It’s theWaitGroupyou never have to remember toWait()on — leaking a goroutine isn’t possible here.async { ... }+awaitAll()is the fan-out / fan-in.urls.map { async { ... } }launches one coroutine per URL immediately and collects aList<Deferred<FetchResult>>;awaitAll()suspends until all of them resolve and flattens the list back toList<FetchResult>. ADeferred<T>is a coroutine’s typed promise — like a Go channel you read exactly one value from, or a JSPromise<T>.semaphore.withPermit { ... }is what actually caps concurrency. We constructSemaphore(maxConcurrency), and each coroutine must acquire a permit before callingfetchOne. So even though all N coroutines are launched at once, onlymaxConcurrencyof them are insidefetchOneat any moment — the rest suspend (not block a thread) waiting for a permit. This is the buffered-channel-as-semaphore pattern from Go, made explicit.withPermitreleases automatically even if the body throws.
And the timeout, inside fetchOne:
withTimeoutOrNull(timeoutMs) { ... }races the fetch against a clock. IfdoFetchfinishes in time it returns the body; if the timeout fires first it cancels the work and returnsnull— note theOrNullvariant, which we prefer here overwithTimeoutprecisely because we want to convert a timeout into aFailurerather than throw aTimeoutCancellationException. The surroundingtry/catchthen catches any other exception (e.g. a connection error) and turns it into aFailuretoo, so a single bad URL never sinks the whole batch.
flowchart LR U["urls: List of String"] --> M["fetchAll: coroutineScope"] M -->|"async per url"| S["Semaphore: maxConcurrency permits"] S -->|"withPermit"| F1["fetchOne + withTimeoutOrNull"] S -->|"withPermit"| F2["fetchOne + withTimeoutOrNull"] S -->|"withPermit"| F3["fetchOne + withTimeoutOrNull"] F1 --> A["awaitAll"] F2 --> A F3 --> A A --> R["List of FetchResult"]
The simulated fetch
Section titled “The simulated fetch”To keep the exercise dependency-free, doFetch fakes a network call: a random delay
plus a 10% failure rate. The doc comment shows how to swap in a real JDK HttpClient
(java.net.http.HttpClient, JDK 11+) wrapped in withContext(Dispatchers.IO) so the
blocking send runs off the coroutine dispatcher’s threads.
/** * Simulated HTTP fetch. Replace with real HTTP client for production use. * * Example with JDK HttpClient: * val client = java.net.http.HttpClient.newHttpClient() * val request = java.net.http.HttpRequest.newBuilder(java.net.URI(url)).build() * val response = withContext(Dispatchers.IO) { * client.send(request, java.net.http.HttpResponse.BodyHandlers.ofString()) * } * return response.body() */ private suspend fun doFetch(url: String): String { // Simulate variable response times and occasional failures val delayMs = (100L..2000L).random() delay(delayMs)
// Simulate 10% failure rate if ((1..10).random() == 1) { throw RuntimeException("Connection refused: $url") }
return "<!DOCTYPE html><html><body>Response from $url (${delayMs}ms)</body></html>" }main — driving it
Section titled “main — driving it”main is wrapped in runBlocking, the bridge from ordinary blocking code into the
coroutine world (you need a CoroutineScope to call a suspend function, and
runBlocking provides one for the program’s entry point). It builds a list of URLs,
fetches them with a cap of 3, then prints each result via an exhaustive when and a
final summary line.
fun main() = runBlocking { val urls = listOf( "https://example.com", "https://httpbin.org/get", "https://jsonplaceholder.typicode.com/posts/1", "https://api.github.com", "https://httpbin.org/delay/1", // … six more URLs … )
val fetcher = ConcurrentFetcher(maxConcurrency = 3, timeoutMs = 3000)
println("Fetching ${urls.size} URLs with max concurrency 3...") println("=" .repeat(60))
val start = System.currentTimeMillis() val results = fetcher.fetchAll(urls) val totalTime = System.currentTimeMillis() - start
results.forEach { result -> when (result) { is FetchResult.Success -> println("[OK] ${result.url} (${result.durationMs}ms) - ${result.body.take(50)}...")
is FetchResult.Failure -> println("[FAIL] ${result.url} (${result.durationMs}ms) - ${result.error}") } }
val successes = results.count { it is FetchResult.Success } val failures = results.count { it is FetchResult.Failure } println("=" .repeat(60)) println("Total: ${results.size} | Success: $successes | Failed: $failures | Time: ${totalTime}ms")}Because the cap is 3, the total wall-clock time is roughly ceil(N / 3) waves of
work, not the sum of all delays — that’s the speedup the Semaphore buys you over
fetching one URL at a time, while still being kinder to the remote hosts than firing
all N requests at once.
Run and test
Section titled “Run and test”-
Run the fetcher (it uses simulated requests, so no network is required):
Terminal window ./gradlew run -
Run the tests:
Terminal window ./gradlew test
You’ll see lines stream out as the batch completes, then a summary like
Total: 10 | Success: 9 | Failed: 1 | Time: 1893ms — the exact counts and timing
vary run to run because of the simulated delays and 10% failure rate.