Commit 59905b9f authored by Tobias Steiner's avatar Tobias Steiner
Browse files

Update maxEvenListeneres and try to fix the memory leak

parent 44865349
Pipeline #2307 passed with stage
in 2 minutes and 31 seconds
import express = require('express');
import bodyParser = require('body-parser');
import cors = require('cors')
import cors = require('cors');
import {ApiController, InterfaceRequestApi} from './controller/api'
import {Validator} from './middleware/validator';
import {HealthcheckController} from './controller/healthcheck';
import {Core} from 'geolinker-common';
import {NextFunction, Response} from "express";
import {kafkaEventEmitter} from './utils/kafka-event-container';
import kafkaEventEmitter from './utils/kafka-event-container';
import fmt = require('bunyan-format');
import KafkaAvro = require('kafka-avro');
import timer from './utils/timer';
......
import {NextFunction, Request, Response} from 'express';
import {kafkaEventEmitter} from '../utils/kafka-event-container';
import kafkaEventEmitter from '../utils/kafka-event-container';
import {core} from '../app';
import timer from "../utils/timer";
import { ResponseCollector} from '../utils/response-collector';
......@@ -52,6 +52,8 @@ export interface InterfaceStreamAppResponse {
}
export class ApiController {
static unsubscribeTimeout = 2000;
/**
* If a node contains an url with special characters like ? or # it needs to be urlencoded. We check if we find more than one of those characters to be sure the encoding will work properly
* @param req
......
......@@ -4,12 +4,19 @@ const NodeCache = require( "node-cache" );
const shajs = require('sha.js');
import {InterfaceFinalAnswer} from '../utils/response-collector';
// prepare cache storage. We will cash answers for 5 minutes
const cacheStore: InterfaceCacheStore = new NodeCache( { stdTTL: 3600, checkperiod: 120 } );
/**
* Extend response interface and add sendResponse property
*/
export interface InterfaceCachedResponse extends Response{
sendResponse: (body: any) => Response;
}
/**
* CacheStore interface
*/
export interface InterfaceCacheStore {
set(key: string | number, value: any, ttl?: number): void;
get(key: string | number): any;
......@@ -17,10 +24,16 @@ export interface InterfaceCacheStore {
}
/**
* Small wrapper to cache an get kafka answers
* Small wrapper to cache and get kafka answers
*/
export class KafkaCache {
/**
* Check if we have an answer of kafka in the cash and if possible send it
* @param req
* @param res
* @param next
*/
static middleware(req: Request, res: InterfaceCachedResponse, next: NextFunction) {
const url = req.url || req.originalUrl;
const id = shajs('sha256').update(url).digest('hex');
......
const EventEmitter = require('events');
class KafkaEventEmitter extends EventEmitter {}
export const kafkaEventEmitter = new KafkaEventEmitter();
// event emitter for kafka events
const kafkaEventEmitter = new KafkaEventEmitter();
// allow more than 10 parallel event listeners
kafkaEventEmitter.setMaxListeners(100);
export default kafkaEventEmitter;
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment