Commit c9e6abae authored by Tobias Steiner's avatar Tobias Steiner
Browse files

Change to a writable

parent ecf0771e
import {createClient} from './connection';
import {Client, SearchResponse} from 'elasticsearch';
import {InterfaceCore} from 'geolinker-common/dist/core';
import {Transform} from 'stream';
import {Writable} from 'stream';
export class ElasticsearchTransformer extends Transform {
export class ElasticsearchWriter extends Writable {
/**
* Batch size
......@@ -45,7 +45,7 @@ export class ElasticsearchTransformer extends Transform {
}
public async _transform(chunk, encoding, callback) {
public async _write(chunk, encoding, callback) {
this.batch.push(this.prepareMeta(chunk.parsed));
this.batch.push(this.prepareData(chunk.parsed));
if (this.batch.length > this.batchSize) {
......
import fmt = require('bunyan-format');
import {Core} from 'geolinker-common';
import {ElasticsearchTransformer} from './elasticsearch-transformer';
import {ElasticsearchWriter} from './elasticsearch-writer';
import KafkaAvro = require('kafka-avro');
/**
......@@ -40,7 +40,7 @@ kafka.init().then(() => {
* pipe the stream from kafka fo neo4j
*/
consumer
.pipe(new ElasticsearchTransformer({objectMode: true, batchSize: 500}, core));
.pipe(new ElasticsearchWriter({objectMode: true, batchSize: 500}, core));
consumer.on('data', () => {
counter++;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment