Commit 2f79e129 authored by Tobias Steiner's avatar Tobias Steiner
Browse files

Initial commit

parents
# Logs
logs
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
# Runtime data
pids
*.pid
*.seed
*.pid.lock
# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov
# Coverage directory used by tools like istanbul
coverage
# nyc test coverage
.nyc_output
# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files)
.grunt
# Bower dependency directory (https://bower.io/)
bower_components
# node-waf configuration
.lock-wscript
# Compiled binary addons (https://nodejs.org/api/addons.html)
build/Release
# Dependency directories
node_modules/
jspm_packages/
# TypeScript v1 declaration files
typings/
# Optional npm cache directory
.npm
# Optional eslint cache
.eslintcache
# Optional REPL history
.node_repl_history
# Output of 'npm pack'
*.tgz
# Yarn Integrity file
.yarn-integrity
# dotenv environment variables file
.env
# next.js build output
.next
dist/
.idea/
\ No newline at end of file
services:
- docker:dind
variables:
IMAGE_TAG: $CI_REGISTRY/$CI_IMAGE:$CI_COMMIT_REF_NAME
LATEST_TAG: $CI_REGISTRY/$CI_IMAGE:latest
cache:
paths:
- node_modules/
build:
only:
- master
image: docker:latest
services:
- docker:dind
stage: build
script:
- docker login -u gitlab-ci-token -p $CI_JOB_TOKEN $CI_REGISTRY
- docker build -t $IMAGE_TAG .
- docker tag $IMAGE_TAG $LATEST_TAG
- docker push $IMAGE_TAG
- docker push $LATEST_TAG
test:
stage: test
only:
- sowhereinthefuture
image: $LATEST_TAG
before_script:
- export NODE_ENV=dev && npm install
script:
- npm run test
\ No newline at end of file
FROM node:8-alpine
# add basic libs
RUN apk --no-cache add \
bash \
g++ \
ca-certificates \
lz4-dev \
musl-dev \
cyrus-sasl-dev \
openssl-dev \
make \
python \
bash \
git
RUN apk add --no-cache --virtual .build-deps gcc zlib-dev libc-dev bsd-compat-headers py-setuptools libexecinfo libexecinfo-dev
# Create app directory
RUN mkdir -p /usr/local/app
# Move to the app directory
WORKDIR /usr/local/app
# copy app to the container
COPY package.json package-lock.json config.json tsconfig.json /usr/local/app/
COPY src /usr/local/app/src
COPY test/ /usr/local/app/test
# Install dependencies
RUN npm install
# build stuff
RUN npm run build
# run app
CMD node dist/index.js
\ No newline at end of file
# Magpie
Magpie reads a steam from kafka and fetches the given url. It puts the data back to kafka
## Docker
To build the image use following command. The image will fetch data from a wikidata topic and streams the result back into kafka. The container based on linux alpine.
```bash
docker build -t source.dodis.ch:4577/histhub/wikidata-normalizer .
# Upload to the registry
docker push source.dodis.ch:4577/histhub/wikidata-normalizer
```
## CD
We hav a build pipline in gitlab. So manually building of the image is not longer necessary.
## Deploy to k8
We execute a job on k8 to stream the dump into kafka
```bash
kubectl create -f wikidata-normalizer-deployment.yaml
```
\ No newline at end of file
{
"kafka": {
"broker": "localhost:29092",
"schema-registry": "http://localhost:8081",
"fetchAllVersions": true
},
"producer": {
"config": {
"batch.num.messages": 1000,
"message.send.max.retries": 5,
"retry.backoff.ms": 5000,
"message.max.bytes": 10485760
},
"topics": "extract",
"author": "bot#1"
},
"consumer": {
"config": {
"group.id": "magpie",
"socket.keepalive.enable": true,
"enable.auto.commit": true,
"queued.max.messages.kbytes": 100
},
"topics": {
"topics": "magpie"
},
"stream": {
"request.required.acks": 1
}
},
"log-dir": "/tmp",
"reporter": {
"url": "http://dashboard-api"
}
}
\ No newline at end of file
This diff is collapsed.
{
"name": "magpie",
"version": "1.0.0",
"description": "Magpie get urls from kafka and fetchs them in the wild of the internet",
"main": "dist/index.js",
"types": "dist/index.d.js",
"scripts": {
"test": "node_modules/.bin/mocha -r ts-node/register test/**/*.spec.ts",
"lint": "node_modules/.bin/tslint -c tslint.json 'src/**/*.ts'",
"build": "node_modules/.bin/tsc",
"watch": "concurrently -k -p \"[{name}]\" -n \"TypeScript,Node\" -c \"yellow.bold,cyan.bold,green.bold\" \"npm run watch-ts\" \"npm run watch-node\"",
"watch-node": "nodemon dist/index.js",
"watch-ts": "tsc -w"
},
"author": "",
"license": "ISC",
"dependencies": {
"@types/node": "^9.6.18",
"concurrently": "^3.5.1",
"geolinker-common": "git+https://gitlab+deploy-token-1:vnsdCm_t84QGVA2U4kw4@source.dodis.ch/histhub/geolinker-common.git",
"node-cleanup": "^2.1.2",
"node-spider": "git+https://github.com/tobinski/node-spider",
"source-map": "^0.7.3",
"typescript": "^2.8.3"
},
"devDependencies": {
"ts-node": "^6.1.0",
"tslint": "^5.9.1"
}
}
import {Transform} from 'stream';
import {LoggerInstance} from 'winston';
import Spider = require('node-spider');
/**
* Verbosity
* @type {boolean}
*/
const debug = false;
/**
* Small loger for the spider
* @type {{write: function(*=)}}
*/
const logger = {
write: (msg) => {
if(debug) {
console.log(msg);
}
}
};
/**
* A small writer to put result to stdout
* @type {{put: function(*=)}}
*/
const writer = {
put: (line) => {
console.log(line);
}
};
const spider = new Spider({
// How many requests can be run in parallel
concurrent: 20,
// How long to wait after each request
delay: 50,
// A stream to where internal logs are sent, optional
logs: logger,
// Re-visit visited URLs, false by default
allowDuplicates: false,
// If `true` all queued handlers will be try-catch'd, errors go to `error` callback
catchErrors: false,
// If `true` the spider will set the Referer header automatically on subsequent requests
addReferrer: false,
// If `true` adds the X-Requested-With:XMLHttpRequest header
xhr: false,
// If `true` adds the Connection:keep-alive header and forever option on request module
keepAlive: false,
// Called when there's an error, throw will be used if none is provided
error: function(err, url) {
console.log(`There was an error while fetching ${url}` + err);
},
// Called when there are no more requests
done: function() {
console.log("We are done");
},
headers: { 'user-agent': 'magpie' },
encoding: 'utf8'
});
class Crawler extends Transform {
/**
* Timeout to queue an url if the queue is full
* @type {number}
*/
private timeout = 20;
/**
* Logger
*/
private logger: LoggerInstance;
constructor(options, logger: LoggerInstance) {
super(options);
this.logger = logger;
}
/**
* @inheritDoc
* @param chunk
* @param encoding
* @param callback
* @private
*/
public _transform(chunk, encoding, callback) {
console.log('loaded');
if(!spider.full()) {
const data = chunk.parsed;
spider.queue(data.url, (doc) => {
console.log("Successfully got data " + data.url);
data.doc = doc;
callback(null, data);
}, data.headers);
} else {
setTimeout(() => {
this._transform(chunk, encoding, callback);
}, this.timeout);
}
}
}
export default Crawler;
import {Core} from 'geolinker-common';
import ReporterCounter from 'geolinker-common/dist/stream/reporter-counter';
import StreamProducerPreparer from 'geolinker-common/dist/stream/stream-producer-preparer';
import Crawler from "./crawler";
import ProducerStream = require('node-rdkafka/lib/producer-stream');
import Debug from "geolinker-common/dist/stream/debug";
/**
* load sourcemap and config
*/
const core = Core.init('magpie', __dirname + '/../config.json');
/**
* Init KafkaInstance
* @type {}
*/
const kafka = core.getKafkaAvro();
kafka.init().then(async () => {
// Wait for the promises to resolve
try {
const extractorProducer = await kafka.getProducer();
const consumerStream = await kafka.getConsumerStream();
core.getLogger().info('Producer and consumer are ready to go');
/**
* On data (flow mode) transform data and send them to the data topic
*/
consumerStream
.pipe(new ReporterCounter({objectMode: true}, core.getReporter(10000, ), 'read'))
.pipe(new Crawler({objectMode: true}, core.getLogger()))
.pipe(new ReporterCounter({objectMode: true}, core.getReporter(), 'write'))
.pipe(new StreamProducerPreparer({objectMode: true, topic: core.getNconf().get('producer:topics'), partition: (chunk) => chunk.provider}))
.pipe(new Debug({objectMode: true}, ''))
.pipe(new ProducerStream(extractorProducer, {objectMode: true, topic: core.getNconf().get('producer:topics')}));
core.getLogger().info('Pipes are registered');
} catch (error) {
core.getLogger().error('Error in retrieving consumer or producer');
core.getLogger().error(error);
}
})
.catch((err) => {
core.getLogger().error('Its not possible to connect to kafka');
throw err;
});
import {Writable} from 'stream';
import {CODES, Producer} from 'node-rdkafka';
import {InterfaceReporter} from 'geolinker-common/dist/reporter';
class KafkaProducerWritable extends Writable {
private producer ;
private nconf;
private timeout = 500;
private reporter: InterfaceReporter;
constructor(options = {}, producer: Producer, nconf, reporter: InterfaceReporter) {
super(options);
this.producer = producer;
this.nconf = nconf;
this.reporter = reporter;
}
public async _write(chunk, encoding, callback) {
try {
// produce to a dynamic topic
const data = {
topic: 'extractor',
value: chunk,
partition: chunk.provider,
}
await this.producer.produce(this.nconf.get('topic:config:topic'), this.nconf.get('topic:config:partion'), data);
this.reporter.setDataOut(1);
callback();
} catch (err) {
// handle backpressure if the queue is full
// if (CODES.ERRORS.ERR__QUEUE_FULL === err.code) {
// setTimeout(() => {
// console.
// this._write(chunk, encoding, callback);
//
// }, this.timeout);
// } else {
setTimeout(() => {
callback(err);
}, this.timeout);
// }
}
}
}
export default KafkaProducerWritable;
{
"compilerOptions": {
"target": "es6",
"module": "commonjs",
"outDir": "dist",
"inlineSourceMap": true,
"declaration": false,
"moduleResolution": "node",
"typeRoots": [
"node_modules/@types"
]
},
"include": [
"src/**/*.ts"
],
"exclude": [
"node_modules"
]
}
{
"defaultSeverity": "error",
"extends": [
"tslint:recommended"
],
"jsRules": {},
"rules": {
"quotemark": [
true,
"single"
],
"object-literal-sort-keys": false
},
"rulesDirectory": []
}
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment