Commit 10419744 authored by tobinski's avatar tobinski
Browse files

Refactor transformer and update readme

parent 2e7945be
FROM source.dodis.ch:4577/histhub/node-kafka-docker-base:latest
# copy app to the container
COPY package.json package-lock.json config.json tsconfig.json /usr/local/app/
# Install dependencies
RUN npm install
COPY src /usr/local/app/src
COPY test/ /usr/local/app/test
# build stuff
RUN npm run build
# run app
CMD node dist/index.js
# wikidata-normalizer
This streaming app consumes data from the wikidata-geo topic normalized and analysed them to a common schema. The app decouples the producing of the stream from the normalizing and analysing process.
## Normalizer
The Readable stream has two Writable stream subscribed. One for Analysing the data and build links and one for normalising the data a common schema. The Normaliser sends the data to the topic `wikidata-small`
## Analyser
The analyser extracts specific wikidata properties and send a concordance of links to the `linker` topic
## Docker
To build the image use following command. The image will fetch data from a wikidata topic and streams the result back into kafka. The container based on linux alpine.
```bash
docker build -t source.dodis.ch:4577/histhub/wikidata-normalizer .
# Upload to the registry
docker push source.dodis.ch:4577/histhub/wikidata-normalizer
```
## CD
We hav a build pipline in gitlab. So manually building of the image is not longer necessary.
## Deploy to k8
We execute a job on k8 to stream the dump into kafka
```bash
kubectl create -f wikidata-normalizer-deployment.yaml
```
\ No newline at end of file
# wikidata library
The wikidata library provide a set of stream-transformers and utils to work with raw data from wikidata (dump and feed). Originally the normalizer was a pod running in k8, consumed and produced messages from kafka. While a refactoring we decided to use the transformations directly inside the dump-producer and the live-feed. So we can save the round trip to kafka and save some disk storage.
## wikidata-normalizer-transformer
This transformer takes raw data from wikidata and normalize them to the geolinker default format. It then prepares the data to snd it to kafka
## wikidata-analyzer-transformer
This transformer analyse wikidata's raw data and extracts information about links between items. F.e we check for links to other interesting resources. We then extract those links and prepare them as a message for the linker
## wikidata-time-to-date-object
This script transforms the date from wikidata into a formate used by the geolinker
## wikidata-utils
Simple utils used to grab a set of properties from wikidata or to transform time statements
# Tests
There are some tests, but just few. We should add more tests!
{
"kafka": {
"broker": "localhost:29092",
"schema-registry": "http://localhost:8081",
"fetchAllVersions": true
},
"producer": {
"normalizer": {
"config": {
"batch.num.messages": 1000,
"message.send.max.retries": 5,
"retry.backoff.ms": 5000,
"message.max.bytes": 10485760
},
"topics": "geolinker",
"partition": -1,
"author": "bot#1"
},
"analyser": {
"config": {
"batch.num.messages": 1000,
"message.send.max.retries": 5,
"retry.backoff.ms": 5000,
"message.max.bytes": 10485760
},
"topics": "linker",
"partition": -1,
"author": "bot#1"
}
},
"consumer": {
"config": {
"group.id": "wikidata-normalizer",
"socket.keepalive.enable": true,
"enable.auto.commit": true,
"queued.max.messages.kbytes": 100
},
"topics": {
"topics": "wikidata-geo"
},
"stream": {
"request.required.acks": 1
}
},
"log-dir": "/tmp",
"reporter": {
"url": "http://dashboard-api"
}
}
\ No newline at end of file
import { queryTaxonomy } from 'wikidata-taxonomy';
export interface InterfaceProperty {
name: string;
qid: string;
}
export class BootstrapWikidata {
/**
* An array of properties to load from wikidata
* @type {any[]}
*/
private properties = [];
/**
* Load a list of properties with subclasses from wikidata
* @param {InterfaceProperty[]} properties
* @returns {Promise<void>}
*/
public async init(properties: InterfaceProperty[]) {
for (const property of properties) {
const sublclasses = await this.getProperties(property.qid);
// add the main property to the list
sublclasses.push(property.qid);
this.properties.push({name: property.name, properties: sublclasses});
}
}
/**
* Get a collection of subclasses for a named property
* @param {string} name
* @returns {string[]}
*/
public get(name: string): string[] {
const property = this.properties.find((p) => {
return p.name === name;
});
return property.properties;
}
/**
* Get a property with his subclasses from wikidata
* @param {string} property
* @returns {Promise<any>}
*/
private async getProperties(property: string) {
// P1647 = subproperty
return await queryTaxonomy(property, {brief: true, property: ['P1647']})
.then((taxonomy) => {
const properties = [];
taxonomy.concepts.forEach((concept) => {
const qid = concept.notation[0];
properties.push(qid);
});
return properties.filter((d, i, a) => {
return a.indexOf(d) === i;
});
})
.catch((error) => {
console.log('error while collectin Qids');
console.log(error);
});
}
}
import fmt = require('bunyan-format');
import {Core} from 'geolinker-common';
import ReporterCounter from 'geolinker-common/dist/stream/reporter-counter';
import StreamProducerPreparer from 'geolinker-common/dist/stream/stream-producer-preparer';
import * as KafkaAvro from 'kafka-avro';
import nodeCleanup = require('node-cleanup');
import ProducerStream = require('node-rdkafka/lib/producer-stream');
import AnalyserTransformable from './analyser-transformable';
import {BootstrapWikidata, InterfaceProperty} from './bootstrap-wikidata';
import WikiNormalizer from './wiki-normalizer';
import Debug from 'geolinker-common/dist/stream/debug';
/**
* load sourcemap and config
*/
const core = Core.init('wikidata-normalizer', __dirname + '/../config.json');
/**
* To trigger this you need to set the KAFKA_AVRO_LOG_LEVEL
* f.e. env KAFKA_AVRO_LOG_LEVEL=debug node dist/index.js
*/
const kafkaLog = KafkaAvro.getLogger();
kafkaLog.addStream({
type: 'stream',
stream: fmt({
outputMode: 'short',
levelInString: true,
}),
level: 'debug',
});
nodeCleanup((exitCode, signal) => {
core.getLogger().info('Signal ' + signal);
core.getLogger().info('ExitCode ' + exitCode);
setTimeout(() => {
process.exit();
}, 100);
});
/**
* A list of properties to load from wikidata
* @type {InterfaceProperty[]}
*/
const properties = [
{name: 'start', qid: 'P580'},
{name: 'end', qid: 'P582'},
];
const wikidataProperties = new BootstrapWikidata();
const wpPromise = wikidataProperties.init(properties);
/**
* Init KafkaInstance
* @type {}
*/
const kafka = core.getKafkaAvro();
kafka.init().then(async () => {
// Wait for the promises to resolve
try {
await wpPromise;
const analyseProducer = await kafka.getProducer('analyser');
const normProducer = await kafka.getProducer('normalizer');
const consumerStream = await kafka.getConsumerStream();
core.getLogger().info('Producer and consumer are ready to go');
/**
* On data (flow mode) transform data and send them to the data topic
*/
consumerStream
.pipe(new ReporterCounter({objectMode: true}, core.getReporter(10000, 'normalizer'), 'read'))
.pipe(new WikiNormalizer({objectMode: true}, core.getLogger(), wikidataProperties))
.pipe(new ReporterCounter({objectMode: true}, core.getReporter('normalizer'), 'write'))
.pipe(new StreamProducerPreparer({objectMode: true, topic: core.getNconf().get('producer:normalizer:topics'), partition: -1, key: (d) => `${d.provider}${d.id}`}))
.pipe(new Debug({objectMode: true}, '---'))
.pipe(new ProducerStream(normProducer, {objectMode: true, topic: core.getNconf().get('producer:normalizer:topics')}));
// todo: we need a second consumer and we need a second reporter. So we need to update common libs
consumerStream
.pipe(new ReporterCounter({objectMode: true}, core.getReporter(10000, 'analyser'), 'read'))
.pipe(new AnalyserTransformable({objectMode: true}, core.getUriBuilder(), core.getNconf()))
.pipe(new ReporterCounter({objectMode: true}, core.getReporter(10000, 'analyser'), 'write'))
.pipe(new StreamProducerPreparer({objectMode: true, topic: core.getNconf().get('producer:analyser:topics'), partition: -1, key: (d) => `${d.provider}${d.id}`}))
.pipe(new ProducerStream(analyseProducer, {objectMode: true, topic: core.getNconf().get('producer:analyser:topics')}));
core.getLogger().info('Pipes are registered');
} catch (error) {
core.getLogger().error('Error in retrieving consumer or producer');
core.getLogger().error(error);
}
})
.catch((err) => {
core.getLogger().error('Its not possible to connect to kafka');
throw err;
});
......@@ -5,7 +5,7 @@ import {Transform} from 'stream';
/**
* Interface for links
*/
export interface InterfaceLink {
export interface ILink {
from: string;
to: string[];
relation: {
......@@ -19,7 +19,7 @@ export interface InterfaceLink {
* Decode a stream of a json array.
* emit single json objects
*/
class AnalyserTransformable extends Transform {
class WikidataAnalyserTransformer extends Transform {
/**
* Properties to extract from wiki data structure from lena
......@@ -73,7 +73,7 @@ class AnalyserTransformable extends Transform {
'P998'];
/**
* A chunk of stings to build an object
* A chunk of strings to build an object
* @type {string}
*/
private urlBuilder: UrlBuilder;
......@@ -86,13 +86,11 @@ class AnalyserTransformable extends Transform {
/**
* @inheritDoc
* @param {{}} options
* @param uriBuilder UrlBuilder
* @param nconf
*/
constructor(options = {}, uriBuilder: UrlBuilder, nconf) {
constructor(options: any = {}) {
super(options);
this.urlBuilder = uriBuilder;
this.nconf = nconf;
this.urlBuilder = options.uriBuilder;
this.nconf = options.nconf;
}
/**
......@@ -118,7 +116,7 @@ class AnalyserTransformable extends Transform {
}
// prepare things
const result: InterfaceLink = {
const result: ILink = {
from: this.urlBuilder.urlResolver('wikidata', obj.id),
to,
relation: {
......@@ -157,4 +155,4 @@ class AnalyserTransformable extends Transform {
// }
*/
}
export default AnalyserTransformable;
export default WikidataAnalyserTransformer;
import {Transform} from 'stream';
/**
* Filter form a stream of wikidata items all geographical items
*/
class WikidataGeofilterTransformer extends Transform {
/**
* All valid instances iof a geographical entity
*/
private geoInstances: string[];
constructor(options) {
super(options);
this.geoInstances = options.geoInstances;
}
/**
* @inheritDoc
* @param chunk
* @param encoding
* @param callback
* @private
*/
public _transform(chunk, encoding, callback) {
// check if instance of exists
let obj;
try {
obj = JSON.parse(chunk.toString());
} catch (err) {
return callback(null);
}
if (typeof obj.claims === 'undefined' || typeof obj.claims.P31 === 'undefined') {
return callback(null);
}
// Check for geographical instances
const instances = [];
const validIndex = obj.claims.P31.findIndex((instanceOf) => {
try {
const found = this.geoInstances.indexOf(instanceOf.mainsnak.datavalue.value.id) > -1;
if (found) {
instances.push(instanceOf.mainsnak.datavalue.value.id);
return true;
}
// tslint:disable-next-line:no-empty
} catch (err) {}
});
if (validIndex === -1) {
return callback(null);
}
return callback(null, chunk);
}
}
export default WikidataGeofilterTransformer;
import converter from 'julian-gregorian';
import * as geohash from 'ngeohash';
import * as ramda from 'ramda';
import {Transform} from 'stream';
import {LoggerInstance} from 'winston';
import {BootstrapWikidata} from './bootstrap-wikidata';
import toDateObject from './wikidata_time_to_date_object';
import * as geohash from 'ngeohash';
import {timeToDate, WikidataProperties} from './wikidata-utils';
class WikiNormalizer extends Transform {
class WikidataNormalizerTransformer extends Transform {
/**
* Languages for alternative names
* @type {string[]}
......@@ -18,14 +17,14 @@ class WikiNormalizer extends Transform {
* We fetch them from wikidata
* @type {null}
*/
private properties: BootstrapWikidata;
private properties: WikidataProperties;
/**
* Logger
*/
private logger: LoggerInstance;
constructor(options, logger: LoggerInstance, wikidataProperties: BootstrapWikidata) {
constructor(options, logger: LoggerInstance, wikidataProperties: WikidataProperties) {
super(options);
this.logger = logger;
this.properties = wikidataProperties;
......@@ -130,7 +129,7 @@ class WikiNormalizer extends Transform {
private convertTimestamp(time: string | null, calendar: string) {
if (time !== null) {
// build timestamp
let date: Date = toDateObject(time);
let date: Date = timeToDate(time);
// wikidata uses julian and gregorian calendar. We need to transform the dates into a common timestamp
// more info: https://www.wikidata.org/wiki/Help:Dates
// Q1985727 = gregorianCalendar
......@@ -164,6 +163,7 @@ class WikiNormalizer extends Transform {
try {
return typeof instanceOf.mainsnak.datavalue.value.id === 'undefined' ?
null : instanceOf.mainsnak.datavalue.value.id;
// tslint:disable-next-line:no-empty
} catch (err) {}
}).filter((instance) => typeof instance !== 'undefined');
}
......@@ -218,4 +218,4 @@ class WikiNormalizer extends Transform {
}
}
export default WikiNormalizer;
export default WikidataNormalizerTransformer;
import { queryTaxonomy } from 'wikidata-taxonomy';
export interface IProperty {
name: string;
qid: string;
}
export class WikidataProperties {
/**
* Get a property with his subclasses from wikidata
* @param {string} property
* @param {{}} query f.e. P1647 = subproperty
* @returns {Promise<any>}
*/
public static async getProperties(property: string, query: any = {brief: true}) {
try {
const taxonomy = await queryTaxonomy(property, query);
const props = [];
taxonomy.concepts.forEach((concept) => {
const qid = concept.notation[0];
props.push(qid);
});
return props.filter((d, i, a) => {
return a.indexOf(d) === i;
});
} catch (error) {
console.log('Error while collecting Q-numbers from wikidata');
console.log(error);
return [];
}
}
/**
* An array of properties to load from wikidata
* @type {any[]}
*/
private properties = [];
/**
* Load a list of properties with subclasses from wikidata
* @param {IProperty[]} props
* @returns {Promise<void>}
*/
public async init(props: IProperty[]) {
for (const property of props) {
const sublclasses = await WikidataProperties.getProperties(property.qid, {brief: true, property: ['P1647']});
// add the main property to the list
sublclasses.push(property.qid);
this.properties.push({name: property.name, properties: sublclasses});
}
}
/**
* Get a collection of subclasses for a named property
* @param {string} name
* @returns {string[]}
*/
public get(name: string): string[] {
const property = this.properties.find((p) => {
return p.name === name;
});
return property.properties;
}
/**
* set a list of properties f.e. manual collected data
* @param props
*/
public setProperties(props: string[]) {
this.properties = [...this.properties, ...props];
}
}
/**
* Grab all the subclasses of start and end properties from wikidata.
* We want to find all possible annotations and return an instance
*/
export const wikidataTimePropertiesLoader = new Promise(async (resolve) => {
const properties = [
{name: 'start', qid: 'P580'},
{name: 'end', qid: 'P582'},
];
const wikidataProperties = new WikidataProperties();
await wikidataProperties.init(properties);
resolve(wikidataProperties);
});
/**
* We got this mpodule from
* Get all subclasses of geographical entities from wikidata (Q2221906 = geographic location)
* and combine them with the manual list of lena
* @type {{brief: boolean}}
*/
export const wikidataGeoInstances: Promise<string[]> = new Promise(async (resolve, reject) => {
// get basic taxonomies for the filter (from lena)
const baseGeoInstances = [
'Q2221906',
'Q17334923',
'Q515',
'Q486972',
'Q1187811',
'Q1549591',
'Q5119',
'Q14788575',
'Q35145743',
'Q27096235',
'Q618123',
'Q3957',
];
const options = { brief: true };
const collectedGeoInstances = await WikidataProperties.getProperties('Q2221906', options);
resolve([...baseGeoInstances, ...collectedGeoInstances]);
});
/**
* We got this module from
* https://github.com/maxlath/wikidata-sdk/blob/master/lib/helpers/wikidata_time_to_date_object.js
*/
interface InterfaceWikidataTime {
interface IWikidataTime {
time: string;
}
const timeToDate = (wikidataTime: string | InterfaceWikidataTime) => {
export const timeToDate = (wikidataTime: string | IWikidataTime) => {
// Also accept claim datavalue.value objects
if (typeof wikidataTime === 'object') {
wikidataTime = wikidataTime.time;
......@@ -50,5 +164,3 @@ const parseInvalideDate = (sign, rest) => {
const year = rest.split('T')[0].split('-')[0];
return fullDateData(sign, year);
};
export default timeToDate;
......@@ -6,9 +6,8 @@ import ReporterCounter from 'geolinker-common/dist/stream/reporter-counter';
import converter from 'julian-gregorian';
import {describe, it} from 'mocha';
import nconf = require('nconf');
import {BootstrapWikidata} from '../src/bootstrap-wikidata';
import WikiNormalizer from '../src/wiki-normalizer';
import toDateObject = require('../src/wikidata_time_to_date_object.js');
import WikidataNormalizerTransformer from '../src/wikidata-normalizer-transformer';
import {timeToDate, wikidataTimePropertiesLoader} from '../src/wikidata-utils';
import MockReadable from './mock-readable';
import MockReporter from './mock-reporter';
import MockWriteable from './mock-writeable';
......@@ -19,29 +18,21 @@ nconf.set('log-dir', './log');
process.env.NODE_ENV = 'test';
const coreMock: InterfaceCore = new Core('test', 'none');
/**
* A list of properties to load from wikidata
* @type {InterfaceProperty[]}
*/
const properties = [
{name: 'start', qid: 'P580'},
{name: 'end', qid: 'P582'},
];
const wikidataProperties = new BootstrapWikidata();
const wpPromise = wikidataProperties.init(properties);
let wikidataTimeProperties;
describe('Test WikiNormalizer', () => {
before((done) => {
chai.should();
chai.use(chaiAsPromised);
wpPromise.then(() => {
wikidataTimePropertiesLoader.then((instance) => {
wikidataTimeProperties = instance;
done();
});
});
it('Try to transform data', (done) => {
const normalizer = new WikiNormalizer({objectMode: true}, coreMock.getLogger(), wikidataProperties);
const normalizer = new WikidataNormalizerTransformer({objectMode: true}, coreMock.getLogger(), wikidataTimeProperties);