NestJS is a progressive Node.js framework that heavily embraces Observables to handle asynchronous tasks. Observables are particularly useful in NestJS Microservices for:
β
Inter-Service Communication (using Kafka, Redis, RabbitMQ, etc.)
β
Streaming Data (WebSockets, gRPC, etc.)
β
Handling Long-running Tasks (e.g., background jobs)
Letβs dive into real-world examples of how Observables can be leveraged in NestJS Microservices. π₯
1οΈβ£ Observables in NestJS Microservices
NestJS uses RxJS Observables as a core part of its design for handling async operations. The framework provides built-in support for Microservices and encourages the use of Observables for request-response patterns.
Example: NestJS Microservices Setup
Letβs say we have two microservices:
- Orders Service (publishes an event when an order is placed)
- Inventory Service (listens and updates stock levels)
Orders Microservice (Publisher)
import { Controller, Post } from '@nestjs/common';
import { Client, ClientProxy } from '@nestjs/microservices';
import { Observable } from 'rxjs';
@Controller('orders')
export class OrdersController {
@Client({ transport: Transport.REDIS, options: { host: 'localhost', port: 6379 } })
private client: ClientProxy;
@Post('create')
createOrder(): Observable<string> {
return this.client.send('order_created', { productId: 1, quantity: 2 });
}
}
πΉ Here, client.send()
returns an Observable that emits a response when the Inventory Service processes the event.
Inventory Microservice (Listener)
import { Controller } from '@nestjs/common';
import { MessagePattern } from '@nestjs/microservices';
@Controller()
export class InventoryController {
@MessagePattern('order_created')
handleOrderCreated(data: { productId: number; quantity: number }): Observable<string> {
console.log('Updating inventory for:', data);
return new Observable((subscriber) => {
// Simulate processing
setTimeout(() => {
subscriber.next('Inventory Updated β
');
subscriber.complete();
}, 2000);
});
}
}
πΉ This service listens for order_created
messages and responds with an Observable.
Response in Orders Service
createOrder().subscribe((response) => console.log(response));
π’ Output:
Updating inventory for: { productId: 1, quantity: 2 }
Inventory Updated β
2οΈβ£ Observables in HTTP Services
If you're building a REST API inside a microservice, Observables can be used with HTTP clients like Axios (wrapped in from()
to convert Promises to Observables).
Example: Fetching Data from Another Microservice
import { Injectable } from '@nestjs/common';
import { HttpService } from '@nestjs/axios';
import { Observable, map } from 'rxjs';
@Injectable()
export class ProductService {
constructor(private httpService: HttpService) {}
getProductDetails(productId: number): Observable<any> {
return this.httpService.get(`http://inventory-service/products/${productId}`).pipe(
map((response) => response.data) // Transform response
);
}
}
πΉ This helps us keep the reactive approach even for HTTP calls.
3οΈβ£ Streaming Data Using Observables
NestJS supports WebSockets & gRPC, both of which work well with Observables.
Example: Real-time Stock Updates using WebSockets
1οΈβ£ Gateway (WebSocket Server)
import { WebSocketGateway, SubscribeMessage, WebSocketServer } from '@nestjs/websockets';
import { Observable, interval, map } from 'rxjs';
@WebSocketGateway()
export class StockGateway {
@WebSocketServer() server;
@SubscribeMessage('stock_updates')
stockUpdates(): Observable<{ stock: number }> {
return interval(2000).pipe(
map(() => ({ stock: Math.floor(Math.random() * 100) })) // Random stock value
);
}
}
2οΈβ£ Client (WebSocket Frontend)
const socket = io('http://localhost:3000');
socket.emit('stock_updates');
socket.on('stock_updates', (data) => {
console.log('Live Stock:', data);
});
πΉ The stock updates stream continuously using an Observable interval.
4οΈβ£ Handling Long-running Tasks with Observables
In some cases, we might need to process large data asynchronously. Instead of blocking the request, we can return an Observable that emits data progressively.
Example: Streaming Large Report Data
import { Controller, Get } from '@nestjs/common';
import { Observable, interval, take, map } from 'rxjs';
@Controller('reports')
export class ReportsController {
@Get('generate')
generateReport(): Observable<string> {
return interval(1000).pipe(
take(5), // Emit 5 values (simulate processing)
map((count) => `Processing chunk ${count + 1}...`)
);
}
}
π’ Client Output (after hitting /reports/generate
):
Processing chunk 1...
Processing chunk 2...
Processing chunk 3...
Processing chunk 4...
Processing chunk 5...
πΉ This prevents blocking and streams responses incrementally.
Why Use Observables in NestJS? π€
β Better async handling β Unlike Promises, Observables allow multiple values over time.
β Reactive programming β Works great with real-time updates (WebSockets, Kafka, etc.).
β Powerful Operators β map()
, filter()
, mergeMap()
make async transformations easier.
β Built-in NestJS Support β Microservices, WebSockets, and gRPC all use Observables by default.
Conclusion π―
Observables in NestJS shine when working with Microservices, real-time applications, and long-running processes. They provide a powerful way to handle asynchronous tasks efficiently, making your app more reactive and scalable.
π₯ If youβre building NestJS Microservices, start using Observables today! π
Top comments (0)