Commit d755c066 authored by Tobias Steiner's avatar Tobias Steiner
Browse files

Add a basic readme file for the lib

parent 544fa910
# Geolinker common
This simple lib just bundle modules for the geolinker. Over a core instance you can have access to kafka-consumers, kafka-producers, reporter, logger, config and uriBuilder. This tools are used in most of the streaming app and so we can stop replicating code - at least a bit.
## Config
All streaming apps provides a common config-file. Over this file you can configure the diffrent modules and then access an instance over the core. The configs makes use of the `nconf` module. A typical config looks like
```json
{
"kafka": {
"broker": "localhost:29092",
"schema-registry": "http://localhost:8081",
"fetchAllVersions": true
},
"producer": {
"config": {
"batch.num.messages": 100,
"message.send.max.retries": 5,
"retry.backoff.ms": 5000,
"message.max.bytes": 10485760
},
"topics": "wikidata-small",
"partition": -1,
"author": "bot#1"
},
"consumer": {
"config": {
"group.id": "wikidata-normalizer",
"socket.keepalive.enable": true,
"enable.auto.commit": true,
"queued.max.messages.kbytes": 100
},
"topics": {"topics": "wikidata-geo"},
"stream": {
"request.required.acks": 1
}
},
"log-dir": "/tmp",
"reporter": {
"url": "http://dashboard-api"
}
}
```
### general kafka config
```json
{
"kafka": {
"broker": "localhost:29092",
"schema-registry": "http://localhost:8081",
"fetchAllVersions": true
}
}
```
* broker = A string or an array of kafka brokers
* schema-registry = The schema registry url with protocol
* fetchAllVersions = Fetsch all versions of the schema or just the oldest one
### kafka producer config
You can configure several producers in one file. You need to prefix the config with the name of the producer. In the following example you configure an `example` producer.
```json
{
"producer": {
"example" : {
"config": {
"batch.num.messages": 100,
"message.send.max.retries": 5,
"retry.backoff.ms": 5000,
"message.max.bytes": 10485760
},
"topics": "wikidata-small",
"partition": -1,
"author": "bot#1"
}
}
}
```
If you just need a single producer for the app (common case), you can leave the prefix and just go with following config
```json
{
"producer": {
"config": {
"batch.num.messages": 100,
"message.send.max.retries": 5,
"retry.backoff.ms": 5000,
"message.max.bytes": 10485760
},
"topics": "wikidata-small",
"partition": -1,
"author": "bot#1"
}
}
```
* config = An object with producer config options from librdkafka
* batch.num.messages = Number of messages in a batch. If this is too big, the app will run out of memory
* message.send.max.retries = Retries for unsucceffull writes
* retry.backoff.ms = Backoff time for retires
* message.max.bytes = Max lenght of the message
* topic = A string or an array of names for topics
* partition = The number if the partition. -1 will autoselect the partition
* author: The name of the author of a message (rarely used)
### kafka consumer
You can configure several consumers in one file. You need to prefix the config with the name of the consumer. In the following example you configure an `example` consumer.
```json
{
"consumer": {
"example" : {
"config": {
"group.id": "wikidata-normalizer",
"socket.keepalive.enable": true,
"enable.auto.commit": true,
"queued.max.messages.kbytes": 100
},
"topics": {"topics": "wikidata-geo"},
"stream": {
"request.required.acks": 1
}
}
}
}
```
If you just need a single consumer for the app (common case), you can leave the prefix and just go with following config
```json
{
"consumer": {
"config": {
"group.id": "wikidata-normalizer",
"socket.keepalive.enable": true,
"enable.auto.commit": true,
"queued.max.messages.kbytes": 100
},
"topics": {"topics": "wikidata-geo"},
"stream": {
"request.required.acks": 1
}
}
}
```
* config = An object with consumer config options from librdkafka
* group.id = The id of the consumer group
* socket.keepalive.enable = Keep the socket open
* enable.auto.commit = Commit the offset automatic or manual
* queued.max.messages.kbytes = Max lenght of the message
* topic = An object of of topics (we should refactor this)
* stream
* request.required.acks: Should kafka producer wait for ack
### Log (winston) config
As a logger we use `winston`. We use two transports to deliver logs. One to stadOut and one into a file. For the file we need a configured log-dir.
```json
{
"log-dir": "/tmp"
}
```
### Reporter config
The status reporter need to get an address from the config to report.
```json
{
"reporter": {
"url": "http://dashboard-api"
}
}
```
* url: The url of the reporter
## Core
The `core` is a container with instances or factories to provide the service. You can inject the core into a class to have access to the services. So you get one instance over the project.
```js
const core = Core.init(name, configFilePath);
```
To init the core you need to provide a name of the app and the path to the config file.
## Reporter
The reporter helps us to collect some basic metrics about the app. It helps us to debug and to see the throughput of the app.
```js
const core = Core.init(name, configFilePath);
// report the amount of incoming messages
core.getReporter().setDataIn(10);
// report the amout of outgoing messages
core.getReporter().setDataOut(5);
```
If you need more than one reporter for the app, you can get a names instance. This can be helpful if you have one Readable and two Writable streams.
```js
const core = Core.init(name, configFilePath);
// init a reporter with the name example and a report timeout of 10s. Then report the amount of incoming messages
core.getReporter(10000, 'example').setDataIn(10);
// report the amount of outgoing messages
core.getReporter('example').setDataOut(5);
```
## Kafka
The kafka class uses `kafka-avro` module to provide kafka and avro schemas. We just unified the method to configure and init kafka, consumers and producers.
The class gets the configuration from the nconf file. There we can configure all details for each consumer and producer.
```js
const core = Core.init(name, configFilePath);
// get a kafka instance with some basic config
const kafka = core.getKafkaAvro();
// int kafka: connect to schema-registry and brokers
kafka.init().then(async () => {
// Get consumer ready to consume
try {
const consumer = await kafka.getConsumer();
// get producer ready to produce
const producer = await kafka.getProducer();
} catch(error) {
console.log('Error: Can not open a consume ror producer');
}
}
```
## Logger
We use `winston` as a logger module. The get access to the same logger instance over the app, we init and provide it in the core.
```js
const core = Core.init(name, configFilePath);
// get the logger instance
core.getLogger().error('This is an error.');
core.getLogger().warn('This is a warning.');
core.getLogger().info('This is an info.');
core.getLogger().debug('This is a debug message.');
```
## Stream utilities
We provide some common stream utilities. Those can be used in all projects.
### Stream ReporterCounter
A Transformable stream to report throughput to the reporter
### Stream StreamProducerPreparer
A Transformable stream to prepare a message for kafka-avro
### Stream Debug
A Transfromable stream to log the content of the stream to console.
## Tests
There are some tests `npm run test` for the lib. But we need to improve them and let the cover the full lib
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment