diff --git a/src/magpie-transformer.ts b/src/magpie-transformer.ts index f7b8609a4dbc86789fc0f39f0a12e3a994caff38..a7f0accb779a5d89d1493ac7d1bc27872efd6559 100644 --- a/src/magpie-transformer.ts +++ b/src/magpie-transformer.ts @@ -39,7 +39,8 @@ class MagpieTransformer extends Transform { /** * Timeout for a server */ - private serverTimeout = 30 * 1000; + private readonly serverTimeout = 30 * 1000; + /** * Logger */ @@ -71,6 +72,7 @@ class MagpieTransformer extends Transform { this.delay = options.delay || 200; this.concurrent = options.concurrent || 10; this.requestOptions = options.requestOptions || {}; + this.serverTimeout = options.serverTimeout || 30 * 1000; } /** @@ -97,30 +99,19 @@ class MagpieTransformer extends Transform { this.pending++; setTimeout(() => { - let foundServer = false; - // if the server is not reacting wait for a timeout and then call the callback - setTimeout(() => { - this.pending--; - if (!foundServer) { - this.logger.warn('Too fast. ServerTimeout triggered'); - this.emit('free'); - } - }, this.serverTimeout); - - const options = {...this.requestOptions, ...{url: data.url}}; + const options = {...this.requestOptions, ...{url: data.url, timeout: this.serverTimeout}}; MagpieTransformer._request(options, (err, res, _) => { this.pending--; - - foundServer = true; if (err) { - this.logger.error(err, data.url); - this.push(null); + this.logger.warn(err, data.url); + this.emit('error', err); + return this.emit('free') } const doc = new Document(data.url, res); this.logger.info('Success', data.url); this.push(doc); - this.emit('free') + return this.emit('free') }); }, this.delay); callback(); @@ -129,7 +120,7 @@ class MagpieTransformer extends Transform { // Wrap it for easier mocking private static _request(opts, done) { // All options forwarded to request() - request(opts, done); + return request(opts, done); } // @ts-ignore diff --git a/test/magpie-transformer.spec.ts b/test/magpie-transformer.spec.ts index 8539062f642bcc6261a4b674e0f7190426164e0b..2ca8604aba22e6c2ec991e3336f345f570f91990 100644 --- a/test/magpie-transformer.spec.ts +++ b/test/magpie-transformer.spec.ts @@ -20,7 +20,11 @@ describe('Test magpie-transformer', () => { chai.should(); // start a test server to fetch data from const requestHandler = (request, response) => { - response.end('ok') + if(request.url === '/timeout') { + setTimeout(() => response.end('ok'), 1000) + } else { + response.end('ok') + } }; server = http.createServer(requestHandler); server.listen(3000, (err) => { @@ -58,7 +62,7 @@ describe('Test magpie-transformer', () => { const options = { objectMode: true, logger: coreMock.getLogger(), - delay: 2000, + delay: 200, concurrent: 1 }; const magpie = new MagpieTransformer(options); @@ -73,7 +77,7 @@ describe('Test magpie-transformer', () => { } else { const time = timer.end(); timer.start(); - time.should.be.greaterThan(2000); + time.should.be.greaterThan(200); } if(counter === 2) { done(); @@ -159,6 +163,72 @@ describe('Test magpie-transformer', () => { magpie.end() }).timeout(5000); + + it('Test delay with slow server', (done) => { + const options = { + objectMode: true, + logger: coreMock.getLogger(), + delay: 200, + concurrent: 1, + highWaterMark: 10000, + }; + const magpie = new MagpieTransformer(options); + + // listener to check output + let counter = 0; + let timer; + magpie.on('data', (data: any) => { + data.res.body.should.contains('ok'); + if(counter === 0) { + timer = new Timer(); + } else { + const time = timer.end(); + timer.start(); + time.should.be.greaterThan(1200).lessThan(1300); + } + if(counter === 2) { + done(); + } + counter++; + }); + + // send three mocks and measure time between results + for(let i = 0; i < 3; i++) { + const mock = JSON.stringify({url: 'http://localhost:3000/timeout'}); + magpie.write(mock); + } + magpie.end() + }).timeout(50000); + + + it('Test server partial not available', (done) => { + const options = { + objectMode: true, + logger: coreMock.getLogger(), + delay: 200, + concurrent: 1, + serverTimeout: 100 + }; + const magpie = new MagpieTransformer(options); + + // listener to check output + let data = 0; + let timer; + magpie.on('data', (err: any) => { + data++; + }); + magpie.on('error', (err: any) => { + if(err.code === 'ESOCKETTIMEDOUT') { + data.should.be.eq(1); + done() + } + }); + + // send three mocks and measure time between results + magpie.write(JSON.stringify({url: 'http://localhost:3000/'})); + magpie.write(JSON.stringify({url: 'http://localhost:3000/timeout'})); + magpie.end() + }).timeout(50000); }); /**