/* * node-rdkafka - Node.js wrapper for RdKafka C/C++ library * * Copyright (c) 2016 Blizzard Entertainment * * This software may be modified and distributed under the terms * of the MIT license. See the LICENSE.txt file for details. */ import {Writable} from 'stream'; import Session from 'neo4j-driver/types/v1/session'; import Driver from 'neo4j-driver/types/v1/driver'; import {InterfaceCore} from 'geolinker-common/dist/core'; class Neo4jStreamWriter extends Writable { private neo4j; private timeout = 500; private core; /** * Escape special characters * @param {string} param * @returns {string} */ private static escape(param: string) { // from https://github.com/packagestats/sql-escape/blob/master/index.js return param.replace(/[\0\x08\x09\x1a\n\r"'\\\%]/g, (char) => { switch (char) { case '\0': return '\\0'; case '\x08': return '\\b'; case '\x09': return '\\t'; case '\x1a': return '\\z'; case '\n': return '\\n'; case '\r': return '\\r'; case '\"': case '\'': case '\\': case '%': // prepends a backslash to backslash, percent, and double/single quotes return '\\' + char; } }); } constructor(options = {}, neo4j: Driver, core: InterfaceCore) { super(options); this.neo4j = neo4j; this.core = core; } public async _write(chunk, encoding, callback) { const query = this.buildQuery(chunk.parsed); const neo4jSession: Session = this.neo4j.session(); try { await neo4jSession.run(query); neo4jSession.close(); this.core.getReporter().setDataOut(1); setImmediate(callback); } catch (e) { neo4jSession.close(); // Just delay this thing a bit and pass the params // backpressure will get exerted this way. // todo: check the error first // neo4j.error.SERVICE_UNAVAILABLE this.core.getLogger().error(e); this.core.getLogger().error('from' + chunk.parsed.from); setTimeout(() => { this._write(chunk, encoding, callback); }, this.timeout); } } /** * Build the query * todo: we should move this into a transformer * todo: we should use parameters f.e. https://stackoverflow.com/questions/42397773/neo4j-what-is-the-syntax-to-set-cypher-query-parameters-in-the-browser-interfac * @param connection */ private buildQuery(connection) { let query = `MERGE (o:Place {uri:'${Neo4jStreamWriter.escape(connection.from)}'}) `; connection.to.forEach((current, i) => { query += ` MERGE (t${i}:Place {uri:'${Neo4jStreamWriter.escape(current)}'}) `; query += ` MERGE (o)-[:${connection.relation.type} {author:'${connection.relation.author}'}]->(t${i}) `; }); return query; } } export default Neo4jStreamWriter;