Skip to content
Snippets Groups Projects
Commit 973a37d4 authored by Tobias Steiner's avatar Tobias Steiner
Browse files

Update counter-reporter usage

parent f035aced
No related branches found
Tags 0.2
No related merge requests found
import {Core} from 'geolinker-common';
import ProducerStream = require('node-rdkafka/lib/producer-stream');
import request = require('request');
import zlib = require('zlib');
import ProducerStream = require('node-rdkafka/lib/producer-stream');
import DecodeJson from './decode-json-transformer';
import StreamFilter from './stream-filter-transfomer';
import ReporterCounter from './reporter-counter';
import StreamFilter from './stream-filter-transfomer';
const core = Core.init('Wikidump-producer', __dirname + '/../config.json');
......@@ -56,7 +54,6 @@ const wikidataFilter = (chunk, encoding) => {
if (typeof obj.claims === 'undefined' || typeof obj.claims.P31 === 'undefined') {
return [null];
}
core.getReporter().setDataIn(1);
// Check for geographical instances
const instances = [];
const validIndex = obj.claims.P31.findIndex((instanceOf) => {
......@@ -93,8 +90,9 @@ kafka.init()
const stream = request('https://dumps.wikimedia.org/wikidatawiki/entities/latest-all.json.gz')
.pipe(zlib.createGunzip())
.pipe(new DecodeJson({}))
.pipe(new ReporterCounter({}, core.getReporter(), 'read'))
.pipe(new StreamFilter({}, wikidataFilter))
.pipe(new ReporterCounter({}, core.getReporter()))
.pipe(new ReporterCounter({}, core.getReporter(), 'write'))
.pipe(new ProducerStream(producer, {topic: core.getNconf().get('producer:topic')}));
stream.on('end', () => {
core.getLogger().info('Job is done');
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment