Commit 32f1f9b4 authored by Tobinsk's avatar Tobinsk

Merge branch '2-use-magpie-as-a-library' into 'master'

Resolve "Use magpie as a library"

Closes #2

See merge request !3
parents 71fef1e6 f3ef5692
......@@ -5,17 +5,6 @@
"fetchAllVersions": true
},
"producer": {
"magpie" : {
"config": {
"batch.num.messages": 1000,
"message.send.max.retries": 5,
"retry.backoff.ms": 5000,
"message.max.bytes": 10485760
},
"topics": "magpie",
"partition": -1,
"author": "bot#1"
},
"geolinker" : {
"config": {
"batch.num.messages": 1000,
......@@ -29,23 +18,9 @@
}
},
"consumer": {
"config": {
"group.id": "dodis",
"socket.keepalive.enable": true,
"enable.auto.commit": true,
"queued.max.messages.kbytes": 100
},
"topics": {
"topics": "extract"
},
"stream": {
"request.required.acks": 1
}
},
"log-dir": "/tmp",
"buildSitemap": false,
"buildSitemap": true,
"reporter": {
"url": "http://dashboard-api"
}
}
\ No newline at end of file
}
This diff is collapsed.
{
"name": "magpie",
"name": "dodis",
"version": "1.0.0",
"description": "Magpie get urls from kafka and fetchs them in the wild of the internet",
"main": "dist/index.js",
......@@ -15,17 +15,19 @@
"author": "",
"license": "ISC",
"dependencies": {
"@types/node": "^9.6.18",
"bunyan-format": "^0.2.1",
"@types/node": "^10.14",
"concurrently": "^3.5.1",
"geolinker-common": "git+https://gitlab+deploy-token-1:vnsdCm_t84QGVA2U4kw4@source.dodis.ch/histhub/geolinker-common.git",
"node-cleanup": "^2.1.2",
"node-spider": "^1.4.1",
"source-map": "^0.7.3",
"typescript": "^2.8.3"
"magpie": "git+https://gitlab+deploy-token-10:j2fMMZ6XHFDmTJ9yGD6U@source.dodis.ch/histhub/magpie.git",
"typescript": "^3.6"
},
"devDependencies": {
"ts-node": "^6.1.0",
"tslint": "^5.9.1"
"@types/chai": "^4.2",
"@types/mocha": "^5.2",
"chai": "^4.2",
"mocha": "^6",
"sinon": "^6.3.5",
"ts-node": "^8.4",
"tslint": "^5.20.0"
}
}
/*
/// <reference types="node" />
declare module 'node-spider' {
export interface Logger {
write: (msg) => void;
}
export interface SpiderOpts {
concurrent: number,
// How long to wait after each request
delay: number,
// A stream to where internal logs are sent, optional
logs: Logger,
// Re-visit visited URLs, false by default
allowDuplicates: boolean,
// If `true` all queued handlers will be try-catch'd, errors go to `error` callback
catchErrors: boolean,
// If `true` the spider will set the Referer header automatically on subsequent requests
addReferrer: boolean,
// If `true` adds the X-Requested-With:XMLHttpRequest header
xhr: boolean,
// If `true` adds the Connection:keep-alive header and forever option on request module
keepAlive: boolean,
// Called when there's an error, throw will be used if none is provided
error: (err, url) => void;
// Called when there are no more requests
done: () => void,
//- All options are passed to `request` module, for example:
headers: any,
encoding: string
}
function
export class Spider {
queue(url: string, done);
full(): boolean;
log(status: string, url: string);
load(url, done, referrer);
constructor(opts: SpiderOpts);
}
}*/
import {Transform} from 'stream';
import {LoggerInstance} from 'winston';
import {Spider} from 'node-spider';
/**
* Verbosity
* @type {boolean}
*/
const debug = false;
/**
* Small loger for the spider
* @type {{write: function(*=)}}
*/
const logger = {
write: (msg) => {
if(debug) {
console.log(msg);
}
}
};
/**
* A small writer to put result to stdout
* @type {{put: function(*=)}}
*/
const writer = {
put: (line) => {
console.log(line);
}
};
const spider = new Spider({
// How many requests can be run in parallel
concurrent: 10,
// How long to wait after each request
delay: 50,
// A stream to where internal logs are sent, optional
logs: logger,
// Re-visit visited URLs, false by default
allowDuplicates: false,
// If `true` all queued handlers will be try-catch'd, errors go to `error` callback
catchErrors: false,
// If `true` the spider will set the Referer header automatically on subsequent requests
addReferrer: false,
// If `true` adds the X-Requested-With:XMLHttpRequest header
xhr: false,
// If `true` adds the Connection:keep-alive header and forever option on request module
keepAlive: false,
// Called when there's an error, throw will be used if none is provided
error: function(err, url) {
console.log(`There was an error while fetching ${url}` + err);
},
// Called when there are no more requests
done: function() {
console.log("We are done");
},
//- All options are passed to `request` module, for example:
headers: { 'user-agent': 'magpie' },
encoding: 'utf8'
});
class Crawler extends Transform {
/**
* Timeout to queue an url if the queue is full
* @type {number}
*/
private timeout = 20;
/**
* Logger
*/
private logger: LoggerInstance;
constructor(options, logger: LoggerInstance) {
super(options);
this.logger = logger;
}
/**
* @inheritDoc
* @param chunk
* @param encoding
* @param callback
* @private
*/
public _transform(chunk, encoding, callback) {
if(!spider.full) {
const data = chunk.parsed;
spider.queue(chunk.parsed.url, (doc) => {
data.doc = doc;
callback(null, data);
});
} else {
setTimeout(() => {
this._transform(chunk, encoding, callback);
}, this.timeout);
}
}
}
export default Crawler;
......@@ -4,16 +4,16 @@ import {LoggerInstance} from "winston";
/**
* Prepare a value for kafka ProducerStream
*/
class Extractor extends Transform {
class DodisNormalizerTransformer extends Transform {
/**
* Name of the topic
*/
private logger: LoggerInstance;
constructor(opts, logger: LoggerInstance) {
constructor(opts) {
super(opts);
this.logger = logger;
this.logger = opts.logger;
}
/**
......@@ -24,37 +24,37 @@ class Extractor extends Transform {
* @private
*/
public _transform(chunk, encoding, callback) {
const data = JSON.parse(chunk.parsed.doc.res.body);
const data = JSON.parse(chunk.res.body);
try {
const res = {
id: data.data.id.toString(),
provider: 'dodis',
url: chunk.parsed.url,
url: chunk.url,
name: data.data.name,
alternative_name: data.data.comment,
country: data.data.countryCode,
location: this.buildLocation(data.data),
modification_date:this.getDate()
location: DodisNormalizerTransformer.buildLocation(data.data),
modification_date: DodisNormalizerTransformer.getDate()
};
return callback(null, res);
} catch (error) {
this.logger.error(error);
callback(error);
return callback(error);
}
}
private buildLocation(obj){
private static buildLocation(obj){
if((obj.longitude == null)|| (obj.latitude == null)) {
return null;
}
return `${obj.latitude},${obj.longitude}`;
}
private getDate() {
private static getDate() {
const date = new Date();
return `${date.getFullYear()}-${date.getMonth()}-${date.getDate()}`;
}
}
export default Extractor;
export default DodisNormalizerTransformer;
import fmt = require('bunyan-format');
import {Core} from 'geolinker-common';
import ReporterCounter from 'geolinker-common/dist/stream/reporter-counter';
import StreamProducerPreparer from 'geolinker-common/dist/stream/stream-producer-preparer';
import Crawler from "./crawler";
import ProducerStream = require('node-rdkafka/lib/producer-stream');
import * as KafkaAvro from 'kafka-avro';
import {SitemapGenerator} from "./sitemap-generator";
import Extractor from "./Extractor";
import DodisNormalizerTransformer from "./dodis-normalizer-transformer";
import SitemapReader from './sitemap-reader';
import MagpieTransformer from 'magpie/dist/magpie-transformer';
/**
* load sourcemap and config
*/
const core = Core.init('magpie', __dirname + '/../config.json');
/**
* To trigger this you need to set the KAFKA_AVRO_LOG_LEVEL
* f.e. env KAFKA_AVRO_LOG_LEVEL=debug node dist/index.js
* Init core
*/
const kafkaLog = KafkaAvro.getLogger();
kafkaLog.addStream({
type: 'stream',
stream: fmt({
outputMode: 'short',
levelInString: true,
}),
level: 'debug',
});
const core = Core.init('magpie', __dirname + '/../config.json');
/**
* Init KafkaInstance
......@@ -37,38 +18,53 @@ const kafka = core.getKafkaAvro();
kafka.init().then(async () => {
try {
// Wait for the promises to resolve
const extractorStreamConsumer = await kafka.getConsumerStream();
const magpieProducer = await kafka.getProducer('magpie');
const geolinkerProducer = await kafka.getProducer('geolinker');
core.getLogger().info('Producer is ready to go');
core.getLogger().info('Producer and consumer are ready to go');
/**
* Init sitemap readable stream
*/
const headers = {'user-agent': 'magpie', 'content-type': 'application/json'};
const siteMapReader = new SitemapReader({logger: core.getLogger(), objectMode: true});
/**
* Generate sitemap of all resources
* Get urls from the sitemap, request the data ith magpie and extract them into the geolinker topic
*/
if(core.getNconf().get('buildSitemap')) {
new SitemapGenerator(magpieProducer);
}
siteMapReader
.pipe(new MagpieTransformer({
objectMode: true,
delay: 500,
concurrent: 1,
logger: core.getLogger(),
requestOptions: {headers}
}))
.pipe(new DodisNormalizerTransformer({objectMode: true, logger: core.getLogger()}))
.pipe(new StreamProducerPreparer({
objectMode: true,
topic: core.getNconf().get('producer:geolinker:topics'),
key: (d) => `${d.provider}${d.id}`
}))
.pipe(new ProducerStream(geolinkerProducer, {
objectMode: true,
topic: core.getNconf().get('producer:geolinker:topics')
}));
/**
* On data (flow mode) extract data from magpie and send them to the geolinker
* todo: use the right partition or a different topic
* Handle error
*/
extractorStreamConsumer
.pipe(new ReporterCounter({objectMode: true}, core.getReporter(10000, ), 'read'))
.pipe(new Extractor({objectMode: true}, core.getLogger()))
.pipe(new ReporterCounter({objectMode: true}, core.getReporter(), 'write'))
.pipe(new StreamProducerPreparer({objectMode: true, topic: core.getNconf().get('producer:geolinker:topics'), key: (d) => `${d.provider}${d.id}`}))
.pipe(new ProducerStream(geolinkerProducer, {objectMode: true, topic: core.getNconf().get('producer:geolinker:topics')}));
siteMapReader.on('error', (err) => {
core.getLogger().error('Error in sitemap stream');
core.getLogger().error(err);
});
core.getLogger().info('Pipes are registered');
} catch (error) {
core.getLogger().error('Error in retrieving consumer or producer');
core.getLogger().error(error);
}
})
.catch((err) => {
core.getLogger().error('Its not possible to connect to kafka');
throw err;
});
.catch((err) => {
core.getLogger().error('Its not possible to connect to kafka');
throw err;
});
import Spider = require("node-spider");
const logger = {
write: (msg) => {
console.log(msg);
}
};
let producer ;
export class SitemapGenerator {
private producer;
private spider;
private initial = true;
private startUrl = 'https://dodis.ch/search?q=&c=Place&f=Name&t=all&cb=doc';
constructor(producer1) {
producer = producer1;
this.init();
}
init() {
this.spider = new Spider({
// How many requests can be run in parallel
concurrent: 10,
// How long to wait after each request
delay: 50,
// A stream to where internal logs are sent, optional
logs: logger,
// Re-visit visited URLs, false by default
allowDuplicates: false,
// If `true` all queued handlers will be try-catch'd, errors go to `error` callback
catchErrors: false,
// If `true` the spider will set the Referer header automatically on subsequent requests
addReferrer: false,
// If `true` adds the X-Requested-With:XMLHttpRequest header
xhr: false,
// If `true` adds the Connection:keep-alive header and forever option on request module
keepAlive: false,
// Called when there's an error, throw will be used if none is provided
error: function(err, url) {
console.log(`There was an error while fetching ${url}` + err);
},
// Called when there are no more requests
done: function() {
console.log("We are done");
},
//- All options are passed to `request` module, for example:
headers: { 'user-agent': 'magpie' , 'content-type': 'application/json'},
encoding: 'utf8'
});
// its not possbile to override bind context
this.spider.queue(this.startUrl, this.handleRequest.bind(this));
}
handleRequest(doc) {
const data = JSON.parse(doc.res.body);
if (this.initial) {
for (let i = 1; i < data.totalPages; i++) {
this.spider.queue(`${this.startUrl}&p=${i}`, this.handleRequest);
}
this.initial = false;
}
for (let doc of data.data){
try {
producer.produce(
// Topic to send the message to
'magpie',
-1,
// Message to send.
{
url: `https://dodis.ch/G${doc.id}`,
provider: 'dodis',
queuetime: (new Date()).getTime(),
headers: {'content-type': 'application/json'},
},
null,
// you can send a timestamp here. If your broker version supports it,
// it will get added. Otherwise, we default to 0
Date.now(),
// you can send an opaque token here, which gets passed along
// to your delivery reports
);
} catch (err) {
console.error('A problem occurred when sending our message to magpie');
console.error(err);
}
}
}
}
\ No newline at end of file
import {Readable} from 'stream';
import {LoggerInstance} from 'winston';
import * as request from 'request';
/**
* MagpieTransformer
*/
class SitemapReader extends Readable {
/**
* Logger
*/
private logger: LoggerInstance;
/**
* Simple flag to decide if we just started or nor
*/
private initial = false;
private static headers = { 'user-agent': 'magpie' , 'content-type': 'application/json'};
/**
* Start url for dodis
*/
private startUrl = 'https://dodis.ch/search?q=&c=Place&f=Name&t=all&cb=doc';
constructor(options) {
super(options);
this.logger = options.logger;
}
/**
* Read stuff from the sitemap
* @param size
* @private
*/
public _read(size) {
SitemapReader._request({url: this.startUrl}, (err, res, _) => {
this.handleRequest(err, res);
});
}
/**
* Request the data
* @param opts
* @param done
* @private
*/
public static _request(opts, done) {
// All options forwarded to request()
opts = {...{headers: SitemapReader.headers}, ...opts};
request(opts, done);
}
/**
* Handle the request and push data
* @param res
*/
private handleRequest(err, res) {
if(err) {
this.logger.error(err);
return this.push(null);
}
if(res) {
const data = JSON.parse(res.body);
if (this.initial) {
for (let i = 1; i < data.totalPages; i++) {
setTimeout(() => {
SitemapReader._request({url: `${this.startUrl}&p=${i}`}, this.handleRequest.bind(this));
}, 300 * i);
}
this.initial = false;
}
for (let doc of data.data) {
this.push(JSON.stringify({url: `https://dodis.ch/G${doc.id}`}));
}
} else {
this.push(null);
}
}
}
export default SitemapReader;
import chai = require('chai');
import {describe, it, before} from 'mocha';
import DodisNormalizerTransformer from '../src/dodis-normalizer-transformer';
import {InterfaceCore} from 'geolinker-common/dist/core';
import {Core} from 'geolinker-common';
import * as fs from 'fs';
// prepare environment
process.env.NODE_ENV = 'test';
process.env['log-dir'] = '/tmp';
process.env.reporter_url = 'localhost:9999';
// mock core and server
const coreMock: InterfaceCore = new Core('test', 'none');
describe('Test dodis-normalizer-transformer', () => {
before((done) => {
chai.should();
done();
});
it('Try to normalize a dodis json', (done) => {
const mock = fs.readFileSync(__dirname + '/fixtures/single.json', 'utf-8');
const options = {
objectMode: true,
logger: coreMock.getLogger(),
};
const dodisNormalizerTransformer = new DodisNormalizerTransformer(options);
// listener to check output
dodisNormalizerTransformer.on('data', (data: any) => {
const expect = {
id: "8",
provider: 'dodis',
url: 'https://dodis.ch/G8',
name: 'Budapest',
alternative_name: null,
country: null,
location: `47.497913,19.040236`,
};
// we cant predict modification_date
delete data.modification_date;
data.should.deep.equal(expect);
});
dodisNormalizerTransformer.on('end', () => {
done()
});
dodisNormalizerTransformer.write({res: {body: mock}, url: "https://dodis.ch/G8"});
dodisNormalizerTransformer.end();
});
});
This diff is collapsed.