Commit 528b0123 authored by Tobinsk's avatar Tobinsk
Browse files

Merge branch '9-add-url-resolver' into 'master'

Resolve "Add url resolver"

Closes #9

See merge request !23
parents 3a6975d0 45d22b7e
Pipeline #3370 passed with stage
in 2 minutes and 48 seconds
......@@ -2,8 +2,12 @@
# Official docker image.
image: docker:latest
variables:
DOCKER_DRIVER: overlay2
DOCKER_TLS_CERTDIR: ""
services:
- docker:dind
- docker:19.03.1-dind
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
......
......@@ -6,7 +6,7 @@
},
"consumer": {
"config": {
"group.id": "response-geoconcordance",
"group.id": "response-geoconcordance-1",
"socket.keepalive.enable": true,
"enable.auto.commit": true,
"queued.max.messages.kbytes": 100
......@@ -36,5 +36,6 @@
"express": {
"port": 3000,
"kafka-timeout": 5000
}
},
"url-resolver": "http://url-resolver/v1/urltoprovider/"
}
......@@ -5,12 +5,13 @@ import {ApiController, InterfaceRequestApi} from './controller/api'
import {Validator} from './middleware/validator';
import {HealthcheckController} from './controller/healthcheck';
import {Core} from 'geolinker-common';
import {NextFunction, Response} from "express";
import {NextFunction, Request, Response} from "express";
import kafkaEventEmitter from './utils/kafka-event-container';
import fmt = require('bunyan-format');
import KafkaAvro = require('kafka-avro');
import timer from './utils/timer';
import {KafkaCache} from './middleware/cache';
import {InterfaceCore} from 'geolinker-common/dist/core';
const kafkaLog = KafkaAvro.getLogger();
kafkaLog.addStream({
......@@ -26,7 +27,7 @@ kafkaLog.addStream({
* Init the core
* @type {Core}
*/
export const core = Core.init('Api', './config.json');
export const core: InterfaceCore = Core.init('Api', './config.json');
// Create Express server
const app = express();
......@@ -43,7 +44,7 @@ app.use(KafkaCache.middleware);
* K8 needs to get this status. Otherwise loadbalancing is not possible
*/
app.get('/', HealthcheckController.healthy);
app.get('/v1/', HealthcheckController.healthy);
app.get('/v1-1/', HealthcheckController.healthy);
/**
* first connect to kafka
......@@ -108,33 +109,30 @@ kafka.init().then(async () => {
/**
* API to match a given node with all nodes with uri in Elastic search
*/
app.get('/v1/similarto/:node(*)', [...Validator.apiMatch(), ApiController.checkEncoding, kafkaMiddleware, ApiController.match, ApiController.queryKafka]);
app.get('/v1-1/similarto/:node(*)', [...Validator.apiMatch(), ApiController.checkEncoding, kafkaMiddleware, ApiController.match, ApiController.queryKafka]);
/**
* API to fetch all edges and nodes interconnected with a given node from neo4j
*/
app.get('/v1/sameas/:node(*)', [...Validator.apiConcordance(), ApiController.checkEncoding, kafkaMiddleware, ApiController.concordance, ApiController.queryKafka]);
app.get('/v1-1/sameas/:node(*)', [...Validator.apiConcordance(), ApiController.checkEncoding, kafkaMiddleware, ApiController.concordance, ApiController.queryKafka]);
/**
* API to get metadata about a given node
*/
app.get('/v1/metadata/:node(*)', [...Validator.apiMetadata(), ApiController.checkEncoding, kafkaMiddleware, ApiController.metadata, ApiController.queryKafka]);
app.get('/v1-1/metadata/:node(*)', [...Validator.apiMetadata(), ApiController.checkEncoding, kafkaMiddleware, ApiController.metadata, ApiController.queryKafka]);
/**
* API to get all information about a given node
*/
app.get('/v1/all/:node(*)', [...Validator.apiAll(), ApiController.checkEncoding, kafkaMiddleware, ApiController.all, ApiController.queryKafka]);
app.get('/v1-1/all/:node(*)', [...Validator.apiAll(), ApiController.checkEncoding, kafkaMiddleware, ApiController.all, ApiController.queryKafka]);
/**
* General error handling
* This has to be done in this callback
*/
app.use((err, req, res, next) => {
if (err.isServer) {
console.error(err.output);
}
return res.status(err.output.statusCode).json(err.output.payload);
});
app.use((err: any, req: Request, res: Response, next: NextFunction) => {
console.error(err.stack);
res.status(500).send('Something broke!') });
core.getLogger().info("We are ready to interact with kafka")
}).catch((error) => {
......
......@@ -87,7 +87,7 @@ export class ApiController {
public static concordance(req: InterfaceRequestKafka, res: Response, next: NextFunction) {
const message: any = { };
message.resolverneo4j = {
depth: parseInt(req.validatedBody.depth) || 1
depth: parseInt(req.validatedBody.depth) || 1,
};
message.verb = 'same_as';
// add values for next middleware
......@@ -156,6 +156,7 @@ export class ApiController {
// prepared data from the middleware
const message = req.message;
const id = message.id;
const language = req.validatedBody.language || 'de';
timer.start(`kafka-${id}`);
......@@ -164,7 +165,7 @@ export class ApiController {
// flag to check if we already answered
let answered = false;
const responseData = new ResponseCollector(req.config);
const responseData = new ResponseCollector(req.config, language);
// if kafka is not answering give a fuck
const kafkaTimeout = setTimeout(() => {
// check if we already answered
......@@ -178,9 +179,19 @@ export class ApiController {
// todo: we should use rxjs here
let unsubscribeTimer: Timer = null;
function waitForResponse(data: InterfaceStreamAppResponse) {
responseData.add(data);
async function waitForResponse(data: InterfaceStreamAppResponse) {
// if we fail to resolve the url to a provider we will send a 500
try {
console.log("Try to add data");
await responseData.add(data);
console.log("data added");
} catch (error) {
return next(Boom.internal('Unable to resolve urls to provider'));
}
console.log("tmp data");
console.log(responseData.getData());
if (responseData.finished()) {
console.log("finished");
// set the cache each time we got new data and have at least a minimum set
KafkaCache.set(id, responseData.getData());
if(!answered) {
......
......@@ -2,7 +2,7 @@ import { RequestHandler} from 'express-serve-static-core';
import { NextFunction, Request, Response} from 'express';
import { check, validationResult } from 'express-validator/check';
import { matchedData } from 'express-validator/filter';
import { InterfaceError, InterfaceRequestApi } from '../controller/api';
import { InterfaceRequestApi } from '../controller/api';
import * as boom from 'boom';
/**
......@@ -20,6 +20,7 @@ export class Validator {
check('trust', 'The level of trust for a given provider (Not yet implemented)').optional().isArray(),
check('depth', 'Depth to traverse the graph. It defines how many hops the query take to fetch connected nodes').optional().isNumeric(),
check('timeout', 'A timeout in ms to wait for the streaming app to send data').optional().isNumeric(),
check('language', 'The language of the provider name').optional().isString(),
Validator.extract,
];
}
......@@ -33,6 +34,7 @@ export class Validator {
check('distance', 'The radius to search in for a given resource in meter f.e. 5000. Default: 10000').optional().isNumeric(),
check('fuzziness', 'The fuzziness to apply to the name of a resource. This will use levenshtein distance. Default: 2').optional().isNumeric(),
check('timeout', 'A timeout in ms to wait for the streaming app to send data').optional().isNumeric(),
check('language', 'The language of the provider name').optional().isString(),
Validator.extract,
];
}
......
import {InterfaceStreamAppResponse} from '../controller/api';
import UrlResolver from './url-resolver';
/**
......@@ -28,23 +29,47 @@ export class ResponseCollector {
*/
private config: string[];
/**
* Language oif the provider
*/
private language: string;
/**
* The config is a simple array with the names of the kafka streming apps
* @param config
* @param language
*/
constructor(config: string[]) {
constructor(config: string[], language = 'de') {
this.config = config;
this.language = language;
}
/**
* Add a data to the final result
* @param {InterfaceStreamAppResponse} data
*/
add(data: InterfaceStreamAppResponse): void {
async add(data: InterfaceStreamAppResponse): Promise<void> {
// should always be the same
this.finalData.id = data.id;
// find key and add data
if(this.config.indexOf(data.name) > -1){
// check if array
if(Array.isArray(data.data.links)) {
console.log('links');
data.data.links = await Promise.all(data.data.links.map(async (d) => {
const a = {};
try {
const provider = await UrlResolver.getProvider(d, this.language);
a[provider] = d;
return a;
} catch(error) {
console.log(`error getting provider` + error);
a['unknown'] = d;
return a;
}
}));
console.log(data.data.links);
}
if(Array.isArray(this.finalData.data[data.name])) {
this.finalData.data[data.name].push(data.data);
} else {
......@@ -86,3 +111,6 @@ export class ResponseCollector {
return this.finalData;
}
}
export class InterfaceResponseConfig {
}
\ No newline at end of file
import axios from 'axios';
import {core} from '../app';
/**
* Small class to resolve an url over an external sevice
*/
class UrlResolver {
static baseUrl = 'http://url-resolver/v1/urltoprovider/';
constructor() {
UrlResolver.baseUrl = core.getNconf().get('url-resolver');
}
public static async getProvider(url: string, language: string = 'de') {
try {
const provider = await axios.get(UrlResolver.baseUrl + url);
return provider.data.name[language];
} catch(err) {
if(err.response.status === 404) {
console.log(`Could not resolve (404) url ${url}`);
return 'unknown';
}
console.log(`Could not resolve (${err.response.status}) url ${url}`);
return 'unknown';
}
}
}
export default UrlResolver;
......@@ -22,9 +22,7 @@ describe('Test response collector', () => {
});
it('Should fail on one of two required answers', (done) => {
const config: InterfaceResponseConfig = {
data: ['resolverneo4j', 'resolverelasticseatch',],
};
const config = ['resolverneo4j', 'resolverelasticseatch',];
const collector = new ResponseCollector(config);
collector.add({name: 'neo4jresolver', data: {test}, id: 'swe34rfdedw3s'});
......@@ -33,9 +31,7 @@ describe('Test response collector', () => {
});
it('Should pass on two of two required answers', (done) => {
const config: InterfaceResponseConfig = {
data: ['resolverneo4j', 'resolverelasticsearch',],
};
const config = ['resolverneo4j', 'resolverelasticseatch',];
const collector = new ResponseCollector(config);
collector.add({name: 'resolverneo4j', data: {test}, id: 'swe34rfdedw3s'});
......@@ -45,9 +41,7 @@ describe('Test response collector', () => {
});
it('Should pass on tree of minimum two required answers', (done) => {
const config: InterfaceResponseConfig = {
data: ['resolverneo4j', 'resolverelasticsearch',],
};
const config = ['resolverneo4j', 'resolverelasticseatch',];
const collector = new ResponseCollector(config);
collector.add({name: 'resolverneo4j', data: {test}, id: 'swe34rfdedw3s'});
......@@ -57,10 +51,7 @@ describe('Test response collector', () => {
});
it('Get data from collector', (done) => {
const config: InterfaceResponseConfig = {
data: ['resolverneo4j', 'resolverelasticsearch',],
metadata: 'resolvermetadata',
};
const config = ['resolverneo4j', 'resolverelasticsearch', 'resolvermetadata'];
const mockAnswer = {
config: {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment