Commit d19c3a5b authored by tobinski's avatar tobinski
Browse files

Change the streaming app into a node-transformer

parent b37e5f77
FROM node:10-slim
# add basic libs
RUN apt-get update && apt-get install -y \
bash \
g++ \
gcc \
ca-certificates \
make \
python \
bash \
git
# Create app directory
RUN mkdir -p /usr/local/app
# Move to the app directory
WORKDIR /usr/local/app
# copy app to the container
COPY package.json package-lock.json config.json tsconfig.json /usr/local/app/
COPY src /usr/local/app/src
COPY test/ /usr/local/app/test
# Install dependencies
RUN npm install
# build stuff
RUN npm run build
# run app
CMD node dist/index.js
# Magpie
Magpie reads a steam from kafka and fetches the given url. It puts the data back to kafka
Magpie is a stream transformer that takes urls from a stream and requests the resource from the internet. It can handle delays between requests and concurrency.
## Docker
To build the image use following command. The image will fetch data from a wikidata topic and streams the result back into kafka. The container based on linux alpine.
```bash
docker build -t source.dodis.ch:4577/histhub/wikidata-normalizer .
# Upload to the registry
docker push source.dodis.ch:4577/histhub/wikidata-normalizer
```
## CD
We hav a build pipline in gitlab. So manually building of the image is not longer necessary.
## Deploy to k8
We execute a job on k8 to stream the dump into kafka
```bash
kubectl create -f wikidata-normalizer-deployment.yaml
```
\ No newline at end of file
## Usage
## Concurrency and delay
If we configure a concurrency of 10 and a delay of 200 then 10 requests are send in parallel and after finishing they will make a break for 200ms and send again. The first 10 requests hits the server in parallel and then after the response is send each callback will wait for 200ms before send again. After a while the delays between the requests are a bit more randomized.
{
"kafka": {
"broker": "localhost:29092",
"schema-registry": "http://localhost:8081",
"fetchAllVersions": true
},
"producer": {
"config": {
"batch.num.messages": 1000,
"message.send.max.retries": 5,
"retry.backoff.ms": 5000,
"message.max.bytes": 10485760
},
"topics": "extract",
"author": "bot#1"
},
"consumer": {
"config": {
"group.id": "magpie",
"socket.keepalive.enable": true,
"enable.auto.commit": true,
"queued.max.messages.kbytes": 100
},
"topics": {
"topics": "magpie"
},
"stream": {
"request.required.acks": 1
}
},
"log-dir": "/tmp",
"reporter": {
"url": "http://dashboard-api"
}
}
\ No newline at end of file
This diff is collapsed.
......@@ -15,16 +15,20 @@
"author": "",
"license": "ISC",
"dependencies": {
"@types/node": "^9.6.18",
"@types/node": "^9.6.52",
"concurrently": "^3.5.1",
"geolinker-common": "git+https://gitlab+deploy-token-1:vnsdCm_t84QGVA2U4kw4@source.dodis.ch/histhub/geolinker-common.git",
"node-cleanup": "^2.1.2",
"node-spider": "git+https://github.com/tobinski/node-spider",
"source-map": "^0.7.3",
"typescript": "^2.8.3"
"typescript": "^2.8.3",
"cheerio": "^0.22"
},
"devDependencies": {
"ts-node": "^6.1.0",
"tslint": "^5.9.1"
"@types/chai": "^4.2.3",
"@types/mocha": "^5.2.7",
"chai": "^4.2.0",
"mocha": "^6",
"sinon": "^6.3.5",
"ts-node": "^8.4",
"tslint": "^5.20.0"
}
}
import {Transform} from 'stream';
import {LoggerInstance} from 'winston';
import Spider = require('node-spider');
/**
* Verbosity
* @type {boolean}
*/
const debug = false;
/**
* Small loger for the spider
* @type {{write: function(*=)}}
*/
const logger = {
write: (msg) => {
if(debug) {
console.log(msg);
}
}
};
/**
* A small writer to put result to stdout
* @type {{put: function(*=)}}
*/
const writer = {
put: (line) => {
console.log(line);
}
};
const spider = new Spider({
// How many requests can be run in parallel
concurrent: 10,
// How long to wait after each request
delay: 200,
// A stream to where internal logs are sent, optional
logs: logger,
// Re-visit visited URLs, false by default
allowDuplicates: false,
// If `true` all queued handlers will be try-catch'd, errors go to `error` callback
catchErrors: false,
// If `true` the spider will set the Referer header automatically on subsequent requests
addReferrer: false,
// If `true` adds the X-Requested-With:XMLHttpRequest header
xhr: false,
// If `true` adds the Connection:keep-alive header and forever option on request module
keepAlive: false,
// Called when there's an error, throw will be used if none is provided
error: function(err, url) {
console.log(`There was an error while fetching ${url}` + err);
},
// Called when there are no more requests
done: function() {
console.log("We are done");
},
headers: { 'user-agent': 'magpie' },
encoding: 'utf8'
});
class Crawler extends Transform {
/**
* Timeout to queue an url if the queue is full
* @type {number}
*/
private timeout = 20;
/**
* Timeout for a server
*/
private serverTimeout = 30 * 1000;
/**
* Logger
*/
private logger: LoggerInstance;
constructor(options, logger: LoggerInstance) {
super(options);
this.logger = logger;
}
/**
* @inheritDoc
* @param chunk
* @param encoding
* @param callback
* @private
*/
public _transform(chunk, encoding, callback) {
console.log('loaded');
if(!spider.full()) {
const data = chunk.parsed;
let foundServer = false;
spider.queue(data.url, (doc) => {
console.log("Successfully got data " + data.url);
data.doc = doc;
foundServer = true;
callback(null, data);
}, data.headers);
// if the server is not reacting wait for a timeout and then call the callback
setTimeout(() => {
if(!foundServer) {
this.logger.warn('Too fast. ServerTimeout triggered');
callback(null);
}
}, this.serverTimeout)
} else {
setTimeout(() => {
this._transform(chunk, encoding, callback);
}, this.timeout);
}
}
}
export default Crawler;
import {Core} from 'geolinker-common';
import ReporterCounter from 'geolinker-common/dist/stream/reporter-counter';
import StreamProducerPreparer from 'geolinker-common/dist/stream/stream-producer-preparer';
import Crawler from "./crawler";
import ProducerStream = require('node-rdkafka/lib/producer-stream');
import Debug from "geolinker-common/dist/stream/debug";
/**
* load sourcemap and config
*/
const core = Core.init('magpie', __dirname + '/../config.json');
/**
* Init KafkaInstance
* @type {}
*/
const kafka = core.getKafkaAvro();
kafka.init().then(async () => {
// Wait for the promises to resolve
try {
const extractorProducer = await kafka.getProducer();
const consumerStream = await kafka.getConsumerStream();
core.getLogger().info('Producer and consumer are ready to go');
/**
* On data (flow mode) transform data and send them to the data topic
*/
consumerStream
.pipe(new ReporterCounter({objectMode: true}, core.getReporter(10000, ), 'read'))
.pipe(new Crawler({objectMode: true}, core.getLogger()))
.pipe(new ReporterCounter({objectMode: true}, core.getReporter(), 'write'))
.pipe(new StreamProducerPreparer({objectMode: true, topic: core.getNconf().get('producer:topics'), partition: (chunk) => chunk.provider}))
.pipe(new Debug({objectMode: true}, ''))
.pipe(new ProducerStream(extractorProducer, {objectMode: true, topic: core.getNconf().get('producer:topics')}));
core.getLogger().info('Pipes are registered');
} catch (error) {
core.getLogger().error('Error in retrieving consumer or producer');
core.getLogger().error(error);
}
})
.catch((err) => {
core.getLogger().error('Its not possible to connect to kafka');
throw err;
});
import {Transform} from 'stream';
import {LoggerInstance} from 'winston';
import * as request from 'request';
import cheerio from 'cheerio';
import {resolve} from 'url';
/**
* The returning document
* @param url
* @param res
* @constructor
*/
function Document(url, res) {
this.url = url;
this.res = res;
}
/**
* Properties of the document
*/
Document.prototype = {
constructor: Document,
// Lazy parse
get $() {
return this._$ || (this._$ = cheerio.load(this.res.body));
},
resolve: function(uri) {
return resolve(this.url, uri);
}
};
/**
* MagpieTransformer
*/
class MagpieTransformer extends Transform {
/**
* Timeout for a server
*/
private serverTimeout = 30 * 1000;
/**
* Logger
*/
private logger: LoggerInstance;
/**
* delay between two requests
*/
private readonly delay: number = 200;
/**
* Options for request
*/
private readonly requestOptions = {};
/**
* A counter for pending requests
*/
private pending: number = 0;
/**
* the concurrency of the crawler
*/
private readonly concurrent: number = 10;
constructor(options) {
super(options);
this.logger = options.logger;
this.delay = options.delay || 200;
this.concurrent = options.concurrent || 10;
this.requestOptions = options.requestOptions || {};
}
/**
* @inheritDoc
* @param chunk
* @param encoding
* @param callback
* @private
*/
public _transform(chunk, encoding, callback) {
let data;
if (Buffer.isBuffer(chunk)) {
const buffer = Buffer.from(chunk).toString('utf8');
data = JSON.parse(buffer);
} else {
data = JSON.parse(chunk);
}
if (this.pending >= this.concurrent) {
return this.once('free', () => {
this._transform(chunk, encoding, callback);
});
}
this.pending++;
setTimeout(() => {
let foundServer = false;
// if the server is not reacting wait for a timeout and then call the callback
setTimeout(() => {
this.pending--;
if (!foundServer) {
this.logger.warn('Too fast. ServerTimeout triggered');
this.emit('free');
}
}, this.serverTimeout);
const options = {...this.requestOptions, ...{url: data.url}};
MagpieTransformer._request(options, (err, res, _) => {
this.pending--;
foundServer = true;
if (err) {
this.logger.error(err, data.url);
this.push(null);
}
const doc = new Document(data.url, res);
this.logger.info('Success', data.url);
this.push(doc);
this.emit('free')
});
}, this.delay);
callback();
}
// Wrap it for easier mocking
private static _request(opts, done) {
// All options forwarded to request()
request(opts, done);
}
// @ts-ignore
public end();
public end(chunk: any);
public end(chunk: any, enc: string);
public end(chunk: any, enc: string, callback: any);
public end(chunk: any, enc: string, callback: any) {
if (this.pending) {
return this.once('free', () => {
this.end(chunk, enc, callback);
});
}
if (typeof chunk === 'function') {
callback = chunk;
chunk = null;
}
if (typeof enc === 'function') {
callback = enc;
enc = null;
}
if (chunk) {
this.write(chunk, enc);
return this.once('free', () => {
this.end(callback);
});
}
if (callback) this.on('finish', callback);
super.end();
}
}
export default MagpieTransformer;
import {Writable} from 'stream';
import {CODES, Producer} from 'node-rdkafka';
import {InterfaceReporter} from 'geolinker-common/dist/reporter';
class KafkaProducerWritable extends Writable {
private producer ;
private nconf;
private timeout = 500;
private reporter: InterfaceReporter;
constructor(options = {}, producer: Producer, nconf, reporter: InterfaceReporter) {
super(options);
this.producer = producer;
this.nconf = nconf;
this.reporter = reporter;
}
public async _write(chunk, encoding, callback) {
try {
// produce to a dynamic topic
const data = {
topic: 'extractor',
value: chunk,
partition: chunk.provider,
}
await this.producer.produce(this.nconf.get('topic:config:topic'), this.nconf.get('topic:config:partion'), data);
this.reporter.setDataOut(1);
callback();
} catch (err) {
// handle backpressure if the queue is full
// if (CODES.ERRORS.ERR__QUEUE_FULL === err.code) {
// setTimeout(() => {
// console.
// this._write(chunk, encoding, callback);
//
// }, this.timeout);
// } else {
setTimeout(() => {
callback(err);
}, this.timeout);
// }
}
}
}
export default KafkaProducerWritable;
import chai = require('chai');
import {describe, it, before} from 'mocha';
import MagpieTransformer from '../src/magpie-transformer';
import {InterfaceCore} from 'geolinker-common/dist/core';
import {Core} from 'geolinker-common';
import * as http from 'http';
// prepare environment
process.env.NODE_ENV = 'test';
process.env['log-dir'] = '/tmp';
process.env.reporter_url = 'localhost:9999';
// mock core and server
const coreMock: InterfaceCore = new Core('test', 'none');
let server;
describe('Test magpie-transformer', () => {
before((done) => {
chai.should();
// start a test server to fetch data from
const requestHandler = (request, response) => {
response.end('ok')
};
server = http.createServer(requestHandler);
server.listen(3000, (err) => {
if (err) {
return console.log('something bad happened', err)
}
done();
});
});
after((done) => {
server.close();
done();
});
it('Try to fetch an url', (done) => {
const mock = JSON.stringify({url: 'http://localhost:3000'});
const options = {
objectMode: true,
logger: coreMock.getLogger(),
delay: 200,
};
const filter = new MagpieTransformer(options);
// listener to check output
filter.on('data', (data: any) => {
data.res.body.should.contains('ok');
done();
});
filter.write(mock);
filter.end()
}).timeout(30000);
it('Test delay between requests if there is no concurrency', (done) => {
const options = {
objectMode: true,
logger: coreMock.getLogger(),
delay: 2000,
concurrent: 1
};
const filter = new MagpieTransformer(options);
// listener to check output
let counter = 0;
let timer;
filter.on('data', (data: any) => {
data.res.body.should.contains('ok');
if(counter === 0) {
timer = new Timer();
} else {
const time = timer.end();
timer.start();
time.should.be.greaterThan(2000);
}
if(counter === 2) {
done();
}
counter++;
});
// send three mocks and measure time between results
for(let i = 0; i < 3; i++) {
const mock = JSON.stringify({url: 'http://localhost:3000/' + i});
filter.write(mock);
}
filter.end()
}).timeout(7000);
it('Test delay between requests if there is concurrency', (done) => {
const options = {
objectMode: true,
logger: coreMock.getLogger(),
delay: 200,
concurrent: 10
};
const filter = new MagpieTransformer(options);
// listener to check output
let counter = 0;
let timer;
filter.on('data', (data: any) => {
data.res.body.should.contains('ok');