Commit 544fa910 authored by Tobias Steiner's avatar Tobias Steiner
Browse files

Add the possibility to generate more than one producer and consumer

parent ad85436b
......@@ -3,9 +3,9 @@ import {InterfaceCore} from './core';
export interface InterfaceKafkaGeoname {
init();
getProducer();
getConsumerStream();
getConsumer();
getProducer(name?: string);
getConsumerStream(name?: string);
getConsumer(name?: string);
getKafkaAvro();
}
......@@ -17,16 +17,16 @@ export default class KafkaGeoname implements InterfaceKafkaGeoname {
private kafkaAvro = null;
/**
* A producer instanance or null
* An object with a collection of producer instances
* @type {null}
*/
private producer = null;
private producer = {};
/**
* A consumer stream or null
* An object with a collection of consumer streams
* @type {null}
*/
private consumerStream = null;
private consumerStream = {};
/**
* Name of the instance. Is used in error logging
......@@ -44,6 +44,7 @@ export default class KafkaGeoname implements InterfaceKafkaGeoname {
/**
*
* todo: better DI
* @param {string} name
* @param core
*/
......@@ -51,6 +52,7 @@ export default class KafkaGeoname implements InterfaceKafkaGeoname {
this.name = name;
this.core = core;
}
/**
* Init kafka. Connect to the schema registry and get all the schemas
* This is an async operation
......@@ -77,31 +79,41 @@ export default class KafkaGeoname implements InterfaceKafkaGeoname {
* Get a kafka producer. The config is provided by nconf
* @returns {Promise<any>}
*/
public async getProducer() {
if (this.producer === null) {
return this.createProducer();
public async getProducer(name: string = 'default') {
if (this.producer[name] === null) {
return this.createProducer(name);
}
return this.producer;
return this.producer[name];
}
/**
* Get a consumer stream
* @returns {Promise<any>}
*/
public async getConsumerStream() {
if (this.consumerStream === null) {
return this.createConsumerStream();
public async getConsumerStream(name: string = 'default') {
if (this.consumerStream[name] === null) {
return this.createConsumerStream(name);
}
return this.consumerStream;
return this.consumerStream[name];
}
/**
* get a classic consumer, subscribed to a topic
* @returns {Promise<any>}
*/
public async getConsumer() {
return this.kafkaAvro.getConsumer(this.core.getNconf().get('consumer:config'),
this.core.getNconf().get('consumer:topics:config'))
public async getConsumer(name: string = 'default') {
/**
* Define a namespace prefix for the producer
* For the default consumer there is no prefix
* @type {string}
*/
let prefix = 'consumer';
if (name !== 'default') {
prefix = `consumer:${name}`;
}
return this.kafkaAvro.getConsumer(this.core.getNconf().get(`${prefix}:config`),
this.core.getNconf().get(`${prefix}:topics:config`))
// the "getConsumer()" method will return a bluebird promise.
.then((consumer) => {
// Perform a consumer.connect()
......@@ -121,7 +133,7 @@ export default class KafkaGeoname implements InterfaceKafkaGeoname {
})
.then( (consumer) => {
// subscribe to topic
consumer.subscribe( this.core.getNconf().get('consumer:topics:topics'));
consumer.subscribe( this.core.getNconf().get(`${prefix}:topics:topics`));
return consumer;
});
}
......@@ -146,7 +158,18 @@ export default class KafkaGeoname implements InterfaceKafkaGeoname {
* Get a promise with a producer
* @returns {Promise<any>}
*/
private async createProducer() {
private async createProducer(name: string) {
/**
* Define a namespace prefix for the producer
* For the default consumer there is no prefix
* @type {string}
*/
let prefix = 'producer';
if (name !== 'default') {
prefix = `producer:${name}`;
}
/**
* Check if kafka is inited
*/
......@@ -154,20 +177,20 @@ export default class KafkaGeoname implements InterfaceKafkaGeoname {
/**
* Get the producer
*/
this.producer = await this.kafkaAvro.getProducer(this.core.getNconf().get('producer:config'));
this.producer[name] = await this.kafkaAvro.getProducer(this.core.getNconf().get(`${prefix}:config`));
/**
* Disconnected
*/
this.producer.on('disconnected', (arg) => {
this.core.getLogger().error(`Producer (${this.name}) disconnected. ${JSON.stringify(arg)}`);
this.producer[name].on('disconnected', (arg) => {
this.core.getLogger().error(`Producer (${this.name} -> ${name}) disconnected. ${JSON.stringify(arg)}`);
});
/**
* Stream is ready
*/
this.producer.on('ready', () => {
this.core.getLogger().info(`The producer (${this.name}) is ready`);
this.producer[name].on('ready', () => {
this.core.getLogger().info(`The producer (${this.name} -> ${name}) is ready`);
});
/**
......@@ -179,27 +202,38 @@ export default class KafkaGeoname implements InterfaceKafkaGeoname {
/**
* Error in the producer
*/
this.producer.on('error', (err) => {
this.producer[name].on('error', (err) => {
// Here's where we'll know if something went wrong sending to Kafka
this.core.getLogger().warn(`Error in our kafka (${this.name}) stream ${JSON.stringify(err)}`);
this.core.getLogger().warn(`Error in our kafka (${this.name} -> ${name}) stream ${JSON.stringify(err)}`);
});
/**
* Terminated producer
*/
this.producer.on('end', () => {
this.core.getLogger().error(`The producer stream (${this.name}) ended`);
this.producer[name].on('end', () => {
this.core.getLogger().error(`The producer stream (${this.name} -> ${name}) ended`);
throw Error('The producer ended');
});
return this.producer;
return this.producer[name];
}
/**
* Create a consumer stream and register error handling
* @returns {Promise<any>}
*/
private async createConsumerStream() {
private async createConsumerStream(name: string) {
/**
* Define a namespace prefix for the config.
* For the default consumer there is no profix
* @type {string}
*/
let prefix = 'consumer';
if (name !== 'default') {
prefix = `consumer:${name}`;
}
/**
* Check if kafka is inited
*/
......@@ -209,16 +243,16 @@ export default class KafkaGeoname implements InterfaceKafkaGeoname {
* crete consumer stream
* @type {Promise<any> | any}
*/
this.consumerStream = await this.kafkaAvro.getConsumerStream(
this.core.getNconf().get('consumer:config'),
this.core.getNconf().get('consumer:stream'),
this.core.getNconf().get('consumer:topics'),
this.consumerStream[name] = await this.kafkaAvro.getConsumerStream(
this.core.getNconf().get(`${prefix}:config`),
this.core.getNconf().get(`${prefix}:stream`),
this.core.getNconf().get(`${prefix}:topics`),
);
/**
* On a general error -> exit
*/
this.consumerStream.on('error', (err) => {
this.consumerStream[name].on('error', (err) => {
if (err) {
this.core.getLogger().error('An error while consuming the stream');
this.core.getLogger().error(JSON.stringify(err));
......@@ -229,11 +263,11 @@ export default class KafkaGeoname implements InterfaceKafkaGeoname {
/**
* Just a stream error
*/
this.consumerStream.consumer.on('event.error', (err) => {
this.consumerStream[name].consumer.on('event.error', (err) => {
this.core.getLogger().warn('An error while consuming the stream');
this.core.getLogger().warn(JSON.stringify(err));
});
return this.consumerStream;
return this.consumerStream[name];
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment