Linked Data Event Streams and TimescaleDB for Real-time Timeseries Data Management
Last Updated on March 1, 2023 by Editorial Team
Author(s): Samuel Van Ackere
Originally published on Towards AI.
How to consume a Linked Data Event Stream and store it in a TimescaleDB database
Photo by Scott Graham onΒ Unsplash
Linked data eventΒ stream
Linked Data Event Streams represent and share fast and slow-moving data on the Web using the Resource Description Framework (RDF). This allows data to be linked and connected to other data sources using unique identifiers (URIs).
A Linked Data Event Stream is a data event stream of a group of immutable objects described as machine-readable RDF (such as sensor observations, address registers, or financial data).
Linked Data Event Streams explained in 8 minutes
LDES streams provide a flexible and interoperable way of describing and exchanging events as Linked Data, enabling different systems and applications to easily consume and act on data streams in a consistent and standardized way.
This article shows how to effortlessly insert sensor data in the form of an LDES into a TimescaleDB database.
Managing large amounts of time-series data
TimescaleDB is an open-source database for storing and querying large amounts of time-series data. It extends PostgreSQL with time-series support, offering features like fast ingestion and querying of large data, flexible data granularity, and long-term dataΒ storage.
Insert performance comparison between TimescaleDB 2.7.2 and PostgreSQL 14.4
TimescaleDB is a powerful and efficient database system well-suited for storing and querying time-series data at scale. It is widely used in various applications, including IoT, finance, and telemetry.
First, a data flow must be configured to ingest a Linked Data Event Stream into PostgreSQL. This can be done in the Apache NiFi environment.
LDES to TimeScaleDB
To persist LDES in a database, we use the Apache NiFi platform. Apache NiFi is an open-source data processing, and integration platform designed to automate data flow between systems. It provides a Web-based user interface for creating, managing, and monitoring data flow and a range of pre-built connectors and processors for performing data processing tasks.
Data pipeline in Apachine NiFi (image byΒ author)
To consume an LDES stream, an LDES client processor is needed in the Apache NiFi flow. An LDES client is a component capable of consuming, processing, and analyzing the events in theΒ stream.
The output of the LDES Client (working with the following LDES):
_:B5edf92f59913b8d14f45818d5bde1d51 <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://def.isotc211.org/iso19156/2011/Measurement#OM_Measurement> .
_:B5edf92f59913b8d14f45818d5bde1d51 <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.observedProperty> <https://data.vmm.be/concept/waterkwaliteitparameter/conductiviteit> .
_:B5edf92f59913b8d14f45818d5bde1d51 <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.phenomenonTime> "2022-11-09T20:30:00.000Z"^^<http://www.w3.org/2001/XMLSchema#datetime> .
_:B5edf92f59913b8d14f45818d5bde1d51 <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.result> _:B727a9b2b6a6311cc83b677d439f4cf68 .
_:B5edf92f59913b8d14f45818d5bde1d51 <http://www.w3.org/ns/sosa/madeBySensor> <urn:ngsi-v2:cot-imec-be:Device:dwg-iow-csHTUVdGuPYK89L34yi88j> .
_:B6003104aae33209ee0e6a26e14ba38cb <https://schema.org/value> "1.152E1"^^<http://www.w3.org/2001/XMLSchema#double> .
_:Bec20a7e293639ee5a985d621d7b2d6d5 <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://def.isotc211.org/iso19156/2011/Measurement#OM_Measurement> .
_:Bec20a7e293639ee5a985d621d7b2d6d5 <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.observedProperty> <https://data.vmm.be/concept/waterkwaliteitparameter/temperatuur> .
_:Bec20a7e293639ee5a985d621d7b2d6d5 <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.phenomenonTime> "2022-11-09T20:30:00.000Z"^^<http://www.w3.org/2001/XMLSchema#datetime> .
_:Bec20a7e293639ee5a985d621d7b2d6d5 <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.result> _:Bdd20913615f9b4c0169c7770bdc770d4 .
_:Bec20a7e293639ee5a985d621d7b2d6d5 <http://www.w3.org/ns/sosa/madeBySensor> <urn:ngsi-v2:cot-imec-be:Device:dwg-iow-csHTUVdGuPYK89L34yi88j> .
_:Bdd20913615f9b4c0169c7770bdc770d4 <http://def.isotc211.org/iso19103/2005/UnitsOfMeasure#Measure.value> _:B6003104aae33209ee0e6a26e14ba38cb .
<urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://www.w3.org/TR/vocab-ssn-ext/#sosa:ObservationCollection> .
<urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> <http://def.isotc211.org/iso19156/2011/SamplingFeature#SF_SamplingFeatureCollection.member> _:B5edf92f59913b8d14f45818d5bde1d51 .
<urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> <http://def.isotc211.org/iso19156/2011/SamplingFeature#SF_SamplingFeatureCollection.member> _:B3105a27f0059867877fb28653f5a6abc .
<urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> <http://def.isotc211.org/iso19156/2011/SamplingFeature#SF_SamplingFeatureCollection.member> _:Bec20a7e293639ee5a985d621d7b2d6d5 .
<urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> <http://purl.org/dc/terms/isVersionOf> <urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn> .
<urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> <http://www.w3.org/ns/prov#generatedAtTime> "2022-11-09T20:30:00.000Z"^^<http://www.w3.org/2001/XMLSchema#dateTime> .
<urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> <http://www.w3.org/ns/sosa/hasFeatureOfInterest> "spt-00029-97" .
_:B3105a27f0059867877fb28653f5a6abc <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://def.isotc211.org/iso19156/2011/Measurement#OM_Measurement> .
_:B3105a27f0059867877fb28653f5a6abc <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.observedProperty> <https://data.vmm.be/concept/observatieparameter/hydrostatische-druk> .
_:B3105a27f0059867877fb28653f5a6abc <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.phenomenonTime> "2022-11-09T20:30:00.000Z"^^<http://www.w3.org/2001/XMLSchema#datetime> .
_:B3105a27f0059867877fb28653f5a6abc <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.result> _:B2598b988faa2635d5dd520f59f376b8e .
_:B3105a27f0059867877fb28653f5a6abc <http://www.w3.org/ns/sosa/madeBySensor> <urn:ngsi-v2:cot-imec-be:Device:dwg-iow-csHTUVdGuPYK89L34yi88j> .
<https://iow.smartdataspace.beta-vlaanderen.be/water-quality-observations> <https://w3id.org/tree#member> <urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> .
_:B727a9b2b6a6311cc83b677d439f4cf68 <http://def.isotc211.org/iso19103/2005/UnitsOfMeasure#Measure.value> _:B9776c69148d2ef600aed62aa2c85bd40 .
_:B2598b988faa2635d5dd520f59f376b8e <http://def.isotc211.org/iso19103/2005/UnitsOfMeasure#Measure.value> _:B436b5950c7725d6d9a38aaf6ca88b00f .
_:B9776c69148d2ef600aed62aa2c85bd40 <https://schema.org/value> "1120"^^<http://www.w3.org/2001/XMLSchema#integer> .
_:B436b5950c7725d6d9a38aaf6ca88b00f <https://schema.org/value> "673"^^<http://www.w3.org/2001/XMLSchema#integer> .
Next in the Apache NiFi flow is a version materialization component. βVersion materializationβ refers to the process of removing version information from an LDES member and reverting it to reconstruct a βstateβ object, which only reflects the current state of the LDES member without historical information aboutΒ changes.
Version materialization is needed since a consumer doesnβt want to store these versions in a database.
The data is first converted to JSON-LD to convert to a tabular structure easily. This JSON-LD file looks likeΒ this:
{
"@id" : "urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-g9kEXxUeP28TNc6wJvFsau",
"@type" : "Observatieverzameling",
"Bemonsteringsobjectverzameling.lid" : [ {
"@id" : "_:b8",
"@type" : "Meting",
"Observatie.geobserveerdKenmerk" : "https://data.vmm.be/concept/waterkwaliteitparameter/temperatuur",
"Observatie:.phenomenonTime" : {
"@type" : "http://www.w3.org/2001/XMLSchema#datetime",
"@value" : "2022-11-09T19:30:00.000Z"
},
"Observatie.resultaat" : {
"@id" : "_:b6",
"Maat.maat" : {
"@id" : "_:b1",
"https://schema.org/value" : {
"@type" : "http://www.w3.org/2001/XMLSchema#double",
"@value" : "1.153E1"
}
}
},
"Observatie.uitgevoerdMetSensor" : "urn:ngsi-v2:cot-imec-be:Device:dwg-iow-8JgF3vxoYKoXzwWGUiX3Yc"
}, {
"@id" : "_:b3",
"@type" : "Meting",
"Observatie.geobserveerdKenmerk" : "https://data.vmm.be/concept/waterkwaliteitparameter/conductiviteit",
"Observatie:.phenomenonTime" : {
"@type" : "http://www.w3.org/2001/XMLSchema#datetime",
"@value" : "2022-11-09T19:30:00.000Z"
},
"Observatie.resultaat" : {
"@id" : "_:b4",
"Maat.maat" : {
"@id" : "_:b5",
"https://schema.org/value" : {
"@type" : "http://www.w3.org/2001/XMLSchema#integer",
"@value" : "920"
}
}
},
"Observatie.uitgevoerdMetSensor" : "urn:ngsi-v2:cot-imec-be:Device:dwg-iow-8JgF3vxoYKoXzwWGUiX3Yc"
}, {
"@id" : "_:b7",
"@type" : "Meting",
"Observatie.geobserveerdKenmerk" : "https://data.vmm.be/concept/observatieparameter/hydrostatische-druk",
"Observatie:.phenomenonTime" : {
"@type" : "http://www.w3.org/2001/XMLSchema#datetime",
"@value" : "2022-11-09T19:30:00.000Z"
},
"Observatie.resultaat" : {
"@id" : "_:b2",
"Maat.maat" : {
"@id" : "_:b0",
"https://schema.org/value" : {
"@type" : "http://www.w3.org/2001/XMLSchema#integer",
"@value" : "11133"
}
}
},
"Observatie.uitgevoerdMetSensor" : "urn:ngsi-v2:cot-imec-be:Device:dwg-iow-8JgF3vxoYKoXzwWGUiX3Yc"
} ],
"http://www.w3.org/ns/prov#generatedAtTime" : {
"@type" : "http://www.w3.org/2001/XMLSchema#dateTime",
"@value" : "2022-11-09T19:30:00.000Z"
},
"http://www.w3.org/ns/sosa/hasFeatureOfInterest" : "spt-00027-06"
}
Hereafter, a transformation into a tabular structure is needed and can be done via a SPARQL query or a JOLT transformation. For this case, I use a JOLT transformation. A JOLT transformation filters out the relevant parameters and puts them in a structured JSON file (see output underneath).
{
"id": "urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn",
"temperature_value": "1.152E1",
"temperature_date": "2022-11-09T20:30:00.000Z",
"temperature_sensor": "urn:ngsi-v2:cot-imec-be:Device:dwg-iow-csHTUVdGuPYK89L34yi88j",
"conductivity_value": "1120",
"conductivity_date": "2022-11-09T20:30:00.000Z",
"conductivity_sensor": "urn:ngsi-v2:cot-imec-be:Device:dwg-iow-csHTUVdGuPYK89L34yi88j",
"hydro_pressure_value": "673",
"hydro_pressure_date": "2022-11-09T20:30:00.000Z",
"hydro_pressure_sensor": "urn:ngsi-v2:cot-imec-be:Device:dwg-iow-csHTUVdGuPYK89L34yi88j"
}
After this transformation, the data can be written to a PostgreSQL or TimescaleDB database. Analysis that can be done with data in the TimescaleDB database includes but is not limited to time-series visualization, anomaly detection, forecasting, and aggregation of time-series data over various time intervals.
(image byΒ author)
Full code
To replicate the data flow in this article, please go to the LDES2TimescaleDB GitHub repo. It describes how to set up the dockerized TimescaleDB and Apache NiFi, after which the data flow can be started using the supplied Apache NiFi setupΒ file.
GitHub – samuvack/LDES2TimescaleDB: manage timeseries LDES members in TimescaleDB
Wrapping Up
A Linked Data Event Stream is the core API of fast and slow-moving data. It is a data event stream of a group of immutable objects described as machine-readable RDF (such as sensor observations, address registers or financial data).
To write a LDES to a PostgreSQL or TimescaleDB database, a data conversion flow is configured in ApacheΒ NiFi.
With this Medium article as a guideline, you should be able to write Linked Data Event Streams to a TimescaleDB database.
If you like what you read, be sure to β€οΈ itβββas a writer, it means the world. Stay in touch by following me as anΒ author.
Contributors to this article are ddvlanck (Dwight Van Lancker) (github.com), sandervd (Sander Van Dooren) (github.com) at Smart Data Space (Digital Flanders, Belgium). In a rapidly changing society, governments need to be more agile and resilient than ever. Digital Flanders realizes and supervises digital transformation projects for Flemish and local governments.
Linked Data Event Streams and TimescaleDB for Real-time Timeseries Data Management was originally published in Towards AI on Medium, where people are continuing the conversation by highlighting and responding to this story.
Join thousands of data leaders on the AI newsletter. Join over 80,000 subscribers and keep up to date with the latest developments in AI. From research to projects and ideas. If you are building an AI startup, an AI-related product, or a service, we invite you to consider becoming aΒ sponsor.
Published via Towards AI