Commit ecf0771e authored by Tobias Steiner's avatar Tobias Steiner
Browse files

Lower batchsize

parent eee49c27
......@@ -49,19 +49,23 @@ export class ElasticsearchTransformer extends Transform {
this.batch.push(this.prepareMeta(chunk.parsed));
this.batch.push(this.prepareData(chunk.parsed));
if (this.batch.length > this.batchSize) {
try {
await this.client.bulk({body: this.batch});
return setImmediate(callback);
} catch (err) {
console.error(err);
setTimeout(() => this._transform(chunk, encoding, callback), this.timeout);
}
await this.indexElasticsearch(this.batch, callback);
this.batch = [];
} else {
return setImmediate(callback);
}
}
private async indexElasticsearch(batch, callback) {
try {
await this.client.bulk({body: batch});
return setImmediate(callback);
} catch (err) {
this.core.getLogger().error('Error communicating with ES', err);
setTimeout(() => this.indexElasticsearch(batch, callback), this.timeout);
}
}
private prepareMeta(row) {
return {
index: {
......
......@@ -40,11 +40,16 @@ kafka.init().then(() => {
* pipe the stream from kafka fo neo4j
*/
consumer
.pipe(new ElasticsearchTransformer({objectMode: true}, core));
.pipe(new ElasticsearchTransformer({objectMode: true, batchSize: 500}, core));
consumer.on('data', () => {
counter++;
});
consumer.on('event.error', (err) => {
core.getLogger().error(err);
consumer.consume();
});
setInterval(() => {
console.log('Count : ' + counter);
}, 30000);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment