Master LLMs with our FREE course in collaboration with Activeloop & Intel Disruptor Initiative. Join now!

Publication

Linked Data Event Streams and TimescaleDB for Real-time Timeseries Data Management
Latest

Linked Data Event Streams and TimescaleDB for Real-time Timeseries Data Management

Last Updated on March 1, 2023 by Editorial Team

Author(s): Samuel Van Ackere

Originally published on Towards AI.

How to consume a Linked Data Event Stream and store it in a TimescaleDB database

Linked Data Event Streams, TimescaleDB, Timeseries, Linked Data Event Streams, Linked Data Event Streams, Linked Data Event Streams, LDESPhoto by Scott Graham on Unsplash

Linked data event stream

Linked Data Event Streams represent and share fast and slow-moving data on the Web using the Resource Description Framework (RDF). This allows data to be linked and connected to other data sources using unique identifiers (URIs).

A Linked Data Event Stream is a data event stream of a group of immutable objects described as machine-readable RDF (such as sensor observations, address registers, or financial data).

Linked Data Event Streams explained in 8 minutes

LDES streams provide a flexible and interoperable way of describing and exchanging events as Linked Data, enabling different systems and applications to easily consume and act on data streams in a consistent and standardized way.

This article shows how to effortlessly insert sensor data in the form of an LDES into a TimescaleDB database.

Managing large amounts of time-series data

TimescaleDB is an open-source database for storing and querying large amounts of time-series data. It extends PostgreSQL with time-series support, offering features like fast ingestion and querying of large data, flexible data granularity, and long-term data storage.

Linked Data Event Streams, Linked Data Event Streams, Linked Data Event Streams, Linked Data Event Streams, TimescaleDB, Linked dataInsert performance comparison between TimescaleDB 2.7.2 and PostgreSQL 14.4

TimescaleDB is a powerful and efficient database system well-suited for storing and querying time-series data at scale. It is widely used in various applications, including IoT, finance, and telemetry.

First, a data flow must be configured to ingest a Linked Data Event Stream into PostgreSQL. This can be done in the Apache NiFi environment.

LDES to TimeScaleDB

To persist LDES in a database, we use the Apache NiFi platform. Apache NiFi is an open-source data processing, and integration platform designed to automate data flow between systems. It provides a Web-based user interface for creating, managing, and monitoring data flow and a range of pre-built connectors and processors for performing data processing tasks.

Linked Data Event Streams and TimescaleDB for Real-time Timeseries Data ManagementData pipeline in Apachine NiFi (image by author)

To consume an LDES stream, an LDES client processor is needed in the Apache NiFi flow. An LDES client is a component capable of consuming, processing, and analyzing the events in the stream.

The output of the LDES Client (working with the following LDES):

_:B5edf92f59913b8d14f45818d5bde1d51 <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://def.isotc211.org/iso19156/2011/Measurement#OM_Measurement> .
_:B5edf92f59913b8d14f45818d5bde1d51 <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.observedProperty> <https://data.vmm.be/concept/waterkwaliteitparameter/conductiviteit> .
_:B5edf92f59913b8d14f45818d5bde1d51 <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.phenomenonTime> "2022-11-09T20:30:00.000Z"^^<http://www.w3.org/2001/XMLSchema#datetime> .
_:B5edf92f59913b8d14f45818d5bde1d51 <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.result> _:B727a9b2b6a6311cc83b677d439f4cf68 .
_:B5edf92f59913b8d14f45818d5bde1d51 <http://www.w3.org/ns/sosa/madeBySensor> <urn:ngsi-v2:cot-imec-be:Device:dwg-iow-csHTUVdGuPYK89L34yi88j> .
_:B6003104aae33209ee0e6a26e14ba38cb <https://schema.org/value> "1.152E1"^^<http://www.w3.org/2001/XMLSchema#double> .
_:Bec20a7e293639ee5a985d621d7b2d6d5 <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://def.isotc211.org/iso19156/2011/Measurement#OM_Measurement> .
_:Bec20a7e293639ee5a985d621d7b2d6d5 <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.observedProperty> <https://data.vmm.be/concept/waterkwaliteitparameter/temperatuur> .
_:Bec20a7e293639ee5a985d621d7b2d6d5 <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.phenomenonTime> "2022-11-09T20:30:00.000Z"^^<http://www.w3.org/2001/XMLSchema#datetime> .
_:Bec20a7e293639ee5a985d621d7b2d6d5 <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.result> _:Bdd20913615f9b4c0169c7770bdc770d4 .
_:Bec20a7e293639ee5a985d621d7b2d6d5 <http://www.w3.org/ns/sosa/madeBySensor> <urn:ngsi-v2:cot-imec-be:Device:dwg-iow-csHTUVdGuPYK89L34yi88j> .
_:Bdd20913615f9b4c0169c7770bdc770d4 <http://def.isotc211.org/iso19103/2005/UnitsOfMeasure#Measure.value> _:B6003104aae33209ee0e6a26e14ba38cb .
<urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://www.w3.org/TR/vocab-ssn-ext/#sosa:ObservationCollection> .
<urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> <http://def.isotc211.org/iso19156/2011/SamplingFeature#SF_SamplingFeatureCollection.member> _:B5edf92f59913b8d14f45818d5bde1d51 .
<urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> <http://def.isotc211.org/iso19156/2011/SamplingFeature#SF_SamplingFeatureCollection.member> _:B3105a27f0059867877fb28653f5a6abc .
<urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> <http://def.isotc211.org/iso19156/2011/SamplingFeature#SF_SamplingFeatureCollection.member> _:Bec20a7e293639ee5a985d621d7b2d6d5 .
<urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> <http://purl.org/dc/terms/isVersionOf> <urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn> .
<urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> <http://www.w3.org/ns/prov#generatedAtTime> "2022-11-09T20:30:00.000Z"^^<http://www.w3.org/2001/XMLSchema#dateTime> .
<urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> <http://www.w3.org/ns/sosa/hasFeatureOfInterest> "spt-00029-97" .
_:B3105a27f0059867877fb28653f5a6abc <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://def.isotc211.org/iso19156/2011/Measurement#OM_Measurement> .
_:B3105a27f0059867877fb28653f5a6abc <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.observedProperty> <https://data.vmm.be/concept/observatieparameter/hydrostatische-druk> .
_:B3105a27f0059867877fb28653f5a6abc <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.phenomenonTime> "2022-11-09T20:30:00.000Z"^^<http://www.w3.org/2001/XMLSchema#datetime> .
_:B3105a27f0059867877fb28653f5a6abc <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.result> _:B2598b988faa2635d5dd520f59f376b8e .
_:B3105a27f0059867877fb28653f5a6abc <http://www.w3.org/ns/sosa/madeBySensor> <urn:ngsi-v2:cot-imec-be:Device:dwg-iow-csHTUVdGuPYK89L34yi88j> .
<https://iow.smartdataspace.beta-vlaanderen.be/water-quality-observations> <https://w3id.org/tree#member> <urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> .
_:B727a9b2b6a6311cc83b677d439f4cf68 <http://def.isotc211.org/iso19103/2005/UnitsOfMeasure#Measure.value> _:B9776c69148d2ef600aed62aa2c85bd40 .
_:B2598b988faa2635d5dd520f59f376b8e <http://def.isotc211.org/iso19103/2005/UnitsOfMeasure#Measure.value> _:B436b5950c7725d6d9a38aaf6ca88b00f .
_:B9776c69148d2ef600aed62aa2c85bd40 <https://schema.org/value> "1120"^^<http://www.w3.org/2001/XMLSchema#integer> .
_:B436b5950c7725d6d9a38aaf6ca88b00f <https://schema.org/value> "673"^^<http://www.w3.org/2001/XMLSchema#integer> .

Next in the Apache NiFi flow is a version materialization component. “Version materialization” refers to the process of removing version information from an LDES member and reverting it to reconstruct a “state” object, which only reflects the current state of the LDES member without historical information about changes.

Version materialization is needed since a consumer doesn’t want to store these versions in a database.

The data is first converted to JSON-LD to convert to a tabular structure easily. This JSON-LD file looks like this:

{
"@id" : "urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-g9kEXxUeP28TNc6wJvFsau",
"@type" : "Observatieverzameling",
"Bemonsteringsobjectverzameling.lid" : [ {
"@id" : "_:b8",
"@type" : "Meting",
"Observatie.geobserveerdKenmerk" : "https://data.vmm.be/concept/waterkwaliteitparameter/temperatuur",
"Observatie:.phenomenonTime" : {
"@type" : "http://www.w3.org/2001/XMLSchema#datetime",
"@value" : "2022-11-09T19:30:00.000Z"
},
"Observatie.resultaat" : {
"@id" : "_:b6",
"Maat.maat" : {
"@id" : "_:b1",
"https://schema.org/value" : {
"@type" : "http://www.w3.org/2001/XMLSchema#double",
"@value" : "1.153E1"
}
}
},
"Observatie.uitgevoerdMetSensor" : "urn:ngsi-v2:cot-imec-be:Device:dwg-iow-8JgF3vxoYKoXzwWGUiX3Yc"
}, {
"@id" : "_:b3",
"@type" : "Meting",
"Observatie.geobserveerdKenmerk" : "https://data.vmm.be/concept/waterkwaliteitparameter/conductiviteit",
"Observatie:.phenomenonTime" : {
"@type" : "http://www.w3.org/2001/XMLSchema#datetime",
"@value" : "2022-11-09T19:30:00.000Z"
},
"Observatie.resultaat" : {
"@id" : "_:b4",
"Maat.maat" : {
"@id" : "_:b5",
"https://schema.org/value" : {
"@type" : "http://www.w3.org/2001/XMLSchema#integer",
"@value" : "920"
}
}
},
"Observatie.uitgevoerdMetSensor" : "urn:ngsi-v2:cot-imec-be:Device:dwg-iow-8JgF3vxoYKoXzwWGUiX3Yc"
}, {
"@id" : "_:b7",
"@type" : "Meting",
"Observatie.geobserveerdKenmerk" : "https://data.vmm.be/concept/observatieparameter/hydrostatische-druk",
"Observatie:.phenomenonTime" : {
"@type" : "http://www.w3.org/2001/XMLSchema#datetime",
"@value" : "2022-11-09T19:30:00.000Z"
},
"Observatie.resultaat" : {
"@id" : "_:b2",
"Maat.maat" : {
"@id" : "_:b0",
"https://schema.org/value" : {
"@type" : "http://www.w3.org/2001/XMLSchema#integer",
"@value" : "11133"
}
}
},
"Observatie.uitgevoerdMetSensor" : "urn:ngsi-v2:cot-imec-be:Device:dwg-iow-8JgF3vxoYKoXzwWGUiX3Yc"
} ],
"http://www.w3.org/ns/prov#generatedAtTime" : {
"@type" : "http://www.w3.org/2001/XMLSchema#dateTime",
"@value" : "2022-11-09T19:30:00.000Z"
},
"http://www.w3.org/ns/sosa/hasFeatureOfInterest" : "spt-00027-06"
}

Hereafter, a transformation into a tabular structure is needed and can be done via a SPARQL query or a JOLT transformation. For this case, I use a JOLT transformation. A JOLT transformation filters out the relevant parameters and puts them in a structured JSON file (see output underneath).

{
"id": "urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn",
"temperature_value": "1.152E1",
"temperature_date": "2022-11-09T20:30:00.000Z",
"temperature_sensor": "urn:ngsi-v2:cot-imec-be:Device:dwg-iow-csHTUVdGuPYK89L34yi88j",
"conductivity_value": "1120",
"conductivity_date": "2022-11-09T20:30:00.000Z",
"conductivity_sensor": "urn:ngsi-v2:cot-imec-be:Device:dwg-iow-csHTUVdGuPYK89L34yi88j",
"hydro_pressure_value": "673",
"hydro_pressure_date": "2022-11-09T20:30:00.000Z",
"hydro_pressure_sensor": "urn:ngsi-v2:cot-imec-be:Device:dwg-iow-csHTUVdGuPYK89L34yi88j"
}

After this transformation, the data can be written to a PostgreSQL or TimescaleDB database. Analysis that can be done with data in the TimescaleDB database includes but is not limited to time-series visualization, anomaly detection, forecasting, and aggregation of time-series data over various time intervals.

Linked Data Event Streams and TimescaleDB for Real-time Timeseries Data Management(image by author)

Full code

To replicate the data flow in this article, please go to the LDES2TimescaleDB GitHub repo. It describes how to set up the dockerized TimescaleDB and Apache NiFi, after which the data flow can be started using the supplied Apache NiFi setup file.

GitHub – samuvack/LDES2TimescaleDB: manage timeseries LDES members in TimescaleDB

Wrapping Up

A Linked Data Event Stream is the core API of fast and slow-moving data. It is a data event stream of a group of immutable objects described as machine-readable RDF (such as sensor observations, address registers or financial data).

To write a LDES to a PostgreSQL or TimescaleDB database, a data conversion flow is configured in Apache NiFi.

With this Medium article as a guideline, you should be able to write Linked Data Event Streams to a TimescaleDB database.

If you like what you read, be sure to ❤️ it — as a writer, it means the world. Stay in touch by following me as an author.

Contributors to this article are ddvlanck (Dwight Van Lancker) (github.com), sandervd (Sander Van Dooren) (github.com) at Smart Data Space (Digital Flanders, Belgium). In a rapidly changing society, governments need to be more agile and resilient than ever. Digital Flanders realizes and supervises digital transformation projects for Flemish and local governments.


Linked Data Event Streams and TimescaleDB for Real-time Timeseries Data Management was originally published in Towards AI on Medium, where people are continuing the conversation by highlighting and responding to this story.

Join thousands of data leaders on the AI newsletter. Join over 80,000 subscribers and keep up to date with the latest developments in AI. From research to projects and ideas. If you are building an AI startup, an AI-related product, or a service, we invite you to consider becoming a sponsor.

Published via Towards AI

Feedback ↓