Commit 1b0a35af authored by Tobinsk's avatar Tobinsk

Merge branch '1-wrong-keys' into 'master'

Resolve "Wrong keys"

Closes #1

See merge request !1
parents d6626aef e7b85e3e
FROM node:8-alpine
FROM node:10-slim
# add basic libs
RUN apk --no-cache add \
RUN apt-get update && apt-get install -y \
bash \
g++ \
gcc \
ca-certificates \
lz4-dev \
musl-dev \
cyrus-sasl-dev \
openssl-dev \
make \
python \
bash \
git
RUN apk add --no-cache --virtual .build-deps gcc zlib-dev libc-dev bsd-compat-headers py-setuptools libexecinfo libexecinfo-dev
# Create app directory
RUN mkdir -p /usr/local/app
......@@ -34,4 +29,4 @@ RUN npm install
RUN npm run build
# run app
CMD node dist/index.js
\ No newline at end of file
CMD node dist/index.js
......@@ -31,7 +31,7 @@
},
"consumer": {
"config": {
"group.id": "magpie",
"group.id": "dodis",
"socket.keepalive.enable": true,
"enable.auto.commit": true,
"queued.max.messages.kbytes": 100
......
This diff is collapsed.
......@@ -24,10 +24,7 @@ class Extractor extends Transform {
* @private
*/
public _transform(chunk, encoding, callback) {
console.log("Messages");
const data = JSON.parse(chunk.parsed.doc.res.body);
console.log(data);
try {
const res = {
id: data.data.id.toString(),
......@@ -39,7 +36,6 @@ class Extractor extends Transform {
location: this.buildLocation(data.data),
modification_date:this.getDate()
};
console.log(res);
return callback(null, res);
} catch (error) {
......@@ -52,7 +48,7 @@ class Extractor extends Transform {
if((obj.longitude == null)|| (obj.latitude == null)) {
return null;
}
return `${obj.latitude}, ${obj.longitude}`;
return `${obj.latitude},${obj.longitude}`;
}
private getDate() {
......
......@@ -52,12 +52,13 @@ kafka.init().then(async () => {
/**
* On data (flow mode) extract data from magpie and send them to the geolinker
* todo: use the right partition or a different topic
*/
extractorStreamConsumer
.pipe(new ReporterCounter({objectMode: true}, core.getReporter(10000, ), 'read'))
.pipe(new Extractor({objectMode: true}, core.getLogger()))
.pipe(new ReporterCounter({objectMode: true}, core.getReporter(), 'write'))
.pipe(new StreamProducerPreparer({objectMode: true, topic: core.getNconf().get('producer:geolinker:topics'), partition: (chunk) => chunk.provider}))
.pipe(new StreamProducerPreparer({objectMode: true, topic: core.getNconf().get('producer:geolinker:topics'), key: (d) => `${d.provider}${d.id}`}))
.pipe(new ProducerStream(geolinkerProducer, {objectMode: true, topic: core.getNconf().get('producer:geolinker:topics')}));
core.getLogger().info('Pipes are registered');
} catch (error) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment