Commit 54bc65c4 authored by Tobias Steiner's avatar Tobias Steiner
Browse files

Add the provider to the key. So we cant double them in kafka

parent fa6004bb
......@@ -80,7 +80,7 @@ kafka.init().then(async () => {
.pipe(new ReporterCounter({objectMode: true}, core.getReporter(10000, 'analyser'), 'read'))
.pipe(new AnalyserTransformable({objectMode: true}, core.getUriBuilder(), core.getNconf()))
.pipe(new ReporterCounter({objectMode: true}, core.getReporter(10000, 'analyser'), 'write'))
.pipe(new StreamProducerPreparer({objectMode: true, topic: core.getNconf().get('producer:analyser:topics'), partition: -1, key: (d) => d.id}))
.pipe(new StreamProducerPreparer({objectMode: true, topic: core.getNconf().get('producer:analyser:topics'), partition: -1, key: (d) => `${d.provider}${d.id}`}))
.pipe(new ProducerStream(analyseProducer, {objectMode: true, topic: core.getNconf().get('producer:analyser:topics')}));
core.getLogger().info('Pipes are registered');
} catch (error) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment