Commit 34943f46 authored by Tobinsk's avatar Tobinsk
Browse files

Merge branch '2-missing-metadata' into 'master'

Resolve "change toi a writable stream"

Closes #3

See merge request !3
parents 116d236f c9e6abae
import {createClient} from './connection';
import {Client, SearchResponse} from 'elasticsearch';
import {InterfaceCore} from 'geolinker-common/dist/core';
import {Transform} from 'stream';
import {Writable} from 'stream';
export class ElasticsearchTransformer extends Transform {
export class ElasticsearchWriter extends Writable {
/**
* Batch size
......@@ -45,23 +45,27 @@ export class ElasticsearchTransformer extends Transform {
}
public async _transform(chunk, encoding, callback) {
public async _write(chunk, encoding, callback) {
this.batch.push(this.prepareMeta(chunk.parsed));
this.batch.push(this.prepareData(chunk.parsed));
if (this.batch.length > this.batchSize) {
try {
await this.client.bulk({body: this.batch});
return setImmediate(callback);
} catch (err) {
console.error(err);
setTimeout(() => this._transform(chunk, encoding, callback), this.timeout);
}
await this.indexElasticsearch(this.batch, callback);
this.batch = [];
} else {
return setImmediate(callback);
}
}
private async indexElasticsearch(batch, callback) {
try {
await this.client.bulk({body: batch});
return setImmediate(callback);
} catch (err) {
this.core.getLogger().error('Error communicating with ES', err);
setTimeout(() => this.indexElasticsearch(batch, callback), this.timeout);
}
}
private prepareMeta(row) {
return {
index: {
......
import fmt = require('bunyan-format');
import {Core} from 'geolinker-common';
import {ElasticsearchTransformer} from './elasticsearch-transformer';
import {ElasticsearchWriter} from './elasticsearch-writer';
import KafkaAvro = require('kafka-avro');
/**
......@@ -40,11 +40,16 @@ kafka.init().then(() => {
* pipe the stream from kafka fo neo4j
*/
consumer
.pipe(new ElasticsearchTransformer({objectMode: true}, core));
.pipe(new ElasticsearchWriter({objectMode: true, batchSize: 500}, core));
consumer.on('data', () => {
counter++;
});
consumer.on('event.error', (err) => {
core.getLogger().error(err);
consumer.consume();
});
setInterval(() => {
console.log('Count : ' + counter);
}, 30000);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment