Source: graphdb.js

import SparqlClient from 'sparql-http-client'
import libGraphDB from 'graphdb'
import isStream from 'is-stream'
import { chunkBetween } from './utils.js'
import { Store } from './store.js'

const { GraphDBServerClient, ServerClientConfig } = libGraphDB.server
const { RDFMimeType, QueryContentType } = libGraphDB.http
const { RepositoryConfig, RepositoryType, RepositoryClientConfig } = libGraphDB.repository
const { GetQueryPayload, UpdateQueryPayload, QueryType } = libGraphDB.query

/**
 * Class to interact with a GraphDB store
 *
 * @extends Store
 */
export class GraphDB extends Store {
  /**
   * @param {Object} [options={}] Connection params
   * @param {string} [options.user] Username
   * @param {string} [options.password] Password
   * @param {string} [options.endpoint=http://localhost:7200] Endpoint
   */
  constructor (options = {}) {
    super()
    this.endpointParams = Object.assign({ endpoint: 'http://localhost:7200' }, options)

    const serverConfig = new ServerClientConfig(this.endpointParams.endpoint)
      .setTimeout(5000)
      .setHeaders({
        Accept: RDFMimeType.SPARQL_RESULTS_JSON
      })
      .setKeepAlive(true)

    if (this.endpointParams.user && this.endpointParams.password) {
      serverConfig.useBasicAuthentication(this.endpointParams.user, this.endpointParams.password)
    }

    this._conn = new GraphDBServerClient(serverConfig)
    this._repos = {}
  }

  /**
   * Creates a database
   *
   * @async
   * @param {string} dbname
   */
  async createDb (dbname) {
    const config = new RepositoryConfig(dbname, '', new Map(), '', dbname, RepositoryType.FREE)
    await this.conn.createRepository(config)
    return true
  }

  /**
   * Deletes a database
   *
   * @async
   * @param {string} dbname
   */
  async dropDb (dbname) {
    await this.conn.deleteRepository(dbname)
    return true
  }

  /**
   * Empties a database
   *
   * @async
   * @param {string} dbname
   */
  async clearDb (dbname, options = {}) {
    // return this.update(dbname, 'DELETE {?s ?p ?o} WHERE {?s ?p ?o}', options)

    const repo = await this._getRepo(dbname, options)
    const res = await repo.deleteAllStatements()
    return res || true
  }

  /**
   * Issues an ASK query
   *
   * @async
   * @param {string} dbname
   * @param {string} sparql query
   * @param {Object} [options]
   * @param {string} [options.responseType=application/sparql-results+json]
   */
  async ask (dbname, sparql, options = {}) {
    options = Object.assign({ responseType: RDFMimeType.SPARQL_RESULTS_JSON }, options)
    return this._readQuery(dbname, sparql, QueryType.ASK, options)
  }

  /**
   * Issues a CONSTRUCT query
   *
   * @async
   * @param {string} dbname
   * @param {string} sparql query
   * @param {Object} [options]
   * @param {string} [options.responseType=application/ld+json]
   */
  async construct (dbname, sparql, options = {}) {
    options = Object.assign({ responseType: RDFMimeType.JSON_LD }, options)
    return this._readQuery(dbname, sparql, QueryType.CONSTRUCT, options)
  }

  /**
   * Issues a DESCRIBE query
   *
   * @async
   * @param {string} dbname
   * @param {string} sparql query
   * @param {Object} [options]
   * @param {string} [options.responseType=application/ld+json]
   */
  async describe (dbname, sparql, options = {}) {
    options = Object.assign({ responseType: RDFMimeType.JSON_LD }, options)
    return this._readQuery(dbname, sparql, QueryType.DESCRIBE, options)
  }

  /**
   * Issues a SELECT query
   *
   * @async
   * @param {string} dbname
   * @param {string} sparql query
   * @param {Object} [options]
   * @param {string} [options.responseType=application/sparql-results+json]
   */
  async select (dbname, sparql, options = {}) {
    options = Object.assign({ responseType: RDFMimeType.SPARQL_RESULTS_JSON }, options)
    return this._readQuery(dbname, sparql, QueryType.SELECT, options)
  }

