Commit 055c7bca authored by Tobinsk's avatar Tobinsk
Browse files

Merge branch '3-kafka-several-consumer-and-producer' into 'master'

Resolve "Kafka several consumer and producer"

Closes #3

See merge request !4
parents ad85436b 7ce78108
# Geolinker common
This simple lib just bundle modules for the geolinker. Over a core instance you can have access to kafka-consumers, kafka-producers, reporter, logger, config and uriBuilder. This tools are used in most of the streaming app and so we can stop replicating code - at least a bit.
## Config
All streaming apps provides a common config-file. Over this file you can configure the diffrent modules and then access an instance over the core. The configs makes use of the `nconf` module. A typical config looks like
```json
{
"kafka": {
"broker": "localhost:29092",
"schema-registry": "http://localhost:8081",
"fetchAllVersions": true
},
"producer": {
"config": {
"batch.num.messages": 100,
"message.send.max.retries": 5,
"retry.backoff.ms": 5000,
"message.max.bytes": 10485760
},
"topics": "wikidata-small",
"partition": -1,
"author": "bot#1"
},
"consumer": {
"config": {
"group.id": "wikidata-normalizer",
"socket.keepalive.enable": true,
"enable.auto.commit": true,
"queued.max.messages.kbytes": 100
},
"topics": {"topics": "wikidata-geo"},
"stream": {
"request.required.acks": 1
}
},
"log-dir": "/tmp",
"reporter": {
"url": "http://dashboard-api"
}
}
```
### general kafka config
```json
{
"kafka": {
"broker": "localhost:29092",
"schema-registry": "http://localhost:8081",
"fetchAllVersions": true
}
}
```
* broker = A string or an array of kafka brokers
* schema-registry = The schema registry url with protocol
* fetchAllVersions = Fetsch all versions of the schema or just the oldest one
### kafka producer config
You can configure several producers in one file. You need to prefix the config with the name of the producer. In the following example you configure an `example` producer.
```json
{
"producer": {
"example" : {
"config": {
"batch.num.messages": 100,
"message.send.max.retries": 5,
"retry.backoff.ms": 5000,
"message.max.bytes": 10485760
},
"topics": "wikidata-small",
"partition": -1,
"author": "bot#1"
}
}
}
```
If you just need a single producer for the app (common case), you can leave the prefix and just go with following config
```json
{
"producer": {
"config": {
"batch.num.messages": 100,
"message.send.max.retries": 5,
"retry.backoff.ms": 5000,
"message.max.bytes": 10485760
},
"topics": "wikidata-small",
"partition": -1,
"author": "bot#1"
}
}
```
* config = An object with producer config options from librdkafka
* batch.num.messages = Number of messages in a batch. If this is too big, the app will run out of memory
* message.send.max.retries = Retries for unsucceffull writes
* retry.backoff.ms = Backoff time for retires
* message.max.bytes = Max lenght of the message
* topic = A string or an array of names for topics
* partition = The number if the partition. -1 will autoselect the partition
* author: The name of the author of a message (rarely used)
### kafka consumer
You can configure several consumers in one file. You need to prefix the config with the name of the consumer. In the following example you configure an `example` consumer.
```json
{
"consumer": {
"example" : {
"config": {
"group.id": "wikidata-normalizer",
"socket.keepalive.enable": true,
"enable.auto.commit": true,
"queued.max.messages.kbytes": 100
},
"topics": {"topics": "wikidata-geo"},
"stream": {
"request.required.acks": 1
}
}
}
}
```
If you just need a single consumer for the app (common case), you can leave the prefix and just go with following config
```json
{
"consumer": {
"config": {
"group.id": "wikidata-normalizer",
"socket.keepalive.enable": true,
"enable.auto.commit": true,
"queued.max.messages.kbytes": 100
},
"topics": {"topics": "wikidata-geo"},
"stream": {
"request.required.acks": 1
}
}
}
```
* config = An object with consumer config options from librdkafka
* group.id = The id of the consumer group
* socket.keepalive.enable = Keep the socket open
* enable.auto.commit = Commit the offset automatic or manual
* queued.max.messages.kbytes = Max lenght of the message
* topic = An object of of topics (we should refactor this)
* stream
* request.required.acks: Should kafka producer wait for ack
### Log (winston) config
As a logger we use `winston`. We use two transports to deliver logs. One to stadOut and one into a file. For the file we need a configured log-dir.
```json
{
"log-dir": "/tmp"
}
```
### Reporter config
The status reporter need to get an address from the config to report.
```json
{
"reporter": {
"url": "http://dashboard-api"
}
}
```
* url: The url of the reporter
## Core
The `core` is a container with instances or factories to provide the service. You can inject the core into a class to have access to the services. So you get one instance over the project.
```js
const core = Core.init(name, configFilePath);
```
To init the core you need to provide a name of the app and the path to the config file.
## Reporter
The reporter helps us to collect some basic metrics about the app. It helps us to debug and to see the throughput of the app.
```js
const core = Core.init(name, configFilePath);
// report the amount of incoming messages
core.getReporter().setDataIn(10);
// report the amout of outgoing messages
core.getReporter().setDataOut(5);
```
If you need more than one reporter for the app, you can get a names instance. This can be helpful if you have one Readable and two Writable streams.
```js
const core = Core.init(name, configFilePath);
// init a reporter with the name example and a report timeout of 10s. Then report the amount of incoming messages
core.getReporter(10000, 'example').setDataIn(10);
// report the amount of outgoing messages
core.getReporter('example').setDataOut(5);
```
## Kafka
The kafka class uses `kafka-avro` module to provide kafka and avro schemas. We just unified the method to configure and init kafka, consumers and producers.
The class gets the configuration from the nconf file. There we can configure all details for each consumer and producer.
```js
const core = Core.init(name, configFilePath);
// get a kafka instance with some basic config
const kafka = core.getKafkaAvro();
// int kafka: connect to schema-registry and brokers
kafka.init().then(async () => {
// Get consumer ready to consume
try {
const consumer = await kafka.getConsumer();
// get producer ready to produce
const producer = await kafka.getProducer();
} catch(error) {
console.log('Error: Can not open a consume ror producer');
}
}
```
## Logger
We use `winston` as a logger module. The get access to the same logger instance over the app, we init and provide it in the core.
```js
const core = Core.init(name, configFilePath);
// get the logger instance
core.getLogger().error('This is an error.');
core.getLogger().warn('This is a warning.');
core.getLogger().info('This is an info.');
core.getLogger().debug('This is a debug message.');
```
## Stream utilities
We provide some common stream utilities. Those can be used in all projects.
### Stream ReporterCounter
A Transformable stream to report throughput to the reporter
### Stream StreamProducerPreparer
A Transformable stream to prepare a message for kafka-avro
### Stream Debug
A Transfromable stream to log the content of the stream to console.
## Tests
There are some tests `npm run test` for the lib. But we need to improve them and let the cover the full lib
\ No newline at end of file
......@@ -6,6 +6,7 @@ import {LoggerInstance} from 'winston';
import UriBuilder from './uri-builder';
import {InterfaceReporter, InterfaceReporterCollection, Reporter} from './reporter';
import fs = require('fs');
import {InterfaceCore} from '../dist/core';
export interface InterfaceCore {
name: string;
......@@ -102,11 +103,18 @@ class Core {
/**
* Get a singleton reporter instance with a specific name
* @param {number} interval
* @param {number | string} nameOrInterval The interval or a shortcut for the name
* @param {string} name
* @returns {Reporter}
*/
public getReporter(interval: number = 10000, name: string = 'default'): InterfaceReporter {
public getReporter(nameOrInterval: number | string = 10000, name: string = 'default'): InterfaceReporter {
let interval = 10000;
if (typeof nameOrInterval === 'number') {
interval = nameOrInterval;
} else {
name = nameOrInterval;
}
let reporter = this.reporter.find((r) => r.name === name);
if (typeof reporter === 'undefined') {
reporter = {name, reporter: new Reporter(this, interval)};
......
......@@ -3,9 +3,9 @@ import {InterfaceCore} from './core';
export interface InterfaceKafkaGeoname {
init();
getProducer();
getConsumerStream();
getConsumer();
getProducer(name?: string);
getConsumerStream(name?: string);
getConsumer(name?: string);
getKafkaAvro();
}
......@@ -17,16 +17,16 @@ export default class KafkaGeoname implements InterfaceKafkaGeoname {
private kafkaAvro = null;
/**
* A producer instanance or null
* An object with a collection of producer instances
* @type {null}
*/
private producer = null;
private producer = {};
/**
* A consumer stream or null
* An object with a collection of consumer streams
* @type {null}
*/
private consumerStream = null;
private consumerStream = {};
/**
* Name of the instance. Is used in error logging
......@@ -44,6 +44,7 @@ export default class KafkaGeoname implements InterfaceKafkaGeoname {
/**
*
* todo: better DI
* @param {string} name
* @param core
*/
......@@ -51,6 +52,7 @@ export default class KafkaGeoname implements InterfaceKafkaGeoname {
this.name = name;
this.core = core;
}
/**
* Init kafka. Connect to the schema registry and get all the schemas
* This is an async operation
......@@ -77,31 +79,41 @@ export default class KafkaGeoname implements InterfaceKafkaGeoname {
* Get a kafka producer. The config is provided by nconf
* @returns {Promise<any>}
*/
public async getProducer() {
if (this.producer === null) {
return this.createProducer();
public async getProducer(name: string = 'default') {
if (this.producer[name] === null) {
return this.createProducer(name);
}
return this.producer;
return this.producer[name];
}
/**
* Get a consumer stream
* @returns {Promise<any>}
*/
public async getConsumerStream() {
if (this.consumerStream === null) {
return this.createConsumerStream();
public async getConsumerStream(name: string = 'default') {
if (this.consumerStream[name] === null) {
return this.createConsumerStream(name);
}
return this.consumerStream;
return this.consumerStream[name];
}
/**
* get a classic consumer, subscribed to a topic
* @returns {Promise<any>}
*/
public async getConsumer() {
return this.kafkaAvro.getConsumer(this.core.getNconf().get('consumer:config'),
this.core.getNconf().get('consumer:topics:config'))
public async getConsumer(name: string = 'default') {
/**
* Define a namespace prefix for the producer
* For the default consumer there is no prefix
* @type {string}
*/
let prefix = 'consumer';
if (name !== 'default') {
prefix = `consumer:${name}`;
}
return this.kafkaAvro.getConsumer(this.core.getNconf().get(`${prefix}:config`),
this.core.getNconf().get(`${prefix}:topics:config`))
// the "getConsumer()" method will return a bluebird promise.
.then((consumer) => {
// Perform a consumer.connect()
......@@ -121,7 +133,7 @@ export default class KafkaGeoname implements InterfaceKafkaGeoname {
})
.then( (consumer) => {
// subscribe to topic
consumer.subscribe( this.core.getNconf().get('consumer:topics:topics'));
consumer.subscribe( this.core.getNconf().get(`${prefix}:topics:topics`));
return consumer;
});
}
......@@ -146,7 +158,18 @@ export default class KafkaGeoname implements InterfaceKafkaGeoname {
* Get a promise with a producer
* @returns {Promise<any>}
*/
private async createProducer() {
private async createProducer(name: string) {
/**
* Define a namespace prefix for the producer
* For the default consumer there is no prefix
* @type {string}
*/
let prefix = 'producer';
if (name !== 'default') {
prefix = `producer:${name}`;
}
/**
* Check if kafka is inited
*/
......@@ -154,20 +177,20 @@ export default class KafkaGeoname implements InterfaceKafkaGeoname {
/**
* Get the producer
*/
this.producer = await this.kafkaAvro.getProducer(this.core.getNconf().get('producer:config'));
this.producer[name] = await this.kafkaAvro.getProducer(this.core.getNconf().get(`${prefix}:config`));
/**
* Disconnected
*/
this.producer.on('disconnected', (arg) => {
this.core.getLogger().error(`Producer (${this.name}) disconnected. ${JSON.stringify(arg)}`);
this.producer[name].on('disconnected', (arg) => {
this.core.getLogger().error(`Producer (${this.name} -> ${name}) disconnected. ${JSON.stringify(arg)}`);
});
/**
* Stream is ready
*/
this.producer.on('ready', () => {
this.core.getLogger().info(`The producer (${this.name}) is ready`);
this.producer[name].on('ready', () => {
this.core.getLogger().info(`The producer (${this.name} -> ${name}) is ready`);
});
/**
......@@ -179,27 +202,38 @@ export default class KafkaGeoname implements InterfaceKafkaGeoname {
/**
* Error in the producer
*/
this.producer.on('error', (err) => {
this.producer[name].on('error', (err) => {
// Here's where we'll know if something went wrong sending to Kafka
this.core.getLogger().warn(`Error in our kafka (${this.name}) stream ${JSON.stringify(err)}`);
this.core.getLogger().warn(`Error in our kafka (${this.name} -> ${name}) stream ${JSON.stringify(err)}`);
});
/**
* Terminated producer
*/
this.producer.on('end', () => {
this.core.getLogger().error(`The producer stream (${this.name}) ended`);
this.producer[name].on('end', () => {
this.core.getLogger().error(`The producer stream (${this.name} -> ${name}) ended`);
throw Error('The producer ended');
});
return this.producer;
return this.producer[name];
}
/**
* Create a consumer stream and register error handling
* @returns {Promise<any>}
*/
private async createConsumerStream() {
private async createConsumerStream(name: string) {
/**
* Define a namespace prefix for the config.
* For the default consumer there is no profix
* @type {string}
*/
let prefix = 'consumer';
if (name !== 'default') {
prefix = `consumer:${name}`;
}
/**
* Check if kafka is inited
*/
......@@ -209,16 +243,16 @@ export default class KafkaGeoname implements InterfaceKafkaGeoname {
* crete consumer stream
* @type {Promise<any> | any}
*/
this.consumerStream = await this.kafkaAvro.getConsumerStream(
this.core.getNconf().get('consumer:config'),
this.core.getNconf().get('consumer:stream'),
this.core.getNconf().get('consumer:topics'),
this.consumerStream[name] = await this.kafkaAvro.getConsumerStream(
this.core.getNconf().get(`${prefix}:config`),
this.core.getNconf().get(`${prefix}:stream`),
this.core.getNconf().get(`${prefix}:topics`),
);
/**
* On a general error -> exit
*/
this.consumerStream.on('error', (err) => {
this.consumerStream[name].on('error', (err) => {
if (err) {
this.core.getLogger().error('An error while consuming the stream');
this.core.getLogger().error(JSON.stringify(err));
......@@ -229,11 +263,11 @@ export default class KafkaGeoname implements InterfaceKafkaGeoname {
/**
* Just a stream error
*/
this.consumerStream.consumer.on('event.error', (err) => {
this.consumerStream[name].consumer.on('event.error', (err) => {
this.core.getLogger().warn('An error while consuming the stream');
this.core.getLogger().warn(JSON.stringify(err));
});
return this.consumerStream;
return this.consumerStream[name];
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment