Commit ca8a17e7 authored by Tobinsk's avatar Tobinsk
Browse files

Readme

parent 787d5acc
# linker
The linker is a [kafka sink](https://docs.confluent.io/current/connect/kafka-connect-elasticsearch/index.html) for the [neo4j](https://neo4j.com/) graph database. It consumes messages from the `linker` topic and writes them to the database.
## Neo4jStreamWriter
The Neo4jStreamWriter consumes a stream of kafka messages and writes the content of the message to the database. It is very simple and can handle backpressure.
## Docker
To build the image use the following command. The container is based on the [histub/ node-kafka-docker-base](https://source.dodis.ch/histhub/node-kafka-docker-base).
```bash
docker build -t source.dodis.ch:4577/histhub/linker .
# Upload to the registry
docker push source.dodis.ch:4577/histhub/linker
```
## CD
We have a build pipeline in gitlab. So manual building of the image is no longer necessary.
## Deploy to k8
This streaming app is part of the [deploy geolinker helm chart](https://source.dodis.ch/histhub/deploy-geolinker/wikis/deployment-manual#linker).
import {v1 as neo4j} from 'neo4j-driver';
import * as jsesc from 'jsesc';
export class Linker {
private nconf;
private logger;
private driver;
private uriBuilder;
constructor(nconf, logger, uriBuilder) {
this.nconf = nconf;
this.logger = logger;
this.uriBuilder = uriBuilder;
this.driver = neo4j.driver(
nconf.get('neo4j:config:uri'),
neo4j.auth.basic(
nconf.get('neo4j:config:user'),
nconf.get('neo4j:config:password'),
),
);
}
/**
* Create a new node in the db if they do not exists
* @param node
* @returns {Result}
*/
public newNode(node) {
// todo: from topic!
const provider = 'geonames';
const uri = this.uriBuilder.geoname(node.id);
return this.driver.session().run(`MERGE (n:Place { uri: '${uri}'})
ON CREATE SET n.id: '${node.id}', n.provider: ${provider}
ON MATCH SET n.id: '${node.id}', n.provider: ${provider}`);
}
/**
* Write connection int the db. Create nodes if they do not exists
* @param connection
*/
public connect(connection) {
return new Promise( (resolve, reject) => {
let query = `MERGE (o:Place {uri:'${connection.from}'}) `;
connection.to.forEach((current, i) => {
query += ` MERGE (t${i}:Place {uri:'${current}'}) `;
query += ` MERGE (o)-[:${connection.relation.type} {author:'${connection.relation.author}'}]->(t${i}) `;
});
// get a new session
this.logger.info('Try to get a session');
this.logger.info(query);
const session = this.driver.session();
this.logger.info('Try to build network');
session.run(query).then(() => {
session.close();
resolve();
}).catch((err) => {
setTimeout(() => {
session.close();
reject(`Error while saving a connection to neo4j ${JSON.stringify(err)}`);
}, 1000);
});
});
}
/**
* There is no build in function to escape
* https://github.com/neo4j/neo4j-javascript-driver/issues/398
* @param string
*/
public escape(data: string) {
return jsesc(data);
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment