Commit 57798fb2 authored by Tobias Steiner's avatar Tobias Steiner

Remove logs and add key for geolinker topic

parent d6626aef
......@@ -24,10 +24,7 @@ class Extractor extends Transform {
* @private
*/
public _transform(chunk, encoding, callback) {
console.log("Messages");
const data = JSON.parse(chunk.parsed.doc.res.body);
console.log(data);
try {
const res = {
id: data.data.id.toString(),
......@@ -39,7 +36,6 @@ class Extractor extends Transform {
location: this.buildLocation(data.data),
modification_date:this.getDate()
};
console.log(res);
return callback(null, res);
} catch (error) {
......@@ -52,7 +48,7 @@ class Extractor extends Transform {
if((obj.longitude == null)|| (obj.latitude == null)) {
return null;
}
return `${obj.latitude}, ${obj.longitude}`;
return `${obj.latitude},${obj.longitude}`;
}
private getDate() {
......
......@@ -52,12 +52,13 @@ kafka.init().then(async () => {
/**
* On data (flow mode) extract data from magpie and send them to the geolinker
* todo: use the right partition or a different topic
*/
extractorStreamConsumer
.pipe(new ReporterCounter({objectMode: true}, core.getReporter(10000, ), 'read'))
.pipe(new Extractor({objectMode: true}, core.getLogger()))
.pipe(new ReporterCounter({objectMode: true}, core.getReporter(), 'write'))
.pipe(new StreamProducerPreparer({objectMode: true, topic: core.getNconf().get('producer:geolinker:topics'), partition: (chunk) => chunk.provider}))
.pipe(new StreamProducerPreparer({objectMode: true, topic: core.getNconf().get('producer:geolinker:topics'), partition: (chunk) => chunk.provider, key: (d) => `${d.provider}${d.id}`}))
.pipe(new ProducerStream(geolinkerProducer, {objectMode: true, topic: core.getNconf().get('producer:geolinker:topics')}));
core.getLogger().info('Pipes are registered');
} catch (error) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment