Commit b09daeb4 authored by Tobias Steiner's avatar Tobias Steiner
Browse files

Managing backpressure

parent f35bfe58
......@@ -31,28 +31,36 @@ kafka.init().then(() => {
/**
* Get next from kafka
*/
consumer.consume();
let consuming = false;
const next = () => {
consumer.consume(1);
consuming = false;
};
/**
* On data, write stuff to neo4j
*/
consumer.on('data', (rawData) => {
core.getLogger().info('Consumed some data');
consuming = true;
core.getReporter().setDataIn(1);
// create new node
// rawData.parsed.provider = 'geonames';
// todo: we need to profile this error
// https://stackoverflow.com/questions/30403504/neo4j-merge-performance-vs-create-set
linker.connect(rawData.parsed).then(() => {
next();
}, (error) => {
core.getLogger().error('Could not save data to neo4j ' + JSON.stringify(error));
consumer.disconnect();
setTimeout(() => {
consumer.connect();
consumer.consume();
}, 1000);
next();
});
});
setTimeout(() => {
if (consuming === false) {
next();
}
}, 100);
});
}, (err) => {
core.getLogger().error('Error while connecting to kafka');
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment