GH-2701: Fuseki Mod to list and abort running executions.
GitHub issue resolved #2701
Pull request Description: ARQ plugin + Fuseki Plugin to track and abort ongoing SPARQL executions. I tried to design the changes to jena's core in such a way that execution tracking can be enabled without requiring any changes to existing code.
Summary of changes.
Interception of SPARQL Requests and Execution Tracking
Changes in jena-arq:
- Added
SparqlDispatcherRegistry+ infrastructure which allows to intercept SPARQL update and query statements (as objects or strings) against DatasetGraphs. Update-/QueryExecDataset now first delegates to the dispatcher chain. - The usual Update-/QueryEngineFactory handling is now the last handler in this dispatcher chain.
- Execution tracking is the first element in the chain, initialized with the plugin
InitExecTracking. If in the dispatcher chain there is aTaskListeneris the context, then the Update-/QueryExec instances are wrapped with a tracking wrapping such that the listener is notified. - The default
TaskListenerimplementation isTaskEventBrokerwhich supports de-/registering of further listeners. - For keeping track of running tasks and a history of the last
nexecutions, there isTaskEventHistorywhich is a subclass ofTaskEventBroker. - Added a
parseCheckdataset context attribute. If false, then update-/query requests are forwarded via the dispatcher without parsing.
RDFLink-based Execution Tracking
This adds infrastructure to jena-rdfconnection in order to track executions against RDFLinks via the newly introduced class DatasetGraphOverRDFLink.
- Replaced the unfinished class GraphSPARQLService with
DatasetGraphOverSparql. This is a base class injena-arqthat implements all methods by means of SPARQL requests. It is wired up with the tests inTS_SparqlCore. Caveat: As each update is a separate request and bulk updates may be split into multiple requests, blank nodes may not work as expected. - Added
DatasetGraphOverRDFLinkas a subclass ofDatasetGraphOverSparqlwhich provides anewRDFLink()method and implements all DatasetGraph methods based on the RDFLink. - Added dispatchers that intercept requests against
DatasetGraphOverRDFLinkand delegates them to the RDFLink. - Added a
DatasetAssemblerHTTPwhich configures aDatasetGraphOverRDFLinkinstance, which allows use of this system with Fuseki. - Added
ExampleDBpediaViaRemoteDatasetwhich demonstrates a query against such a dataset making use of virtuoso specific features.
PREFIX fuseki: <http://jena.apache.org/fuseki#>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX ja: <http://jena.hpl.hp.com/2005/11/Assembler#>
PREFIX remote: <http://jena.apache.org/2025/http#>
<#service> rdf:type fuseki:Service ;
fuseki:name "dbpedia-remote" ;
fuseki:endpoint [
fuseki:operation fuseki:query ;
] ;
fuseki:endpoint [
fuseki:operation fuseki:tracker ;
fuseki:name "tracker" ;
] ;
fuseki:dataset <#baseDS>
.
<#baseDS> rdf:type remote:DatasetHTTP ;
remote:queryEndpoint "http://dbpedia.org/sparql" ;
.
Fuseki Mod: Execution Tracker
Added a simple web page that will show a live view of ongoing and completed queries.
- Fuseki plugin will register a
TaskEventHistoryin the endpoint context and connect it to the dataset context'sTaskEventBroker(broker will be created if absent). - Web page will disconnect from event stream if there is too much activity and prompt for manual reload - prevents browser tab from freezing.
- Abort action can be disabled via context attribute:
<#service> rdf:type fuseki:Service ;
fuseki:endpoint [
fuseki:operation fuseki:tracker ;
fuseki:name "tracker" ;
ja:context [ ja:cxtName "allowAbort" ; ja:cxtValue false ] ; # Disable abort buttons in the Web page and the REST API
] ;
fuseki:dataset <#baseDS> .
Misc changes
-
QueryExecHTTP: Improved support to cancel HTTP requests. So far cancel would hang until the InputStream of the HTTP response could be obtained. Now the HTTP request can be cancelled immediately. - HttpLib: Updated the methods to work on the async variant in order to make immediate abort on QueryExecHTTP work.
- Consolidated Task system of
jena-geosparqlwith that of the execution tracking. - Endpoint context is now never null so that Fuseki mods can always associate information with endpoints.
- [x] Tests are included.
- [ ] Documentation change and updates are provided for the Apache Jena website
- [x] Commits have been squashed to remove intermediate development commit messages.
- [x] Key commit messages start with the issue number (GH-xxxx)
By submitting this pull request, I acknowledge that I am making a contribution to the Apache Software Foundation under the terms and conditions of the Contributor's Agreement.
See the Apache Jena "Contributing" guide.
Really interesting piece of work, once did something much cruder (at least UI wise) in a previous $dayjob
Perhaps this could also serve as a base for discussion about further improvements and any necessary core changes for Jena 6. Ideally the ExecTracker mechanism would not require a wrapping with DatasetGraphWithExecTracker and instead this would be handled in the core machinery already. In cases where a specific DatasetGraph implementation is expected, the need for a wrapper to track executions may make things complex.
Yes I think this would be much cleaner if the tracking mechanism was integrated directly into the execution machinery without requiring extra wrapping as you do in this PR.
It would be nice if there were programmatic APIs for interacting with tracked queries/updates (there's some pieces towards that here but appears mostly focused on exposing stuff to the UI from my skim-reading of the code) so that applications that embed Jena could access and manage tracked queries/updates as desired.
Fuseki already has the concept of Tasks that's used for things like backups and compactions, would it make sense to integrate query/update tracking into that rather than creating a separate tracking mechanism. That might need generalising that mechanism, or pulling it more into Jena's core rather than Fuseki machinery, so might not be worth the effort, wdyt?
It would be nice if there were programmatic APIs for interacting with tracked queries/updates (there's some pieces towards that here but appears mostly focused on exposing stuff to the UI from my skim-reading of the code) so that applications that embed Jena could access and manage tracked queries/updates as desired.
The ARQ Plugin adds ExecTracker which so far would be the programmatic API. (Still a bit crude but intended as a starting point towards this goal.)
The DatasetGraphWithExecTracker wrapping is needed such that the query/update engines process queries by adding the tracking.
So by integrating the tracking closer to the core, the DatasetGraphWithExecTracker would only be needed for adding tracking to alternative query engines, but the main query/update engines could interact with the tracker directly.
An important question is, whether tracking executions within the DatasetGraph's context is the way to move forward.
Fuseki already has the concept of Tasks [...] pulling it more into Jena's core [...]
Yes, I yet need to look into how much effort it would be to disentangle Fuseki' task tracker from the Fuseki - but adding such a mechanism to core (and updating Fuseki for it) would be most likely the way to go.
The original code integrated exec tracking into the Update-/QueryEngineFactory system. However, this was sub-par because execution could only be intercepted on the QueryIterator level.
My updated proposal is to introduce a new layer Update-/QueryDispatcher on top of the Update-/QueryEngineFactory machinery. Now it is possible to intercept any query / update request to a dataset - even without having to parse the query.
Old design: QueryExecBuilderDataset -build-> QueryExecDataset -exec-> QueryEngineRegistry
New design: QueryExecBuilderDataset -build-> QueryDispatcherRegistry -customized build-> QueryExec
The last element of the dispatcher chain forwards the request to the usual Update-/QueryEngineFactory system. The returned QueryExec may be of any type, such as a tracking wrapper - it is no longer necessarily a QueryExecDataset.
Consequences:
- Code that relies on QueryExecBuilderDataset to always return a QueryExecDataset may break when adding custom interceptors. This should be a low price to pay because conventional code relies on the QueryExec interface.
- Execution Tracking can be added without any change to the existing code.
Related Ongoing Work
As a demo for related work based on this infrastructure, we are using it to integrate third party triple stores - such as Qlever - into Fuseki. This way we can use one server framework to manage several triple stores. Technically, we created a DatasetGraph wrapper implementation that is backed by a factory of HTTP RDFConnection instances (such as to a remote Qlever database). The query dispatcher just forwards query/update requests to an RDFConnection obtained from such a DatasetGraph implementation. Any opinions on this design?
As a final note, we also already created an assembler that starts qlever from a docker image as part of Fuseki (via the Java TestContainer framework), so the configuration looks like this:
<#baseDS> rdf:type qlever:Dataset ;
qlever:location "/run/databases/qlever/mydb/" ;
qlever:indexName "myindex" ;
qlever:accessToken "abcde" ;
qlever:memoryMaxSize "4G" ;
qlever:defaultQueryTimeout "600s" ;
I added an assembler to build datasets backed by remote (HTTP) SPARQL endpoints based on a new DatasetGraphOverRDFLink class in combination with the new dispatcher system, and this works out of the box with the execution tracking:
The PR should be review ready in the next few days.
The PR should be fairly complete and cleaned up now such that it is ready for review. I updated the first post with a summary of the changes.
This is "quite big".
Is there a best place to start reviewing to break up the feedback?
The fundamental changes in jena-arq are:
-
The indirection between Update-/Query statements and DatasetGraphs is in
org.apache.jena.sparql.engine.dispatch. -
The execution tracking then comes from
org.apache.jena.sparql.exec.tracker. It registers a dispatcher that iff there is a listener in the dataset context, thenUpateExecandQueryExecinstances are wrapped with status reporting, such that that listener is notified when query execution starts and terminates. -
The class
DatasetGraphOverSparqlis meant as a base class that can be used with the dispatcher system to externalize query execution to e.g. remote endpoints.
The changes in jena-rdfconnection build upon DatasetGraphOverSparql and the dispatcher system.
The main change is the introduction of DatasetGraphOverRDFLink + corresponding infrastructure. A dispatcher is provided that - for such a dataset graph - delegates each SPARQL statement to a fresh underlying RDF link. This way a remote store can be abstracted as a DatasetGraph for which queries and updates are forwarded to the backend.
The purpose is to test/showcase that the dispatcher system can be used to uniformly track remote executions.
An assembler is provided to allow configuration of such a dataset such as for use with Fuseki (note that not all configuration options of RDFLinkHTTPBuilder are yet wired up).
- In Fuseki, a plugin is provided that exposes the tracked executions via a JSON-returning REST method and a corresponding basic HTML view.
The pivotal question is, whether abstraction of remote endpoints should be done via DatasetGraph in the first place. But since most of Jena's architecture is built around it, it seemed reasonable to me to try to build upon this abstraction.
"git rebase main" works with one update for some Junit4 code.
The Fuseki module contains an HTML interface that includes a control to stop an update or query execution.
- It is per dataset so there isn't a single page of all request in progress which is useful to see to find out about the current state of the server.
- It is uncontrolled - any one can stop a request.
- Administration function go under
/$/so that the pattern can have security applied.
I'm not sure what the best thing to do it but one possibility is:
- The endpoint is
/$/exectrackerand it returns the state of all requests. -
operation:trackerstill exists to cause registration to happen for the dataset.
- It is uncontrolled - any one can stop a request.
Its possible to combine this with Fuseki's security and there is a context flag to disable the stop action (both in HTML and the API):
<#service> rdf:type fuseki:Service ;
fuseki:name "coypu" ;
# ...
fuseki:endpoint [
fuseki:operation fuseki:tracker ;
fuseki:name "tracker" ;
ja:context [ ja:cxtName "allowAbort" ; ja:cxtValue false ] ; # Disable stop action
] ;
fuseki:endpoint [
fuseki:operation fuseki:tracker ;
fuseki:name "admin-tracker" ;
fuseki:allowedUsers "admin" ;
Our deployment:
- Public exec tracker without stop action (stop button is greyed out and API will raise an exception): https://data.aksw.org/coypu/tracker
- Visiting this URL will trigger an execution: https://data.coypu.org/country/DEU (Disable the
Calculate inferencescheckbox for now, I still need to deploy an update to greatly improve performance). - Protected admin tracker (should not be accessible): https://data.aksw.org/coypu/admin-tracker
Pending improvements:
- I think that enabling the stop action should be opt-in rather than opt-out.
- The HTML table should not exceed 100% width.
- Administration function go under /$/ so that the pattern can have security applied.
Hm, if there was a context on the server level (not sure right now if there already is one), then the exec tracker fmod could wire up the listeners such that all events from the datasets are delegated to the server-wide listener.
Unrelated but DatasetGraphOverSparql, DatasetGraphOverRDFLink etc. look like the wrong design to me. They wrap the transport implementations inside what should be RDF spec implementations (Model and Dataset). That is a dead-end IMO because the class hierarchy becomes open-ended - every new transport (let's imagine, I don't know, some binary-based protocol) will now require a new Dataset subclass. Using inheritance this way feels wrong to me.
IMO the transport details should be handled by clients external to Dataset, e.g. the same StreamRDFToUpdateRequest.sendGraphTriplesToStream(graph, graphName, sink); that is used in this PR (although I'm not crazy about that class either 😅 ).
The number of Model/Dataset implementations should stay finite - implement the specs and then subclass for some transactional/TDB mechanisms but that's it.
The goal of DatasetGraphOverSparql is to act as a bridge between ARQ and external SPARQL-capabale stores.
The design of the sparql dispatcher system proposed by this PR allows both QueryExec.dataset(dsgOverSparql) and RDFLink.connect(dsgOverSparql) to efficiently proxy queries and updates to the backend.
However, for graph-store-protocol (GSP) operations, ARQ relies on the DatasetGraph API, which is indeed far from ideal for use with a SPARQL backend.
It is consistent however, because the class name (on ARQ level) is DatasetGraphOverSparql and not DatasetGraphOverSparqlAndGSP - so the protocol is fixed to SPARQL.
Sidenote: Following my line of reasoning, DatasetGraphOverSparqlViaRDFLink would be more accurate than DatasetGraphOverRDFLink.
For SPARQL, the current design already allows configuration of protocol matters on the RDFLink level.
The snippet below is a variation of this PR's ExampleDBpediaViaRemoteDataset.java:
Creator<RDFLink> linkCreator = () -> RDFLinkHTTP.newBuilder()
.destination("http://dbpedia.org/sparql")
// Request using thrift instead of the default application/sparql-results+json.
.acceptHeaderSelectQuery(WebContent.contentTypeResultsThrift)
.build()
DatasetGraph dsg = new DatasetGraphOverRDFLink(linkCreator);
QueryExec.dataset(dsg).query("...")...; // Queries will be dispatched to the link and
// execution won't use the DatasetGraph API.
The fundamental issue is that DatasetGraph is central to most parts of Jena (up to Fuseki).
At some point in the future - in the appropriate places - it might be worth superseding DatasetGraph with a more general RDFLinkSource (a factory of RDFLinks - similar to JDBC's DataSource).
This way, Fuseki could forward GSP requests to vendor-specific driver implementations - but I feel that these changes are outside the scope of this PR.
The idea of a JDBC-like DataSource was briefly mentioned in https://github.com/apache/jena/pull/1390#issuecomment-1165821801
Transforming update requests via
StreamRDFToUpdateRequest.sendGraphTriplesToStream
I think in principle the abstraction with a configurable update sink is ok, but I agree that a custom graph-to-update-requests mapper should not require subclassing!
Also, the default strategy should put all inserts into a single request instead of performing some magic splitting that will break blank nodes.
[Edit] A thought: If both RDFLink and DatasetGraph had an addAll(Iterator<Quad>) method then at least this kind of insert could be delegated relatively efficiently to a graph implementation compared to dsg::add(individualQuad). Wrapping an iterator as a DatasetGraph and passing it to DatasetGraph.addAll(DatasetGraph) would already be possible but it feels hacky. So far I still feel that improving the situation for GSP is better handled as a separate issue.
@Aklakan I understand the need for a proxy but I'm not convinced it should live in a Dataset implementation. It might feel that way because many of Jena's core classes unfortunately conflate the domain model implementation with the transport details - which are orthogonal concerns IMO. I know I've said this before, but this much more cleanly separated in JAX-RS.
Conceptually the proxy is not of a dataset - it is of an endpoint. A dataset should not care where it came from - it's just a bag of quads.
To illustrate, this is a SPARQL endpoint implementation in JAX-RS. The separation of concerns is much cleaner IMO.
https://github.com/AtomGraph/Core/blob/master/src/main/java/com/atomgraph/core/model/impl/SPARQLEndpointImpl.java
It can use an different EndpointAccessor implementations - one based on a Dataset and one for remote endpoints.
DatasetGraph can play a dual role:
- Domain API
- Endpoint (something that can be connected to)
In jena-rdfconnection, with RDFLink.connect(dsg) there exists already an API that could be extended to handle accessor indirection: connect() could use a registry of Function<DatesetGraph, RDFLink> linkProviders to establish an appropriate link to the dataset graph.
The same goes for QueryExec.dataset() and UpdateExec.dataset() - so these methods could return a Query/UpdateExecBuilder based on a registry of providers. But instead of having 2 additional ARQ-level provider registries, I think they should rely on the RDFLink-level one.
I am not too fond of the proposed dispatcher/interception system because it only intercepts query/update execution but not on the connection process itself. So I am indeed thinking about moving the interception to a higher level.
The main aspects I am pondering over are:
- RDFLink is in
jena-rdfconnection- links so far don't exist on ARQ level. - For the sake of tracking update / query executions, a simple "Query/ExecPostProcessor" registry with conceptually a list of
Function<QueryExec, QueryExec>would be sufficient.{Query, Update}ExecBuilder{Dataset, HTTP}could natively support a post processor registry - no interceptors needed. - I realized that for transaction-capable dataset, Fuseki processes updates in a streaming way via an
UpdateSink. This is pretty cool, but it currently goes directly to the DatasetGraph API and thus bypassesUpdateExecBuilder, does not even create anUpdateExec, and thus cannot yet be exec-tracked. I think this streaming API needs to be added to theUpdateExecBuilder.
It can use an different EndpointAccessor implementations - one based on a Dataset and one for remote endpoints.
This is indeed conceptually similar to RDFLink.
Agree to disagree :)
connect() to me smells of JDBC or some protocol like that - Linked Data and SPARQL are based on REST that does not require any "connection".
This is why I'm only using Jena's in-memory Model/Dataset and readers/parsers and not the HTTP layer - I've replaced it all with JAX-RS/Jersey.
DatasetGraph is local, fine grained for code usage and Jena-specific. Something somewhere has to implement storage.
The world could be built on this but it will be unsatisfactory because it is too fine-grained for web use.
RDFLink is remote-first, coarse grained and uses a standardized protocol.
It is connection oriented. One RDFLink is a connection.
RDFLink can present the same API to local and remote; if local it goes to DatasetGraph.
Sometimes, your have code that works on datasets but you want to apply it to something that only supports SPARQL protocols. You need an adapter to invert the stack. Adapters that map low-level APIs to higher-level APIs and are rarely great for performance or using the details of the high-level API but if the difference is access/no-access, an adapater gets the job done.
@Aklakan --
In order to make progress, can this be split into multiple PRs? The first would be broken down into changes in ARQ for async.
This would give us some focus to complete subtask, and also help review because this single PR is causing the github review UI to struggle!
A question I'm trying to answer for myself is how much of this is fundamental machinery and how much is driven by your usage of Jena.
Yes it makes sense to split the (mostly) finished parts out from this pr for isolated review. I factored the async http part out in #3464 . My suggestion is to use this pr as the "full stack" branch from which smaller PRs can be derived if the overall system works. I plan on making an updated proposal on the query/update execution tracking part around next week.
Just a quick heads up: In a separate branch I have completed my first draft of a revision but I am still looking into whether it could be simplified. I hope I find time in the coming week.