Commit 34f640ad authored by Tobias Steiner's avatar Tobias Steiner
Browse files

Add the callback for the partition

parent 3f46c853
/// <reference types="node" />
import { Transform } from 'stream';
/**
* A type to for the PartitionCallback function
*/
export declare type PartitionCallback = (chunk: any) => number;
/**
* Prepare a value for kafka ProducerStream
*/
......
......@@ -26,10 +26,10 @@ class StreamProducerPreparer extends stream_1.Transform {
const value = {
topic: this.topic,
value: chunk,
partition: this.partition,
partition: typeof this.partition === 'function' ? this.partition(chunk) : this.partition,
};
callback(null, value);
}
}
exports.default = StreamProducerPreparer;
//# sourceMappingURL=data:application/json;base64,eyJ2ZXJzaW9uIjozLCJmaWxlIjoic3RyZWFtLXByb2R1Y2VyLXByZXBhcmVyLmpzIiwic291cmNlUm9vdCI6IiIsInNvdXJjZXMiOlsiLi4vLi4vc3JjL3N0cmVhbS9zdHJlYW0tcHJvZHVjZXItcHJlcGFyZXIudHMiXSwibmFtZXMiOltdLCJtYXBwaW5ncyI6Ijs7QUFBQSxtQ0FBbUM7QUFFbkM7O0dBRUc7QUFDSCw0QkFBNkIsU0FBUSxrQkFBUztJQVk1QyxZQUFZLElBQUk7UUFDZCxLQUFLLENBQUMsSUFBSSxDQUFDLENBQUM7UUFOZDs7V0FFRztRQUNjLGNBQVMsR0FBVyxDQUFDLENBQUMsQ0FBQztRQUl0QyxJQUFJLENBQUMsS0FBSyxHQUFHLElBQUksQ0FBQyxLQUFLLENBQUM7UUFDeEIsSUFBSSxDQUFDLFNBQVMsR0FBRyxJQUFJLENBQUMsU0FBUyxJQUFJLENBQUMsQ0FBQyxDQUFDO0lBQ3hDLENBQUM7SUFDRDs7Ozs7O09BTUc7SUFDSSxVQUFVLENBQUMsS0FBSyxFQUFFLFFBQVEsRUFBRSxRQUFRO1FBQ3pDLGlDQUFpQztRQUNqQyxNQUFNLEtBQUssR0FBRztZQUNaLEtBQUssRUFBRSxJQUFJLENBQUMsS0FBSztZQUNqQixLQUFLLEVBQUUsS0FBSztZQUNaLFNBQVMsRUFBRSxJQUFJLENBQUMsU0FBUztTQUMxQixDQUFDO1FBQ0YsUUFBUSxDQUFDLElBQUksRUFBRSxLQUFLLENBQUMsQ0FBQztJQUN4QixDQUFDO0NBQ0Y7QUFFRCxrQkFBZSxzQkFBc0IsQ0FBQyJ9
\ No newline at end of file
//# sourceMappingURL=data:application/json;base64,eyJ2ZXJzaW9uIjozLCJmaWxlIjoic3RyZWFtLXByb2R1Y2VyLXByZXBhcmVyLmpzIiwic291cmNlUm9vdCI6IiIsInNvdXJjZXMiOlsiLi4vLi4vc3JjL3N0cmVhbS9zdHJlYW0tcHJvZHVjZXItcHJlcGFyZXIudHMiXSwibmFtZXMiOltdLCJtYXBwaW5ncyI6Ijs7QUFBQSxtQ0FBbUM7QUFPbkM7O0dBRUc7QUFDSCw0QkFBNkIsU0FBUSxrQkFBUztJQVk1QyxZQUFZLElBQUk7UUFDZCxLQUFLLENBQUMsSUFBSSxDQUFDLENBQUM7UUFOZDs7V0FFRztRQUNjLGNBQVMsR0FBZ0MsQ0FBQyxDQUFDLENBQUM7UUFJM0QsSUFBSSxDQUFDLEtBQUssR0FBRyxJQUFJLENBQUMsS0FBSyxDQUFDO1FBQ3hCLElBQUksQ0FBQyxTQUFTLEdBQUcsSUFBSSxDQUFDLFNBQVMsSUFBSSxDQUFDLENBQUMsQ0FBQztJQUN4QyxDQUFDO0lBQ0Q7Ozs7OztPQU1HO0lBQ0ksVUFBVSxDQUFDLEtBQUssRUFBRSxRQUFRLEVBQUUsUUFBUTtRQUN6QyxpQ0FBaUM7UUFDakMsTUFBTSxLQUFLLEdBQUc7WUFDWixLQUFLLEVBQUUsSUFBSSxDQUFDLEtBQUs7WUFDakIsS0FBSyxFQUFFLEtBQUs7WUFDWixTQUFTLEVBQUUsT0FBTyxJQUFJLENBQUMsU0FBUyxLQUFLLFVBQVUsQ0FBQyxDQUFDLENBQUMsSUFBSSxDQUFDLFNBQVMsQ0FBQyxLQUFLLENBQUMsQ0FBQyxDQUFDLENBQUMsSUFBSSxDQUFDLFNBQVM7U0FDekYsQ0FBQztRQUNGLFFBQVEsQ0FBQyxJQUFJLEVBQUUsS0FBSyxDQUFDLENBQUM7SUFDeEIsQ0FBQztDQUNGO0FBRUQsa0JBQWUsc0JBQXNCLENBQUMifQ==
\ No newline at end of file
import { Transform } from 'stream';
/**
* A type to for the PartitionCallback function
*/
export type PartitionCallback = (chunk: any) => number;
/**
* Prepare a value for kafka ProducerStream
*/
......@@ -13,7 +18,7 @@ class StreamProducerPreparer extends Transform {
/**
* Partition
*/
private readonly partition: number = -1;
private readonly partition: number | PartitionCallback = -1;
constructor(opts) {
super(opts);
......@@ -32,7 +37,7 @@ class StreamProducerPreparer extends Transform {
const value = {
topic: this.topic,
value: chunk,
partition: this.partition,
partition: typeof this.partition === 'function' ? this.partition(chunk) : this.partition,
};
callback(null, value);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment