Commit 197d7975 authored by Tobias Steiner's avatar Tobias Steiner
Browse files

Use pipes to handle streams

parent b09daeb4
import {Linker} from './neo4j-linker';
import {Core} from 'geolinker-common';
import Timer = NodeJS.Timer;
import Neo4jStreamWriter from './neo4j-stream-writer';
import {v1 as neo4j} from 'neo4j-driver';
/**
* load sourcemap and config
*/
const core = Core.init('linker', __dirname + '/../config.json');
/**
* Init Linker
* @type {Linker}
*/
const linker = new Linker(core.getNconf(), core.getLogger(), core.getUriBuilder());
/**
* This consumer listen to the linker topic and save the data into neo4j
*/
......@@ -20,47 +15,23 @@ kafka.init().then(() => {
/**
* Get the consumer subscribed to the topic linker
*/
kafka.getConsumer().then(async (consumer) => {
core.getLogger().info('Got consumer, need a connection');
/**
* Wait for connection
*/
await consumer.connect();
kafka.getConsumerStream().then(async (consumer) => {
core.getLogger().info('Got streamConsumer');
core.getLogger().info('Consumer is connected');
/**
* Get next from kafka
* Connect to neo4j
* @type {Driver}
*/
let consuming = false;
const next = () => {
consumer.consume(1);
consuming = false;
};
const driver = neo4j.driver(
core.getNconf().get('neo4j:config:uri'),
neo4j.auth.basic(
core.getNconf().get('neo4j:config:user'),
core.getNconf().get('neo4j:config:password'),
));
/**
* On data, write stuff to neo4j
* pipe the stream from kafka fo neo4j
*/
consumer.on('data', (rawData) => {
consuming = true;
core.getReporter().setDataIn(1);
// create new node
// rawData.parsed.provider = 'geonames';
// todo: we need to profile this error
// https://stackoverflow.com/questions/30403504/neo4j-merge-performance-vs-create-set
linker.connect(rawData.parsed).then(() => {
next();
}, (error) => {
core.getLogger().error('Could not save data to neo4j ' + JSON.stringify(error));
next();
});
});
setTimeout(() => {
if (consuming === false) {
next();
}
}, 100);
consumer.pipe(new Neo4jStreamWriter({}, driver));
});
}, (err) => {
core.getLogger().error('Error while connecting to kafka');
......
/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/
import {Writable} from 'stream';
import Session from 'neo4j-driver/types/v1/session';
import Driver from 'neo4j-driver/types/v1/driver';
class Neo4jStreamWriter extends Writable {
private neo4j;
private timeout = 500;
constructor(options = {}, neo4j: Driver) {
super(options);
this.neo4j = neo4j;
}
public async _write(chunk, encoding, callback) {
const query = this.buildQuery(chunk);
const neo4jSession: Session = this.neo4j.session();
try {
await neo4jSession.run(query);
neo4jSession.close();
setImmediate(callback);
} catch (e) {
neo4jSession.close();
// Just delay this thing a bit and pass the params
// backpressure will get exerted this way.
// todo: check the error first
// neo4j.error.SERVICE_UNAVAILABLE
setTimeout(() => {
this._write(chunk, encoding, callback);
}, this.timeout);
}
}
/**
* Build the query
* todo: we should move this into a transformer
* @param connection
*/
private buildQuery(connection) {
let query = `MERGE (o:Place {uri:'${connection.from}'}) `;
connection.to.forEach((current, i) => {
query += ` MERGE (t${i}:Place {uri:'${current}'}) `;
query += ` MERGE (o)-[:${connection.relation.type} {author:'${connection.relation.author}'}]->(t${i}) `;
});
return query;
}
}
export default Neo4jStreamWriter;
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment