Commit f35bfe58 authored by Tobias Steiner's avatar Tobias Steiner
Browse files

Managing backpressure

parent 215ff526
......@@ -31,10 +31,7 @@ kafka.init().then(() => {
/**
* Get next from kafka
*/
const next = () => {
// start consuming
consumer.consume(1);
};
consumer.consume();
/**
* On data, write stuff to neo4j
......@@ -47,14 +44,15 @@ kafka.init().then(() => {
// todo: we need to profile this error
// https://stackoverflow.com/questions/30403504/neo4j-merge-performance-vs-create-set
linker.connect(rawData.parsed).then(() => {
next();
}, (error) => {
core.getLogger().error('Could not save data to neo4j ' + JSON.stringify(error));
throw error;
// next();
consumer.disconnect();
setTimeout(() => {
consumer.connect();
consumer.consume();
}, 1000);
});
});
next();
});
}, (err) => {
core.getLogger().error('Error while connecting to kafka');
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment