neo4j-stream-writer.ts 1.62 KB
Newer Older
Tobias Steiner's avatar
Tobias Steiner committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/*
 * node-rdkafka - Node.js wrapper for RdKafka C/C++ library
 *
 * Copyright (c) 2016 Blizzard Entertainment
 *
 * This software may be modified and distributed under the terms
 * of the MIT license.  See the LICENSE.txt file for details.
 */

import {Writable} from 'stream';
import Session from 'neo4j-driver/types/v1/session';
import Driver from 'neo4j-driver/types/v1/driver';

class Neo4jStreamWriter extends Writable {
  private neo4j;
  private timeout = 500;
  constructor(options = {}, neo4j: Driver) {
    super(options);
    this.neo4j = neo4j;
  }

  public async _write(chunk, encoding, callback) {

    const query = this.buildQuery(chunk);
    const neo4jSession: Session = this.neo4j.session();
    try {
      await neo4jSession.run(query);
      neo4jSession.close();
      setImmediate(callback);
    } catch (e) {
      neo4jSession.close();
      // Just delay this thing a bit and pass the params
      // backpressure will get exerted this way.
      // todo: check the error first
      // neo4j.error.SERVICE_UNAVAILABLE
      setTimeout(() => {
        this._write(chunk, encoding, callback);
      }, this.timeout);
    }
  }

  /**
   * Build the query
   * todo: we should move this into a transformer
   * @param connection
   */
  private buildQuery(connection) {
      let query = `MERGE (o:Place {uri:'${connection.from}'}) `;
      connection.to.forEach((current, i) => {
        query += ` MERGE (t${i}:Place {uri:'${current}'}) `;
        query += ` MERGE (o)-[:${connection.relation.type} {author:'${connection.relation.author}'}]->(t${i}) `;
      });
      return query;
    }
  }

export default Neo4jStreamWriter;