Commit 0655a02c authored by Tobinsk's avatar Tobinsk
Browse files

Merge branch '8-key-callback' into 'master'

Add the possiblity to define a callback to set a key

Closes #8

See merge request !10
parents ba026ae5 0d87105d
import { Transform } from 'stream';
import {Key} from 'readline';
/**
* A type to for the PartitionCallback function
*/
export type PartitionCallback = (chunk: any) => number;
export type KeyCallback = (chunk: any) => number;
/**
* Prepare a value for kafka ProducerStream
*/
......@@ -20,10 +22,16 @@ class StreamProducerPreparer extends Transform {
*/
private readonly partition: number | PartitionCallback = -1;
/**
* Should we use a key or not
*/
private readonly key: boolean | KeyCallback = false;
constructor(opts) {
super(opts);
this.topic = opts.topic;
this.partition = opts.partition || -1;
this.key = opts.key || false;
}
/**
* @inheritDoc
......@@ -34,11 +42,14 @@ class StreamProducerPreparer extends Transform {
*/
public _transform(chunk, encoding, callback) {
// see: producer-stream.js L: 176
const value = {
const value: any = {
topic: this.topic,
value: chunk,
partition: typeof this.partition === 'function' ? this.partition(chunk) : this.partition,
};
if (typeof this.key === 'function') {
value.key = this.key(chunk);
}
callback(null, value);
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment