1
0
mirror of https://github.com/cookiengineer/audacity synced 2025-05-02 08:39:46 +02:00
audacity/lib-src/redland/librdf/rdf_storage_postgresql.c
2010-01-24 09:19:39 +00:00

2380 lines
82 KiB
C

/* -*- Mode: c; c-basic-offset: 2 -*-
*
* rdf_storage_postgresql.c - RDF Storage in PostgreSQL DB interface definition.
*
* Based in part on rdf_storage_mysql.
*
* Copyright (C) 2003-2005 Shi Wenzhong - email to shiwenzhong@hz.cn
* Copyright (C) 2000-2008, David Beckett http://www.dajobe.org/
* Copyright (C) 2000-2005, University of Bristol, UK http://www.bristol.ac.uk/
*
* This package is Free Software and part of Redland http://librdf.org/
*
* It is licensed under the following three licenses as alternatives:
* 1. GNU Lesser General Public License (LGPL) V2.1 or any newer version
* 2. GNU General Public License (GPL) V2 or any newer version
* 3. Apache License, V2.0 or any newer version
*
* You may not use this file except in compliance with at least one of
* the above three licenses.
*
* See LICENSE.html or LICENSE.txt at the top of this package for the
* complete terms and further detail along with the license texts for
* the licenses in COPYING.LIB, COPYING and LICENSE-2.0.txt respectively.
*
*
*/
#ifdef HAVE_CONFIG_H
#include <rdf_config.h>
#endif
#ifdef WIN32
#include <win32_rdf_config.h>
#endif
#include <stdio.h>
#include <string.h>
#ifdef HAVE_STDLIB_H
#include <stdlib.h> /* for abort() as used in errors */
#endif
#include <sys/types.h>
#include <redland.h>
#include <rdf_types.h>
#include <libpq-fe.h>
typedef enum {
/* Status of individual postgresql connections */
LIBRDF_STORAGE_POSTGRESQL_CONNECTION_CLOSED = 0,
LIBRDF_STORAGE_POSTGRESQL_CONNECTION_OPEN = 1,
LIBRDF_STORAGE_POSTGRESQL_CONNECTION_BUSY = 2
} librdf_storage_postgresql_connection_status;
typedef struct {
/* A postgresql connection */
librdf_storage_postgresql_connection_status status;
PGconn *handle;
} librdf_storage_postgresql_connection;
typedef struct {
/* postgresql connection parameters */
const char *host;
char *port;
char *dbname;
char *user;
const char *password;
/* Array of virtual postgresql connections */
librdf_storage_postgresql_connection *connections;
int connections_count;
/* hash of model name in the database (table Models, column ID) */
u64 model;
/* if inserts should be optimized by locking and index optimizations */
int bulk;
/* if a table with merged models should be maintained */
int merge;
/* digest object for node hashes */
librdf_digest *digest;
PGconn* transaction_handle;
} librdf_storage_postgresql_context;
/* prototypes for local functions */
static int librdf_storage_postgresql_init(librdf_storage* storage, const char *name,
librdf_hash* options);
static int librdf_storage_postgresql_merge(librdf_storage* storage);
static void librdf_storage_postgresql_terminate(librdf_storage* storage);
static int librdf_storage_postgresql_open(librdf_storage* storage,
librdf_model* model);
static int librdf_storage_postgresql_close(librdf_storage* storage);
static int librdf_storage_postgresql_sync(librdf_storage* storage);
static int librdf_storage_postgresql_size(librdf_storage* storage);
static int librdf_storage_postgresql_add_statement(librdf_storage* storage,
librdf_statement* statement);
static int librdf_storage_postgresql_add_statements(librdf_storage* storage,
librdf_stream* statement_stream);
static int librdf_storage_postgresql_remove_statement(librdf_storage* storage,
librdf_statement* statement);
static int librdf_storage_postgresql_contains_statement(librdf_storage* storage,
librdf_statement* statement);
librdf_stream* librdf_storage_postgresql_serialise(librdf_storage* storage);
static librdf_stream* librdf_storage_postgresql_find_statements(librdf_storage* storage,
librdf_statement* statement);
static librdf_stream* librdf_storage_postgresql_find_statements_with_options(librdf_storage* storage,
librdf_statement* statement,
librdf_node* context_node,
librdf_hash* options);
/* context functions */
static int librdf_storage_postgresql_context_add_statement(librdf_storage* storage,
librdf_node* context_node,
librdf_statement* statement);
static int librdf_storage_postgresql_context_add_statements(librdf_storage* storage,
librdf_node* context_node,
librdf_stream* statement_stream);
static int librdf_storage_postgresql_context_remove_statement(librdf_storage* storage,
librdf_node* context_node,
librdf_statement* statement);
static int librdf_storage_postgresql_context_remove_statements(librdf_storage* storage,
librdf_node* context_node);
static librdf_stream*
librdf_storage_postgresql_context_serialise(librdf_storage* storage,
librdf_node* context_node);
static librdf_stream* librdf_storage_postgresql_find_statements_in_context(librdf_storage* storage,
librdf_statement* statement,
librdf_node* context_node);
static librdf_iterator* librdf_storage_postgresql_get_contexts(librdf_storage* storage);
static void librdf_storage_postgresql_register_factory(librdf_storage_factory *factory);
/* "private" helper definitions */
typedef struct {
librdf_storage *storage;
librdf_statement *current_statement;
librdf_node *current_context;
librdf_statement *query_statement;
librdf_node *query_context;
PGconn *handle;
PGresult *results;
int current_rowno;
char **row;
int is_literal_match;
} librdf_storage_postgresql_sos_context;
typedef struct {
librdf_storage *storage;
librdf_node *current_context;
PGconn *handle;
PGresult *results;
int current_rowno;
char **row;
} librdf_storage_postgresql_get_contexts_context;
static u64 librdf_storage_postgresql_hash(librdf_storage* storage,
const char *type,
const char *string, int length);
static u64 librdf_storage_postgresql_node_hash(librdf_storage* storage,
librdf_node* node,int add);
static int librdf_storage_postgresql_start_bulk(librdf_storage* storage);
static int librdf_storage_postgresql_stop_bulk(librdf_storage* storage);
static int librdf_storage_postgresql_context_add_statement_helper(librdf_storage* storage,
u64 ctxt,
librdf_statement* statement);
static int librdf_storage_postgresql_find_statements_in_context_augment_query(char **query, const char *addition);
/* methods for stream of statements */
static int librdf_storage_postgresql_find_statements_in_context_end_of_stream(void* context);
static int librdf_storage_postgresql_find_statements_in_context_next_statement(void* context);
static void* librdf_storage_postgresql_find_statements_in_context_get_statement(void* context, int flags);
static void librdf_storage_postgresql_find_statements_in_context_finished(void* context);
/* methods for iterator for contexts */
static int librdf_storage_postgresql_get_contexts_end_of_iterator(void* context);
static int librdf_storage_postgresql_get_contexts_next_context(void* context);
static void* librdf_storage_postgresql_get_contexts_get_context(void* context, int flags);
static void librdf_storage_postgresql_get_contexts_finished(void* context);
static int librdf_storage_postgresql_transaction_rollback(librdf_storage* storage);
/* functions implementing storage api */
/*
* librdf_storage_postgresql_hash - Find hash value of string.
* @storage: the storage
* @type: character type of node to hash ("R", "L" or "B")
* @string: a string to get hash for
* @length: length of string
*
* Find hash value of string.
*
* Return value: Non-zero on succes.
**/
static u64
librdf_storage_postgresql_hash(librdf_storage* storage, const char *type,
const char *string, int length)
{
librdf_storage_postgresql_context* context=(librdf_storage_postgresql_context*)storage->context;
u64 hash;
byte* digest;
int i;
/* (Re)initialize digest object */
librdf_digest_init(context->digest);
/* Update digest with data */
if(type)
librdf_digest_update(context->digest, (unsigned char*)type, 1);
librdf_digest_update(context->digest, (unsigned char*)string, length);
librdf_digest_final(context->digest);
/* Copy first 8 bytes of digest into unsigned 64bit hash
* using a method portable across big/little endianness
*
* Fixes Issue#0000023 - http://bugs.librdf.org/mantis/view.php?id=23
*/
digest = (byte*) librdf_digest_get_digest(context->digest);
hash = 0;
for(i=0; i<8; i++)
hash += ((u64) digest[i]) << (i*8);
return hash;
}
/*
* librdf_storage_postgresql_init_connections - Initialize postgresql connection pool.
* @storage: the storage
*
* Return value: Non-zero on success.
**/
static int
librdf_storage_postgresql_init_connections(librdf_storage* storage)
{
librdf_storage_postgresql_context* context=(librdf_storage_postgresql_context*)storage->context;
/* Reset connection pool */
context->connections=NULL;
context->connections_count=0;
return 0;
}
/*
* librdf_storage_postgresql_finish_connections - Finish all connections in postgresql connection pool and free structures.
* @storage: the storage
*
* Return value: None.
**/
static void
librdf_storage_postgresql_finish_connections(librdf_storage* storage)
{
librdf_storage_postgresql_context* context=(librdf_storage_postgresql_context*)storage->context;
int i;
/* Loop through connections and close */
for(i=0; i < context->connections_count; i++) {
if(LIBRDF_STORAGE_POSTGRESQL_CONNECTION_CLOSED != context->connections[i].status)
PQfinish(context->connections[i].handle);
}
/* Free structure and reset */
if (context->connections_count) {
LIBRDF_FREE(librdf_storage_postgresql_connection*, context->connections);
context->connections=NULL;
context->connections_count=0;
}
}
/*
* librdf_storage_postgresql_get_handle - get a connection handle to the postgresql server
* @storage: the storage
*
* This attempts to reuses any existing available pooled connection
* otherwise creates a new connection to the server.
*
* Return value: Non-zero on succes.
**/
static PGconn*
librdf_storage_postgresql_get_handle(librdf_storage* storage)
{
librdf_storage_postgresql_context* context=(librdf_storage_postgresql_context*)storage->context;
librdf_storage_postgresql_connection* connection= NULL;
int i;
char conninfo[256];
if(context->transaction_handle)
return context->transaction_handle;
/* Look for an open connection handle to return */
for(i=0; i < context->connections_count; i++) {
if(LIBRDF_STORAGE_POSTGRESQL_CONNECTION_OPEN == context->connections[i].status) {
context->connections[i].status=LIBRDF_STORAGE_POSTGRESQL_CONNECTION_BUSY;
return context->connections[i].handle;
}
}
/* Look for a closed connection */
for(i=0; i < context->connections_count && !connection; i++) {
if(LIBRDF_STORAGE_POSTGRESQL_CONNECTION_CLOSED == context->connections[i].status) {
connection=&context->connections[i];
break;
}
}
/* Expand connection pool if no closed connection was found */
if (!connection) {
/* Allocate new buffer with two extra slots */
librdf_storage_postgresql_connection* connections;
if(!(connections=(librdf_storage_postgresql_connection*)
LIBRDF_CALLOC(librdf_storage_postgresql_connection,
context->connections_count+2,
sizeof(librdf_storage_postgresql_connection))))
return NULL;
if (context->connections_count) {
/* Copy old buffer to new */
memcpy(connections, context->connections, sizeof(librdf_storage_postgresql_connection)*context->connections_count);
/* Free old buffer */
LIBRDF_FREE(librdf_storage_postgresql_connection*, context->connections);
}
/* Update buffer size and reset new connections */
context->connections_count+=2;
connection=&connections[context->connections_count-2];
connection->status=LIBRDF_STORAGE_POSTGRESQL_CONNECTION_CLOSED;
connection->handle=NULL;
connections[context->connections_count-1].status=LIBRDF_STORAGE_POSTGRESQL_CONNECTION_CLOSED;
connections[context->connections_count-1].handle=NULL;
context->connections=connections;
}
/* Initialize closed postgresql connection handle */
/* Create connection to database for handle */
sprintf(conninfo,"host=%s port=%s dbname=%s user=%s password=%s",
context->host,context->port,context->dbname,context->user,context->password);
connection->handle=PQconnectdb(conninfo);
if( PQstatus(connection->handle) != CONNECTION_OK ) {
fprintf(stdout,"Connection to postgresql database %s:%s name %s as user %s failed: %s",
context->host, context->port, context->dbname,
context->user, PQerrorMessage(connection->handle));
librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL,
"Connection to postgresql database %s:%s name %s as user %s failed: %s",
context->host, context->port, context->dbname,
context->user, PQerrorMessage(connection->handle));
return NULL;
}
/* Update status and return */
connection->status=LIBRDF_STORAGE_POSTGRESQL_CONNECTION_BUSY;
return connection->handle;
}
/*
* librdf_storage_postgresql_release_handle - Release a connection handle to postgresql server back to the pool
* @storage: the storage
* @handle: the postgresql handle to release
*
* Return value: None.
**/
static void
librdf_storage_postgresql_release_handle(librdf_storage* storage, PGconn *handle)
{
librdf_storage_postgresql_context* context=(librdf_storage_postgresql_context*)storage->context;
int i;
/* Look for busy connection handle to drop */
for(i=0; i < context->connections_count; i++) {
if(LIBRDF_STORAGE_POSTGRESQL_CONNECTION_BUSY == context->connections[i].status &&
context->connections[i].handle == handle) {
context->connections[i].status=LIBRDF_STORAGE_POSTGRESQL_CONNECTION_OPEN;
return;
}
}
librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL,
"Unable to find busy connection (in pool of %i connections) to drop for postgresql server thread: %d",
context->connections_count, PQbackendPID(handle));
}
/**
* librdf_storage_postgresql_init:
* @storage: the storage
* @name: model name
* @options: host, port, database, user, password [, new] [, bulk] [, merge].
*
* Create connection to database. Defaults to port 5432 if not given.
*
* The boolean bulk option can be set to true if optimized inserts (table
* locks and temporary key disabling) is wanted. Note that this will block
* all other access, and requires table locking and alter table privileges.
*
* The boolean merge option can be set to true if a merged "view" of all
* models should be maintained. This "view" will be a table with TYPE=MERGE.
*
* Return value: Non-zero on failure.
**/
static int
librdf_storage_postgresql_init(librdf_storage* storage, const char *name,
librdf_hash* options)
{
librdf_storage_postgresql_context *context=(librdf_storage_postgresql_context*)storage->context;
const char create_table_statements[]="\
CREATE TABLE Statements" UINT64_T_FMT " (\
Subject numeric(20) NOT NULL,\
Predicate numeric(20) NOT NULL,\
Object numeric(20) NOT NULL,\
Context numeric(20) NOT NULL\
) ";
const char create_table_literals[]="\
CREATE TABLE Literals (\
ID numeric(20) NOT NULL,\
Value text NOT NULL,\
Language text NOT NULL,\
Datatype text NOT NULL,\
PRIMARY KEY (ID)\
) ";
const char create_table_resources[]="\
CREATE TABLE Resources (\
ID numeric(20) NOT NULL,\
URI text NOT NULL,\
PRIMARY KEY (ID)\
) ";
const char create_table_bnodes[]="\
CREATE TABLE Bnodes (\
ID numeric(20) NOT NULL,\
Name text NOT NULL,\
PRIMARY KEY (ID)\
) ";
const char create_table_models[]="\
CREATE TABLE Models (\
ID numeric(20) NOT NULL,\
Name text NOT NULL,\
PRIMARY KEY (ID)\
) ";
const char create_model[]="INSERT INTO Models (ID,Name) VALUES (" UINT64_T_FMT ",'%s')";
const char check_model[]="SELECT 1 FROM Models WHERE ID=" UINT64_T_FMT " AND Name='%s'";
int status=0;
char *escaped_name=NULL;
char *query=NULL;
PGresult *res;
PGconn *handle;
/* Must have connection parameters passed as options */
if(!options)
return 1;
/* Create digest */
if(!(context->digest=librdf_new_digest(storage->world,"MD5")))
return 1;
/* Save hash of model name */
context->model=librdf_storage_postgresql_hash(storage, NULL, name, strlen(name));
/* Save connection parameters */
context->host=librdf_hash_get(options, "host");
context->port=librdf_hash_get(options, "port");
if(context->port==NULL )
{
context->port=(char*)LIBRDF_MALLOC(cstring, 10);
strcpy(context->port,"5432"); /* default postgresql port */
}
context->dbname=librdf_hash_get(options, "database");
if(!context->dbname)
context->dbname=librdf_hash_get(options, "dbname");
context->user=librdf_hash_get(options, "user");
if(context->user && !context->dbname) {
context->dbname=(char*)LIBRDF_MALLOC(cstring, strlen(context->user)+1);
strcpy(context->dbname, context->user); /* default dbname=user */
}
context->password=librdf_hash_get(options, "password");
if(!context->host || !context->dbname || !context->user || !context->port
|| !context->password)
return 1;
/* Maintain merge table? */
context->merge=(librdf_hash_get_as_boolean(options, "merge")>0);
/* Initialize postgresql connections */
librdf_storage_postgresql_init_connections(storage);
/* Get postgresql connection handle */
handle=librdf_storage_postgresql_get_handle(storage);
if(!handle)
return 1;
/* Create tables, if new and not existing */
if(!status && (librdf_hash_get_as_boolean(options, "new")>0))
{
query=(char*)LIBRDF_MALLOC(cstring, strlen(create_table_statements)+20);
if(! query)
status=1;
else
{
sprintf(query, create_table_statements, context->model);
if(! PQexec(handle, query) ||
! PQexec(handle, create_table_literals) ||
! PQexec(handle, create_table_resources) ||
! PQexec(handle, create_table_bnodes) ||
! PQexec(handle, create_table_models))
{
librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL,
"postgresql table creation failed: %s",
PQerrorMessage(handle));
status=-1;
}
LIBRDF_FREE(cstring, query);
}
}
/* Create model if new and not existing, or check for existence */
if(!status) {
if(!(escaped_name=(char*)LIBRDF_MALLOC(cstring,strlen(name)*2+1)))
status=1;
PQescapeString(escaped_name,(const char*)name, strlen(name));
}
if(!status && (librdf_hash_get_as_boolean(options, "new")>0)) {
/* Create new model */
if(!(query=(char*)LIBRDF_MALLOC(cstring,strlen(create_model)+20+
strlen(escaped_name)+1)))
status=1;
sprintf(query, create_model, context->model, escaped_name);
if(!status && !(res=(PQexec(handle, query)))
&& PQresultStatus(res) != PGRES_COMMAND_OK) {
librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL,
"postgresql insert into Models table failed: %s",
PQresultErrorMessage(res));
status=-1;
}
/* Maintain merge table? */
if(!status && context->merge)
status=librdf_storage_postgresql_merge(storage);
} else if(!status) {
/* Check for model existence */
if(!(query=(char*)LIBRDF_MALLOC(cstring,strlen(check_model)+20+
strlen(escaped_name)+1)))
status=1;
sprintf(query, check_model, context->model, name);
res=NULL;
if( !status && !(res=(PQexec(handle, query))) ) {
librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL,
"postgresql select from Models table failed: %s",
PQresultErrorMessage(res));
status=-1;
}
if(!status && !(PQntuples(res))) {
librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL,
"Unknown model: %s",name);
status=1;
}
if(res)
PQclear(res);
}
if(query)
LIBRDF_FREE(cstring, query);
if(escaped_name)
LIBRDF_FREE(cstring, escaped_name);
/* Optimize loads? */
context->bulk=(librdf_hash_get_as_boolean(options, "bulk")>0);
/* Truncate model? */
if(!status && (librdf_hash_get_as_boolean(options, "new")>0))
status=librdf_storage_postgresql_context_remove_statements(storage, NULL);
/* Unused options: write (always...) */
librdf_free_hash(options);
librdf_storage_postgresql_release_handle(storage, handle);
return status;
}
/*
* librdf_storage_postgresql_merge - (re)create merged "view" of all models
* @storage: the storage
*
* Return value: Non-zero on failure.
*/
static int
librdf_storage_postgresql_merge(librdf_storage* storage)
{
const char get_models[]="SELECT ID FROM Models";
const char drop_table_statements[]="DROP TABLE Statements";
const char insert_statements[]="INSERT INTO statements SELECT * FROM ";
const char create_table_statements[]="\
CREATE TABLE Statements (\
Subject numeric(20) NOT NULL,\
Predicate numeric(20) NOT NULL,\
Object numeric(20) NOT NULL,\
Context numeric(20) NOT NULL\
) ";
char *query=NULL;
PGresult *res;
int i;
PGconn *handle;
/* Get postgresql connection handle */
handle=librdf_storage_postgresql_get_handle(storage);
if(!handle)
return 1;
/* Drop and create merge table. */
if(! PQexec(handle, drop_table_statements) ||
! PQexec(handle, create_table_statements)) {
librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL,
"postgresql merge table creation failed: %s",
PQerrorMessage(handle));
librdf_storage_postgresql_release_handle(storage, handle);
return -1;
}
/* Query for list of models. */
if(!(res=PQexec(handle, get_models))) {
librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL,
"postgresql query for model list failed: %s",
PQresultErrorMessage(res));
librdf_storage_postgresql_release_handle(storage, handle);
return -1;
}
/* Allocate space for merge table generation query. */
if(!(query=(char*)LIBRDF_MALLOC(cstring, strlen(insert_statements)+
50))) {
librdf_storage_postgresql_release_handle(storage, handle);
return 1;
}
/* Generate CSV list of models. */
for(i=0;i<PQntuples(res);i++) {
strcpy(query,insert_statements);
strcat(query,"Statements");
strcat(query,PQgetvalue(res,i,0));
if(! PQexec(handle, query)) {
librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL,
"postgresql merge table insert failed: %s",
PQerrorMessage(handle));
librdf_storage_postgresql_release_handle(storage, handle);
return -1;
}
}
LIBRDF_FREE(cstring, query);
PQclear(res);
librdf_storage_postgresql_release_handle(storage, handle);
return 0;
}
/**
* librdf_storage_postgresql_terminate:
* @storage: the storage
*
* Close the storage and database connections.
*
* Return value: None.
**/
static void
librdf_storage_postgresql_terminate(librdf_storage* storage)
{
librdf_storage_postgresql_context *context=(librdf_storage_postgresql_context*)storage->context;
librdf_storage_postgresql_finish_connections(storage);
if(context->password)
LIBRDF_FREE(cstring,(char *)context->password);
if(context->user)
LIBRDF_FREE(cstring,(char *)context->user);
if(context->dbname)
LIBRDF_FREE(cstring,(char *)context->dbname);
if(context->port)
LIBRDF_FREE(cstring,(char *)context->port);
if(context->host)
LIBRDF_FREE(cstring,(char *)context->host);
if(context->digest)
librdf_free_digest(context->digest);
if(context->transaction_handle)
librdf_storage_postgresql_transaction_rollback(storage);
}
/**
* librdf_storage_postgresql_open:
* @storage: the storage
* @model: the model
*
* Create or open model in database (nop).
*
* Return value: Non-zero on failure.
**/
static int
librdf_storage_postgresql_open(librdf_storage* storage, librdf_model* model)
{
return 0;
}
/**
* librdf_storage_postgresql_close:
* @storage: the storage
*
* Close model (nop).
*
* Return value: Non-zero on failure.
**/
static int
librdf_storage_postgresql_close(librdf_storage* storage)
{
librdf_storage_postgresql_transaction_rollback(storage);
return librdf_storage_postgresql_sync(storage);
}
/**
* librdf_storage_postgresql_sync - Flush all tables, making sure they are saved on disk.
* @storage: the storage
*
* Return value: Non-zero on failure.
**/
static int
librdf_storage_postgresql_sync(librdf_storage* storage)
{
librdf_storage_postgresql_context *context=(librdf_storage_postgresql_context*)storage->context;
/* Make sure optimizing for bulk operations is stopped? */
if(context->bulk)
librdf_storage_postgresql_stop_bulk(storage);
return 0;
}
/**
* librdf_storage_postgresql_size:
* @storage: the storage
*
* Close model (nop).
*
* Return value: Negative on failure.
**/
static int
librdf_storage_postgresql_size(librdf_storage* storage)
{
librdf_storage_postgresql_context *context=(librdf_storage_postgresql_context*)storage->context;
char model_size[]="SELECT COUNT(*) FROM Statements" UINT64_T_FMT;
char *query;
PGresult *res;
int count;
PGconn *handle;
/* Get postgresql connection handle */
handle=librdf_storage_postgresql_get_handle(storage);
if(!handle)
return -1;
/* Query for number of statements */
if(!(query=(char*)LIBRDF_MALLOC(cstring, strlen(model_size)+21))) {
librdf_storage_postgresql_release_handle(storage, handle);
return -1;
}
sprintf(query, model_size, context->model);
if(!(res=PQexec(handle, query)) || !(PQntuples(res))) {
librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL,
"postgresql query for model size failed: %s",
PQresultErrorMessage(res));
LIBRDF_FREE(cstring,query);
librdf_storage_postgresql_release_handle(storage, handle);
return -1;
}
count=atol(PQgetvalue(res,0,0));
PQclear(res);
LIBRDF_FREE(cstring,query);
librdf_storage_postgresql_release_handle(storage, handle);
return count;
}
static int
librdf_storage_postgresql_add_statement(librdf_storage* storage,
librdf_statement* statement)
{
/* Do not add duplicate statements */
if(librdf_storage_postgresql_contains_statement(storage, statement))
return 0;
return librdf_storage_postgresql_context_add_statement_helper(storage, 0,
statement);
}
/**
* librdf_storage_postgresql_add_statements:
* @storage: the storage
* @statement_stream: the stream of statements
*
* Add statements in stream to storage, without context.
*
* Return value: Non-zero on failure.
**/
static int
librdf_storage_postgresql_add_statements(librdf_storage* storage,
librdf_stream* statement_stream)
{
return librdf_storage_postgresql_context_add_statements(storage, NULL,
statement_stream);
}
/*
* librdf_storage_postgresql_node_hash - Create hash value for node
* @storage: the storage
* @node: a node to get hash for (and possibly create in database)
* @add: whether to add the node to the database
*
* Return value: Non-zero on succes.
**/
static u64
librdf_storage_postgresql_node_hash(librdf_storage* storage,
librdf_node* node,
int add)
{
librdf_node_type type=librdf_node_get_type(node);
u64 hash;
size_t nodelen;
char *query;
PGconn *handle;
PGresult *res;
/* Get postgresql connection handle */
handle=librdf_storage_postgresql_get_handle(storage);
if(!handle)
return 0;
if(type==LIBRDF_NODE_TYPE_RESOURCE) {
/* Get hash */
unsigned char *uri=librdf_uri_as_counted_string(librdf_node_get_uri(node), &nodelen);
hash=librdf_storage_postgresql_hash(storage, "R", (char*)uri, nodelen);
if(add) {
char create_resource[]="INSERT INTO Resources (ID,URI) VALUES (" UINT64_T_FMT ",'%s')";
/* Escape URI for db query */
char *escaped_uri;
if(!(escaped_uri=(char*)LIBRDF_MALLOC(cstring, nodelen*2+1))) {
librdf_storage_postgresql_release_handle(storage, handle);
return 0;
}
PQescapeString(escaped_uri,(const char*)uri, nodelen);
/* Create new resource, ignore if existing */
if(!(query=(char*)LIBRDF_MALLOC(cstring, strlen(create_resource)+
20+nodelen+1))) {
LIBRDF_FREE(cstring,escaped_uri);
librdf_storage_postgresql_release_handle(storage, handle);
return 0;
}
sprintf(query, create_resource, hash, escaped_uri);
LIBRDF_FREE(cstring,escaped_uri);
if(!(res=PQexec(handle, query))
&& PQresultStatus(res) != PGRES_COMMAND_OK) {
librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL,
"postgresql insert into Resources failed with error %s", PQresultErrorMessage(res));
LIBRDF_FREE(cstring,query);
librdf_storage_postgresql_release_handle(storage, handle);
return 0;
}
LIBRDF_FREE(cstring,query);
}
} else if(type==LIBRDF_NODE_TYPE_LITERAL) {
/* Get hash */
unsigned char *value, *datatype=0;
char *lang, *nodestring;
librdf_uri *dt;
size_t valuelen, langlen=0, datatypelen=0;
value=librdf_node_get_literal_value_as_counted_string(node,&valuelen);
lang=librdf_node_get_literal_value_language(node);
if(lang)
langlen=strlen(lang);
dt=librdf_node_get_literal_value_datatype_uri(node);
if(dt)
datatype=librdf_uri_as_counted_string(dt,&datatypelen);
if(datatype)
datatypelen=strlen((const char*)datatype);
/* Create composite node string for hash generation */
if(!(nodestring=(char*)LIBRDF_MALLOC(cstring, valuelen+langlen+datatypelen+3))) {
librdf_storage_postgresql_release_handle(storage, handle);
return 0;
}
strcpy(nodestring, (const char*)value);
strcat(nodestring, "<");
if(lang)
strcat(nodestring, lang);
strcat(nodestring, ">");
if(datatype)
strcat(nodestring, (const char*)datatype);
nodelen=valuelen+langlen+datatypelen+2;
hash=librdf_storage_postgresql_hash(storage, "L", nodestring, nodelen);
LIBRDF_FREE(cstring,nodestring);
if(add) {
char create_literal[]="INSERT INTO Literals (ID,Value,Language,Datatype) VALUES (" UINT64_T_FMT ",'%s','%s','%s')";
/* Escape value, lang and datatype for db query */
char *escaped_value, *escaped_lang, *escaped_datatype;
if(!(escaped_value=(char*)LIBRDF_MALLOC(cstring, valuelen*2+1)) ||
!(escaped_lang=(char*)LIBRDF_MALLOC(cstring, langlen*2+1)) ||
!(escaped_datatype=(char*)LIBRDF_MALLOC(cstring, datatypelen*2+1))) {
librdf_storage_postgresql_release_handle(storage, handle);
return 0;
}
PQescapeString(escaped_value, (const char*)value, valuelen);
if(lang)
PQescapeString( escaped_lang, (const char*)lang, langlen);
else
strcpy(escaped_lang,"");
if(datatype)
PQescapeString( escaped_datatype, (const char*)datatype, datatypelen);
else
strcpy(escaped_datatype,"");
/* Create new literal, ignore if existing */
if(!(query=(char*)LIBRDF_MALLOC(cstring, strlen(create_literal)+
strlen(escaped_value)+
strlen(escaped_lang)+
strlen(escaped_datatype)+21))) {
LIBRDF_FREE(cstring,escaped_value);
LIBRDF_FREE(cstring,escaped_lang);
LIBRDF_FREE(cstring,escaped_datatype);
librdf_storage_postgresql_release_handle(storage, handle);
return 0;
}
sprintf(query, create_literal, hash, escaped_value, escaped_lang, escaped_datatype);
LIBRDF_FREE(cstring,escaped_value);
LIBRDF_FREE(cstring,escaped_lang);
LIBRDF_FREE(cstring,escaped_datatype);
if(!(res=PQexec(handle, query)) &&
PQresultStatus(res) != PGRES_COMMAND_OK) {
librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL,
"postgresql insert into Literals failed: %s",
PQresultErrorMessage(res));
LIBRDF_FREE(cstring,query);
librdf_storage_postgresql_release_handle(storage, handle);
return 0;
}
LIBRDF_FREE(cstring,query);
}
} else if(type==LIBRDF_NODE_TYPE_BLANK) {
/* Get hash */
unsigned char *name=librdf_node_get_blank_identifier(node);
nodelen=strlen((const char*)name);
hash=librdf_storage_postgresql_hash(storage, "B", (char*)name, nodelen);
if(add) {
char create_bnode[]="INSERT INTO Bnodes (ID,Name) VALUES (" UINT64_T_FMT ",'%s')";
/* Escape name for db query */
char *escaped_name;
if(!(escaped_name=(char*)LIBRDF_MALLOC(cstring, nodelen*2+1))) {
librdf_storage_postgresql_release_handle(storage, handle);
return 0;
}
PQescapeString(escaped_name, (const char*)name, nodelen);
/* Create new bnode, ignore if existing */
if(!(query=(char*)LIBRDF_MALLOC(cstring, strlen(create_bnode)+
strlen(escaped_name)+21))) {
LIBRDF_FREE(cstring,escaped_name);
librdf_storage_postgresql_release_handle(storage, handle);
return 0;
}
sprintf(query, create_bnode, hash, escaped_name);
LIBRDF_FREE(cstring,escaped_name);
if(!(res=PQexec(handle, query)) &&
PQresultStatus(res) != PGRES_COMMAND_OK) {
librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL,
"postgresql insert into Bnodes failed: %s",
PQresultErrorMessage(res));
LIBRDF_FREE(cstring,query);
librdf_storage_postgresql_release_handle(storage, handle);
return 0;
}
LIBRDF_FREE(cstring,query);
}
} else {
librdf_storage_postgresql_release_handle(storage, handle);
return 0;
}
librdf_storage_postgresql_release_handle(storage, handle);
return hash;
}
/*
* librdf_storage_postgresql_start_bulk - Prepare for bulk insert operation
* @storage: the storage
*
* Return value: Non-zero on failure.
*/
static int
librdf_storage_postgresql_start_bulk(librdf_storage* storage)
{
return 1;
}
/*
* librdf_storage_postgresql_stop_bulk - End bulk insert operation
* @storage: the storage
*
* Return value: Non-zero on failure.
*/
static int
librdf_storage_postgresql_stop_bulk(librdf_storage* storage)
{
return 1;
}
/**
* librdf_storage_postgresql_context_add_statements:
* @storage: the storage
* @context_node: #librdf_node object
* @statement_stream: the stream of statements
*
* Add statements in stream to storage, with context.
*
* Return value: Non-zero on failure.
**/
static int
librdf_storage_postgresql_context_add_statements(librdf_storage* storage,
librdf_node* context_node,
librdf_stream* statement_stream)
{
librdf_storage_postgresql_context* context=(librdf_storage_postgresql_context*)storage->context;
u64 ctxt=0;
int helper=0;
/* Optimize for bulk loads? */
if(context->bulk) {
if(librdf_storage_postgresql_start_bulk(storage))
return 1;
}
/* Find hash for context, creating if necessary */
if(context_node) {
ctxt=librdf_storage_postgresql_node_hash(storage,context_node,1);
if(!ctxt)
return 1;
}
while(!helper && !librdf_stream_end(statement_stream)) {
librdf_statement* statement=librdf_stream_get_object(statement_stream);
if(!context->bulk) {
/* Do not add duplicate statements
* but do not check for this when in bulk mode.
*/
if(librdf_storage_postgresql_contains_statement(storage, statement))
continue;
}
helper=librdf_storage_postgresql_context_add_statement_helper(storage, ctxt,
statement);
librdf_stream_next(statement_stream);
}
return helper;
}
/**
* librdf_storage_postgresql_context_add_statement - Add a statement to a storage context
* @storage: #librdf_storage object
* @context_node: #librdf_node object
* @statement: #librdf_statement statement to add
*
* Return value: non 0 on failure
**/
static int
librdf_storage_postgresql_context_add_statement(librdf_storage* storage,
librdf_node* context_node,
librdf_statement* statement)
{
u64 ctxt=0;
/* Find hash for context, creating if necessary */
if(context_node) {
ctxt=librdf_storage_postgresql_node_hash(storage,context_node,1);
if(!ctxt)
return 1;
}
return librdf_storage_postgresql_context_add_statement_helper(storage, ctxt,
statement);
}
/*
* librdf_storage_postgresql_context_add_statement_helper - Perform actual addition of a statement to a storage context
* @storage: #librdf_storage object
* @ctxt: u64 context hash
* @statement: #librdf_statement statement to add
*
* Return value: non-zero on failure
**/
static int
librdf_storage_postgresql_context_add_statement_helper(librdf_storage* storage,
u64 ctxt, librdf_statement* statement)
{
librdf_storage_postgresql_context* context=(librdf_storage_postgresql_context*)storage->context;
char insert_statement[]="INSERT INTO Statements" UINT64_T_FMT " (Subject,Predicate,Object,Context) VALUES (" UINT64_T_FMT "," UINT64_T_FMT "," UINT64_T_FMT "," UINT64_T_FMT ")";
u64 subject, predicate, object;
char *query;
PGconn *handle;
PGresult *res;
/* Get postgresql connection handle */
handle=librdf_storage_postgresql_get_handle(storage);
if(!handle)
return 1;
/* Find hashes for nodes, creating if necessary */
subject=librdf_storage_postgresql_node_hash(storage,
librdf_statement_get_subject(statement),1);
predicate=librdf_storage_postgresql_node_hash(storage,
librdf_statement_get_predicate(statement),1);
object=librdf_storage_postgresql_node_hash(storage,
librdf_statement_get_object(statement),1);
if(!subject || !predicate || !object) {
librdf_storage_postgresql_release_handle(storage, handle);
return 1;
}
/* Add statement to storage */
if(!(query=(char*)LIBRDF_MALLOC(cstring, strlen(insert_statement)+101))) {
librdf_storage_postgresql_release_handle(storage, handle);
return 1;
}
sprintf(query, insert_statement, context->model, subject, predicate, object, ctxt);
if(!(res=PQexec(handle, query))) {
librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL,
"postgresql insert into Statements failed: %s",
PQresultErrorMessage(res));
LIBRDF_FREE(cstring,query);
librdf_storage_postgresql_release_handle(storage, handle);
return -1;
}
LIBRDF_FREE(cstring,query);
librdf_storage_postgresql_release_handle(storage, handle);
return 0;
}
/**
* librdf_storage_postgresql_contains_statement - Test if a given complete statement is present in the model
* @storage: the storage
* @statement: a complete statement
*
* Return value: Non-zero if the model contains the statement.
**/
static int
librdf_storage_postgresql_contains_statement(librdf_storage* storage,
librdf_statement* statement)
{
librdf_storage_postgresql_context* context=(librdf_storage_postgresql_context*)storage->context;
char find_statement[]="SELECT 1 FROM Statements" UINT64_T_FMT " WHERE Subject=" UINT64_T_FMT " AND Predicate=" UINT64_T_FMT " AND Object=" UINT64_T_FMT " limit 1";
u64 subject, predicate, object;
char *query;
PGresult *res;
PGconn *handle;
/* Get postgresql connection handle */
handle=librdf_storage_postgresql_get_handle(storage);
if(!handle)
return 0;
/* Find hashes for nodes */
subject=librdf_storage_postgresql_node_hash(storage,
librdf_statement_get_subject(statement),0);
predicate=librdf_storage_postgresql_node_hash(storage,
librdf_statement_get_predicate(statement),0);
object=librdf_storage_postgresql_node_hash(storage,
librdf_statement_get_object(statement),0);
if(!subject || !predicate || !object) {
librdf_storage_postgresql_release_handle(storage, handle);
return 0;
}
/* Check for statement */
if(!(query=(char*)LIBRDF_MALLOC(cstring, strlen(find_statement)+81))) {
librdf_storage_postgresql_release_handle(storage, handle);
return 0;
}
sprintf(query, find_statement, context->model, subject, predicate, object);
if(!(res=PQexec(handle, query)) ) {
librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL,
"postgresql query for statement failed: %s",
PQerrorMessage(handle));
LIBRDF_FREE(cstring,query);
librdf_storage_postgresql_release_handle(storage, handle);
return 0;
}
LIBRDF_FREE(cstring, query);
if(!(PQntuples(res))) {
if(res)
PQclear(res);
librdf_storage_postgresql_release_handle(storage, handle);
return 0;
}
if(res)
PQclear(res);
librdf_storage_postgresql_release_handle(storage, handle);
return 1;
}
/**
* librdf_storage_postgresql_remove_statement - Remove a statement from storage
* @storage: #librdf_storage object
* @statement: #librdf_statement statement to remove
*
* Return value: non-zero on failure
**/
static int
librdf_storage_postgresql_remove_statement(librdf_storage* storage, librdf_statement* statement)
{
return librdf_storage_postgresql_context_remove_statement(storage,NULL,statement);
}
/**
* librdf_storage_postgresql_context_remove_statement - Remove a statement from a storage context
* @storage: #librdf_storage object
* @context_node: #librdf_node object
* @statement: #librdf_statement statement to remove
*
* Return value: non-zero on failure
**/
static int
librdf_storage_postgresql_context_remove_statement(librdf_storage* storage,
librdf_node* context_node,
librdf_statement* statement)
{
librdf_storage_postgresql_context* context=(librdf_storage_postgresql_context*)storage->context;
char delete_statement[]="DELETE FROM Statements" UINT64_T_FMT " WHERE Subject=" UINT64_T_FMT " AND Predicate=" UINT64_T_FMT " AND Object=" UINT64_T_FMT;
char delete_statement_with_context[]="DELETE FROM Statements" UINT64_T_FMT " WHERE Subject=" UINT64_T_FMT " AND Predicate=" UINT64_T_FMT " AND Object=" UINT64_T_FMT " AND Context=" UINT64_T_FMT;
u64 subject, predicate, object, ctxt=0;
char *query;
PGconn *handle;
/* Get postgresql connection handle */
handle=librdf_storage_postgresql_get_handle(storage);
if(!handle)
return 1;
/* Find hashes for nodes */
subject=librdf_storage_postgresql_node_hash(storage,
librdf_statement_get_subject(statement),0);
predicate=librdf_storage_postgresql_node_hash(storage,
librdf_statement_get_predicate(statement),0);
object=librdf_storage_postgresql_node_hash(storage,
librdf_statement_get_object(statement),0);
if(context_node) {
ctxt=librdf_storage_postgresql_node_hash(storage,context_node,0);
if(!ctxt) {
librdf_storage_postgresql_release_handle(storage, handle);
return 1;
}
}
if(!subject || !predicate || !object || (context_node && !ctxt)) {
librdf_storage_postgresql_release_handle(storage, handle);
return 1;
}
/* Remove statement(s) from storage */
if(context_node) {
if(!(query=(char*)LIBRDF_MALLOC(cstring, strlen(delete_statement_with_context)+101))) {
librdf_storage_postgresql_release_handle(storage, handle);
return 1;
}
sprintf(query, delete_statement_with_context, context->model, subject,
predicate, object, ctxt);
} else {
if(!(query=(char*)LIBRDF_MALLOC(cstring, strlen(delete_statement)+81))) {
librdf_storage_postgresql_release_handle(storage, handle);
return 1;
}
sprintf(query, delete_statement, context->model, subject, predicate,
object);
}
if(! PQexec(handle, query)) {
librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL,
"postgresql delete from Statements failed: %s",
PQerrorMessage(handle));
LIBRDF_FREE(cstring,query);
librdf_storage_postgresql_release_handle(storage, handle);
return -1;
}
LIBRDF_FREE(cstring,query);
librdf_storage_postgresql_release_handle(storage, handle);
return 0;
}
/**
* librdf_storage_postgresql_context_remove_statements - Remove all statement from a storage context
* @storage: #librdf_storage object
* @context_node: #librdf_node object
*
* Return value: non-zero on failure
**/
static int
librdf_storage_postgresql_context_remove_statements(librdf_storage* storage,
librdf_node* context_node)
{
librdf_storage_postgresql_context* context=(librdf_storage_postgresql_context*)storage->context;
char delete_context[]="DELETE FROM Statements" UINT64_T_FMT " WHERE Context=" UINT64_T_FMT;
char delete_model[]="DELETE FROM Statements" UINT64_T_FMT;
u64 ctxt=0;
char *query;
PGconn *handle;
/* Get postgresql connection handle */
handle=librdf_storage_postgresql_get_handle(storage);
if(!handle)
return 1;
/* Find hash for context */
if(context_node) {
ctxt=librdf_storage_postgresql_node_hash(storage,context_node,0);
if(!ctxt) {
librdf_storage_postgresql_release_handle(storage, handle);
return 1;
}
}
/* Remove statement(s) from storage */
if(context_node) {
if(!(query=(char*)LIBRDF_MALLOC(cstring, strlen(delete_context)+61))) {
librdf_storage_postgresql_release_handle(storage, handle);
return 1;
}
sprintf(query, delete_context, context->model, ctxt);
} else {
if(!(query=(char*)LIBRDF_MALLOC(cstring, strlen(delete_model)+21))) {
librdf_storage_postgresql_release_handle(storage, handle);
return 1;
}
sprintf(query, delete_model, context->model);
}
if(! PQexec(handle,query)) {
librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL,
"postgresql delete of context from Statements failed: %s",
PQerrorMessage(handle));
LIBRDF_FREE(cstring,query);
librdf_storage_postgresql_release_handle(storage, handle);
return -1;
}
LIBRDF_FREE(cstring,query);
librdf_storage_postgresql_release_handle(storage, handle);
return 0;
}
/**
* librdf_storage_postgresql_serialise - Return a stream of all statements in a storage
* @storage: the storage
*
* Return a stream of all statements in a storage.
*
* Return value: a #librdf_stream or NULL on failure
**/
librdf_stream* librdf_storage_postgresql_serialise(librdf_storage* storage)
{
return librdf_storage_postgresql_find_statements_in_context(storage,NULL,NULL);
}
/**
* librdf_storage_postgresql_find_statements - Find a graph of statements in storage.
* @storage: the storage
* @statement: the statement to match
*
* Return a stream of statements matching the given statement (or
* all statements if NULL). Parts (subject, predicate, object) of the
* statement can be empty in which case any statement part will match that.
*
* Return value: a #librdf_stream or NULL on failure
**/
static librdf_stream*
librdf_storage_postgresql_find_statements(librdf_storage* storage,
librdf_statement* statement)
{
return librdf_storage_postgresql_find_statements_in_context(storage,statement,NULL);
}
/**
* librdf_storage_postgresql_context_serialise - List all statements in a storage context
* @storage: #librdf_storage object
* @context_node: #librdf_node object
*
* Return value: #librdf_stream of statements or NULL on failure or context is empty
**/
static librdf_stream*
librdf_storage_postgresql_context_serialise(librdf_storage* storage,
librdf_node* context_node)
{
return librdf_storage_postgresql_find_statements_in_context(storage,NULL,context_node);
}
/**
* librdf_storage_postgresql_find_statements_in_context - Find a graph of statements in a storage context.
* @storage: the storage
* @statement: the statement to match
* @context_node: the context to search
*
* Return a stream of statements matching the given statement (or
* all statements if NULL). Parts (subject, predicate, object) of the
* statement can be empty in which case any statement part will match that.
*
* Return value: a #librdf_stream or NULL on failure
**/
static librdf_stream*
librdf_storage_postgresql_find_statements_in_context(librdf_storage* storage, librdf_statement* statement,librdf_node* context_node)
{
return librdf_storage_postgresql_find_statements_with_options(storage, statement, context_node, NULL);
}
/**
* librdf_storage_postgresql_find_statements_with_options - Find a graph of statements in a storage context with options.
* @storage: the storage
* @statement: the statement to match
* @context_node: the context to search
* @options: #librdf_hash of match options or NULL
*
* Return a stream of statements matching the given statement (or
* all statements if NULL). Parts (subject, predicate, object) of the
* statement can be empty in which case any statement part will match that.
*
* Return value: a #librdf_stream or NULL on failure
**/
static librdf_stream*
librdf_storage_postgresql_find_statements_with_options(librdf_storage* storage,
librdf_statement* statement,
librdf_node* context_node,
librdf_hash* options)
{
librdf_storage_postgresql_context* context=(librdf_storage_postgresql_context*)storage->context;
librdf_storage_postgresql_sos_context* sos;
librdf_node *subject=NULL, *predicate=NULL, *object=NULL;
char *query;
char tmp[64];
char where[256];
char joins[640];
librdf_stream *stream;
/* Initialize sos context */
if(!(sos=(librdf_storage_postgresql_sos_context*)
LIBRDF_CALLOC(librdf_storage_postgresql_sos_context,1,
sizeof(librdf_storage_postgresql_sos_context))))
return NULL;
sos->storage=storage;
librdf_storage_add_reference(sos->storage);
if(statement)
sos->query_statement=librdf_new_statement_from_statement(statement);
if(context_node)
sos->query_context=librdf_new_node_from_node(context_node);
sos->current_statement=NULL;
sos->current_context=NULL;
sos->results=NULL;
if(options) {
sos->is_literal_match=librdf_hash_get_as_boolean(options, "match-substring");
}
/* Get postgresql connection handle */
sos->handle=librdf_storage_postgresql_get_handle(storage);
if(!sos->handle) {
librdf_storage_postgresql_find_statements_in_context_finished((void*)sos);
return NULL;
}
/* Construct query */
if(!(query=(char*)LIBRDF_MALLOC(cstring, 21))) {
librdf_storage_postgresql_find_statements_in_context_finished((void*)sos);
return NULL;
}
strcpy(query, "SELECT ");
*where='\0';
if(sos->is_literal_match)
sprintf(joins, " FROM Literals AS L LEFT JOIN Statements" UINT64_T_FMT " as S ON L.ID=S.Object",
context->model);
else
sprintf(joins, " FROM Statements" UINT64_T_FMT " AS S", context->model);
if(statement) {
subject=librdf_statement_get_subject(statement);
predicate=librdf_statement_get_predicate(statement);
object=librdf_statement_get_object(statement);
}
/* Subject */
if(statement && subject) {
sprintf(tmp, "S.Subject=" UINT64_T_FMT "",
librdf_storage_postgresql_node_hash(storage,subject,0));
if(!strlen(where))
strcat(where, " WHERE ");
else
strcat(where, " AND ");
strcat(where, tmp);
} else {
if(librdf_storage_postgresql_find_statements_in_context_augment_query(&query, " SubjectR.URI AS SuR, SubjectB.Name AS SuB")) {
librdf_storage_postgresql_find_statements_in_context_finished((void*)sos);
return NULL;
}
strcat(joins," LEFT JOIN Resources AS SubjectR ON S.Subject=SubjectR.ID");
strcat(joins," LEFT JOIN Bnodes AS SubjectB ON S.Subject=SubjectB.ID");
}
/* Predicate */
if(statement && predicate) {
sprintf(tmp, "S.Predicate=" UINT64_T_FMT "",
librdf_storage_postgresql_node_hash(storage, predicate, 0));
if(!strlen(where))
strcat(where, " WHERE ");
else
strcat(where, " AND ");
strcat(where, tmp);
} else {
if(!statement || !subject) {
if(librdf_storage_postgresql_find_statements_in_context_augment_query(&query, ",")) {
librdf_storage_postgresql_find_statements_in_context_finished((void*)sos);
return NULL;
}
}
if(librdf_storage_postgresql_find_statements_in_context_augment_query(&query, " PredicateR.URI AS PrR")) {
librdf_storage_postgresql_find_statements_in_context_finished((void*)sos);
return NULL;
}
strcat(joins," LEFT JOIN Resources AS PredicateR ON S.Predicate=PredicateR.ID");
}
/* Object */
if(statement && object) {
if(!sos->is_literal_match) {
sprintf(tmp,"S.Object=" UINT64_T_FMT "",
librdf_storage_postgresql_node_hash(storage, object, 0));
if(!strlen(where))
strcat(where, " WHERE ");
else
strcat(where, " AND ");
strcat(where, tmp);
} else {
/* MATCH literal, not hash_id */
if(!statement || !subject || !predicate) {
if(librdf_storage_postgresql_find_statements_in_context_augment_query(&query, ",")) {
librdf_storage_postgresql_find_statements_in_context_finished((void*)sos);
return NULL;
}
}
if(librdf_storage_postgresql_find_statements_in_context_augment_query(&query, " ObjectR.URI AS ObR, ObjectB.Name AS ObB, ObjectL.Value AS ObV, ObjectL.Language AS ObL, ObjectL.Datatype AS ObD")) {
librdf_storage_postgresql_find_statements_in_context_finished((void*)sos);
return NULL;
}
strcat(joins," LEFT JOIN Resources AS ObjectR ON S.Object=ObjectR.ID");
strcat(joins," LEFT JOIN Bnodes AS ObjectB ON S.Object=ObjectB.ID");
strcat(joins," LEFT JOIN Literals AS ObjectL ON S.Object=ObjectL.ID");
sprintf(tmp, "MATCH(L.Value) AGAINST ('%s')",
librdf_node_get_literal_value(object));
/* NOTE: This is NOT USED but could be if FULLTEXT wasn't enabled */
/*
sprintf(tmp, " L.Value LIKE '%%%s%%'",
librdf_node_get_literal_value(object));
*/
if(!strlen(where))
strcat(where, " WHERE ");
else
strcat(where, " AND ");
strcat(where, tmp);
}
} else {
if(!statement || !subject || !predicate) {
if(librdf_storage_postgresql_find_statements_in_context_augment_query(&query, ",")) {
librdf_storage_postgresql_find_statements_in_context_finished((void*)sos);
return NULL;
}
}
if(librdf_storage_postgresql_find_statements_in_context_augment_query(&query, " ObjectR.URI AS ObR, ObjectB.Name AS ObB, ObjectL.Value AS ObV, ObjectL.Language AS ObL, ObjectL.Datatype AS ObD")) {
librdf_storage_postgresql_find_statements_in_context_finished((void*)sos);
return NULL;
}
strcat(joins," LEFT JOIN Resources AS ObjectR ON S.Object=ObjectR.ID");
strcat(joins," LEFT JOIN Bnodes AS ObjectB ON S.Object=ObjectB.ID");
strcat(joins," LEFT JOIN Literals AS ObjectL ON S.Object=ObjectL.ID");
}
/* Context */
if(context_node) {
sprintf(tmp,"S.Context=" UINT64_T_FMT "",
librdf_storage_postgresql_node_hash(storage,context_node,0));
if(!strlen(where))
strcat(where, " WHERE ");
else
strcat(where, " AND ");
strcat(where, tmp);
} else {
if(!statement || !subject || !predicate || !object) {
if(librdf_storage_postgresql_find_statements_in_context_augment_query(&query, ",")) {
librdf_storage_postgresql_find_statements_in_context_finished((void*)sos);
return NULL;
}
}
if(librdf_storage_postgresql_find_statements_in_context_augment_query(&query, " ContextR.URI AS CoR, ContextB.Name AS CoB, ContextL.Value AS CoV, ContextL.Language AS CoL, ContextL.Datatype AS CoD")) {
librdf_storage_postgresql_find_statements_in_context_finished((void*)sos);
return NULL;
}
strcat(joins," LEFT JOIN Resources AS ContextR ON S.Context=ContextR.ID");
strcat(joins," LEFT JOIN Bnodes AS ContextB ON S.Context=ContextB.ID");
strcat(joins," LEFT JOIN Literals AS ContextL ON S.Context=ContextL.ID");
}
/* Query without variables? */
if(statement && subject && predicate && object && context_node) {
if(librdf_storage_postgresql_find_statements_in_context_augment_query(&query, " 1")) {
librdf_storage_postgresql_find_statements_in_context_finished((void*)sos);
return NULL;
}
}
/* Complete query string */
if(librdf_storage_postgresql_find_statements_in_context_augment_query(&query, joins) ||
librdf_storage_postgresql_find_statements_in_context_augment_query(&query, where)) {
librdf_storage_postgresql_find_statements_in_context_finished((void*)sos);
return NULL;
}
/* Start query... */
if(!(sos->results=PQexec(sos->handle, query) )) {
librdf_log(sos->storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL,
"postgresql query failed: %s",
PQresultErrorMessage(sos->results));
librdf_storage_postgresql_find_statements_in_context_finished((void*)sos);
return NULL;
}
LIBRDF_FREE(cstring, query);
sos->current_rowno=0;
if(!(sos->row=(char**)LIBRDF_CALLOC(cstring,sizeof(char *),PQnfields(sos->results)+1)))
return NULL;
/* Get first statement, if any, and initialize stream */
if(librdf_storage_postgresql_find_statements_in_context_next_statement(sos) ) {
librdf_storage_postgresql_find_statements_in_context_finished((void*)sos);
return librdf_new_empty_stream(storage->world);
}
stream=librdf_new_stream(storage->world,(void*)sos,
&librdf_storage_postgresql_find_statements_in_context_end_of_stream,
&librdf_storage_postgresql_find_statements_in_context_next_statement,
&librdf_storage_postgresql_find_statements_in_context_get_statement,
&librdf_storage_postgresql_find_statements_in_context_finished);
if(!stream) {
librdf_storage_postgresql_find_statements_in_context_finished((void*)sos);
return NULL;
}
return stream;
}
static int
librdf_storage_postgresql_find_statements_in_context_augment_query(char **query, const char *addition)
{
char *newquery;
/* Augment existing query, returning 0 on success. */
if(!(newquery=(char*)LIBRDF_MALLOC(cstring, strlen(*query)+strlen(addition)+1)))
return 1;
strcpy(newquery,*query);
strcat(newquery,addition);
LIBRDF_FREE(cstring, *query);
*query=newquery;
return 0;
}
static int
librdf_storage_postgresql_find_statements_in_context_end_of_stream(void* context)
{
librdf_storage_postgresql_sos_context* sos=(librdf_storage_postgresql_sos_context*)context;
return sos->current_statement==NULL;
}
static int
librdf_storage_postgresql_find_statements_in_context_next_statement(void* context)
{
librdf_storage_postgresql_sos_context* sos=(librdf_storage_postgresql_sos_context*)context;
librdf_node *subject=NULL, *predicate=NULL, *object=NULL;
librdf_node *node;
char **row=sos->row;
int i;
if( sos->current_rowno < PQntuples(sos->results) ) {
for(i=0;i<PQnfields(sos->results);i++) {
if(PQgetlength(sos->results,sos->current_rowno,i) > 0 ) {
/* FIXME: why is this not copied? */
/*
if(!(row[i]=(char*)LIBRDF_MALLOC(cstring,
PQgetlength(sos->results,sos->current_rowno,i)+1)))
return 1;
strcpy(row[i],PQgetvalue(sos->results,sos->current_rowno,i));
*/
row[i]=PQgetvalue(sos->results,sos->current_rowno,i);
}
else
row[i]=NULL;
}
sos->current_rowno=sos->current_rowno+1;
/* Get ready for context */
if(sos->current_context)
librdf_free_node(sos->current_context);
sos->current_context=NULL;
/* Is this a query with statement parts? */
if(sos->query_statement) {
subject=librdf_statement_get_subject(sos->query_statement);
predicate=librdf_statement_get_predicate(sos->query_statement);
if(sos->is_literal_match)
object=NULL;
else
object=librdf_statement_get_object(sos->query_statement);
}
/* Make sure we have a statement object to return */
if(!sos->current_statement) {
if(!(sos->current_statement=librdf_new_statement(sos->storage->world)))
return 1;
}
librdf_statement_clear(sos->current_statement);
/* Query without variables? */
if(subject && predicate && object && sos->query_context) {
librdf_statement_set_subject(sos->current_statement,librdf_new_node_from_node(subject));
librdf_statement_set_predicate(sos->current_statement,librdf_new_node_from_node(predicate));
librdf_statement_set_object(sos->current_statement,librdf_new_node_from_node(object));
sos->current_context=librdf_new_node_from_node(sos->query_context);
} else {
/* Turn row parts into statement and context */
int part=0;
/* Subject - constant or from row? */
if(subject) {
librdf_statement_set_subject(sos->current_statement,librdf_new_node_from_node(subject));
} else {
/* Resource or Bnode? */
if(row[part]) {
if(!(node=librdf_new_node_from_uri_string(sos->storage->world,
(const unsigned char*)row[part])))
return 1;
} else if(row[part+1]) {
if(!(node=librdf_new_node_from_blank_identifier(sos->storage->world,
(const unsigned char*)row[part+1])))
return 1;
} else
return 1;
librdf_statement_set_subject(sos->current_statement,node);
part+=2;
}
/* Predicate - constant or from row? */
if(predicate) {
librdf_statement_set_predicate(sos->current_statement,librdf_new_node_from_node(predicate));
} else {
/* Resource? */
if(row[part]) {
if(!(node=librdf_new_node_from_uri_string(sos->storage->world,
(const unsigned char*)row[part])))
return 1;
} else
return 1;
librdf_statement_set_predicate(sos->current_statement,node);
part+=1;
}
/* Object - constant or from row? */
if(object) {
librdf_statement_set_object(sos->current_statement,librdf_new_node_from_node(object));
} else {
/* Resource, Bnode or Literal? */
if(row[part]) {
if(!(node=librdf_new_node_from_uri_string(sos->storage->world,
(const unsigned char*)row[part])))
return 1;
} else if(row[part+1]) {
if(!(node=librdf_new_node_from_blank_identifier(sos->storage->world,
(const unsigned char*)row[part+1])))
return 1;
} else if(row[part+2]) {
/* Typed literal? */
librdf_uri *datatype=NULL;
if(row[part+4] && strlen(row[part+4]))
datatype=librdf_new_uri(sos->storage->world,
(const unsigned char*)row[part+4]);
if(!(node=librdf_new_node_from_typed_literal(sos->storage->world,
(const unsigned char*)row[part+2],
row[part+3],
datatype)))
return 1;
} else
return 1;
librdf_statement_set_object(sos->current_statement,node);
part+=5;
}
/* Context - constant or from row? */
if(sos->query_context) {
sos->current_context=librdf_new_node_from_node(sos->query_context);
} else {
/* Resource, Bnode or Literal? */
if(row[part]) {
if(!(node=librdf_new_node_from_uri_string(sos->storage->world,
(const unsigned char*)row[part])))
return 1;
} else if(row[part+1]) {
if(!(node=librdf_new_node_from_blank_identifier(sos->storage->world,
(const unsigned char*)row[part+1])))
return 1;
} else if(row[part+2]) {
/* Typed literal? */
librdf_uri *datatype=NULL;
if(row[part+4] && strlen(row[part+4]))
datatype=librdf_new_uri(sos->storage->world,
(const unsigned char*)row[part+4]);
if(!(node=librdf_new_node_from_typed_literal(sos->storage->world,
(const unsigned char*)row[part+2],
row[part+3],
datatype)))
return 1;
} else
/* no context */
node=NULL;
sos->current_context=node;
}
}
} else {
if(sos->current_statement)
librdf_free_statement(sos->current_statement);
sos->current_statement=NULL;
if(sos->current_context)
librdf_free_node(sos->current_context);
sos->current_context=NULL;
}
return 0;
}
static void*
librdf_storage_postgresql_find_statements_in_context_get_statement(void* context, int flags)
{
librdf_storage_postgresql_sos_context* sos=(librdf_storage_postgresql_sos_context*)context;
switch(flags) {
case LIBRDF_ITERATOR_GET_METHOD_GET_OBJECT:
return sos->current_statement;
case LIBRDF_ITERATOR_GET_METHOD_GET_CONTEXT:
return sos->current_context;
default:
abort();
}
}
static void
librdf_storage_postgresql_find_statements_in_context_finished(void* context)
{
librdf_storage_postgresql_sos_context* sos=(librdf_storage_postgresql_sos_context*)context;
if( sos->row )
LIBRDF_FREE(cstring,sos->row);
if(sos->results)
PQclear(sos->results);
if(sos->handle)
librdf_storage_postgresql_release_handle(sos->storage, sos->handle);
if(sos->current_statement)
librdf_free_statement(sos->current_statement);
if(sos->current_context)
librdf_free_node(sos->current_context);
if(sos->query_statement)
librdf_free_statement(sos->query_statement);
if(sos->query_context)
librdf_free_node(sos->query_context);
if(sos->storage)
librdf_storage_remove_reference(sos->storage);
LIBRDF_FREE(librdf_storage_postgresql_sos_context, sos);
}
/**
* librdf_storage_postgresql_get_contexts:
* @storage: the storage
*
* Return an iterator with the context nodes present in storage.
*
* Return value: a #librdf_iterator or NULL on failure
**/
static librdf_iterator*
librdf_storage_postgresql_get_contexts(librdf_storage* storage)
{
librdf_storage_postgresql_context* context=(librdf_storage_postgresql_context*)storage->context;
librdf_storage_postgresql_get_contexts_context* gccontext;
const char select_contexts[]="\
SELECT DISTINCT R.URI AS CoR, B.Name AS CoB, \
L.Value AS CoV, L.Language AS CoL, L.Datatype AS CoD \
FROM Statements" UINT64_T_FMT " as S \
LEFT JOIN Resources AS R ON S.Context=R.ID \
LEFT JOIN Bnodes AS B ON S.Context=B.ID \
LEFT JOIN Literals AS L ON S.Context=L.ID";
char *query;
librdf_iterator *iterator;
/* Initialize get_contexts context */
if(!(gccontext=(librdf_storage_postgresql_get_contexts_context*)
LIBRDF_CALLOC(librdf_storage_postgresql_get_contexts_context,1,
sizeof(librdf_storage_postgresql_get_contexts_context))))
return NULL;
gccontext->storage=storage;
librdf_storage_add_reference(gccontext->storage);
gccontext->current_context=NULL;
gccontext->results=NULL;
/* Get postgresql connection handle */
gccontext->handle=librdf_storage_postgresql_get_handle(storage);
if(!gccontext->handle) {
librdf_storage_postgresql_get_contexts_finished((void*)gccontext);
return NULL;
}
/* Construct query */
if(!(query=(char*)LIBRDF_MALLOC(cstring,strlen(select_contexts)+21))) {
librdf_storage_postgresql_get_contexts_finished((void*)gccontext);
return NULL;
}
sprintf(query, select_contexts, context->model);
/* Start query... */
if(!(gccontext->results=PQexec(gccontext->handle, query))) {
librdf_log(gccontext->storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL,
"postgresql query failed: %s",
PQresultErrorMessage(gccontext->results));
librdf_storage_postgresql_get_contexts_finished((void*)gccontext);
return NULL;
}
LIBRDF_FREE(cstring, query);
gccontext->current_rowno=0;
if(!(gccontext->row=(char**)LIBRDF_MALLOC(cstring, PQnfields(gccontext->results)+1)))
return NULL;
/* Get first context, if any, and initialize iterator */
if(librdf_storage_postgresql_get_contexts_next_context(gccontext) ||
!gccontext->current_context) {
librdf_storage_postgresql_get_contexts_finished((void*)gccontext);
return librdf_new_empty_iterator(storage->world);
}
iterator=librdf_new_iterator(storage->world,(void*)gccontext,
&librdf_storage_postgresql_get_contexts_end_of_iterator,
&librdf_storage_postgresql_get_contexts_next_context,
&librdf_storage_postgresql_get_contexts_get_context,
&librdf_storage_postgresql_get_contexts_finished);
if(!iterator)
librdf_storage_postgresql_get_contexts_finished(gccontext);
return iterator;
}
static int
librdf_storage_postgresql_get_contexts_end_of_iterator(void* context)
{
librdf_storage_postgresql_get_contexts_context* gccontext=(librdf_storage_postgresql_get_contexts_context*)context;
return gccontext->current_context==NULL;
}
static int
librdf_storage_postgresql_get_contexts_next_context(void* context)
{
librdf_storage_postgresql_get_contexts_context* gccontext=(librdf_storage_postgresql_get_contexts_context*)context;
librdf_node *node;
char **row=gccontext->row;
int i;
if( gccontext->current_rowno < PQntuples(gccontext->results) ) {
for(i=0;i<PQnfields(gccontext->results);i++) {
if(PQgetlength(gccontext->results,gccontext->current_rowno,i) > 0 ) {
/* FIXME: why is this not copied? */
/*
if(!(row[i]=(char*)LIBRDF_MALLOC(cstring,
PQgetlength(gccontext->results,gccontext->current_rowno,i)+1)))
return 1;
strcpy(row[i],PQgetvalue(gccontext->results,gccontext->current_rowno,i));
*/
row[i]=PQgetvalue(gccontext->results,gccontext->current_rowno,i);
}
else
row[i]=NULL;
}
gccontext->current_rowno=gccontext->current_rowno+1;
/* Free old context node, if allocated */
if(gccontext->current_context)
librdf_free_node(gccontext->current_context);
/* Resource, Bnode or Literal? */
if(row[0]) {
if(!(node=librdf_new_node_from_uri_string(gccontext->storage->world,
(const unsigned char*)row[0])))
return 1;
} else if(row[1]) {
if(!(node=librdf_new_node_from_blank_identifier(gccontext->storage->world,
(const unsigned char*)row[1])))
return 1;
} else if(row[2]) {
/* Typed literal? */
librdf_uri *datatype=NULL;
if(row[4] && strlen(row[4]))
datatype=librdf_new_uri(gccontext->storage->world,
(const unsigned char*)row[4]);
if(!(node=librdf_new_node_from_typed_literal(gccontext->storage->world,
(const unsigned char*)row[2],
row[3],
datatype)))
return 1;
} else
return 1;
gccontext->current_context=node;
} else {
if(gccontext->current_context)
librdf_free_node(gccontext->current_context);
gccontext->current_context=NULL;
}
return 0;
}
static void*
librdf_storage_postgresql_get_contexts_get_context(void* context, int flags)
{
librdf_storage_postgresql_get_contexts_context* gccontext=(librdf_storage_postgresql_get_contexts_context*)context;
return gccontext->current_context;
}
#if 0
/* FIXME: why is this not used ? */
static void
librdf_storage_postgresql_free_gccontext_row(void* context)
{
librdf_storage_postgresql_get_contexts_context* gccontext=(librdf_storage_postgresql_get_contexts_context*)context;
/*
for(i=0;i<PQnfields(gccontext->results);i++)
if( gccontext->row[i] )
LIBRDF_FREE(cstring, gccontext->row[i]);
*/
}
#endif
static void
librdf_storage_postgresql_get_contexts_finished(void* context)
{
librdf_storage_postgresql_get_contexts_context* gccontext=(librdf_storage_postgresql_get_contexts_context*)context;
if( gccontext->row )
LIBRDF_FREE(cstring, gccontext->row);
if(gccontext->results)
PQclear(gccontext->results);
if(gccontext->handle)
librdf_storage_postgresql_release_handle(gccontext->storage, gccontext->handle);
if(gccontext->current_context)
librdf_free_node(gccontext->current_context);
if(gccontext->storage)
librdf_storage_remove_reference(gccontext->storage);
LIBRDF_FREE(librdf_storage_postgresql_get_contexts_context, gccontext);
}
/**
* librdf_storage_postgresql_get_feature - get the value of a storage feature
* @storage: #librdf_storage object
* @feature: #librdf_uri feature property
*
* Return value: #librdf_node feature value or NULL if no such feature
* exists or the value is empty.
**/
static librdf_node*
librdf_storage_postgresql_get_feature(librdf_storage* storage, librdf_uri* feature)
{
unsigned char *uri_string;
if(!feature)
return NULL;
uri_string=librdf_uri_as_string(feature);
if(!uri_string)
return NULL;
if(!strcmp((const char*)uri_string, (const char*)LIBRDF_MODEL_FEATURE_CONTEXTS)) {
/* Always have contexts */
static const unsigned char value[2]="1";
return librdf_new_node_from_typed_literal(storage->world,
value,
NULL, NULL);
}
return NULL;
}
/**
* librdf_storage_postgresql_transaction_start:
* @storage: the storage object
*
* Start a transaction
*
* Return value: non-0 on failure
**/
static int
librdf_storage_postgresql_transaction_start(librdf_storage* storage)
{
librdf_storage_postgresql_context *context=(librdf_storage_postgresql_context *)storage->context;
const char query[]="START TRANSACTION";
if(context->transaction_handle) {
librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL,
"Postgresql transaction already started");
return 1;
}
context->transaction_handle=librdf_storage_postgresql_get_handle(storage);
if(!context->transaction_handle)
return 1;
if(!PQexec(context->transaction_handle, query)) {
librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL,
"Postgresql query failed: %s",
PQerrorMessage(context->transaction_handle));
librdf_storage_postgresql_release_handle(storage, context->transaction_handle);
context->transaction_handle=NULL;
return 1;
}
return 0;
}
/**
* librdf_storage_postgresql_transaction_start_with_handle:
* @storage: the storage object
* @handle: the transaction object
*
* Start a transaction using an existing external transaction object.
*
* Return value: non-0 on failure
**/
static int
librdf_storage_postgresql_transaction_start_with_handle(librdf_storage* storage,
void* handle)
{
return librdf_storage_postgresql_transaction_start(storage);
}
/**
* librdf_storage_postgresql_transaction_commit:
* @storage: the storage object
*
* Commit a transaction.
*
* Return value: non-0 on failure
**/
static int
librdf_storage_postgresql_transaction_commit(librdf_storage* storage)
{
librdf_storage_postgresql_context *context=(librdf_storage_postgresql_context *)storage->context;
const char query[]="COMMIT TRANSACTION";
PGconn* handle;
int status;
if(!context->transaction_handle)
return 1;
handle=context->transaction_handle;
status=(PQexec(context->transaction_handle, query) == NULL);
if(status) {
librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL,
"Postgresql query failed: %s",
PQerrorMessage(context->transaction_handle));
}
librdf_storage_postgresql_release_handle(storage, handle);
context->transaction_handle=NULL;
return (status != 0);
}
/**
* librdf_storage_postgresql_transaction_rollback:
* @storage: the storage object
*
* Rollback a transaction.
*
* Return value: non-0 on failure
**/
static int
librdf_storage_postgresql_transaction_rollback(librdf_storage* storage)
{
librdf_storage_postgresql_context *context=(librdf_storage_postgresql_context *)storage->context;
const char query[]="ROLLBACK TRANSACTION";
PGconn* handle;
int status;
if(!context->transaction_handle)
return 1;
handle=context->transaction_handle;
status=(PQexec(context->transaction_handle, query) == NULL);
if(status) {
librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL,
"Postgresql query failed: %s",
PQerrorMessage(context->transaction_handle));
}
librdf_storage_postgresql_release_handle(storage, handle);
context->transaction_handle=NULL;
return (status != 0);
}
/**
* librdf_storage_postgresql_transaction_get_handle:
* @storage: the storage object
*
* Get the current transaction handle.
*
* Return value: non-0 on failure
**/
static void*
librdf_storage_postgresql_transaction_get_handle(librdf_storage* storage)
{
librdf_storage_postgresql_context *context=(librdf_storage_postgresql_context *)storage->context;
return context->transaction_handle;
}
/* local function to register postgresql storage functions */
static void
librdf_storage_postgresql_register_factory(librdf_storage_factory *factory)
{
factory->context_length = sizeof(librdf_storage_postgresql_context);
factory->init = librdf_storage_postgresql_init;
factory->terminate = librdf_storage_postgresql_terminate;
factory->open = librdf_storage_postgresql_open;
factory->close = librdf_storage_postgresql_close;
factory->sync = librdf_storage_postgresql_sync;
factory->size = librdf_storage_postgresql_size;
factory->add_statement = librdf_storage_postgresql_add_statement;
factory->add_statements = librdf_storage_postgresql_add_statements;
factory->remove_statement = librdf_storage_postgresql_remove_statement;
factory->contains_statement = librdf_storage_postgresql_contains_statement;
factory->serialise = librdf_storage_postgresql_serialise;
factory->find_statements = librdf_storage_postgresql_find_statements;
factory->find_statements_with_options = librdf_storage_postgresql_find_statements_with_options;
factory->context_add_statement = librdf_storage_postgresql_context_add_statement;
factory->context_add_statements = librdf_storage_postgresql_context_add_statements;
factory->context_remove_statement = librdf_storage_postgresql_context_remove_statement;
factory->context_remove_statements = librdf_storage_postgresql_context_remove_statements;
factory->context_serialise = librdf_storage_postgresql_context_serialise;
factory->find_statements_in_context = librdf_storage_postgresql_find_statements_in_context;
factory->get_contexts = librdf_storage_postgresql_get_contexts;
factory->get_feature = librdf_storage_postgresql_get_feature;
factory->transaction_start = librdf_storage_postgresql_transaction_start;
factory->transaction_start_with_handle = librdf_storage_postgresql_transaction_start_with_handle;
factory->transaction_commit = librdf_storage_postgresql_transaction_commit;
factory->transaction_rollback = librdf_storage_postgresql_transaction_rollback;
factory->transaction_get_handle = librdf_storage_postgresql_transaction_get_handle;
}
/**
* librdf_init_storage_postgresql:
* @world: world object
*
* INTERNAL - initialise the storage_postgresql module.
**/
void
librdf_init_storage_postgresql(librdf_world *world)
{
librdf_storage_register_factory(world, "postgresql", "postgresql database store",
&librdf_storage_postgresql_register_factory);
}