Commit baec07db authored by Tobias Steiner's avatar Tobias Steiner

Initial commit

parents
# Logs
logs
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
# Runtime data
pids
*.pid
*.seed
*.pid.lock
# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov
# Coverage directory used by tools like istanbul
coverage
# nyc test coverage
.nyc_output
# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files)
.grunt
# Bower dependency directory (https://bower.io/)
bower_components
# node-waf configuration
.lock-wscript
# Compiled binary addons (https://nodejs.org/api/addons.html)
build/Release
# Dependency directories
node_modules/
jspm_packages/
# TypeScript v1 declaration files
typings/
# Optional npm cache directory
.npm
# Optional eslint cache
.eslintcache
# Optional REPL history
.node_repl_history
# Output of 'npm pack'
*.tgz
# Yarn Integrity file
.yarn-integrity
# dotenv environment variables file
.env
# next.js build output
.next
dist/
.idea/
\ No newline at end of file
services:
- docker:dind
variables:
IMAGE_TAG: $CI_REGISTRY/$CI_IMAGE:$CI_COMMIT_REF_NAME
LATEST_TAG: $CI_REGISTRY/$CI_IMAGE:latest
cache:
paths:
- node_modules/
build:
only:
- master
image: docker:latest
services:
- docker:dind
stage: build
script:
- docker login -u gitlab-ci-token -p $CI_JOB_TOKEN $CI_REGISTRY
- docker build -t $IMAGE_TAG .
- docker tag $IMAGE_TAG $LATEST_TAG
- docker push $IMAGE_TAG
- docker push $LATEST_TAG
test:
stage: test
only:
- sowhereinthefuture
image: $LATEST_TAG
before_script:
- export NODE_ENV=dev && npm install
script:
- npm run test
\ No newline at end of file
FROM node:8-alpine
# add basic libs
RUN apk --no-cache add \
bash \
g++ \
ca-certificates \
lz4-dev \
musl-dev \
cyrus-sasl-dev \
openssl-dev \
make \
python \
bash \
git
RUN apk add --no-cache --virtual .build-deps gcc zlib-dev libc-dev bsd-compat-headers py-setuptools libexecinfo libexecinfo-dev
# Create app directory
RUN mkdir -p /usr/local/app
# Move to the app directory
WORKDIR /usr/local/app
# copy app to the container
COPY package.json package-lock.json config.json tsconfig.json /usr/local/app/
COPY src /usr/local/app/src
COPY test/ /usr/local/app/test
# Install dependencies
RUN npm install
# build stuff
RUN npm run build
# run app
CMD node dist/index.js
\ No newline at end of file
# Dodis
This app gets all the links from dodis and extracts the fetched resources.
## Sitemap
The first step is to generate a sitemap. The links are published to magpie.
## Extractor
The extractor takes the fetched resource from magpie and normalized the data frrom dodis. The result is published to geolinker topic
## Docker
To build the image use following command. The image will fetch data from a wikidata topic and streams the result back into kafka. The container based on linux alpine.
```bash
docker build -t source.dodis.ch:4577/histhub/wikidata-normalizer .
# Upload to the registry
docker push source.dodis.ch:4577/histhub/wikidata-normalizer
```
## CD
We hav a build pipline in gitlab. So manually building of the image is not longer necessary.
## Deploy to k8
We execute a job on k8 to stream the dump into kafka
```bash
kubectl create -f wikidata-normalizer-deployment.yaml
```
\ No newline at end of file
{
"kafka": {
"broker": "localhost:29092",
"schema-registry": "http://localhost:8081",
"fetchAllVersions": true
},
"producer": {
"magpie" : {
"config": {
"batch.num.messages": 1000,
"message.send.max.retries": 5,
"retry.backoff.ms": 5000,
"message.max.bytes": 10485760
},
"topics": "magpie",
"partition": -1,
"author": "bot#1"
},
"geolinker" : {
"config": {
"batch.num.messages": 1000,
"message.send.max.retries": 5,
"retry.backoff.ms": 5000,
"message.max.bytes": 10485760
},
"topics": "geolinker",
"partition": -1,
"author": "bot#1"
}
},
"consumer": {
"config": {
"group.id": "magpie",
"socket.keepalive.enable": true,
"enable.auto.commit": true,
"queued.max.messages.kbytes": 100
},
"topics": {
"topics": "extract"
},
"stream": {
"request.required.acks": 1
}
},
"log-dir": "/tmp",
"reporter": {
"url": "http://dashboard-api"
}
}
\ No newline at end of file
This diff is collapsed.
{
"name": "magpie",
"version": "1.0.0",
"description": "Magpie get urls from kafka and fetchs them in the wild of the internet",
"main": "dist/index.js",
"types": "dist/index.d.js",
"scripts": {
"test": "node_modules/.bin/mocha -r ts-node/register test/**/*.spec.ts",
"lint": "node_modules/.bin/tslint -c tslint.json 'src/**/*.ts'",
"build": "node_modules/.bin/tsc",
"watch": "concurrently -k -p \"[{name}]\" -n \"TypeScript,Node\" -c \"yellow.bold,cyan.bold,green.bold\" \"npm run watch-ts\" \"npm run watch-node\"",
"watch-node": "nodemon dist/index.js",
"watch-ts": "tsc -w"
},
"author": "",
"license": "ISC",
"dependencies": {
"@types/node": "^9.6.18",
"bunyan-format": "^0.2.1",
"concurrently": "^3.5.1",
"geolinker-common": "git+https://gitlab+deploy-token-1:vnsdCm_t84QGVA2U4kw4@source.dodis.ch/histhub/geolinker-common.git",
"node-cleanup": "^2.1.2",
"node-spider": "^1.4.1",
"source-map": "^0.7.3",
"typescript": "^2.8.3"
},
"devDependencies": {
"ts-node": "^6.1.0",
"tslint": "^5.9.1"
}
}
/*
/// <reference types="node" />
declare module 'node-spider' {
export interface Logger {
write: (msg) => void;
}
export interface SpiderOpts {
concurrent: number,
// How long to wait after each request
delay: number,
// A stream to where internal logs are sent, optional
logs: Logger,
// Re-visit visited URLs, false by default
allowDuplicates: boolean,
// If `true` all queued handlers will be try-catch'd, errors go to `error` callback
catchErrors: boolean,
// If `true` the spider will set the Referer header automatically on subsequent requests
addReferrer: boolean,
// If `true` adds the X-Requested-With:XMLHttpRequest header
xhr: boolean,
// If `true` adds the Connection:keep-alive header and forever option on request module
keepAlive: boolean,
// Called when there's an error, throw will be used if none is provided
error: (err, url) => void;
// Called when there are no more requests
done: () => void,
//- All options are passed to `request` module, for example:
headers: any,
encoding: string
}
function
export class Spider {
queue(url: string, done);
full(): boolean;
log(status: string, url: string);
load(url, done, referrer);
constructor(opts: SpiderOpts);
}
}*/
import { Transform } from 'stream';
import {LoggerInstance} from "winston";
/**
* Prepare a value for kafka ProducerStream
*/
class Extractor extends Transform {
/**
* Name of the topic
*/
private logger: LoggerInstance;
constructor(opts, logger: LoggerInstance) {
super(opts);
this.logger = logger;
}
/**
* @inheritDoc
* @param chunk
* @param {string} encoding
* @param {(err?: Error) => void} callback
* @private
*/
public _transform(chunk, encoding, callback) {
console.log("Messages");
const data = JSON.parse(chunk.parsed.doc.res.body);
try {
const res = {
id: data.data.id.toString(),
provider: 'dodis',
url: chunk.parsed.url,
name: data.data.name,
alternative_name: data.data.comment,
country: data.data.countryCode,
location: this.buildLocation(data.data),
modification_date:this.getDate()
};
console.log(res);
return callback(null, res);
} catch (error) {
this.logger.error(error);
callback(error);
}
}
private buildLocation(obj){
if((obj.longitude == null)|| (obj.latitude == null)) {
return null;
}
return `${obj.latitude} ${obj.longitude};`
}
private getDate() {
const date = new Date();
return `${date.getFullYear()}-${date.getMonth()}-${date.getDate()}`;
}
}
export default Extractor;
import {Transform} from 'stream';
import {LoggerInstance} from 'winston';
import {Spider} from 'node-spider';
/**
* Verbosity
* @type {boolean}
*/
const debug = false;
/**
* Small loger for the spider
* @type {{write: function(*=)}}
*/
const logger = {
write: (msg) => {
if(debug) {
console.log(msg);
}
}
};
/**
* A small writer to put result to stdout
* @type {{put: function(*=)}}
*/
const writer = {
put: (line) => {
console.log(line);
}
};
const spider = new Spider({
// How many requests can be run in parallel
concurrent: 10,
// How long to wait after each request
delay: 50,
// A stream to where internal logs are sent, optional
logs: logger,
// Re-visit visited URLs, false by default
allowDuplicates: false,
// If `true` all queued handlers will be try-catch'd, errors go to `error` callback
catchErrors: false,
// If `true` the spider will set the Referer header automatically on subsequent requests
addReferrer: false,
// If `true` adds the X-Requested-With:XMLHttpRequest header
xhr: false,
// If `true` adds the Connection:keep-alive header and forever option on request module
keepAlive: false,
// Called when there's an error, throw will be used if none is provided
error: function(err, url) {
console.log(`There was an error while fetching ${url}` + err);
},
// Called when there are no more requests
done: function() {
console.log("We are done");
},
//- All options are passed to `request` module, for example:
headers: { 'user-agent': 'magpie' },
encoding: 'utf8'
});
class Crawler extends Transform {
/**
* Timeout to queue an url if the queue is full
* @type {number}
*/
private timeout = 20;
/**
* Logger
*/
private logger: LoggerInstance;
constructor(options, logger: LoggerInstance) {
super(options);
this.logger = logger;
}
/**
* @inheritDoc
* @param chunk
* @param encoding
* @param callback
* @private
*/
public _transform(chunk, encoding, callback) {
if(!spider.full) {
const data = chunk.parsed;
spider.queue(chunk.parsed.url, (doc) => {
data.doc = doc;
callback(null, data);
});
} else {
setTimeout(() => {
this._transform(chunk, encoding, callback);
}, this.timeout);
}
}
}
export default Crawler;
import fmt = require('bunyan-format');
import {Core} from 'geolinker-common';
import ReporterCounter from 'geolinker-common/dist/stream/reporter-counter';
import StreamProducerPreparer from 'geolinker-common/dist/stream/stream-producer-preparer';
import Crawler from "./crawler";
import ProducerStream = require('node-rdkafka/lib/producer-stream');
import * as KafkaAvro from 'kafka-avro';
import {SitemapGenerator} from "./sitemap-generator";
import Extractor from "./Extractor";
/**
* load sourcemap and config
*/
const core = Core.init('magpie', __dirname + '/../config.json');
/**
* To trigger this you need to set the KAFKA_AVRO_LOG_LEVEL
* f.e. env KAFKA_AVRO_LOG_LEVEL=debug node dist/index.js
*/
const kafkaLog = KafkaAvro.getLogger();
kafkaLog.addStream({
type: 'stream',
stream: fmt({
outputMode: 'short',
levelInString: true,
}),
level: 'debug',
});
/**
* Init KafkaInstance
* @type {}
*/
const kafka = core.getKafkaAvro();
kafka.init().then(async () => {
try {
// Wait for the promises to resolve
const extractorStreamConsumer = await kafka.getConsumerStream();
const magpieProducer = await kafka.getProducer('magpie');
const geolinkerProducer = await kafka.getProducer('geolinker');
core.getLogger().info('Producer and consumer are ready to go');
/**
* Generate sitemap of all resources
*/
// new SitemapGenerator(magpieProducer);
/**
* On data (flow mode) extract data from magpie and send them to the geolinker
*/
extractorStreamConsumer
.pipe(new ReporterCounter({objectMode: true}, core.getReporter(10000, ), 'read'))
.pipe(new Extractor({objectMode: true}, core.getLogger()))
.pipe(new ReporterCounter({objectMode: true}, core.getReporter(), 'write'))
.pipe(new StreamProducerPreparer({objectMode: true, topic: core.getNconf().get('producer:geolinker:topics'), partition: (chunk) => chunk.provider}))
.pipe(new ProducerStream(geolinkerProducer, {objectMode: true, topic: core.getNconf().get('producer:geolinker:topics')}));
core.getLogger().info('Pipes are registered');
} catch (error) {
core.getLogger().error('Error in retrieving consumer or producer');
core.getLogger().error(error);
}
})
.catch((err) => {
core.getLogger().error('Its not possible to connect to kafka');
throw err;
});
import Spider = require("node-spider");
const logger = {
write: (msg) => {
console.log(msg);
}
};
let producer ;
export class SitemapGenerator {
private producer;
private spider;
private initial = true;
private startUrl = 'https://dodis.ch/search?q=&c=Place&f=Name&t=all&cb=doc';
constructor(producer1) {
producer = producer1;
this.init();
}
init() {
this.spider = new Spider({
// How many requests can be run in parallel
concurrent: 10,
// How long to wait after each request
delay: 50,
// A stream to where internal logs are sent, optional
logs: logger,
// Re-visit visited URLs, false by default
allowDuplicates: false,
// If `true` all queued handlers will be try-catch'd, errors go to `error` callback
catchErrors: false,
// If `true` the spider will set the Referer header automatically on subsequent requests
addReferrer: false,
// If `true` adds the X-Requested-With:XMLHttpRequest header
xhr: false,
// If `true` adds the Connection:keep-alive header and forever option on request module
keepAlive: false,
// Called when there's an error, throw will be used if none is provided
error: function(err, url) {
console.log(`There was an error while fetching ${url}` + err);
},
// Called when there are no more requests
done: function() {
console.log("We are done");
},
//- All options are passed to `request` module, for example:
headers: { 'user-agent': 'magpie' , 'content-type': 'application/json'},
encoding: 'utf8'
});
// its not possbile to override bind context
this.spider.queue(this.startUrl, this.handleRequest.bind(this));
}
handleRequest(doc) {
const data = JSON.parse(doc.res.body);
if (this.initial) {
for (let i = 1; i < data.totalPages; i++) {
this.spider.queue(`${this.startUrl}&p=${i}`, this.handleRequest);
}
this.initial = false;
}
for (let doc of data.data){
try {
producer.produce(
// Topic to send the message to
'magpie',
-1,
// Message to send.
{
url: `https://dodis.ch/G${doc.id}`,
provider: 'dodis',
queuetime: (new Date()).getTime(),
headers: {'content-type': 'application/json'},
},
null,
// you can send a timestamp here. If your broker version supports it,
// it will get added. Otherwise, we default to 0
Date.now(),
// you can send an opaque token here, which gets passed along
// to your delivery reports
);
} catch (err) {
console.error('A problem occurred when sending our message to magpie');
console.error(err);
}
}
}
}
\ No newline at end of file
{
"compilerOptions": {
"target": "es6",
"module": "commonjs",
"outDir": "dist",
"inlineSourceMap": true,
"declaration": false,
"moduleResolution": "node",
"typeRoots": [
"node_modules/@types"
]
},
"include": [
"src/**/*.ts"
],
"exclude": [
"node_modules"
]
}
{
"defaultSeverity": "error",
"extends": [
"tslint:recommended"
],
"jsRules": {},
"rules": {
"quotemark": [
true,