/* -*- Mode: c; c-basic-offset: 2 -*- * * rdf_storage_postgresql.c - RDF Storage in PostgreSQL DB interface definition. * * Based in part on rdf_storage_mysql. * * Copyright (C) 2003-2005 Shi Wenzhong - email to shiwenzhong@hz.cn * Copyright (C) 2000-2008, David Beckett http://www.dajobe.org/ * Copyright (C) 2000-2005, University of Bristol, UK http://www.bristol.ac.uk/ * * This package is Free Software and part of Redland http://librdf.org/ * * It is licensed under the following three licenses as alternatives: * 1. GNU Lesser General Public License (LGPL) V2.1 or any newer version * 2. GNU General Public License (GPL) V2 or any newer version * 3. Apache License, V2.0 or any newer version * * You may not use this file except in compliance with at least one of * the above three licenses. * * See LICENSE.html or LICENSE.txt at the top of this package for the * complete terms and further detail along with the license texts for * the licenses in COPYING.LIB, COPYING and LICENSE-2.0.txt respectively. * * */ #ifdef HAVE_CONFIG_H #include #endif #ifdef WIN32 #include #endif #include #include #ifdef HAVE_STDLIB_H #include /* for abort() as used in errors */ #endif #include #include #include #include typedef enum { /* Status of individual postgresql connections */ LIBRDF_STORAGE_POSTGRESQL_CONNECTION_CLOSED = 0, LIBRDF_STORAGE_POSTGRESQL_CONNECTION_OPEN = 1, LIBRDF_STORAGE_POSTGRESQL_CONNECTION_BUSY = 2 } librdf_storage_postgresql_connection_status; typedef struct { /* A postgresql connection */ librdf_storage_postgresql_connection_status status; PGconn *handle; } librdf_storage_postgresql_connection; typedef struct { /* postgresql connection parameters */ const char *host; char *port; char *dbname; char *user; const char *password; /* Array of virtual postgresql connections */ librdf_storage_postgresql_connection *connections; int connections_count; /* hash of model name in the database (table Models, column ID) */ u64 model; /* if inserts should be optimized by locking and index optimizations */ int bulk; /* if a table with merged models should be maintained */ int merge; /* digest object for node hashes */ librdf_digest *digest; PGconn* transaction_handle; } librdf_storage_postgresql_context; /* prototypes for local functions */ static int librdf_storage_postgresql_init(librdf_storage* storage, const char *name, librdf_hash* options); static int librdf_storage_postgresql_merge(librdf_storage* storage); static void librdf_storage_postgresql_terminate(librdf_storage* storage); static int librdf_storage_postgresql_open(librdf_storage* storage, librdf_model* model); static int librdf_storage_postgresql_close(librdf_storage* storage); static int librdf_storage_postgresql_sync(librdf_storage* storage); static int librdf_storage_postgresql_size(librdf_storage* storage); static int librdf_storage_postgresql_add_statement(librdf_storage* storage, librdf_statement* statement); static int librdf_storage_postgresql_add_statements(librdf_storage* storage, librdf_stream* statement_stream); static int librdf_storage_postgresql_remove_statement(librdf_storage* storage, librdf_statement* statement); static int librdf_storage_postgresql_contains_statement(librdf_storage* storage, librdf_statement* statement); librdf_stream* librdf_storage_postgresql_serialise(librdf_storage* storage); static librdf_stream* librdf_storage_postgresql_find_statements(librdf_storage* storage, librdf_statement* statement); static librdf_stream* librdf_storage_postgresql_find_statements_with_options(librdf_storage* storage, librdf_statement* statement, librdf_node* context_node, librdf_hash* options); /* context functions */ static int librdf_storage_postgresql_context_add_statement(librdf_storage* storage, librdf_node* context_node, librdf_statement* statement); static int librdf_storage_postgresql_context_add_statements(librdf_storage* storage, librdf_node* context_node, librdf_stream* statement_stream); static int librdf_storage_postgresql_context_remove_statement(librdf_storage* storage, librdf_node* context_node, librdf_statement* statement); static int librdf_storage_postgresql_context_remove_statements(librdf_storage* storage, librdf_node* context_node); static librdf_stream* librdf_storage_postgresql_context_serialise(librdf_storage* storage, librdf_node* context_node); static librdf_stream* librdf_storage_postgresql_find_statements_in_context(librdf_storage* storage, librdf_statement* statement, librdf_node* context_node); static librdf_iterator* librdf_storage_postgresql_get_contexts(librdf_storage* storage); static void librdf_storage_postgresql_register_factory(librdf_storage_factory *factory); /* "private" helper definitions */ typedef struct { librdf_storage *storage; librdf_statement *current_statement; librdf_node *current_context; librdf_statement *query_statement; librdf_node *query_context; PGconn *handle; PGresult *results; int current_rowno; char **row; int is_literal_match; } librdf_storage_postgresql_sos_context; typedef struct { librdf_storage *storage; librdf_node *current_context; PGconn *handle; PGresult *results; int current_rowno; char **row; } librdf_storage_postgresql_get_contexts_context; static u64 librdf_storage_postgresql_hash(librdf_storage* storage, const char *type, const char *string, int length); static u64 librdf_storage_postgresql_node_hash(librdf_storage* storage, librdf_node* node,int add); static int librdf_storage_postgresql_start_bulk(librdf_storage* storage); static int librdf_storage_postgresql_stop_bulk(librdf_storage* storage); static int librdf_storage_postgresql_context_add_statement_helper(librdf_storage* storage, u64 ctxt, librdf_statement* statement); static int librdf_storage_postgresql_find_statements_in_context_augment_query(char **query, const char *addition); /* methods for stream of statements */ static int librdf_storage_postgresql_find_statements_in_context_end_of_stream(void* context); static int librdf_storage_postgresql_find_statements_in_context_next_statement(void* context); static void* librdf_storage_postgresql_find_statements_in_context_get_statement(void* context, int flags); static void librdf_storage_postgresql_find_statements_in_context_finished(void* context); /* methods for iterator for contexts */ static int librdf_storage_postgresql_get_contexts_end_of_iterator(void* context); static int librdf_storage_postgresql_get_contexts_next_context(void* context); static void* librdf_storage_postgresql_get_contexts_get_context(void* context, int flags); static void librdf_storage_postgresql_get_contexts_finished(void* context); static int librdf_storage_postgresql_transaction_rollback(librdf_storage* storage); /* functions implementing storage api */ /* * librdf_storage_postgresql_hash - Find hash value of string. * @storage: the storage * @type: character type of node to hash ("R", "L" or "B") * @string: a string to get hash for * @length: length of string * * Find hash value of string. * * Return value: Non-zero on succes. **/ static u64 librdf_storage_postgresql_hash(librdf_storage* storage, const char *type, const char *string, int length) { librdf_storage_postgresql_context* context=(librdf_storage_postgresql_context*)storage->context; u64 hash; byte* digest; int i; /* (Re)initialize digest object */ librdf_digest_init(context->digest); /* Update digest with data */ if(type) librdf_digest_update(context->digest, (unsigned char*)type, 1); librdf_digest_update(context->digest, (unsigned char*)string, length); librdf_digest_final(context->digest); /* Copy first 8 bytes of digest into unsigned 64bit hash * using a method portable across big/little endianness * * Fixes Issue#0000023 - http://bugs.librdf.org/mantis/view.php?id=23 */ digest = (byte*) librdf_digest_get_digest(context->digest); hash = 0; for(i=0; i<8; i++) hash += ((u64) digest[i]) << (i*8); return hash; } /* * librdf_storage_postgresql_init_connections - Initialize postgresql connection pool. * @storage: the storage * * Return value: Non-zero on success. **/ static int librdf_storage_postgresql_init_connections(librdf_storage* storage) { librdf_storage_postgresql_context* context=(librdf_storage_postgresql_context*)storage->context; /* Reset connection pool */ context->connections=NULL; context->connections_count=0; return 0; } /* * librdf_storage_postgresql_finish_connections - Finish all connections in postgresql connection pool and free structures. * @storage: the storage * * Return value: None. **/ static void librdf_storage_postgresql_finish_connections(librdf_storage* storage) { librdf_storage_postgresql_context* context=(librdf_storage_postgresql_context*)storage->context; int i; /* Loop through connections and close */ for(i=0; i < context->connections_count; i++) { if(LIBRDF_STORAGE_POSTGRESQL_CONNECTION_CLOSED != context->connections[i].status) PQfinish(context->connections[i].handle); } /* Free structure and reset */ if (context->connections_count) { LIBRDF_FREE(librdf_storage_postgresql_connection*, context->connections); context->connections=NULL; context->connections_count=0; } } /* * librdf_storage_postgresql_get_handle - get a connection handle to the postgresql server * @storage: the storage * * This attempts to reuses any existing available pooled connection * otherwise creates a new connection to the server. * * Return value: Non-zero on succes. **/ static PGconn* librdf_storage_postgresql_get_handle(librdf_storage* storage) { librdf_storage_postgresql_context* context=(librdf_storage_postgresql_context*)storage->context; librdf_storage_postgresql_connection* connection= NULL; int i; char conninfo[256]; if(context->transaction_handle) return context->transaction_handle; /* Look for an open connection handle to return */ for(i=0; i < context->connections_count; i++) { if(LIBRDF_STORAGE_POSTGRESQL_CONNECTION_OPEN == context->connections[i].status) { context->connections[i].status=LIBRDF_STORAGE_POSTGRESQL_CONNECTION_BUSY; return context->connections[i].handle; } } /* Look for a closed connection */ for(i=0; i < context->connections_count && !connection; i++) { if(LIBRDF_STORAGE_POSTGRESQL_CONNECTION_CLOSED == context->connections[i].status) { connection=&context->connections[i]; break; } } /* Expand connection pool if no closed connection was found */ if (!connection) { /* Allocate new buffer with two extra slots */ librdf_storage_postgresql_connection* connections; if(!(connections=(librdf_storage_postgresql_connection*) LIBRDF_CALLOC(librdf_storage_postgresql_connection, context->connections_count+2, sizeof(librdf_storage_postgresql_connection)))) return NULL; if (context->connections_count) { /* Copy old buffer to new */ memcpy(connections, context->connections, sizeof(librdf_storage_postgresql_connection)*context->connections_count); /* Free old buffer */ LIBRDF_FREE(librdf_storage_postgresql_connection*, context->connections); } /* Update buffer size and reset new connections */ context->connections_count+=2; connection=&connections[context->connections_count-2]; connection->status=LIBRDF_STORAGE_POSTGRESQL_CONNECTION_CLOSED; connection->handle=NULL; connections[context->connections_count-1].status=LIBRDF_STORAGE_POSTGRESQL_CONNECTION_CLOSED; connections[context->connections_count-1].handle=NULL; context->connections=connections; } /* Initialize closed postgresql connection handle */ /* Create connection to database for handle */ sprintf(conninfo,"host=%s port=%s dbname=%s user=%s password=%s", context->host,context->port,context->dbname,context->user,context->password); connection->handle=PQconnectdb(conninfo); if( PQstatus(connection->handle) != CONNECTION_OK ) { fprintf(stdout,"Connection to postgresql database %s:%s name %s as user %s failed: %s", context->host, context->port, context->dbname, context->user, PQerrorMessage(connection->handle)); librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL, "Connection to postgresql database %s:%s name %s as user %s failed: %s", context->host, context->port, context->dbname, context->user, PQerrorMessage(connection->handle)); return NULL; } /* Update status and return */ connection->status=LIBRDF_STORAGE_POSTGRESQL_CONNECTION_BUSY; return connection->handle; } /* * librdf_storage_postgresql_release_handle - Release a connection handle to postgresql server back to the pool * @storage: the storage * @handle: the postgresql handle to release * * Return value: None. **/ static void librdf_storage_postgresql_release_handle(librdf_storage* storage, PGconn *handle) { librdf_storage_postgresql_context* context=(librdf_storage_postgresql_context*)storage->context; int i; /* Look for busy connection handle to drop */ for(i=0; i < context->connections_count; i++) { if(LIBRDF_STORAGE_POSTGRESQL_CONNECTION_BUSY == context->connections[i].status && context->connections[i].handle == handle) { context->connections[i].status=LIBRDF_STORAGE_POSTGRESQL_CONNECTION_OPEN; return; } } librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL, "Unable to find busy connection (in pool of %i connections) to drop for postgresql server thread: %d", context->connections_count, PQbackendPID(handle)); } /** * librdf_storage_postgresql_init: * @storage: the storage * @name: model name * @options: host, port, database, user, password [, new] [, bulk] [, merge]. * * Create connection to database. Defaults to port 5432 if not given. * * The boolean bulk option can be set to true if optimized inserts (table * locks and temporary key disabling) is wanted. Note that this will block * all other access, and requires table locking and alter table privileges. * * The boolean merge option can be set to true if a merged "view" of all * models should be maintained. This "view" will be a table with TYPE=MERGE. * * Return value: Non-zero on failure. **/ static int librdf_storage_postgresql_init(librdf_storage* storage, const char *name, librdf_hash* options) { librdf_storage_postgresql_context *context=(librdf_storage_postgresql_context*)storage->context; const char create_table_statements[]="\ CREATE TABLE Statements" UINT64_T_FMT " (\ Subject numeric(20) NOT NULL,\ Predicate numeric(20) NOT NULL,\ Object numeric(20) NOT NULL,\ Context numeric(20) NOT NULL\ ) "; const char create_table_literals[]="\ CREATE TABLE Literals (\ ID numeric(20) NOT NULL,\ Value text NOT NULL,\ Language text NOT NULL,\ Datatype text NOT NULL,\ PRIMARY KEY (ID)\ ) "; const char create_table_resources[]="\ CREATE TABLE Resources (\ ID numeric(20) NOT NULL,\ URI text NOT NULL,\ PRIMARY KEY (ID)\ ) "; const char create_table_bnodes[]="\ CREATE TABLE Bnodes (\ ID numeric(20) NOT NULL,\ Name text NOT NULL,\ PRIMARY KEY (ID)\ ) "; const char create_table_models[]="\ CREATE TABLE Models (\ ID numeric(20) NOT NULL,\ Name text NOT NULL,\ PRIMARY KEY (ID)\ ) "; const char create_model[]="INSERT INTO Models (ID,Name) VALUES (" UINT64_T_FMT ",'%s')"; const char check_model[]="SELECT 1 FROM Models WHERE ID=" UINT64_T_FMT " AND Name='%s'"; int status=0; char *escaped_name=NULL; char *query=NULL; PGresult *res; PGconn *handle; /* Must have connection parameters passed as options */ if(!options) return 1; /* Create digest */ if(!(context->digest=librdf_new_digest(storage->world,"MD5"))) return 1; /* Save hash of model name */ context->model=librdf_storage_postgresql_hash(storage, NULL, name, strlen(name)); /* Save connection parameters */ context->host=librdf_hash_get(options, "host"); context->port=librdf_hash_get(options, "port"); if(context->port==NULL ) { context->port=(char*)LIBRDF_MALLOC(cstring, 10); strcpy(context->port,"5432"); /* default postgresql port */ } context->dbname=librdf_hash_get(options, "database"); if(!context->dbname) context->dbname=librdf_hash_get(options, "dbname"); context->user=librdf_hash_get(options, "user"); if(context->user && !context->dbname) { context->dbname=(char*)LIBRDF_MALLOC(cstring, strlen(context->user)+1); strcpy(context->dbname, context->user); /* default dbname=user */ } context->password=librdf_hash_get(options, "password"); if(!context->host || !context->dbname || !context->user || !context->port || !context->password) return 1; /* Maintain merge table? */ context->merge=(librdf_hash_get_as_boolean(options, "merge")>0); /* Initialize postgresql connections */ librdf_storage_postgresql_init_connections(storage); /* Get postgresql connection handle */ handle=librdf_storage_postgresql_get_handle(storage); if(!handle) return 1; /* Create tables, if new and not existing */ if(!status && (librdf_hash_get_as_boolean(options, "new")>0)) { query=(char*)LIBRDF_MALLOC(cstring, strlen(create_table_statements)+20); if(! query) status=1; else { sprintf(query, create_table_statements, context->model); if(! PQexec(handle, query) || ! PQexec(handle, create_table_literals) || ! PQexec(handle, create_table_resources) || ! PQexec(handle, create_table_bnodes) || ! PQexec(handle, create_table_models)) { librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL, "postgresql table creation failed: %s", PQerrorMessage(handle)); status=-1; } LIBRDF_FREE(cstring, query); } } /* Create model if new and not existing, or check for existence */ if(!status) { if(!(escaped_name=(char*)LIBRDF_MALLOC(cstring,strlen(name)*2+1))) status=1; PQescapeString(escaped_name,(const char*)name, strlen(name)); } if(!status && (librdf_hash_get_as_boolean(options, "new")>0)) { /* Create new model */ if(!(query=(char*)LIBRDF_MALLOC(cstring,strlen(create_model)+20+ strlen(escaped_name)+1))) status=1; sprintf(query, create_model, context->model, escaped_name); if(!status && !(res=(PQexec(handle, query))) && PQresultStatus(res) != PGRES_COMMAND_OK) { librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL, "postgresql insert into Models table failed: %s", PQresultErrorMessage(res)); status=-1; } /* Maintain merge table? */ if(!status && context->merge) status=librdf_storage_postgresql_merge(storage); } else if(!status) { /* Check for model existence */ if(!(query=(char*)LIBRDF_MALLOC(cstring,strlen(check_model)+20+ strlen(escaped_name)+1))) status=1; sprintf(query, check_model, context->model, name); res=NULL; if( !status && !(res=(PQexec(handle, query))) ) { librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL, "postgresql select from Models table failed: %s", PQresultErrorMessage(res)); status=-1; } if(!status && !(PQntuples(res))) { librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL, "Unknown model: %s",name); status=1; } if(res) PQclear(res); } if(query) LIBRDF_FREE(cstring, query); if(escaped_name) LIBRDF_FREE(cstring, escaped_name); /* Optimize loads? */ context->bulk=(librdf_hash_get_as_boolean(options, "bulk")>0); /* Truncate model? */ if(!status && (librdf_hash_get_as_boolean(options, "new")>0)) status=librdf_storage_postgresql_context_remove_statements(storage, NULL); /* Unused options: write (always...) */ librdf_free_hash(options); librdf_storage_postgresql_release_handle(storage, handle); return status; } /* * librdf_storage_postgresql_merge - (re)create merged "view" of all models * @storage: the storage * * Return value: Non-zero on failure. */ static int librdf_storage_postgresql_merge(librdf_storage* storage) { const char get_models[]="SELECT ID FROM Models"; const char drop_table_statements[]="DROP TABLE Statements"; const char insert_statements[]="INSERT INTO statements SELECT * FROM "; const char create_table_statements[]="\ CREATE TABLE Statements (\ Subject numeric(20) NOT NULL,\ Predicate numeric(20) NOT NULL,\ Object numeric(20) NOT NULL,\ Context numeric(20) NOT NULL\ ) "; char *query=NULL; PGresult *res; int i; PGconn *handle; /* Get postgresql connection handle */ handle=librdf_storage_postgresql_get_handle(storage); if(!handle) return 1; /* Drop and create merge table. */ if(! PQexec(handle, drop_table_statements) || ! PQexec(handle, create_table_statements)) { librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL, "postgresql merge table creation failed: %s", PQerrorMessage(handle)); librdf_storage_postgresql_release_handle(storage, handle); return -1; } /* Query for list of models. */ if(!(res=PQexec(handle, get_models))) { librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL, "postgresql query for model list failed: %s", PQresultErrorMessage(res)); librdf_storage_postgresql_release_handle(storage, handle); return -1; } /* Allocate space for merge table generation query. */ if(!(query=(char*)LIBRDF_MALLOC(cstring, strlen(insert_statements)+ 50))) { librdf_storage_postgresql_release_handle(storage, handle); return 1; } /* Generate CSV list of models. */ for(i=0;iworld, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL, "postgresql merge table insert failed: %s", PQerrorMessage(handle)); librdf_storage_postgresql_release_handle(storage, handle); return -1; } } LIBRDF_FREE(cstring, query); PQclear(res); librdf_storage_postgresql_release_handle(storage, handle); return 0; } /** * librdf_storage_postgresql_terminate: * @storage: the storage * * Close the storage and database connections. * * Return value: None. **/ static void librdf_storage_postgresql_terminate(librdf_storage* storage) { librdf_storage_postgresql_context *context=(librdf_storage_postgresql_context*)storage->context; librdf_storage_postgresql_finish_connections(storage); if(context->password) LIBRDF_FREE(cstring,(char *)context->password); if(context->user) LIBRDF_FREE(cstring,(char *)context->user); if(context->dbname) LIBRDF_FREE(cstring,(char *)context->dbname); if(context->port) LIBRDF_FREE(cstring,(char *)context->port); if(context->host) LIBRDF_FREE(cstring,(char *)context->host); if(context->digest) librdf_free_digest(context->digest); if(context->transaction_handle) librdf_storage_postgresql_transaction_rollback(storage); } /** * librdf_storage_postgresql_open: * @storage: the storage * @model: the model * * Create or open model in database (nop). * * Return value: Non-zero on failure. **/ static int librdf_storage_postgresql_open(librdf_storage* storage, librdf_model* model) { return 0; } /** * librdf_storage_postgresql_close: * @storage: the storage * * Close model (nop). * * Return value: Non-zero on failure. **/ static int librdf_storage_postgresql_close(librdf_storage* storage) { librdf_storage_postgresql_transaction_rollback(storage); return librdf_storage_postgresql_sync(storage); } /** * librdf_storage_postgresql_sync - Flush all tables, making sure they are saved on disk. * @storage: the storage * * Return value: Non-zero on failure. **/ static int librdf_storage_postgresql_sync(librdf_storage* storage) { librdf_storage_postgresql_context *context=(librdf_storage_postgresql_context*)storage->context; /* Make sure optimizing for bulk operations is stopped? */ if(context->bulk) librdf_storage_postgresql_stop_bulk(storage); return 0; } /** * librdf_storage_postgresql_size: * @storage: the storage * * Close model (nop). * * Return value: Negative on failure. **/ static int librdf_storage_postgresql_size(librdf_storage* storage) { librdf_storage_postgresql_context *context=(librdf_storage_postgresql_context*)storage->context; char model_size[]="SELECT COUNT(*) FROM Statements" UINT64_T_FMT; char *query; PGresult *res; int count; PGconn *handle; /* Get postgresql connection handle */ handle=librdf_storage_postgresql_get_handle(storage); if(!handle) return -1; /* Query for number of statements */ if(!(query=(char*)LIBRDF_MALLOC(cstring, strlen(model_size)+21))) { librdf_storage_postgresql_release_handle(storage, handle); return -1; } sprintf(query, model_size, context->model); if(!(res=PQexec(handle, query)) || !(PQntuples(res))) { librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL, "postgresql query for model size failed: %s", PQresultErrorMessage(res)); LIBRDF_FREE(cstring,query); librdf_storage_postgresql_release_handle(storage, handle); return -1; } count=atol(PQgetvalue(res,0,0)); PQclear(res); LIBRDF_FREE(cstring,query); librdf_storage_postgresql_release_handle(storage, handle); return count; } static int librdf_storage_postgresql_add_statement(librdf_storage* storage, librdf_statement* statement) { /* Do not add duplicate statements */ if(librdf_storage_postgresql_contains_statement(storage, statement)) return 0; return librdf_storage_postgresql_context_add_statement_helper(storage, 0, statement); } /** * librdf_storage_postgresql_add_statements: * @storage: the storage * @statement_stream: the stream of statements * * Add statements in stream to storage, without context. * * Return value: Non-zero on failure. **/ static int librdf_storage_postgresql_add_statements(librdf_storage* storage, librdf_stream* statement_stream) { return librdf_storage_postgresql_context_add_statements(storage, NULL, statement_stream); } /* * librdf_storage_postgresql_node_hash - Create hash value for node * @storage: the storage * @node: a node to get hash for (and possibly create in database) * @add: whether to add the node to the database * * Return value: Non-zero on succes. **/ static u64 librdf_storage_postgresql_node_hash(librdf_storage* storage, librdf_node* node, int add) { librdf_node_type type=librdf_node_get_type(node); u64 hash; size_t nodelen; char *query; PGconn *handle; PGresult *res; /* Get postgresql connection handle */ handle=librdf_storage_postgresql_get_handle(storage); if(!handle) return 0; if(type==LIBRDF_NODE_TYPE_RESOURCE) { /* Get hash */ unsigned char *uri=librdf_uri_as_counted_string(librdf_node_get_uri(node), &nodelen); hash=librdf_storage_postgresql_hash(storage, "R", (char*)uri, nodelen); if(add) { char create_resource[]="INSERT INTO Resources (ID,URI) VALUES (" UINT64_T_FMT ",'%s')"; /* Escape URI for db query */ char *escaped_uri; if(!(escaped_uri=(char*)LIBRDF_MALLOC(cstring, nodelen*2+1))) { librdf_storage_postgresql_release_handle(storage, handle); return 0; } PQescapeString(escaped_uri,(const char*)uri, nodelen); /* Create new resource, ignore if existing */ if(!(query=(char*)LIBRDF_MALLOC(cstring, strlen(create_resource)+ 20+nodelen+1))) { LIBRDF_FREE(cstring,escaped_uri); librdf_storage_postgresql_release_handle(storage, handle); return 0; } sprintf(query, create_resource, hash, escaped_uri); LIBRDF_FREE(cstring,escaped_uri); if(!(res=PQexec(handle, query)) && PQresultStatus(res) != PGRES_COMMAND_OK) { librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL, "postgresql insert into Resources failed with error %s", PQresultErrorMessage(res)); LIBRDF_FREE(cstring,query); librdf_storage_postgresql_release_handle(storage, handle); return 0; } LIBRDF_FREE(cstring,query); } } else if(type==LIBRDF_NODE_TYPE_LITERAL) { /* Get hash */ unsigned char *value, *datatype=0; char *lang, *nodestring; librdf_uri *dt; size_t valuelen, langlen=0, datatypelen=0; value=librdf_node_get_literal_value_as_counted_string(node,&valuelen); lang=librdf_node_get_literal_value_language(node); if(lang) langlen=strlen(lang); dt=librdf_node_get_literal_value_datatype_uri(node); if(dt) datatype=librdf_uri_as_counted_string(dt,&datatypelen); if(datatype) datatypelen=strlen((const char*)datatype); /* Create composite node string for hash generation */ if(!(nodestring=(char*)LIBRDF_MALLOC(cstring, valuelen+langlen+datatypelen+3))) { librdf_storage_postgresql_release_handle(storage, handle); return 0; } strcpy(nodestring, (const char*)value); strcat(nodestring, "<"); if(lang) strcat(nodestring, lang); strcat(nodestring, ">"); if(datatype) strcat(nodestring, (const char*)datatype); nodelen=valuelen+langlen+datatypelen+2; hash=librdf_storage_postgresql_hash(storage, "L", nodestring, nodelen); LIBRDF_FREE(cstring,nodestring); if(add) { char create_literal[]="INSERT INTO Literals (ID,Value,Language,Datatype) VALUES (" UINT64_T_FMT ",'%s','%s','%s')"; /* Escape value, lang and datatype for db query */ char *escaped_value, *escaped_lang, *escaped_datatype; if(!(escaped_value=(char*)LIBRDF_MALLOC(cstring, valuelen*2+1)) || !(escaped_lang=(char*)LIBRDF_MALLOC(cstring, langlen*2+1)) || !(escaped_datatype=(char*)LIBRDF_MALLOC(cstring, datatypelen*2+1))) { librdf_storage_postgresql_release_handle(storage, handle); return 0; } PQescapeString(escaped_value, (const char*)value, valuelen); if(lang) PQescapeString( escaped_lang, (const char*)lang, langlen); else strcpy(escaped_lang,""); if(datatype) PQescapeString( escaped_datatype, (const char*)datatype, datatypelen); else strcpy(escaped_datatype,""); /* Create new literal, ignore if existing */ if(!(query=(char*)LIBRDF_MALLOC(cstring, strlen(create_literal)+ strlen(escaped_value)+ strlen(escaped_lang)+ strlen(escaped_datatype)+21))) { LIBRDF_FREE(cstring,escaped_value); LIBRDF_FREE(cstring,escaped_lang); LIBRDF_FREE(cstring,escaped_datatype); librdf_storage_postgresql_release_handle(storage, handle); return 0; } sprintf(query, create_literal, hash, escaped_value, escaped_lang, escaped_datatype); LIBRDF_FREE(cstring,escaped_value); LIBRDF_FREE(cstring,escaped_lang); LIBRDF_FREE(cstring,escaped_datatype); if(!(res=PQexec(handle, query)) && PQresultStatus(res) != PGRES_COMMAND_OK) { librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL, "postgresql insert into Literals failed: %s", PQresultErrorMessage(res)); LIBRDF_FREE(cstring,query); librdf_storage_postgresql_release_handle(storage, handle); return 0; } LIBRDF_FREE(cstring,query); } } else if(type==LIBRDF_NODE_TYPE_BLANK) { /* Get hash */ unsigned char *name=librdf_node_get_blank_identifier(node); nodelen=strlen((const char*)name); hash=librdf_storage_postgresql_hash(storage, "B", (char*)name, nodelen); if(add) { char create_bnode[]="INSERT INTO Bnodes (ID,Name) VALUES (" UINT64_T_FMT ",'%s')"; /* Escape name for db query */ char *escaped_name; if(!(escaped_name=(char*)LIBRDF_MALLOC(cstring, nodelen*2+1))) { librdf_storage_postgresql_release_handle(storage, handle); return 0; } PQescapeString(escaped_name, (const char*)name, nodelen); /* Create new bnode, ignore if existing */ if(!(query=(char*)LIBRDF_MALLOC(cstring, strlen(create_bnode)+ strlen(escaped_name)+21))) { LIBRDF_FREE(cstring,escaped_name); librdf_storage_postgresql_release_handle(storage, handle); return 0; } sprintf(query, create_bnode, hash, escaped_name); LIBRDF_FREE(cstring,escaped_name); if(!(res=PQexec(handle, query)) && PQresultStatus(res) != PGRES_COMMAND_OK) { librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL, "postgresql insert into Bnodes failed: %s", PQresultErrorMessage(res)); LIBRDF_FREE(cstring,query); librdf_storage_postgresql_release_handle(storage, handle); return 0; } LIBRDF_FREE(cstring,query); } } else { librdf_storage_postgresql_release_handle(storage, handle); return 0; } librdf_storage_postgresql_release_handle(storage, handle); return hash; } /* * librdf_storage_postgresql_start_bulk - Prepare for bulk insert operation * @storage: the storage * * Return value: Non-zero on failure. */ static int librdf_storage_postgresql_start_bulk(librdf_storage* storage) { return 1; } /* * librdf_storage_postgresql_stop_bulk - End bulk insert operation * @storage: the storage * * Return value: Non-zero on failure. */ static int librdf_storage_postgresql_stop_bulk(librdf_storage* storage) { return 1; } /** * librdf_storage_postgresql_context_add_statements: * @storage: the storage * @context_node: #librdf_node object * @statement_stream: the stream of statements * * Add statements in stream to storage, with context. * * Return value: Non-zero on failure. **/ static int librdf_storage_postgresql_context_add_statements(librdf_storage* storage, librdf_node* context_node, librdf_stream* statement_stream) { librdf_storage_postgresql_context* context=(librdf_storage_postgresql_context*)storage->context; u64 ctxt=0; int helper=0; /* Optimize for bulk loads? */ if(context->bulk) { if(librdf_storage_postgresql_start_bulk(storage)) return 1; } /* Find hash for context, creating if necessary */ if(context_node) { ctxt=librdf_storage_postgresql_node_hash(storage,context_node,1); if(!ctxt) return 1; } while(!helper && !librdf_stream_end(statement_stream)) { librdf_statement* statement=librdf_stream_get_object(statement_stream); if(!context->bulk) { /* Do not add duplicate statements * but do not check for this when in bulk mode. */ if(librdf_storage_postgresql_contains_statement(storage, statement)) continue; } helper=librdf_storage_postgresql_context_add_statement_helper(storage, ctxt, statement); librdf_stream_next(statement_stream); } return helper; } /** * librdf_storage_postgresql_context_add_statement - Add a statement to a storage context * @storage: #librdf_storage object * @context_node: #librdf_node object * @statement: #librdf_statement statement to add * * Return value: non 0 on failure **/ static int librdf_storage_postgresql_context_add_statement(librdf_storage* storage, librdf_node* context_node, librdf_statement* statement) { u64 ctxt=0; /* Find hash for context, creating if necessary */ if(context_node) { ctxt=librdf_storage_postgresql_node_hash(storage,context_node,1); if(!ctxt) return 1; } return librdf_storage_postgresql_context_add_statement_helper(storage, ctxt, statement); } /* * librdf_storage_postgresql_context_add_statement_helper - Perform actual addition of a statement to a storage context * @storage: #librdf_storage object * @ctxt: u64 context hash * @statement: #librdf_statement statement to add * * Return value: non-zero on failure **/ static int librdf_storage_postgresql_context_add_statement_helper(librdf_storage* storage, u64 ctxt, librdf_statement* statement) { librdf_storage_postgresql_context* context=(librdf_storage_postgresql_context*)storage->context; char insert_statement[]="INSERT INTO Statements" UINT64_T_FMT " (Subject,Predicate,Object,Context) VALUES (" UINT64_T_FMT "," UINT64_T_FMT "," UINT64_T_FMT "," UINT64_T_FMT ")"; u64 subject, predicate, object; char *query; PGconn *handle; PGresult *res; /* Get postgresql connection handle */ handle=librdf_storage_postgresql_get_handle(storage); if(!handle) return 1; /* Find hashes for nodes, creating if necessary */ subject=librdf_storage_postgresql_node_hash(storage, librdf_statement_get_subject(statement),1); predicate=librdf_storage_postgresql_node_hash(storage, librdf_statement_get_predicate(statement),1); object=librdf_storage_postgresql_node_hash(storage, librdf_statement_get_object(statement),1); if(!subject || !predicate || !object) { librdf_storage_postgresql_release_handle(storage, handle); return 1; } /* Add statement to storage */ if(!(query=(char*)LIBRDF_MALLOC(cstring, strlen(insert_statement)+101))) { librdf_storage_postgresql_release_handle(storage, handle); return 1; } sprintf(query, insert_statement, context->model, subject, predicate, object, ctxt); if(!(res=PQexec(handle, query))) { librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL, "postgresql insert into Statements failed: %s", PQresultErrorMessage(res)); LIBRDF_FREE(cstring,query); librdf_storage_postgresql_release_handle(storage, handle); return -1; } LIBRDF_FREE(cstring,query); librdf_storage_postgresql_release_handle(storage, handle); return 0; } /** * librdf_storage_postgresql_contains_statement - Test if a given complete statement is present in the model * @storage: the storage * @statement: a complete statement * * Return value: Non-zero if the model contains the statement. **/ static int librdf_storage_postgresql_contains_statement(librdf_storage* storage, librdf_statement* statement) { librdf_storage_postgresql_context* context=(librdf_storage_postgresql_context*)storage->context; char find_statement[]="SELECT 1 FROM Statements" UINT64_T_FMT " WHERE Subject=" UINT64_T_FMT " AND Predicate=" UINT64_T_FMT " AND Object=" UINT64_T_FMT " limit 1"; u64 subject, predicate, object; char *query; PGresult *res; PGconn *handle; /* Get postgresql connection handle */ handle=librdf_storage_postgresql_get_handle(storage); if(!handle) return 0; /* Find hashes for nodes */ subject=librdf_storage_postgresql_node_hash(storage, librdf_statement_get_subject(statement),0); predicate=librdf_storage_postgresql_node_hash(storage, librdf_statement_get_predicate(statement),0); object=librdf_storage_postgresql_node_hash(storage, librdf_statement_get_object(statement),0); if(!subject || !predicate || !object) { librdf_storage_postgresql_release_handle(storage, handle); return 0; } /* Check for statement */ if(!(query=(char*)LIBRDF_MALLOC(cstring, strlen(find_statement)+81))) { librdf_storage_postgresql_release_handle(storage, handle); return 0; } sprintf(query, find_statement, context->model, subject, predicate, object); if(!(res=PQexec(handle, query)) ) { librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL, "postgresql query for statement failed: %s", PQerrorMessage(handle)); LIBRDF_FREE(cstring,query); librdf_storage_postgresql_release_handle(storage, handle); return 0; } LIBRDF_FREE(cstring, query); if(!(PQntuples(res))) { if(res) PQclear(res); librdf_storage_postgresql_release_handle(storage, handle); return 0; } if(res) PQclear(res); librdf_storage_postgresql_release_handle(storage, handle); return 1; } /** * librdf_storage_postgresql_remove_statement - Remove a statement from storage * @storage: #librdf_storage object * @statement: #librdf_statement statement to remove * * Return value: non-zero on failure **/ static int librdf_storage_postgresql_remove_statement(librdf_storage* storage, librdf_statement* statement) { return librdf_storage_postgresql_context_remove_statement(storage,NULL,statement); } /** * librdf_storage_postgresql_context_remove_statement - Remove a statement from a storage context * @storage: #librdf_storage object * @context_node: #librdf_node object * @statement: #librdf_statement statement to remove * * Return value: non-zero on failure **/ static int librdf_storage_postgresql_context_remove_statement(librdf_storage* storage, librdf_node* context_node, librdf_statement* statement) { librdf_storage_postgresql_context* context=(librdf_storage_postgresql_context*)storage->context; char delete_statement[]="DELETE FROM Statements" UINT64_T_FMT " WHERE Subject=" UINT64_T_FMT " AND Predicate=" UINT64_T_FMT " AND Object=" UINT64_T_FMT; char delete_statement_with_context[]="DELETE FROM Statements" UINT64_T_FMT " WHERE Subject=" UINT64_T_FMT " AND Predicate=" UINT64_T_FMT " AND Object=" UINT64_T_FMT " AND Context=" UINT64_T_FMT; u64 subject, predicate, object, ctxt=0; char *query; PGconn *handle; /* Get postgresql connection handle */ handle=librdf_storage_postgresql_get_handle(storage); if(!handle) return 1; /* Find hashes for nodes */ subject=librdf_storage_postgresql_node_hash(storage, librdf_statement_get_subject(statement),0); predicate=librdf_storage_postgresql_node_hash(storage, librdf_statement_get_predicate(statement),0); object=librdf_storage_postgresql_node_hash(storage, librdf_statement_get_object(statement),0); if(context_node) { ctxt=librdf_storage_postgresql_node_hash(storage,context_node,0); if(!ctxt) { librdf_storage_postgresql_release_handle(storage, handle); return 1; } } if(!subject || !predicate || !object || (context_node && !ctxt)) { librdf_storage_postgresql_release_handle(storage, handle); return 1; } /* Remove statement(s) from storage */ if(context_node) { if(!(query=(char*)LIBRDF_MALLOC(cstring, strlen(delete_statement_with_context)+101))) { librdf_storage_postgresql_release_handle(storage, handle); return 1; } sprintf(query, delete_statement_with_context, context->model, subject, predicate, object, ctxt); } else { if(!(query=(char*)LIBRDF_MALLOC(cstring, strlen(delete_statement)+81))) { librdf_storage_postgresql_release_handle(storage, handle); return 1; } sprintf(query, delete_statement, context->model, subject, predicate, object); } if(! PQexec(handle, query)) { librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL, "postgresql delete from Statements failed: %s", PQerrorMessage(handle)); LIBRDF_FREE(cstring,query); librdf_storage_postgresql_release_handle(storage, handle); return -1; } LIBRDF_FREE(cstring,query); librdf_storage_postgresql_release_handle(storage, handle); return 0; } /** * librdf_storage_postgresql_context_remove_statements - Remove all statement from a storage context * @storage: #librdf_storage object * @context_node: #librdf_node object * * Return value: non-zero on failure **/ static int librdf_storage_postgresql_context_remove_statements(librdf_storage* storage, librdf_node* context_node) { librdf_storage_postgresql_context* context=(librdf_storage_postgresql_context*)storage->context; char delete_context[]="DELETE FROM Statements" UINT64_T_FMT " WHERE Context=" UINT64_T_FMT; char delete_model[]="DELETE FROM Statements" UINT64_T_FMT; u64 ctxt=0; char *query; PGconn *handle; /* Get postgresql connection handle */ handle=librdf_storage_postgresql_get_handle(storage); if(!handle) return 1; /* Find hash for context */ if(context_node) { ctxt=librdf_storage_postgresql_node_hash(storage,context_node,0); if(!ctxt) { librdf_storage_postgresql_release_handle(storage, handle); return 1; } } /* Remove statement(s) from storage */ if(context_node) { if(!(query=(char*)LIBRDF_MALLOC(cstring, strlen(delete_context)+61))) { librdf_storage_postgresql_release_handle(storage, handle); return 1; } sprintf(query, delete_context, context->model, ctxt); } else { if(!(query=(char*)LIBRDF_MALLOC(cstring, strlen(delete_model)+21))) { librdf_storage_postgresql_release_handle(storage, handle); return 1; } sprintf(query, delete_model, context->model); } if(! PQexec(handle,query)) { librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL, "postgresql delete of context from Statements failed: %s", PQerrorMessage(handle)); LIBRDF_FREE(cstring,query); librdf_storage_postgresql_release_handle(storage, handle); return -1; } LIBRDF_FREE(cstring,query); librdf_storage_postgresql_release_handle(storage, handle); return 0; } /** * librdf_storage_postgresql_serialise - Return a stream of all statements in a storage * @storage: the storage * * Return a stream of all statements in a storage. * * Return value: a #librdf_stream or NULL on failure **/ librdf_stream* librdf_storage_postgresql_serialise(librdf_storage* storage) { return librdf_storage_postgresql_find_statements_in_context(storage,NULL,NULL); } /** * librdf_storage_postgresql_find_statements - Find a graph of statements in storage. * @storage: the storage * @statement: the statement to match * * Return a stream of statements matching the given statement (or * all statements if NULL). Parts (subject, predicate, object) of the * statement can be empty in which case any statement part will match that. * * Return value: a #librdf_stream or NULL on failure **/ static librdf_stream* librdf_storage_postgresql_find_statements(librdf_storage* storage, librdf_statement* statement) { return librdf_storage_postgresql_find_statements_in_context(storage,statement,NULL); } /** * librdf_storage_postgresql_context_serialise - List all statements in a storage context * @storage: #librdf_storage object * @context_node: #librdf_node object * * Return value: #librdf_stream of statements or NULL on failure or context is empty **/ static librdf_stream* librdf_storage_postgresql_context_serialise(librdf_storage* storage, librdf_node* context_node) { return librdf_storage_postgresql_find_statements_in_context(storage,NULL,context_node); } /** * librdf_storage_postgresql_find_statements_in_context - Find a graph of statements in a storage context. * @storage: the storage * @statement: the statement to match * @context_node: the context to search * * Return a stream of statements matching the given statement (or * all statements if NULL). Parts (subject, predicate, object) of the * statement can be empty in which case any statement part will match that. * * Return value: a #librdf_stream or NULL on failure **/ static librdf_stream* librdf_storage_postgresql_find_statements_in_context(librdf_storage* storage, librdf_statement* statement,librdf_node* context_node) { return librdf_storage_postgresql_find_statements_with_options(storage, statement, context_node, NULL); } /** * librdf_storage_postgresql_find_statements_with_options - Find a graph of statements in a storage context with options. * @storage: the storage * @statement: the statement to match * @context_node: the context to search * @options: #librdf_hash of match options or NULL * * Return a stream of statements matching the given statement (or * all statements if NULL). Parts (subject, predicate, object) of the * statement can be empty in which case any statement part will match that. * * Return value: a #librdf_stream or NULL on failure **/ static librdf_stream* librdf_storage_postgresql_find_statements_with_options(librdf_storage* storage, librdf_statement* statement, librdf_node* context_node, librdf_hash* options) { librdf_storage_postgresql_context* context=(librdf_storage_postgresql_context*)storage->context; librdf_storage_postgresql_sos_context* sos; librdf_node *subject=NULL, *predicate=NULL, *object=NULL; char *query; char tmp[64]; char where[256]; char joins[640]; librdf_stream *stream; /* Initialize sos context */ if(!(sos=(librdf_storage_postgresql_sos_context*) LIBRDF_CALLOC(librdf_storage_postgresql_sos_context,1, sizeof(librdf_storage_postgresql_sos_context)))) return NULL; sos->storage=storage; librdf_storage_add_reference(sos->storage); if(statement) sos->query_statement=librdf_new_statement_from_statement(statement); if(context_node) sos->query_context=librdf_new_node_from_node(context_node); sos->current_statement=NULL; sos->current_context=NULL; sos->results=NULL; if(options) { sos->is_literal_match=librdf_hash_get_as_boolean(options, "match-substring"); } /* Get postgresql connection handle */ sos->handle=librdf_storage_postgresql_get_handle(storage); if(!sos->handle) { librdf_storage_postgresql_find_statements_in_context_finished((void*)sos); return NULL; } /* Construct query */ if(!(query=(char*)LIBRDF_MALLOC(cstring, 21))) { librdf_storage_postgresql_find_statements_in_context_finished((void*)sos); return NULL; } strcpy(query, "SELECT "); *where='\0'; if(sos->is_literal_match) sprintf(joins, " FROM Literals AS L LEFT JOIN Statements" UINT64_T_FMT " as S ON L.ID=S.Object", context->model); else sprintf(joins, " FROM Statements" UINT64_T_FMT " AS S", context->model); if(statement) { subject=librdf_statement_get_subject(statement); predicate=librdf_statement_get_predicate(statement); object=librdf_statement_get_object(statement); } /* Subject */ if(statement && subject) { sprintf(tmp, "S.Subject=" UINT64_T_FMT "", librdf_storage_postgresql_node_hash(storage,subject,0)); if(!strlen(where)) strcat(where, " WHERE "); else strcat(where, " AND "); strcat(where, tmp); } else { if(librdf_storage_postgresql_find_statements_in_context_augment_query(&query, " SubjectR.URI AS SuR, SubjectB.Name AS SuB")) { librdf_storage_postgresql_find_statements_in_context_finished((void*)sos); return NULL; } strcat(joins," LEFT JOIN Resources AS SubjectR ON S.Subject=SubjectR.ID"); strcat(joins," LEFT JOIN Bnodes AS SubjectB ON S.Subject=SubjectB.ID"); } /* Predicate */ if(statement && predicate) { sprintf(tmp, "S.Predicate=" UINT64_T_FMT "", librdf_storage_postgresql_node_hash(storage, predicate, 0)); if(!strlen(where)) strcat(where, " WHERE "); else strcat(where, " AND "); strcat(where, tmp); } else { if(!statement || !subject) { if(librdf_storage_postgresql_find_statements_in_context_augment_query(&query, ",")) { librdf_storage_postgresql_find_statements_in_context_finished((void*)sos); return NULL; } } if(librdf_storage_postgresql_find_statements_in_context_augment_query(&query, " PredicateR.URI AS PrR")) { librdf_storage_postgresql_find_statements_in_context_finished((void*)sos); return NULL; } strcat(joins," LEFT JOIN Resources AS PredicateR ON S.Predicate=PredicateR.ID"); } /* Object */ if(statement && object) { if(!sos->is_literal_match) { sprintf(tmp,"S.Object=" UINT64_T_FMT "", librdf_storage_postgresql_node_hash(storage, object, 0)); if(!strlen(where)) strcat(where, " WHERE "); else strcat(where, " AND "); strcat(where, tmp); } else { /* MATCH literal, not hash_id */ if(!statement || !subject || !predicate) { if(librdf_storage_postgresql_find_statements_in_context_augment_query(&query, ",")) { librdf_storage_postgresql_find_statements_in_context_finished((void*)sos); return NULL; } } if(librdf_storage_postgresql_find_statements_in_context_augment_query(&query, " ObjectR.URI AS ObR, ObjectB.Name AS ObB, ObjectL.Value AS ObV, ObjectL.Language AS ObL, ObjectL.Datatype AS ObD")) { librdf_storage_postgresql_find_statements_in_context_finished((void*)sos); return NULL; } strcat(joins," LEFT JOIN Resources AS ObjectR ON S.Object=ObjectR.ID"); strcat(joins," LEFT JOIN Bnodes AS ObjectB ON S.Object=ObjectB.ID"); strcat(joins," LEFT JOIN Literals AS ObjectL ON S.Object=ObjectL.ID"); sprintf(tmp, "MATCH(L.Value) AGAINST ('%s')", librdf_node_get_literal_value(object)); /* NOTE: This is NOT USED but could be if FULLTEXT wasn't enabled */ /* sprintf(tmp, " L.Value LIKE '%%%s%%'", librdf_node_get_literal_value(object)); */ if(!strlen(where)) strcat(where, " WHERE "); else strcat(where, " AND "); strcat(where, tmp); } } else { if(!statement || !subject || !predicate) { if(librdf_storage_postgresql_find_statements_in_context_augment_query(&query, ",")) { librdf_storage_postgresql_find_statements_in_context_finished((void*)sos); return NULL; } } if(librdf_storage_postgresql_find_statements_in_context_augment_query(&query, " ObjectR.URI AS ObR, ObjectB.Name AS ObB, ObjectL.Value AS ObV, ObjectL.Language AS ObL, ObjectL.Datatype AS ObD")) { librdf_storage_postgresql_find_statements_in_context_finished((void*)sos); return NULL; } strcat(joins," LEFT JOIN Resources AS ObjectR ON S.Object=ObjectR.ID"); strcat(joins," LEFT JOIN Bnodes AS ObjectB ON S.Object=ObjectB.ID"); strcat(joins," LEFT JOIN Literals AS ObjectL ON S.Object=ObjectL.ID"); } /* Context */ if(context_node) { sprintf(tmp,"S.Context=" UINT64_T_FMT "", librdf_storage_postgresql_node_hash(storage,context_node,0)); if(!strlen(where)) strcat(where, " WHERE "); else strcat(where, " AND "); strcat(where, tmp); } else { if(!statement || !subject || !predicate || !object) { if(librdf_storage_postgresql_find_statements_in_context_augment_query(&query, ",")) { librdf_storage_postgresql_find_statements_in_context_finished((void*)sos); return NULL; } } if(librdf_storage_postgresql_find_statements_in_context_augment_query(&query, " ContextR.URI AS CoR, ContextB.Name AS CoB, ContextL.Value AS CoV, ContextL.Language AS CoL, ContextL.Datatype AS CoD")) { librdf_storage_postgresql_find_statements_in_context_finished((void*)sos); return NULL; } strcat(joins," LEFT JOIN Resources AS ContextR ON S.Context=ContextR.ID"); strcat(joins," LEFT JOIN Bnodes AS ContextB ON S.Context=ContextB.ID"); strcat(joins," LEFT JOIN Literals AS ContextL ON S.Context=ContextL.ID"); } /* Query without variables? */ if(statement && subject && predicate && object && context_node) { if(librdf_storage_postgresql_find_statements_in_context_augment_query(&query, " 1")) { librdf_storage_postgresql_find_statements_in_context_finished((void*)sos); return NULL; } } /* Complete query string */ if(librdf_storage_postgresql_find_statements_in_context_augment_query(&query, joins) || librdf_storage_postgresql_find_statements_in_context_augment_query(&query, where)) { librdf_storage_postgresql_find_statements_in_context_finished((void*)sos); return NULL; } /* Start query... */ if(!(sos->results=PQexec(sos->handle, query) )) { librdf_log(sos->storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL, "postgresql query failed: %s", PQresultErrorMessage(sos->results)); librdf_storage_postgresql_find_statements_in_context_finished((void*)sos); return NULL; } LIBRDF_FREE(cstring, query); sos->current_rowno=0; if(!(sos->row=(char**)LIBRDF_CALLOC(cstring,sizeof(char *),PQnfields(sos->results)+1))) return NULL; /* Get first statement, if any, and initialize stream */ if(librdf_storage_postgresql_find_statements_in_context_next_statement(sos) ) { librdf_storage_postgresql_find_statements_in_context_finished((void*)sos); return librdf_new_empty_stream(storage->world); } stream=librdf_new_stream(storage->world,(void*)sos, &librdf_storage_postgresql_find_statements_in_context_end_of_stream, &librdf_storage_postgresql_find_statements_in_context_next_statement, &librdf_storage_postgresql_find_statements_in_context_get_statement, &librdf_storage_postgresql_find_statements_in_context_finished); if(!stream) { librdf_storage_postgresql_find_statements_in_context_finished((void*)sos); return NULL; } return stream; } static int librdf_storage_postgresql_find_statements_in_context_augment_query(char **query, const char *addition) { char *newquery; /* Augment existing query, returning 0 on success. */ if(!(newquery=(char*)LIBRDF_MALLOC(cstring, strlen(*query)+strlen(addition)+1))) return 1; strcpy(newquery,*query); strcat(newquery,addition); LIBRDF_FREE(cstring, *query); *query=newquery; return 0; } static int librdf_storage_postgresql_find_statements_in_context_end_of_stream(void* context) { librdf_storage_postgresql_sos_context* sos=(librdf_storage_postgresql_sos_context*)context; return sos->current_statement==NULL; } static int librdf_storage_postgresql_find_statements_in_context_next_statement(void* context) { librdf_storage_postgresql_sos_context* sos=(librdf_storage_postgresql_sos_context*)context; librdf_node *subject=NULL, *predicate=NULL, *object=NULL; librdf_node *node; char **row=sos->row; int i; if( sos->current_rowno < PQntuples(sos->results) ) { for(i=0;iresults);i++) { if(PQgetlength(sos->results,sos->current_rowno,i) > 0 ) { /* FIXME: why is this not copied? */ /* if(!(row[i]=(char*)LIBRDF_MALLOC(cstring, PQgetlength(sos->results,sos->current_rowno,i)+1))) return 1; strcpy(row[i],PQgetvalue(sos->results,sos->current_rowno,i)); */ row[i]=PQgetvalue(sos->results,sos->current_rowno,i); } else row[i]=NULL; } sos->current_rowno=sos->current_rowno+1; /* Get ready for context */ if(sos->current_context) librdf_free_node(sos->current_context); sos->current_context=NULL; /* Is this a query with statement parts? */ if(sos->query_statement) { subject=librdf_statement_get_subject(sos->query_statement); predicate=librdf_statement_get_predicate(sos->query_statement); if(sos->is_literal_match) object=NULL; else object=librdf_statement_get_object(sos->query_statement); } /* Make sure we have a statement object to return */ if(!sos->current_statement) { if(!(sos->current_statement=librdf_new_statement(sos->storage->world))) return 1; } librdf_statement_clear(sos->current_statement); /* Query without variables? */ if(subject && predicate && object && sos->query_context) { librdf_statement_set_subject(sos->current_statement,librdf_new_node_from_node(subject)); librdf_statement_set_predicate(sos->current_statement,librdf_new_node_from_node(predicate)); librdf_statement_set_object(sos->current_statement,librdf_new_node_from_node(object)); sos->current_context=librdf_new_node_from_node(sos->query_context); } else { /* Turn row parts into statement and context */ int part=0; /* Subject - constant or from row? */ if(subject) { librdf_statement_set_subject(sos->current_statement,librdf_new_node_from_node(subject)); } else { /* Resource or Bnode? */ if(row[part]) { if(!(node=librdf_new_node_from_uri_string(sos->storage->world, (const unsigned char*)row[part]))) return 1; } else if(row[part+1]) { if(!(node=librdf_new_node_from_blank_identifier(sos->storage->world, (const unsigned char*)row[part+1]))) return 1; } else return 1; librdf_statement_set_subject(sos->current_statement,node); part+=2; } /* Predicate - constant or from row? */ if(predicate) { librdf_statement_set_predicate(sos->current_statement,librdf_new_node_from_node(predicate)); } else { /* Resource? */ if(row[part]) { if(!(node=librdf_new_node_from_uri_string(sos->storage->world, (const unsigned char*)row[part]))) return 1; } else return 1; librdf_statement_set_predicate(sos->current_statement,node); part+=1; } /* Object - constant or from row? */ if(object) { librdf_statement_set_object(sos->current_statement,librdf_new_node_from_node(object)); } else { /* Resource, Bnode or Literal? */ if(row[part]) { if(!(node=librdf_new_node_from_uri_string(sos->storage->world, (const unsigned char*)row[part]))) return 1; } else if(row[part+1]) { if(!(node=librdf_new_node_from_blank_identifier(sos->storage->world, (const unsigned char*)row[part+1]))) return 1; } else if(row[part+2]) { /* Typed literal? */ librdf_uri *datatype=NULL; if(row[part+4] && strlen(row[part+4])) datatype=librdf_new_uri(sos->storage->world, (const unsigned char*)row[part+4]); if(!(node=librdf_new_node_from_typed_literal(sos->storage->world, (const unsigned char*)row[part+2], row[part+3], datatype))) return 1; } else return 1; librdf_statement_set_object(sos->current_statement,node); part+=5; } /* Context - constant or from row? */ if(sos->query_context) { sos->current_context=librdf_new_node_from_node(sos->query_context); } else { /* Resource, Bnode or Literal? */ if(row[part]) { if(!(node=librdf_new_node_from_uri_string(sos->storage->world, (const unsigned char*)row[part]))) return 1; } else if(row[part+1]) { if(!(node=librdf_new_node_from_blank_identifier(sos->storage->world, (const unsigned char*)row[part+1]))) return 1; } else if(row[part+2]) { /* Typed literal? */ librdf_uri *datatype=NULL; if(row[part+4] && strlen(row[part+4])) datatype=librdf_new_uri(sos->storage->world, (const unsigned char*)row[part+4]); if(!(node=librdf_new_node_from_typed_literal(sos->storage->world, (const unsigned char*)row[part+2], row[part+3], datatype))) return 1; } else /* no context */ node=NULL; sos->current_context=node; } } } else { if(sos->current_statement) librdf_free_statement(sos->current_statement); sos->current_statement=NULL; if(sos->current_context) librdf_free_node(sos->current_context); sos->current_context=NULL; } return 0; } static void* librdf_storage_postgresql_find_statements_in_context_get_statement(void* context, int flags) { librdf_storage_postgresql_sos_context* sos=(librdf_storage_postgresql_sos_context*)context; switch(flags) { case LIBRDF_ITERATOR_GET_METHOD_GET_OBJECT: return sos->current_statement; case LIBRDF_ITERATOR_GET_METHOD_GET_CONTEXT: return sos->current_context; default: abort(); } } static void librdf_storage_postgresql_find_statements_in_context_finished(void* context) { librdf_storage_postgresql_sos_context* sos=(librdf_storage_postgresql_sos_context*)context; if( sos->row ) LIBRDF_FREE(cstring,sos->row); if(sos->results) PQclear(sos->results); if(sos->handle) librdf_storage_postgresql_release_handle(sos->storage, sos->handle); if(sos->current_statement) librdf_free_statement(sos->current_statement); if(sos->current_context) librdf_free_node(sos->current_context); if(sos->query_statement) librdf_free_statement(sos->query_statement); if(sos->query_context) librdf_free_node(sos->query_context); if(sos->storage) librdf_storage_remove_reference(sos->storage); LIBRDF_FREE(librdf_storage_postgresql_sos_context, sos); } /** * librdf_storage_postgresql_get_contexts: * @storage: the storage * * Return an iterator with the context nodes present in storage. * * Return value: a #librdf_iterator or NULL on failure **/ static librdf_iterator* librdf_storage_postgresql_get_contexts(librdf_storage* storage) { librdf_storage_postgresql_context* context=(librdf_storage_postgresql_context*)storage->context; librdf_storage_postgresql_get_contexts_context* gccontext; const char select_contexts[]="\ SELECT DISTINCT R.URI AS CoR, B.Name AS CoB, \ L.Value AS CoV, L.Language AS CoL, L.Datatype AS CoD \ FROM Statements" UINT64_T_FMT " as S \ LEFT JOIN Resources AS R ON S.Context=R.ID \ LEFT JOIN Bnodes AS B ON S.Context=B.ID \ LEFT JOIN Literals AS L ON S.Context=L.ID"; char *query; librdf_iterator *iterator; /* Initialize get_contexts context */ if(!(gccontext=(librdf_storage_postgresql_get_contexts_context*) LIBRDF_CALLOC(librdf_storage_postgresql_get_contexts_context,1, sizeof(librdf_storage_postgresql_get_contexts_context)))) return NULL; gccontext->storage=storage; librdf_storage_add_reference(gccontext->storage); gccontext->current_context=NULL; gccontext->results=NULL; /* Get postgresql connection handle */ gccontext->handle=librdf_storage_postgresql_get_handle(storage); if(!gccontext->handle) { librdf_storage_postgresql_get_contexts_finished((void*)gccontext); return NULL; } /* Construct query */ if(!(query=(char*)LIBRDF_MALLOC(cstring,strlen(select_contexts)+21))) { librdf_storage_postgresql_get_contexts_finished((void*)gccontext); return NULL; } sprintf(query, select_contexts, context->model); /* Start query... */ if(!(gccontext->results=PQexec(gccontext->handle, query))) { librdf_log(gccontext->storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL, "postgresql query failed: %s", PQresultErrorMessage(gccontext->results)); librdf_storage_postgresql_get_contexts_finished((void*)gccontext); return NULL; } LIBRDF_FREE(cstring, query); gccontext->current_rowno=0; if(!(gccontext->row=(char**)LIBRDF_MALLOC(cstring, PQnfields(gccontext->results)+1))) return NULL; /* Get first context, if any, and initialize iterator */ if(librdf_storage_postgresql_get_contexts_next_context(gccontext) || !gccontext->current_context) { librdf_storage_postgresql_get_contexts_finished((void*)gccontext); return librdf_new_empty_iterator(storage->world); } iterator=librdf_new_iterator(storage->world,(void*)gccontext, &librdf_storage_postgresql_get_contexts_end_of_iterator, &librdf_storage_postgresql_get_contexts_next_context, &librdf_storage_postgresql_get_contexts_get_context, &librdf_storage_postgresql_get_contexts_finished); if(!iterator) librdf_storage_postgresql_get_contexts_finished(gccontext); return iterator; } static int librdf_storage_postgresql_get_contexts_end_of_iterator(void* context) { librdf_storage_postgresql_get_contexts_context* gccontext=(librdf_storage_postgresql_get_contexts_context*)context; return gccontext->current_context==NULL; } static int librdf_storage_postgresql_get_contexts_next_context(void* context) { librdf_storage_postgresql_get_contexts_context* gccontext=(librdf_storage_postgresql_get_contexts_context*)context; librdf_node *node; char **row=gccontext->row; int i; if( gccontext->current_rowno < PQntuples(gccontext->results) ) { for(i=0;iresults);i++) { if(PQgetlength(gccontext->results,gccontext->current_rowno,i) > 0 ) { /* FIXME: why is this not copied? */ /* if(!(row[i]=(char*)LIBRDF_MALLOC(cstring, PQgetlength(gccontext->results,gccontext->current_rowno,i)+1))) return 1; strcpy(row[i],PQgetvalue(gccontext->results,gccontext->current_rowno,i)); */ row[i]=PQgetvalue(gccontext->results,gccontext->current_rowno,i); } else row[i]=NULL; } gccontext->current_rowno=gccontext->current_rowno+1; /* Free old context node, if allocated */ if(gccontext->current_context) librdf_free_node(gccontext->current_context); /* Resource, Bnode or Literal? */ if(row[0]) { if(!(node=librdf_new_node_from_uri_string(gccontext->storage->world, (const unsigned char*)row[0]))) return 1; } else if(row[1]) { if(!(node=librdf_new_node_from_blank_identifier(gccontext->storage->world, (const unsigned char*)row[1]))) return 1; } else if(row[2]) { /* Typed literal? */ librdf_uri *datatype=NULL; if(row[4] && strlen(row[4])) datatype=librdf_new_uri(gccontext->storage->world, (const unsigned char*)row[4]); if(!(node=librdf_new_node_from_typed_literal(gccontext->storage->world, (const unsigned char*)row[2], row[3], datatype))) return 1; } else return 1; gccontext->current_context=node; } else { if(gccontext->current_context) librdf_free_node(gccontext->current_context); gccontext->current_context=NULL; } return 0; } static void* librdf_storage_postgresql_get_contexts_get_context(void* context, int flags) { librdf_storage_postgresql_get_contexts_context* gccontext=(librdf_storage_postgresql_get_contexts_context*)context; return gccontext->current_context; } #if 0 /* FIXME: why is this not used ? */ static void librdf_storage_postgresql_free_gccontext_row(void* context) { librdf_storage_postgresql_get_contexts_context* gccontext=(librdf_storage_postgresql_get_contexts_context*)context; /* for(i=0;iresults);i++) if( gccontext->row[i] ) LIBRDF_FREE(cstring, gccontext->row[i]); */ } #endif static void librdf_storage_postgresql_get_contexts_finished(void* context) { librdf_storage_postgresql_get_contexts_context* gccontext=(librdf_storage_postgresql_get_contexts_context*)context; if( gccontext->row ) LIBRDF_FREE(cstring, gccontext->row); if(gccontext->results) PQclear(gccontext->results); if(gccontext->handle) librdf_storage_postgresql_release_handle(gccontext->storage, gccontext->handle); if(gccontext->current_context) librdf_free_node(gccontext->current_context); if(gccontext->storage) librdf_storage_remove_reference(gccontext->storage); LIBRDF_FREE(librdf_storage_postgresql_get_contexts_context, gccontext); } /** * librdf_storage_postgresql_get_feature - get the value of a storage feature * @storage: #librdf_storage object * @feature: #librdf_uri feature property * * Return value: #librdf_node feature value or NULL if no such feature * exists or the value is empty. **/ static librdf_node* librdf_storage_postgresql_get_feature(librdf_storage* storage, librdf_uri* feature) { unsigned char *uri_string; if(!feature) return NULL; uri_string=librdf_uri_as_string(feature); if(!uri_string) return NULL; if(!strcmp((const char*)uri_string, (const char*)LIBRDF_MODEL_FEATURE_CONTEXTS)) { /* Always have contexts */ static const unsigned char value[2]="1"; return librdf_new_node_from_typed_literal(storage->world, value, NULL, NULL); } return NULL; } /** * librdf_storage_postgresql_transaction_start: * @storage: the storage object * * Start a transaction * * Return value: non-0 on failure **/ static int librdf_storage_postgresql_transaction_start(librdf_storage* storage) { librdf_storage_postgresql_context *context=(librdf_storage_postgresql_context *)storage->context; const char query[]="START TRANSACTION"; if(context->transaction_handle) { librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL, "Postgresql transaction already started"); return 1; } context->transaction_handle=librdf_storage_postgresql_get_handle(storage); if(!context->transaction_handle) return 1; if(!PQexec(context->transaction_handle, query)) { librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL, "Postgresql query failed: %s", PQerrorMessage(context->transaction_handle)); librdf_storage_postgresql_release_handle(storage, context->transaction_handle); context->transaction_handle=NULL; return 1; } return 0; } /** * librdf_storage_postgresql_transaction_start_with_handle: * @storage: the storage object * @handle: the transaction object * * Start a transaction using an existing external transaction object. * * Return value: non-0 on failure **/ static int librdf_storage_postgresql_transaction_start_with_handle(librdf_storage* storage, void* handle) { return librdf_storage_postgresql_transaction_start(storage); } /** * librdf_storage_postgresql_transaction_commit: * @storage: the storage object * * Commit a transaction. * * Return value: non-0 on failure **/ static int librdf_storage_postgresql_transaction_commit(librdf_storage* storage) { librdf_storage_postgresql_context *context=(librdf_storage_postgresql_context *)storage->context; const char query[]="COMMIT TRANSACTION"; PGconn* handle; int status; if(!context->transaction_handle) return 1; handle=context->transaction_handle; status=(PQexec(context->transaction_handle, query) == NULL); if(status) { librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL, "Postgresql query failed: %s", PQerrorMessage(context->transaction_handle)); } librdf_storage_postgresql_release_handle(storage, handle); context->transaction_handle=NULL; return (status != 0); } /** * librdf_storage_postgresql_transaction_rollback: * @storage: the storage object * * Rollback a transaction. * * Return value: non-0 on failure **/ static int librdf_storage_postgresql_transaction_rollback(librdf_storage* storage) { librdf_storage_postgresql_context *context=(librdf_storage_postgresql_context *)storage->context; const char query[]="ROLLBACK TRANSACTION"; PGconn* handle; int status; if(!context->transaction_handle) return 1; handle=context->transaction_handle; status=(PQexec(context->transaction_handle, query) == NULL); if(status) { librdf_log(storage->world, 0, LIBRDF_LOG_ERROR, LIBRDF_FROM_STORAGE, NULL, "Postgresql query failed: %s", PQerrorMessage(context->transaction_handle)); } librdf_storage_postgresql_release_handle(storage, handle); context->transaction_handle=NULL; return (status != 0); } /** * librdf_storage_postgresql_transaction_get_handle: * @storage: the storage object * * Get the current transaction handle. * * Return value: non-0 on failure **/ static void* librdf_storage_postgresql_transaction_get_handle(librdf_storage* storage) { librdf_storage_postgresql_context *context=(librdf_storage_postgresql_context *)storage->context; return context->transaction_handle; } /* local function to register postgresql storage functions */ static void librdf_storage_postgresql_register_factory(librdf_storage_factory *factory) { factory->context_length = sizeof(librdf_storage_postgresql_context); factory->init = librdf_storage_postgresql_init; factory->terminate = librdf_storage_postgresql_terminate; factory->open = librdf_storage_postgresql_open; factory->close = librdf_storage_postgresql_close; factory->sync = librdf_storage_postgresql_sync; factory->size = librdf_storage_postgresql_size; factory->add_statement = librdf_storage_postgresql_add_statement; factory->add_statements = librdf_storage_postgresql_add_statements; factory->remove_statement = librdf_storage_postgresql_remove_statement; factory->contains_statement = librdf_storage_postgresql_contains_statement; factory->serialise = librdf_storage_postgresql_serialise; factory->find_statements = librdf_storage_postgresql_find_statements; factory->find_statements_with_options = librdf_storage_postgresql_find_statements_with_options; factory->context_add_statement = librdf_storage_postgresql_context_add_statement; factory->context_add_statements = librdf_storage_postgresql_context_add_statements; factory->context_remove_statement = librdf_storage_postgresql_context_remove_statement; factory->context_remove_statements = librdf_storage_postgresql_context_remove_statements; factory->context_serialise = librdf_storage_postgresql_context_serialise; factory->find_statements_in_context = librdf_storage_postgresql_find_statements_in_context; factory->get_contexts = librdf_storage_postgresql_get_contexts; factory->get_feature = librdf_storage_postgresql_get_feature; factory->transaction_start = librdf_storage_postgresql_transaction_start; factory->transaction_start_with_handle = librdf_storage_postgresql_transaction_start_with_handle; factory->transaction_commit = librdf_storage_postgresql_transaction_commit; factory->transaction_rollback = librdf_storage_postgresql_transaction_rollback; factory->transaction_get_handle = librdf_storage_postgresql_transaction_get_handle; } /** * librdf_init_storage_postgresql: * @world: world object * * INTERNAL - initialise the storage_postgresql module. **/ void librdf_init_storage_postgresql(librdf_world *world) { librdf_storage_register_factory(world, "postgresql", "postgresql database store", &librdf_storage_postgresql_register_factory); }