neo4j-stream-writer.ts 1.8 KB
Newer Older
Tobias Steiner's avatar
Tobias Steiner committed
1
2
3
4
5
6
7
8
9
10
11
12
/*
 * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
 *
 * Copyright (c) 2016 Blizzard Entertainment
 *
 * This software may be modified and distributed under the terms
 * of the MIT license.  See the LICENSE.txt file for details.
 */

import {Writable} from 'stream';
import Session from 'neo4j-driver/types/v1/session';
import Driver from 'neo4j-driver/types/v1/driver';
Tobias Steiner's avatar
Tobias Steiner committed
13
import {InterfaceCore} from 'geolinker-common/dist/core';
Tobias Steiner's avatar
Tobias Steiner committed
14
15
16
17

class Neo4jStreamWriter extends Writable {
  private neo4j;
  private timeout = 500;
Tobias Steiner's avatar
Tobias Steiner committed
18
19
  private core;
  constructor(options = {}, neo4j: Driver, core: InterfaceCore) {
Tobias Steiner's avatar
Tobias Steiner committed
20
21
    super(options);
    this.neo4j = neo4j;
Tobias Steiner's avatar
Tobias Steiner committed
22
    this.core = core;
Tobias Steiner's avatar
Tobias Steiner committed
23
24
25
  }

  public async _write(chunk, encoding, callback) {
Tobias Steiner's avatar
Tobias Steiner committed
26
    console.log(chunk);
Tobias Steiner's avatar
Tobias Steiner committed
27
28
29
30
31
    const query = this.buildQuery(chunk);
    const neo4jSession: Session = this.neo4j.session();
    try {
      await neo4jSession.run(query);
      neo4jSession.close();
Tobias Steiner's avatar
Tobias Steiner committed
32
      this.core.getReporter().setDataOut(1);
Tobias Steiner's avatar
Tobias Steiner committed
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
      setImmediate(callback);
    } catch (e) {
      neo4jSession.close();
      // Just delay this thing a bit and pass the params
      // backpressure will get exerted this way.
      // todo: check the error first
      // neo4j.error.SERVICE_UNAVAILABLE
      setTimeout(() => {
        this._write(chunk, encoding, callback);
      }, this.timeout);
    }
  }

  /**
   * Build the query
   * todo: we should move this into a transformer
   * @param connection
   */
  private buildQuery(connection) {
      let query = `MERGE (o:Place {uri:'${connection.from}'}) `;
      connection.to.forEach((current, i) => {
        query += ` MERGE (t${i}:Place {uri:'${current}'}) `;
        query += ` MERGE (o)-[:${connection.relation.type} {author:'${connection.relation.author}'}]->(t${i}) `;
      });
      return query;
    }
  }

export default Neo4jStreamWriter;