NestJS custom transport strategy for PostgreSQL Pub/Sub.
PostgreSQL can be used as a Pub/Sub message broker. Its functionality is similar to the Redis Pub/Sub, but has its own features and limitations.
The References section contains links that you may find useful to familiarize yourself with the PostgreSQL asynchronous notifications.
NestJS PG Notify implements Pub/Sub messaging paradigm using PostgreSQL as a NestJS custom transporter. It wraps the pg-listen library under the hood.
It can be used in microservice and hybrid NestJS applications. The example folder contains examples for both types of applications.
$ npm i nestjs-pg-notify pg
import { PgNotifyServer } from 'nestjs-pg-notify';
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
strategy: new PgNotifyServer({
/**
* Required parameter
* Corresponds to the "pg" library's connection config
*/
connection: {
host: 'localhost',
port: 5432,
database: 'pgnotify',
user: 'pgnotify',
password: 'pgnotify',
},
/**
* Optional parameter
* Contains retry-strategy config passing the data to the "pg-listen" library
*/
strategy: {
retryInterval: 1_000,
retryTimeout: Infinity,
},
/**
* Optional parameter
* Overrides default logger
*/
logger: new Logger(),
})
});
NestJS PG Notify offers two decorators to register message handlers:
@PgNotifyEventPattern()
@PgNotifyMessagePattern()
These are an alternative to standard decorators:
@EventPattern()
@MessagePattern()
Message handler's binding can be used only within controller classes.
import { PgNotifyContext, PgNotifyEventPattern, PgNotifyMessagePattern } from 'nestjs-pg-notify';
@Controller()
export class AppController {
@PgNotifyEventPattern({event: 'greeting'})
@UsePipes(new ValidationPipe())
onGreetingEvent(@Payload() payload: any, @Ctx() context: PgNotifyContext): void {
Logger.log(payload.message);
}
@PgNotifyMessagePattern('greeting')
@UsePipes(new ValidationPipe())
onGreetingRequest(@Payload() payload: any, @Ctx() context: PgNotifyContext): string {
Logger.log(payload.message);
return 'Hello!';
}
}
The standard decorator @Ctx()
allows access to the context of the incoming request. In our case, the context object is an instance of PgNotifyContext
.
The client proxy can be registered as a custom provider. The configuration is the same as the configuration of the PgNotifyServer
.
import { PgNotifyClient } from 'nestjs-pg-notify';
@Module({
providers: [
{
provide: 'PG_NOTIFY_CLIENT',
useFactory: (): ClientProxy => new PgNotifyClient({
connection: {
host: 'localhost',
port: 5432,
database: 'pgnotify',
user: 'pgnotify',
password: 'pgnotify',
},
strategy: {
retryInterval: 1_000,
retryTimeout: Infinity,
},
})
},
],
exports: [
'PG_NOTIFY_CLIENT',
]
})
export class AppModule {}
Then we can inject the client proxy.
import { PgNotifyResponse } from 'nestjs-pg-notify';
export class AppService {
constructor(
@Inject('PG_NOTIFY_CLIENT')
private readonly client: ClientProxy,
) {}
sendRequest(): Observable<PgNotifyResponse> {
// Send request and expect response
return this.client.send('greeting', {message: 'Hello!'}).pipe(
timeout(2_000),
tap(response => Logger.debug(response)),
);
}
emitEvent(): Observable<void> {
// Emit event
return this.client.emit({event: 'greeting'}, {message: 'Hello!'});
}
}
The client proxy generates request identifier when we send requests using client.send()
.
The request identifier in the context of the incoming request means that we need to prepare an error response for the client.
We can use the PgNotifyResponse.error()
factory in order to unify the structure of the response.
import { PgNotifyContext, PgNotifyResponse } from 'nestjs-pg-notify';
@Catch()
export class ExceptionFilter implements ExceptionFilter {
catch(error: Error, host: ArgumentsHost): Observable<PgNotifyResponse|void> {
const {status, message} = parseError(error);
const context = host.switchToRpc().getContext<PgNotifyContext>();
const requestId = context.getRequestId();
Logger.error(message, error.stack, 'PgNotifyExceptionFilter');
if (requestId) {
return of(PgNotifyResponse.error(message, status));
}
return of(undefined);
}
}
Then we can register the filter using the standard @UseFilters()
decorator. It supports method-scope and controller-scope modes.
@Controller()
@UseFilters(ExceptionFilter)
export class AppController {
// ...
}
import { PgNotifyContext } from 'nestjs-pg-notify';
@Injectable()
export class LoggingInterceptor implements NestInterceptor {
public intercept(context: ExecutionContext, next: CallHandler): Observable<void> {
const pgNotifyContext = context
.switchToRpc()
.getContext<PgNotifyContext>();
return next.handle().pipe(
tap(() => Logger.log(JSON.stringify(pgNotifyContext), LoggingInterceptor.name)),
);
}
}
To register interceptor we can use @UseInterceptors()
decorator. It also supports method-scope and controller-scope modes.
@Controller()
@UseInterceptors(LoggingInterceptor)
export class AppController {
// ...
}
API documentation is available here.
- PostgreSQL Documentation:
- PgBouncer Documentation:
- NestJS Documentation:
- Dependencies:
This project is licensed under the MIT License.