Commit 6e804582 authored by Tobias Steiner's avatar Tobias Steiner
Browse files

Delete and rename old files

parent b5a591ee
import {Linker} from './neo4j-linker';
import {Core} from 'geolinker-common';
/**
* load sourcemap and config
*/
const core = Core.init('linker', __dirname + '/../config.json');
/**
* Init Linker
* @type {Linker}
*/
const linker = new Linker(core.getNconf(), core.getNconf(), core.getUriBuilder());
/**
* This consumer listen to the linker topic and save the data into neo4j
*/
const kafka = core.getKafkaAvro();
kafka.init().then(() => {
core.getKafkaAvro().getConsumer().then((consumer) => {
consumer.connect();
/**
* Get next from kafka
*/
const next = () => {
consumer.consume(1);
};
/**
* On data, write stuff to neo4j
*/
consumer.on('data', (rawData) => {
// create new node
// rawData.parsed.provider = 'geonames';
// todo: we need to profile this error
// https://stackoverflow.com/questions/30403504/neo4j-merge-performance-vs-create-set
linker.connect(rawData.parsed).then(() => {
next();
}, (error) => {
core.getLogger().error(JSON.stringify(error));
next();
});
});
})
}, (err) => {
core.getLogger().error('Error while connecting to kafka');
throw err;
});
import * as KafkaAvro from 'kafka-avro';
import nconf = require('nconf');
import logger from 'geolinker-common/dist/logger';
/**
* Load config for the streamer
*/
nconf.argv()
.env()
.file({file: './config.json'});
const kafkaAvroInstance = new KafkaAvro({
kafkaBroker: nconf.get('kafka:broker'),
schemaRegistry: nconf.get('kafka:schema-registry'),
});
// Wait until kafka is initialised then return an instance
export default async () => {
return await kafkaAvroInstance.init()
.then(() => {
logger.info('Ready to consume');
return kafkaAvroInstance;
});
};
import neo4jLinker from './neo4j-linker';
import kafkaAvro from './kafka';
import nconf = require('nconf');
import sourcemap = require('source-map-support');
import logger from 'geolinker-common/dist/logger';
/**
* Init inline source map
*/
sourcemap.install();
/**
* Load config for the streamer
*/
nconf.argv()
.env()
.file({file: './config.json'});
/**
* This consumer listen to the linker topic and save the data into neo4j
*/
kafkaAvro().then((avro) => {
avro.getConsumerStream(
nconf.get('consumer:config'),
nconf.get('stream:config'),
nconf.get('topic:config'),
)
.then((stream) => {
/**
* On a general error -> exit
*/
stream.on('error', (err) => {
if (err) {
logger.error('An error while consuming the stream');
logger.error(JSON.stringify(err));
}
process.exit(1);
});
/**
* Just a stream error
*/
stream.consumer.on('event.error', (err) => {
logger.warn('An error while consuming the stream');
logger.warn(JSON.stringify(err));
});
/**
* On data, write stuff to neo4j
*/
stream.on('data', (rawData) => {
// create new node
// rawData.parsed.provider = 'geonames';
neo4jLinker.connect(rawData.parsed);
});
});
}, (error) => {
logger.error('Error while connecting to kafka');
logger.error(JSON.stringify(error));
process.exit(1);
});
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment