Commit 999ea0f5 authored by Tobinsk's avatar Tobinsk
Browse files

Merge branch 'master' into 'url-resolver-kafka'

# Conflicts:
#   .gitlab-ci.yml
#   config.json
#   src/app.ts
parents 45d90a57 cfa14e70
Pipeline #3574 passed with stage
in 1 minute and 18 seconds
# This file is a template, and might need editing before it works on your project.
# Official docker image.
image: docker:latest
services:
- docker:dind
variables:
DOCKER_DRIVER: overlay2
DOCKER_TLS_CERTDIR: ""
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
build-master:
stage: build
script:
- docker build --pull -t "$CI_REGISTRY_IMAGE" .
- docker push "$CI_REGISTRY_IMAGE"
only:
- master
build:
stage: build
script:
- docker build --pull -t "$CI_REGISTRY_IMAGE:$CI_COMMIT_REF_SLUG" .
- docker push "$CI_REGISTRY_IMAGE:$CI_COMMIT_REF_SLUG"
except:
- master
include:
- project: 'histhub/node-kafka-docker-base'
ref: 'master'
file: '.gitlab-ci.yml'
FROM node:10-slim
# add basic libs
RUN apt-get update && apt-get install -y \
bash \
g++ \
gcc \
ca-certificates \
make \
python \
bash \
git
# Create app directory
RUN mkdir -p /usr/local/app
# Move to the app directory
WORKDIR /usr/local/app
FROM source.dodis.ch:4577/histhub/node-kafka-docker-base:latest
# copy app to the container
COPY package.json package-lock.json config.json tsconfig.json /usr/local/app/
......
......@@ -36,5 +36,6 @@
"express": {
"port": 3000,
"kafka-timeout": 5000
}
},
"url-resolver": "http://url-resolver/v1/urltoprovider/"
}
......@@ -16,9 +16,7 @@
"license": "ISC",
"dependencies": {
"@types/express": "^4.16.0",
"@types/node": "^9.6.21",
"@types/superagent": "^3.8.0",
"@types/winston": "^2.3.9",
"boom": "^7.3.0",
"concurrently": "^3.5.1",
"cors": "^2.8.4",
......@@ -32,9 +30,7 @@
"node-cache": "^4.2.0",
"sha.js": "^2.4.11",
"source-map-support": "^0.5.6",
"supertest": "^3.1.0",
"typescript": "^2.8.3",
"winston": "^2.4.2"
"supertest": "^3.1.0"
},
"devDependencies": {
"@types/boom": "^7.2.1",
......
......@@ -5,12 +5,13 @@ import {ApiController, InterfaceRequestApi} from './controller/api'
import {Validator} from './middleware/validator';
import {HealthcheckController} from './controller/healthcheck';
import {Core} from 'geolinker-common';
import {NextFunction, Response} from "express";
import {NextFunction, Request, Response} from "express";
import kafkaEventEmitter from './utils/kafka-event-container';
import fmt = require('bunyan-format');
import KafkaAvro = require('kafka-avro');
import timer from './utils/timer';
import {KafkaCache} from './middleware/cache';
import {InterfaceCore} from 'geolinker-common/dist/core';
const kafkaLog = KafkaAvro.getLogger();
kafkaLog.addStream({
......
......@@ -156,6 +156,7 @@ export class ApiController {
// prepared data from the middleware
const message = req.message;
const id = message.id;
const language = req.validatedBody.language || 'de';
timer.start(`kafka-${id}`);
......@@ -164,7 +165,7 @@ export class ApiController {
// flag to check if we already answered
let answered = false;
const responseData = new ResponseCollector(req.config);
const responseData = new ResponseCollector(req.config, language);
// if kafka is not answering give a fuck
const kafkaTimeout = setTimeout(() => {
// check if we already answered
......@@ -178,9 +179,19 @@ export class ApiController {
// todo: we should use rxjs here
let unsubscribeTimer: Timer = null;
function waitForResponse(data: InterfaceStreamAppResponse) {
responseData.add(data);
async function waitForResponse(data: InterfaceStreamAppResponse) {
// if we fail to resolve the url to a provider we will send a 500
try {
console.log("Try to add data");
await responseData.add(data);
console.log("data added");
} catch (error) {
return next(Boom.internal('Unable to resolve urls to provider'));
}
console.log("tmp data");
console.log(responseData.getData());
if (responseData.finished()) {
console.log("finished");
// set the cache each time we got new data and have at least a minimum set
KafkaCache.set(id, responseData.getData());
if(!answered) {
......
......@@ -2,7 +2,7 @@ import { RequestHandler} from 'express-serve-static-core';
import { NextFunction, Request, Response} from 'express';
import { check, validationResult } from 'express-validator/check';
import { matchedData } from 'express-validator/filter';
import { InterfaceError, InterfaceRequestApi } from '../controller/api';
import { InterfaceRequestApi } from '../controller/api';
import * as boom from 'boom';
/**
......
import {InterfaceStreamAppResponse} from '../controller/api';
import UrlResolver from './url-resolver';
/**
......@@ -28,23 +29,49 @@ export class ResponseCollector {
*/
private config: string[];
/**
* Language oif the provider
*/
private language: string;
/**
* The config is a simple array with the names of the kafka streming apps
* @param config
* @param language
*/
constructor(config: string[]) {
constructor(config: string[], language = 'de') {
this.config = config;
this.language = language;
}
/**
* Add a data to the final result
* @param {InterfaceStreamAppResponse} data
*/
add(data: InterfaceStreamAppResponse): void {
async add(data: InterfaceStreamAppResponse): Promise<void> {
// should always be the same
this.finalData.id = data.id;
// find key and add data
console.log('config ' + this.config);
console.log('name' + data.name);
if(this.config.indexOf(data.name) > -1){
// check if array
if(Array.isArray(data.data.links)) {
console.log('links');
data.data.links = await Promise.all(data.data.links.map(async (d) => {
const a = {};
try {
const provider = await UrlResolver.getProvider(d, this.language);
a[provider] = d;
return a;
} catch(error) {
console.log(`error getting provider` + error);
a['unknown'] = d;
return a;
}
}));
console.log(data.data.links);
}
if(Array.isArray(this.finalData.data[data.name])) {
this.finalData.data[data.name].push(data.data);
} else {
......@@ -86,3 +113,6 @@ export class ResponseCollector {
return this.finalData;
}
}
export class InterfaceResponseConfig {
}
\ No newline at end of file
import axios from 'axios';
import {core} from '../app';
/**
* Small class to resolve an url over an external sevice
*/
class UrlResolver {
static baseUrl = 'http://url-resolver/v1/urltoprovider/';
constructor() {
UrlResolver.baseUrl = core.getNconf().get('url-resolver');
}
public static async getProvider(url: string, language: string = 'de') {
try {
const provider = await axios.get(UrlResolver.baseUrl + url);
return provider.data.name[language];
} catch(err) {
if(err.response.status === 404) {
console.log(`Could not resolve (404) url ${url}`);
return 'unknown';
}
console.log(`Could not resolve (${err.response.status}) url ${url}`);
return 'unknown';
}
}
}
export default UrlResolver;
......@@ -22,9 +22,7 @@ describe('Test response collector', () => {
});
it('Should fail on one of two required answers', (done) => {
const config: InterfaceResponseConfig = {
data: ['resolverneo4j', 'resolverelasticseatch',],
};
const config = ['resolverneo4j', 'resolverelasticseatch',];
const collector = new ResponseCollector(config);
collector.add({name: 'neo4jresolver', data: {test}, id: 'swe34rfdedw3s'});
......@@ -33,9 +31,7 @@ describe('Test response collector', () => {
});
it('Should pass on two of two required answers', (done) => {
const config: InterfaceResponseConfig = {
data: ['resolverneo4j', 'resolverelasticsearch',],
};
const config = ['resolverneo4j', 'resolverelasticseatch',];
const collector = new ResponseCollector(config);
collector.add({name: 'resolverneo4j', data: {test}, id: 'swe34rfdedw3s'});
......@@ -45,9 +41,7 @@ describe('Test response collector', () => {
});
it('Should pass on tree of minimum two required answers', (done) => {
const config: InterfaceResponseConfig = {
data: ['resolverneo4j', 'resolverelasticsearch',],
};
const config = ['resolverneo4j', 'resolverelasticseatch',];
const collector = new ResponseCollector(config);
collector.add({name: 'resolverneo4j', data: {test}, id: 'swe34rfdedw3s'});
......@@ -57,10 +51,7 @@ describe('Test response collector', () => {
});
it('Get data from collector', (done) => {
const config: InterfaceResponseConfig = {
data: ['resolverneo4j', 'resolverelasticsearch',],
metadata: 'resolvermetadata',
};
const config = ['resolverneo4j', 'resolverelasticsearch', 'resolvermetadata'];
const mockAnswer = {
config: {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment