Commit 664f5bf5 authored by Tobias Steiner's avatar Tobias Steiner
Browse files

Add reporter

parent 197d7975
......@@ -31,7 +31,12 @@ kafka.init().then(() => {
/**
* pipe the stream from kafka fo neo4j
*/
consumer.pipe(new Neo4jStreamWriter({}, driver));
consumer
.pipe(new Neo4jStreamWriter({}, driver, core));
consumer.on('data', () => {
core.getReporter().setDataIn(1);
});
});
}, (err) => {
core.getLogger().error('Error while connecting to kafka');
......
......@@ -10,13 +10,16 @@
import {Writable} from 'stream';
import Session from 'neo4j-driver/types/v1/session';
import Driver from 'neo4j-driver/types/v1/driver';
import {InterfaceCore} from 'geolinker-common/dist/core';
class Neo4jStreamWriter extends Writable {
private neo4j;
private timeout = 500;
constructor(options = {}, neo4j: Driver) {
private core;
constructor(options = {}, neo4j: Driver, core: InterfaceCore) {
super(options);
this.neo4j = neo4j;
this.core = core;
}
public async _write(chunk, encoding, callback) {
......@@ -26,6 +29,7 @@ class Neo4jStreamWriter extends Writable {
try {
await neo4jSession.run(query);
neo4jSession.close();
this.core.getReporter().setDataOut(1);
setImmediate(callback);
} catch (e) {
neo4jSession.close();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment