Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
histhub
geolinker-common
Commits
0d87105d
Commit
0d87105d
authored
May 16, 2019
by
Tobias Steiner
Browse files
Add the possiblity to define a callback to set a key
parent
ba026ae5
Changes
1
Hide whitespace changes
Inline
Side-by-side
src/stream/stream-producer-preparer.ts
View file @
0d87105d
import
{
Transform
}
from
'
stream
'
;
import
{
Key
}
from
'
readline
'
;
/**
* A type to for the PartitionCallback function
*/
export
type
PartitionCallback
=
(
chunk
:
any
)
=>
number
;
export
type
KeyCallback
=
(
chunk
:
any
)
=>
number
;
/**
* Prepare a value for kafka ProducerStream
*/
...
...
@@ -20,10 +22,16 @@ class StreamProducerPreparer extends Transform {
*/
private
readonly
partition
:
number
|
PartitionCallback
=
-
1
;
/**
* Should we use a key or not
*/
private
readonly
key
:
boolean
|
KeyCallback
=
false
;
constructor
(
opts
)
{
super
(
opts
);
this
.
topic
=
opts
.
topic
;
this
.
partition
=
opts
.
partition
||
-
1
;
this
.
key
=
opts
.
key
||
false
;
}
/**
* @inheritDoc
...
...
@@ -34,11 +42,14 @@ class StreamProducerPreparer extends Transform {
*/
public
_transform
(
chunk
,
encoding
,
callback
)
{
// see: producer-stream.js L: 176
const
value
=
{
const
value
:
any
=
{
topic
:
this
.
topic
,
value
:
chunk
,
partition
:
typeof
this
.
partition
===
'
function
'
?
this
.
partition
(
chunk
)
:
this
.
partition
,
};
if
(
typeof
this
.
key
===
'
function
'
)
{
value
.
key
=
this
.
key
(
chunk
);
}
callback
(
null
,
value
);
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment