Commit ece48f5e authored by Tobias Steiner's avatar Tobias Steiner
Browse files

Add key to the second producer

parent 851e2b6e
......@@ -71,7 +71,7 @@ kafka.init().then(async () => {
.pipe(new ReporterCounter({objectMode: true}, core.getReporter(10000, 'normalizer'), 'read'))
.pipe(new WikiNormalizer({objectMode: true}, core.getLogger(), wikidataProperties))
.pipe(new ReporterCounter({objectMode: true}, core.getReporter('normalizer'), 'write'))
.pipe(new StreamProducerPreparer({objectMode: true, topic: core.getNconf().get('producer:normalizer:topics'), partition: -1}))
.pipe(new StreamProducerPreparer({objectMode: true, topic: core.getNconf().get('producer:normalizer:topics'), partition: -1, key: (d) => `${d.provider}${d.id}`}))
.pipe(new Debug({objectMode: true}, '---'))
.pipe(new ProducerStream(normProducer, {objectMode: true, topic: core.getNconf().get('producer:normalizer:topics')}));
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment