neo4j-stream-writer.ts 1.88 KB
Newer Older
Tobias Steiner's avatar
Tobias Steiner committed
1
2
3
4
5
6
7
8
9
10
11
12
/*
 * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
 *
 * Copyright (c) 2016 Blizzard Entertainment
 *
 * This software may be modified and distributed under the terms
 * of the MIT license.  See the LICENSE.txt file for details.
 */

import {Writable} from 'stream';
import Session from 'neo4j-driver/types/v1/session';
import Driver from 'neo4j-driver/types/v1/driver';
Tobias Steiner's avatar
Tobias Steiner committed
13
import {InterfaceCore} from 'geolinker-common/dist/core';
Tobias Steiner's avatar
Tobias Steiner committed
14
15
16
17

class Neo4jStreamWriter extends Writable {
  private neo4j;
  private timeout = 500;
Tobias Steiner's avatar
Tobias Steiner committed
18
19
  private core;
  constructor(options = {}, neo4j: Driver, core: InterfaceCore) {
Tobias Steiner's avatar
Tobias Steiner committed
20
21
    super(options);
    this.neo4j = neo4j;
Tobias Steiner's avatar
Tobias Steiner committed
22
    this.core = core;
Tobias Steiner's avatar
Tobias Steiner committed
23
24
25
  }

  public async _write(chunk, encoding, callback) {
Tobias Steiner's avatar
Tobias Steiner committed
26
    const query = this.buildQuery(chunk.parsed);
Tobias Steiner's avatar
Tobias Steiner committed
27
28
29
30
    const neo4jSession: Session = this.neo4j.session();
    try {
      await neo4jSession.run(query);
      neo4jSession.close();
Tobias Steiner's avatar
Tobias Steiner committed
31
      this.core.getReporter().setDataOut(1);
Tobias Steiner's avatar
Tobias Steiner committed
32
33
34
35
36
37
38
      setImmediate(callback);
    } catch (e) {
      neo4jSession.close();
      // Just delay this thing a bit and pass the params
      // backpressure will get exerted this way.
      // todo: check the error first
      // neo4j.error.SERVICE_UNAVAILABLE
Tobias Steiner's avatar
Tobias Steiner committed
39
40
      this.core.getLogger().error(e);
      this.core.getLogger().error('from' + chunk.parsed.from);
Tobias Steiner's avatar
Tobias Steiner committed
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
      setTimeout(() => {
        this._write(chunk, encoding, callback);
      }, this.timeout);
    }
  }

  /**
   * Build the query
   * todo: we should move this into a transformer
   * @param connection
   */
  private buildQuery(connection) {
      let query = `MERGE (o:Place {uri:'${connection.from}'}) `;
      connection.to.forEach((current, i) => {
        query += ` MERGE (t${i}:Place {uri:'${current}'}) `;
        query += ` MERGE (o)-[:${connection.relation.type} {author:'${connection.relation.author}'}]->(t${i}) `;
      });
      return query;
    }
  }

export default Neo4jStreamWriter;