Commit 2b8d6e84 authored by Tobias Steiner's avatar Tobias Steiner
Browse files

Add the linker code

parent ef7e2f66
{
"kafka": {
"broker": "localhost:29092",
"schema-registry": "http://localhost:8081"
},
"consumer": {
"config": {
"group.id": "linker",
"socket.keepalive.enable": true,
"enable.auto.commit": true
}
},
"topic": {
"config": {
"topic": "tbd",
"partition" : -1
}
},
"stream": {
"config": {
"request.required.acks": 1
}
},
"neo4j": {
"config": {
"uri": "bolt://localhost:7687",
"user": "neo4j",
"password": "tester"
}
},
"log-dir": "../log/"
}
\ No newline at end of file
import * as KafkaAvro from 'kafka-avro';
const kafkaAvroInstance = new KafkaAvro({
kafkaBroker: 'localhost:29092',
schemaRegistry: 'http://localhost:8081',
});
// Wait until kafka is initialised
export default async () => {
return await kafkaAvroInstance.init()
.then( function() {
console.log("Ready to consume");
return kafkaAvroInstance;
});
};
import neo4jLinker from './neo4j-linker';
import kafkaAvro from './kafka';
import nconf = require('nconf');
import sourcemap = require('source-map-support');
import logger from 'geolinker-common/src/logger'
/**
* Init inline source map
*/
sourcemap.install();
/**
* Load config for the streamer
*/
nconf.argv()
.env()
.file({file: './config.json'});
/**
* This consumer listen to the linker topic and save the data into neo4j
*/
kafkaAvro().then((avro) => {
avro.getConsumerStream(
nconf.get('consumer.config'),
nconf.get('stream.config'),
nconf.get('topic.config')
)
.then(function (stream) {
stream.on('error', function (err) {
if (err) {
logger.error('An error while consuming the stream');
logger.error(JSON.stringify(err));
}
process.exit(1);
});
stream.consumer.on('event.error', function (err) {
logger.warn('An error while consuming the stream');
logger.warn(JSON.stringify(err));
})
stream.on('data', function (rawData) {
// create new node
// rawData.parsed.provider = 'geonames';
neo4jLinker.connect(rawData.parsed);
});
});
}, (error) => {
logger.error('Error while connecting to kafka');
logger.error(JSON.stringify(error));
});
import {v1 as neo4j} from 'neo4j-driver';
import uriBuilder from 'geolinker-common/src/uri-builder';
import nconf = require('nconf');
import logger from 'geolinker-common/src/logger'
/**
* Load config for the streamer
*/
nconf.argv()
.env()
.file({file: './config.json'});
const driver = neo4j.driver(nconf.get('uri'), neo4j.auth.basic(nconf.get('user'), nconf.get('password')));
export default {
/**
* Create a new node in the db if they do not exists
* @param node
* @returns {Result}
*/
newNode: (node) => {
// todo: from topic!
const provider = 'geonames';
const uri = uriBuilder.geoname(node.id);
return driver.session().run(`MERGE (n:Place { uri: '${uri}'})
ON CREATE SET n.id: '${node.id}', n.provider: ${provider}
ON MATCH SET n.id: '${node.id}', n.provider: ${provider}`);
},
/**
* Write connection int the db. Create nodes if they do not exists
* MERGE (o:Place {uri:'exampe.com/1'}) exampe.com/2 MERGE (t1:Place {uri:'exampe.com/3'}) MERGE (o)-[:same_as {author:'bot#1'}]->(t1)
* @param connection
*/
connect: (connection) => {
let query = `MERGE (o:Place {uri:'${connection.from}'}) `;
connection.to.forEach((current, i) => {
query += ` MERGE (t${i}:Place {uri:'${current}'}) `;
query += ` MERGE (o)-[:${connection.relation.type} {author:'${connection.relation.author}'}]->(t${i}) `;
});
driver.session().run(query).then((err) => {
logger.error('Error while saving a connection to neo4j');
logger.error(JSON.stringify(err));
});
}
};
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment