Rascal NPM

/**
* CUSTOM LIBS SETUP RASCAL FOR RABBITMQ BY RESTU WAHYU SAPUTRA
**/

import 'dotenv/config'
import { CustomError } from '@helpers/helper.error'
import Broker, { PublicationSession, SubscriberSessionAsPromised } from 'rascal'
import os from 'os'

interface SetterConfig {
	key: string
	prefix: string
}

export class RabbitMQ {
	private brokerConfig: Broker.BrokerConfig
	private key: string
	private prefix: string

	constructor(key: string, prefix: string) {
		this.key = key
		this.prefix = prefix
		this.setConfig({ key, prefix })
	}

	private setConfig(config: SetterConfig) {
		const pubConfig: Broker.PublicationConfig = {
			vhost: process.env.RABBITMQ_VHOST,
			exchange: `rabbitmq_ex:${config.prefix}`,
			routingKey: 'a.b.c',
			autoCreated: true,
			options: {
				persistent: true,
				priority: os.cpus().length
			}
		}

		const subConfig: Broker.PublicationConfig = {
			vhost: process.env.RABBITMQ_VHOST,
			queue: `rabbitmq_eq:${config.prefix}`,
			autoCreated: true
		}

		const newPubConfig: Record<string, any> = {}
		Object.assign(newPubConfig, { [`rabbitmq_pub:${config.key}:${config.prefix}`]: pubConfig })

		const newSubConfig: Record<string, any> = {}
		Object.assign(newSubConfig, { [`rabbitmq_sub:${config.key}:${config.prefix}`]: subConfig })

		this.brokerConfig = {
			vhosts: {
				'/': {
					connectionStrategy: 'random',
					connection: {
						hostname: process.env.RABBITMQ_HOST,
						user: process.env.RABBITMQ_USERNAME,
						password: process.env.RABBITMQ_PASSWORD,
						port: process.env.RABBITMQ_PORT,
						protocol: process.env.RABBITMQ_PROTOCOL
					},
					exchanges: [`rabbitmq_ex:${config.prefix}`],
					queues: [`rabbitmq_eq:${config.prefix}`],
					bindings: [`rabbitmq_ex:${config.prefix}[a.b.c] -> rabbitmq_eq:${config.prefix}`],
					publications: newPubConfig,
					subscriptions: newSubConfig
				}
			},
			recovery: {
				[`rabbitmq_pub:${config.key}:${config.prefix}`]: { requeue: true, strategy: 'ack', xDeathFix: true },
				[`rabbitmq_sub:${config.key}:${config.prefix}`]: { strategy: 'nack' }
			}
		}
	}

	private getConfig(): Broker.BrokerConfig {
		return this.brokerConfig
	}

	private async connection(): Promise<any> {
		try {
			const broker: Broker.BrokerAsPromised = await Broker.BrokerAsPromised.create(this.getConfig())
			broker.on('error', console.error)
			return broker
		} catch (err: any) {
			return new CustomError(err.message)
		}
	}

	async publisher(data: Record<string, any> | Record<string, any>[]): Promise<any> {
		try {
			const connection: Broker.BrokerAsPromised = await this.connection()
			const publisher: PublicationSession = await connection.publish(`rabbitmq_pub:${this.key}:${this.prefix}`, data)

			console.info('RabbitMQ publisher is called')

			publisher.on('success', (jobId: string) => console.log(`job ${jobId} is success`))
			publisher.on('error', (_err: Error, jobId: string) => {
				console.log(`job ${jobId} is error`)
				connection.purge()
			})

			return true
		} catch (err: any) {
			return new CustomError(err.message)
		}
	}

	async subscriber(cb: (content: any, error?: Error) => any): Promise<void> {
		try {
			const connection: Broker.BrokerAsPromised = await this.connection()
			const subscriber: SubscriberSessionAsPromised = await connection.subscribe(`rabbitmq_sub:${this.key}:${this.prefix}`)

			console.info('RabbitMQ subscriber is called')

			subscriber
				.on('message', (_message: any, content: any, ackOrNack: Broker.AckOrNack): void => {
					cb(content)
					ackOrNack()
				})
				.on('error', console.error)
		} catch (err: any) {
			cb(null, err)
		}
	}
}


// publisher demo here
import { RabbitMQ } from '@libs/lib.rabbitmq'
;(async () => {
	try {
		const broker: InstanceType<typeof RabbitMQ> = new RabbitMQ('message:text', 'google')
		setInterval(async () => {
			const res = await broker.publisher({ message: `hello wordl from publisher:${new Date().getTime()}` })
			console.log(res)
		}, 5000)
	} catch (err) {
		console.error(err)
	}
})()

// subscriber demo here
import { RabbitMQ } from '@libs/lib.rabbitmq'
;(async () => {
	try {
		const broker: InstanceType<typeof RabbitMQ> = new RabbitMQ('message:text', 'google')
		broker.subscriber((content: string, error: Error) => {
			if (!error) console.log(content)
		})
	} catch (err) {
		console.error(err)
	}
})()
Restu Wahyu Saputra