neo4j-stream-writer.ts 2.85 KB
Newer Older
Tobias Steiner's avatar
Tobias Steiner committed
1
2
3
4
5
6
7
8
9
10
11
12
/*
 * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
 *
 * Copyright (c) 2016 Blizzard Entertainment
 *
 * This software may be modified and distributed under the terms
 * of the MIT license.  See the LICENSE.txt file for details.
 */

import {Writable} from 'stream';
import Session from 'neo4j-driver/types/v1/session';
import Driver from 'neo4j-driver/types/v1/driver';
Tobias Steiner's avatar
Tobias Steiner committed
13
import {InterfaceCore} from 'geolinker-common/dist/core';
Tobias Steiner's avatar
Tobias Steiner committed
14
15
16
17

class Neo4jStreamWriter extends Writable {
  private neo4j;
  private timeout = 500;
Tobias Steiner's avatar
Tobias Steiner committed
18
  private core;
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

  /**
   * Escape special characters
   * @param {string} param
   * @returns {string}
   */
  private static escape(param: string) {
    // from https://github.com/packagestats/sql-escape/blob/master/index.js
    return param.replace(/[\0\x08\x09\x1a\n\r"'\\\%]/g, (char) => {
      switch (char) {
        case '\0':
          return '\\0';
        case '\x08':
          return '\\b';
        case '\x09':
          return '\\t';
        case '\x1a':
          return '\\z';
        case '\n':
          return '\\n';
        case '\r':
          return '\\r';
        case '\"':
        case '\'':
        case '\\':
        case '%':
          // prepends a backslash to backslash, percent, and double/single quotes
          return '\\' + char;
      }
    });
  }

Tobias Steiner's avatar
Tobias Steiner committed
51
  constructor(options = {}, neo4j: Driver, core: InterfaceCore) {
Tobias Steiner's avatar
Tobias Steiner committed
52
53
    super(options);
    this.neo4j = neo4j;
Tobias Steiner's avatar
Tobias Steiner committed
54
    this.core = core;
Tobias Steiner's avatar
Tobias Steiner committed
55
56
57
  }

  public async _write(chunk, encoding, callback) {
Tobias Steiner's avatar
Tobias Steiner committed
58
    const query = this.buildQuery(chunk.parsed);
Tobias Steiner's avatar
Tobias Steiner committed
59
60
61
62
    const neo4jSession: Session = this.neo4j.session();
    try {
      await neo4jSession.run(query);
      neo4jSession.close();
Tobias Steiner's avatar
Tobias Steiner committed
63
      this.core.getReporter().setDataOut(1);
Tobias Steiner's avatar
Tobias Steiner committed
64
65
66
67
68
69
70
      setImmediate(callback);
    } catch (e) {
      neo4jSession.close();
      // Just delay this thing a bit and pass the params
      // backpressure will get exerted this way.
      // todo: check the error first
      // neo4j.error.SERVICE_UNAVAILABLE
Tobias Steiner's avatar
Tobias Steiner committed
71
72
      this.core.getLogger().error(e);
      this.core.getLogger().error('from' + chunk.parsed.from);
Tobias Steiner's avatar
Tobias Steiner committed
73
74
75
76
77
78
79
80
81
      setTimeout(() => {
        this._write(chunk, encoding, callback);
      }, this.timeout);
    }
  }

  /**
   * Build the query
   * todo: we should move this into a transformer
82
   * todo: we should use parameters f.e. https://stackoverflow.com/questions/42397773/neo4j-what-is-the-syntax-to-set-cypher-query-parameters-in-the-browser-interfac
Tobias Steiner's avatar
Tobias Steiner committed
83
84
85
   * @param connection
   */
  private buildQuery(connection) {
86
      let query = `MERGE (o:Place {uri:'${Neo4jStreamWriter.escape(connection.from)}'}) `;
Tobias Steiner's avatar
Tobias Steiner committed
87
      connection.to.forEach((current, i) => {
88
        query += ` MERGE (t${i}:Place {uri:'${Neo4jStreamWriter.escape(current)}'}) `;
Tobias Steiner's avatar
Tobias Steiner committed
89
90
91
92
        query += ` MERGE (o)-[:${connection.relation.type} {author:'${connection.relation.author}'}]->(t${i}) `;
      });
      return query;
    }
93
}
Tobias Steiner's avatar
Tobias Steiner committed
94
export default Neo4jStreamWriter;