  /**
   * Issues a SPARQL UPDATE query
   *
   * @async
   * @param {string} dbname
   * @param {string} sparql query
   * @param {Object} [options]
   * @param {Object} [options.inference=false]
   * @param {string} [options.timeout=30_000]
   * @param {string} [options.responseType=application/sparql-results+json]
   */
  async update (dbname, sparql, options = {}) {
    options = Object.assign({ responseType: RDFMimeType.SPARQL_RESULTS_JSON }, options)
    const repo = await this._getRepo(dbname, options)

    const payload = new UpdateQueryPayload()
      .setQuery(sparql)
      .setContentType(QueryContentType.X_WWW_FORM_URLENCODED)
      .setInference(Boolean(options.inference))
      .setTimeout(options.timeout || 30_000)

    return repo.update(payload).then((res) => res || true)
  }

  /**
   * Loads triples or quads into a database
   *
   * @async
   * @param {string} dbname
   * @param {string|Buffer} ntriples triples or quads
   * @param {string=} graph named graph to insert to, defaults to default graph
   */
  async import (dbname, ntriples, graph) {
    const mapper = graph
      ? (chunk) => `INSERT DATA { GRAPH <${graph}> { ${chunk} } }`
      : (chunk) => `INSERT DATA { ${chunk} }`

    const queries = chunkBetween(ntriples, mapper, 1_000)
    const results = await Promise.all(
      queries.map((query) => this.update(dbname, query))
    )
    return results.every(Boolean)
  }

  /**
   * Creates a [SPARQL HTTP Client](https://zazuko.github.io/sparql-http-client/) for a database
   *
   * @param {string} dbname
   * @param {Object} options
   * @param {HeadersInit} [options.headers] HTTP headers to send with every endpoint request
   * @param {string} [options.user=username set when instantiating the store] user used for basic authentication
   * @param {string} [options.password=password set when instantiating the store] password used for basic authentication
   * @param {string} [options.endpointUrl=URL generated based on the store endpoint] SPARQL Query endpoint URL
   * @param {string} [options.updateUrl=URL generated based on the store endpoint] SPARQL Update endpoint URL
   * @param {string} [options.storeUrl] Graph Store URL
   * @param {fetch} [options.fetch=nodeify-fetch] fetch implementation
   * @param {factory} [options.factory=uses @rdfjs/data-model by default] RDF/JS DataFactory
   * @returns {SparqlHttpClient}
   */
  sparqlClientFor (dbname, options) {
    options = Object.assign({
      user: this.endpointParams.user,
      password: this.endpointParams.password,
      endpointUrl: `${this.endpointParams.endpoint}/repositories/${dbname}`,
      updateUrl: `${this.endpointParams.endpoint}/repositories/${dbname}/statements`,
      storeUrl: `${this.endpointParams.endpoint}/repositories/${dbname}/rdf-graphs/service`
    }, options)

    return new SparqlClient(options)
  }

  async _readQuery (dbname, sparql, type, options = {}) {
    const repo = await this._getRepo(dbname, options)
    const payload = new GetQueryPayload()
      .setQuery(sparql)
      .setQueryType(type)
      .setResponseType(options.responseType)

    return this._handleResults(await repo.query(payload))
  }

  async _getRepo (dbname, options) {
    options = Object.assign({ readTimeout: 30_000, writeTimeout: 30_000 }, options)

    const repositoryClientConfig = new RepositoryClientConfig(this.endpointParams.endpoint)
      .setEndpoints([`${this.endpointParams.endpoint}/repositories/${dbname}`])
      .setReadTimeout(options.readTimeout)
      .setWriteTimeout(options.writeTimeout)

    const repo = await this.conn.getRepository(dbname, repositoryClientConfig)

    ;['N3Parser', 'NQuadsParser', 'NTriplesParser', 'TriGParser', 'TurtleParser', 'JsonLDParser', 'RDFXmlParser', 'SparqlJsonResultParser', 'SparqlXmlResultParser'].forEach((parser) => {
      repo.registerParser(new libGraphDB.parser[parser]())
    })

    return repo
  }

  async _handleResults (result) {
    if (!isStream(result)) {
      return result
    }

    const results = []

    result.on('data', (bindings) => {
      results.push(bindings)
    })

    return new Promise((resolve) => {
      result.on('end', () => {
        resolve(results)
      })
    })
  }
}