Commit 07c34d93 authored by Tobinsk's avatar Tobinsk
Browse files

Merge branch '14-turn-into-libraray' into 'master'

14 turn into libraray

See merge request !1
parents 3a0cbab0 68175d3d
services:
- docker:dind
image: source.dodis.ch:4577/histhub/node-kafka-docker-base:latest
variables:
IMAGE_TAG: $CI_REGISTRY/$CI_IMAGE:$CI_COMMIT_REF_NAME
LATEST_TAG: $CI_REGISTRY/$CI_IMAGE:latest
stages:
- build
- test
cache:
paths:
- node_modules/
build:
only:
- master
image: docker:latest
services:
- docker:dind
install_dependencies:
stage: build
script:
- docker login -u gitlab-ci-token -p $CI_JOB_TOKEN $CI_REGISTRY
- docker build -t $IMAGE_TAG .
- docker tag $IMAGE_TAG $LATEST_TAG
- docker push $IMAGE_TAG
- docker push $LATEST_TAG
- npm install
artifacts:
paths:
- node_modules/
test:
testing_testing:
stage: test
only:
- sowhereinthefuture
image: $LATEST_TAG
before_script:
- export NODE_ENV=dev && npm install
script:
- npm run test
\ No newline at end of file
script: npm run test
FROM node:8-alpine
# add basic libs
RUN apk --no-cache add \
bash \
g++ \
ca-certificates \
lz4-dev \
musl-dev \
cyrus-sasl-dev \
openssl-dev \
make \
python \
bash \
git
RUN apk add --no-cache --virtual .build-deps gcc zlib-dev libc-dev bsd-compat-headers py-setuptools libexecinfo libexecinfo-dev
# Create app directory
RUN mkdir -p /usr/local/app
# Move to the app directory
WORKDIR /usr/local/app
# copy app to the container
COPY package.json package-lock.json config.json tsconfig.json /usr/local/app/
COPY src /usr/local/app/src
COPY test/ /usr/local/app/test
# Install dependencies
RUN npm install
# build stuff
RUN npm run build
# run app
CMD node dist/index.js
\ No newline at end of file
# wikidata-normalizer
This streaming app consumes data from the wikidata-geo topic normalized and analysed them to a common schema. The app decouples the producing of the stream from the normalizing and analysing process.
## Normalizer
The Readable stream has two Writable stream subscribed. One for Analysing the data and build links and one for normalising the data a common schema. The Normaliser sends the data to the topic `wikidata-small`
## Analyser
The analyser extracts specific wikidata properties and send a concordance of links to the `linker` topic
## Docker
To build the image use following command. The image will fetch data from a wikidata topic and streams the result back into kafka. The container based on linux alpine.
```bash
docker build -t source.dodis.ch:4577/histhub/wikidata-normalizer .
# Upload to the registry
docker push source.dodis.ch:4577/histhub/wikidata-normalizer
```
## CD
We hav a build pipline in gitlab. So manually building of the image is not longer necessary.
## Deploy to k8
We execute a job on k8 to stream the dump into kafka
```bash
kubectl create -f wikidata-normalizer-deployment.yaml
```
\ No newline at end of file
# wikidata library
The wikidata library provide a set of stream-transformers and utils to work with raw data from wikidata (dump and feed). Originally the normalizer was a pod running in k8, consumed and produced messages from kafka. While a refactoring we decided to use the transformers directly inside the dump-producer and the live-feed. So we can save the round trip to kafka and save some disk storage and $$$.
## wikidata-normalizer-transformer
This transformer takes raw data from wikidata and normalize them to the geolinker default format. It then prepares a message for kafka
## wikidata-analyzer-transformer
This transformer analyse wikidata's raw data and extracts information about links between items. F.e we check for links to other interesting resources. We then extract those links and prepare them as a message for the linker
## wikidata-geofilter-transformer
This transformers try to guess the type of a wikidata item. If its from a defined set of classes it forwards the message, if not it just dumps the message. We nuse it to filter out all documents related to geography (f.e. locations, places, cities and so on)
## wikidata-utils
Simple utils that help to work with wikidata raw format.
### timeToDate()
This function transforms the time value from wikidata into a date format used by the geolinker
### WikidataProperties.getProperties(property: string, query: any = {brief: true})
This method query the sparql endpoint of wikidata adn extracts properties from the result. We use it to get all classes and subclasses from location
### WikidataProperties.init(props: IProperty[])
This method get a list of all subclasses for a set of properties. F.e. you can find all properties expressing the "end" of something.
# Tests
The transformer are tested. The utils not
{
"kafka": {
"broker": "localhost:29092",
"schema-registry": "http://localhost:8081",
"fetchAllVersions": true
},
"producer": {
"normalizer": {
"config": {
"batch.num.messages": 1000,
"message.send.max.retries": 5,
"retry.backoff.ms": 5000,
"message.max.bytes": 10485760
},
"topics": "wikidata-small",
"partition": -1,
"author": "bot#1"
},
"analyser": {
"config": {
"batch.num.messages": 1000,
"message.send.max.retries": 5,
"retry.backoff.ms": 5000,
"message.max.bytes": 10485760
},
"topics": "linker",
"partition": -1,
"author": "bot#1"
}
},
"consumer": {
"config": {
"group.id": "wikidata-normalizer",
"socket.keepalive.enable": true,
"enable.auto.commit": true,
"queued.max.messages.kbytes": 100
},
"topics": {
"topics": "wikidata-geo"
},
"stream": {
"request.required.acks": 1
}
},
"log-dir": "/tmp",
"reporter": {
"url": "http://dashboard-api"
}
}
\ No newline at end of file
This diff is collapsed.
{
"name": "wikidata-normalizer",
"version": "1.0.0",
"description": "Small tool to map beteen wikidata to histhub schema",
"description": "Collection of stream transformers and utils to help working with wikidata",
"main": "dist/index.js",
"types": "dist/index.d.js",
"scripts": {
......@@ -15,28 +15,27 @@
"author": "",
"license": "ISC",
"dependencies": {
"@types/chai-as-promised": "^7.1.0",
"@types/node": "^9.6.18",
"@types/winston": "^2.3.9",
"@types/chai-as-promised": "^7.1.2",
"@types/node": "^10.14",
"bunyan-format": "^0.2.1",
"chai-as-promised": "^7.1.1",
"concurrently": "^3.5.1",
"concurrently": "^5.0",
"geolinker-common": "git+https://gitlab+deploy-token-1:vnsdCm_t84QGVA2U4kw4@source.dodis.ch/histhub/geolinker-common.git",
"node-cleanup": "^2.1.2",
"node-rdkafka": "^2.3.3",
"julian-gregorian": "^1.0.0",
"juliandate": "^1.0.0",
"moment": "^2.24",
"ngeohash": "^0.6.3",
"ramda": "^0.25.0",
"source-map": "^0.7.3",
"typescript": "^2.8.3",
"wikidata-taxonomy": "^0.6.6"
"typescript": "^3.6",
"wikidata-taxonomy": "^0.6.7"
},
"devDependencies": {
"@types/chai": "^4.1.3",
"@types/mocha": "^5.2.1",
"chai": "^4.1.2",
"kafka-avro-stub": "git+https://git@github.com/tobinski/kafka-avro-stub.git",
"mocha": "5.2",
"sinon": "^6.0.0",
"ts-node": "^6.1.0",
"tslint": "^5.9.1"
"@types/chai": "^4.2.3",
"@types/mocha": "^5.2.7",
"chai": "^4.2.0",
"mocha": "^6",
"sinon": "^6.3.5",
"ts-node": "^8.4",
"tslint": "^5.20.0"
}
}
import { queryTaxonomy } from 'wikidata-taxonomy';
import util = require('util');
export interface InterfaceProperty {
name: string;
qid: string;
}
export class BootstrapWikidata {
/**
* An array of properties to load from wikidata
* @type {any[]}
*/
private properties = [];
/**
* Load a list of properties with subclasses from wikidata
* @param {InterfaceProperty[]} properties
* @returns {Promise<void>}
*/
public async init(properties: InterfaceProperty[]){
await properties.forEach(async (property) => {
const sublclasses = await this.getProperties(property.qid);
// add the main property to the list
sublclasses.push(property.qid);
this.properties.push({name: property.name, properties: sublclasses});
});
}
/**
* Get a collection of subclasses for a named property
* @param {string} name
* @returns {string[]}
*/
public get(name: string): string[] {
const property = this.properties.find((p) => {
return p.name === name;
});
return property.properties;
}
/**
* Get a property with his subclasses from wikidata
* @param {string} property
* @returns {Promise<any>}
*/
private async getProperties(property: string) {
// P1647 = subproperty
return await queryTaxonomy(property, {brief: true, property: ['P1647']})
.then((taxonomy) => {
const properties = [];
taxonomy.concepts.forEach((concept) => {
const qid = concept.notation[0];
properties.push(qid);
});
return properties.filter((d, i, a) => {
return a.indexOf(d) === i;
});
})
.catch((error) => {
console.log('error while collectin Qids');
console.log(error);
});
}
}
import fmt = require('bunyan-format');
import {Core} from 'geolinker-common';
import ReporterCounter from 'geolinker-common/dist/stream/reporter-counter';
import StreamProducerPreparer from 'geolinker-common/dist/stream/stream-producer-preparer';
import * as KafkaAvro from 'kafka-avro';
import nodeCleanup = require('node-cleanup');
import ProducerStream = require('node-rdkafka/lib/producer-stream');
import AnalyserTransformable from './analyser-transformable';
import {BootstrapWikidata, InterfaceProperty} from './bootstrap-wikidata';
import WikiNormalizer from './wiki-normalizer';
/**
* load sourcemap and config
*/
const core = Core.init('wikidata-normalizer', __dirname + '/../config.json');
/**
* To trigger this you need to set the KAFKA_AVRO_LOG_LEVEL
* f.e. env KAFKA_AVRO_LOG_LEVEL=debug node dist/index.js
*/
const kafkaLog = KafkaAvro.getLogger();
kafkaLog.addStream({
type: 'stream',
stream: fmt({
outputMode: 'short',
levelInString: true,
}),
level: 'debug',
});
nodeCleanup((exitCode, signal) => {
core.getLogger().info('Signal ' + signal);
core.getLogger().info('ExitCode ' + exitCode);
setTimeout(() => {
process.exit();
}, 100);
});
/**
* A list of properties to load from wikidata
* @type {InterfaceProperty[]}
*/
const properties = [
{name: 'start', qid: 'P580'},
{name: 'end', qid: 'P582'},
];
const wikidataProperties = new BootstrapWikidata();
const wpPromise = wikidataProperties.init(properties);
/**
* Init KafkaInstance
* @type {}
*/
const kafka = core.getKafkaAvro();
kafka.init().then(async () => {
// Wait for the promises to resolve
try {
await wpPromise;
const analyseProducer = await kafka.getProducer('analyser');
const normProducer = await kafka.getProducer('normalizer');
const consumerStream = await kafka.getConsumerStream();
core.getLogger().info('Producer and consumer are ready to go');
/**
* On data (flow mode) transform data and send them to the data topic
*/
consumerStream
.pipe(new ReporterCounter({objectMode: true}, core.getReporter(10000, 'normalizer'), 'read'))
.pipe(new WikiNormalizer({objectMode: true}, core.getLogger(), wikidataProperties))
.pipe(new ReporterCounter({objectMode: true}, core.getReporter('normalizer'), 'write'))
.pipe(new StreamProducerPreparer({objectMode: true, topic: core.getNconf().get('producer:normalizer:topics'), partition: -1}))
.pipe(new ProducerStream(normProducer, {objectMode: true, topic: core.getNconf().get('producer:normalizer:topics')}));
// todo: we need a second consumer and we need a second reporter. So we need to update common libs
consumerStream
.pipe(new ReporterCounter({objectMode: true}, core.getReporter(10000, 'analyser'), 'read'))
.pipe(new AnalyserTransformable({objectMode: true}, core.getUriBuilder(), core.getNconf()))
.pipe(new ReporterCounter({objectMode: true}, core.getReporter(10000, 'analyser'), 'write'))
.pipe(new StreamProducerPreparer({objectMode: true, topic: core.getNconf().get('producer:analyser:topics'), partition: -1}))
.pipe(new ProducerStream(analyseProducer, {objectMode: true, topic: core.getNconf().get('producer:analyser:topics')}));
core.getLogger().info('Pipes are registered');
} catch (error) {
core.getLogger().error('Error in retrieving consumer or producer');
core.getLogger().error(error);
}
})
.catch((err) => {
core.getLogger().error('Its not possible to connect to kafka');
throw err;
});
export interface IPropertyCollection {
get(name: string): string[];
add(name: string, subclass: string[]);
}
/**
* Simple class to manage a collection of properties
*/
export class PropertyCollection implements IPropertyCollection{
/**
* An array of properties to load from wikidata
* @type {any[]}
*/
private properties = [];
/**
* Get a collection of subclasses for a named property
* @param {string} name
* @returns {string[]}
*/
public get(name: string): string[] {
const property = this.properties.find((p) => {
return p.name === name;
});
return property.subclasses;
}
/**
* Add a collection of subclasses for a named propertiy
* @param {string} name
* @param {string[]} subclasses
* @returns {void}
*/
public add(name: string, subclasses: string[]): void {
this.properties.push({name, subclasses});
}
}
import {queryTaxonomy} from 'wikidata-taxonomy';
import {PropertyCollection} from './property-collection';
export interface IProperty {
name: string;
qid: string;
}
export class WikidataProperties {
/**
* Get a property with his subclasses from wikidata
* @param {string} property
* @param {{}} query f.e. P1647 = subproperty
* @returns {Promise<any>}
*/
public static async getProperties(property: string, query: any = {brief: true}) {
try {
const taxonomy = await queryTaxonomy(property, query);
const props = [];
taxonomy.concepts.forEach((concept) => {
const qid = concept.notation[0];
props.push(qid);
});
return props.filter((d, i, a) => {
return a.indexOf(d) === i;
});
} catch (error) {
console.log('Error while collecting Q-numbers from wikidata');
console.log(error);
return [];
}
}
/**
* Load a list of properties with subclasses from wikidata
* @param {IProperty[]} props
* @returns {Promise<PropertyCollection>}
*/
public static async init(props: IProperty[]) {
const properties = new PropertyCollection();
for (const property of props) {
const sublclasses = await WikidataProperties.getProperties(property.qid, {brief: true, property: ['P1647']});
// add the main property to the list
sublclasses.push(property.qid);
properties.add(property.name, sublclasses);
}
return properties;
}
}
import {IProperty, WikidataProperties} from './wikidata-properties';
/**
* Grab all the subclasses of start and end properties from wikidata.
* We want to find all possible annotations and return an instance
*/
const properties: IProperty[] = [
{name: 'start', qid: 'P580'},
{name: 'end', qid: 'P582'},
];
export const wikidataTimePropertiesLoader = WikidataProperties.init(properties);
/**
* Get all subclasses of geographical entities from wikidata (Q2221906 = geographic location)
* and combine them with the manual list of lena
* @type {{brief: boolean}}
*/
export const wikidataGeoInstances: Promise<string[]> = new Promise(async (resolve, reject) => {
// get basic taxonomies for the filter (from lena)
const baseGeoInstances = [
'Q2221906',
'Q17334923',
'Q515',
'Q486972',
'Q1187811',
'Q1549591',
'Q5119',
'Q14788575',
'Q35145743',
'Q27096235',
'Q618123',
'Q3957',
];
const options = { brief: true };
const collectedGeoInstances = await WikidataProperties.getProperties('Q2221906', options);
resolve([...baseGeoInstances, ...collectedGeoInstances]);
});
/**
* We got this module from
* https://github.com/maxlath/wikidata-sdk/blob/master/lib/helpers/wikidata_time_to_date_object.js
*/
interface IWikidataTime {
time: string;
}
export const timeToDate = (wikidataTime: string | IWikidataTime) => {
// Also accept claim datavalue.value objects
if (typeof wikidataTime === 'object') {
wikidataTime = wikidataTime.time;
}
const sign = wikidataTime[0];
const rest = wikidataTime.slice(1);
const date = fullDateData(sign, rest);
if (date.toString() === 'Invalid Date') {
return parseInvalideDate(sign, rest);
} else {
return date;
}
};
const fullDateData = (sign, rest) => {
return sign === '-' ? negativeDate(rest) : positiveDate(rest);
};
const positiveDate = (rest) => new Date(rest);
const negativeDate = (rest) => {
const year = rest.split('-')[0];
let date;
// Using ISO8601 expanded notation for negative years: adding 2 leading zeros
// when needed. Can't find the documentation again, but testing
// with `new Date(date)` gives a good clue of the implementation
if (year.length === 4) {
date = `-00${rest}`;
} else if (year.length === 5) {
date = `-0${rest}`;
} else {
date = `-${rest}`;
}
return new Date(date);
};
const parseInvalideDate = (sign, rest) => {
// This is probably a date of unsuffisient precision
// such as 1953-00-00T00:00:00Z, thus invalid
// It should at least have a year, so let's fallback to ${year}-01-01
const year = rest.split('T')[0].split('-')[0];
return fullDateData(sign, year);
};
......@@ -5,7 +5,7 @@ import {Transform} from 'stream';
/**
* Interface for links
*/
export interface InterfaceLink {
export interface ILink {
from: string;
to: string[];
relation: {
......@@ -19,10 +19,10 @@ export interface InterfaceLink {
* Decode a stream of a json array.
* emit single json objects
*/
class AnalyserTransformable extends Transform {
class WikidataAnalyserTransformer extends Transform {
/**
* Properties to extract from wiki data structure from lena
* PropertyCollection to extract from wiki data structure from lena
* todo: get this dynamic
* @type {string[]}
*/
......@@ -73,7 +73,7 @@ class AnalyserTransformable extends Transform {
'P998'];
/**
* A chunk of stings to build an object
* A chunk of strings to build an object
* @type {string}
*/
private urlBuilder: UrlBuilder;
......@@ -86,13 +86,11 @@ class AnalyserTransformable extends Transform {
/**
* @inheritDoc
* @param {{}} options
* @param uriBuilder UrlBuilder
* @param nconf