Commit bd997439 authored by Tobias Steiner's avatar Tobias Steiner
Browse files

Translate linker-consumer js code to tyescript

parent 2b8d6e84
......@@ -12,8 +12,7 @@
},
"topic": {
"config": {
"topic": "tbd",
"partition" : -1
"topics": "tbd"
}
},
"stream": {
......
......@@ -24,7 +24,7 @@
"lint": "node_modules/.bin/tslint -c tslint.json 'src/**/*.ts'",
"build": "node_modules/.bin/tsc",
"watch": "concurrently -k -p \"[{name}]\" -n \"TypeScript,Node\" -c \"yellow.bold,cyan.bold,green.bold\" \"npm run watch-ts\" \"npm run watch-node\"",
"watch-node": "nodemon dist/index.js",
"watch-node": "nodemon dist/linker-consumer.js",
"watch-ts": "tsc -w"
},
"author": "",
......
import * as KafkaAvro from 'kafka-avro';
import nconf = require('nconf');
import logger from 'geolinker-common/dist/logger';
/**
* Load config for the streamer
*/
nconf.argv()
.env()
.file({file: './config.json'});
const kafkaAvroInstance = new KafkaAvro({
kafkaBroker: 'localhost:29092',
schemaRegistry: 'http://localhost:8081',
kafkaBroker: nconf.get('kafka:broker'),
schemaRegistry: nconf.get('kafka:schema-registry'),
});
// Wait until kafka is initialised
// Wait until kafka is initialised then return an instance
export default async () => {
return await kafkaAvroInstance.init()
.then( function() {
console.log("Ready to consume");
return kafkaAvroInstance;
});
return await kafkaAvroInstance.init()
.then(() => {
logger.info('Ready to consume');
return kafkaAvroInstance;
});
};
......@@ -2,7 +2,7 @@ import neo4jLinker from './neo4j-linker';
import kafkaAvro from './kafka';
import nconf = require('nconf');
import sourcemap = require('source-map-support');
import logger from 'geolinker-common/src/logger'
import logger from 'geolinker-common/dist/logger';
/**
* Init inline source map
......@@ -21,12 +21,15 @@ nconf.argv()
*/
kafkaAvro().then((avro) => {
avro.getConsumerStream(
nconf.get('consumer.config'),
nconf.get('stream.config'),
nconf.get('topic.config')
nconf.get('consumer:config'),
nconf.get('stream:config'),
nconf.get('topic:config'),
)
.then(function (stream) {
stream.on('error', function (err) {
.then((stream) => {
/**
* On a general error -> exit
*/
stream.on('error', (err) => {
if (err) {
logger.error('An error while consuming the stream');
logger.error(JSON.stringify(err));
......@@ -34,12 +37,18 @@ kafkaAvro().then((avro) => {
process.exit(1);
});
stream.consumer.on('event.error', function (err) {
/**
* Just a stream error
*/
stream.consumer.on('event.error', (err) => {
logger.warn('An error while consuming the stream');
logger.warn(JSON.stringify(err));
})
});
stream.on('data', function (rawData) {
/**
* On data, write stuff to neo4j
*/
stream.on('data', (rawData) => {
// create new node
// rawData.parsed.provider = 'geonames';
neo4jLinker.connect(rawData.parsed);
......@@ -48,5 +57,5 @@ kafkaAvro().then((avro) => {
}, (error) => {
logger.error('Error while connecting to kafka');
logger.error(JSON.stringify(error));
process.exit(1);
});
import {v1 as neo4j} from 'neo4j-driver';
import uriBuilder from 'geolinker-common/src/uri-builder';
import builder from 'geolinker-common/dist/uri-builder';
import nconf = require('nconf');
import logger from 'geolinker-common/src/logger'
import logger from 'geolinker-common/dist/logger';
/**
* Load config for the streamer
......@@ -10,7 +10,13 @@ nconf.argv()
.env()
.file({file: './config.json'});
const driver = neo4j.driver(nconf.get('uri'), neo4j.auth.basic(nconf.get('user'), nconf.get('password')));
const driver = neo4j.driver(
nconf.get('neo4j:config:uri'),
neo4j.auth.basic(
nconf.get('neo4j:config:user'),
nconf.get('neo4j:config:password'),
),
);
export default {
/**
......@@ -21,15 +27,14 @@ export default {
newNode: (node) => {
// todo: from topic!
const provider = 'geonames';
const uri = uriBuilder.geoname(node.id);
return driver.session().run(`MERGE (n:Place { uri: '${uri}'})
ON CREATE SET n.id: '${node.id}', n.provider: ${provider}
const uri = builder.geoname(node.id);
return driver.session().run(`MERGE (n:Place { uri: '${uri}'})
ON CREATE SET n.id: '${node.id}', n.provider: ${provider}
ON MATCH SET n.id: '${node.id}', n.provider: ${provider}`);
},
/**
* Write connection int the db. Create nodes if they do not exists
* MERGE (o:Place {uri:'exampe.com/1'}) exampe.com/2 MERGE (t1:Place {uri:'exampe.com/3'}) MERGE (o)-[:same_as {author:'bot#1'}]->(t1)
* @param connection
*/
connect: (connection) => {
......@@ -42,5 +47,5 @@ export default {
logger.error('Error while saving a connection to neo4j');
logger.error(JSON.stringify(err));
});
}
},
};
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment