Name: Towards AI Legal Name: Towards AI, Inc. Description: Towards AI is the world's leading artificial intelligence (AI) and technology publication. Read by thought-leaders and decision-makers around the world. Phone Number: +1-650-246-9381 Email: [email protected]
228 Park Avenue South New York, NY 10003 United States
Website: Publisher: https://towardsai.net/#publisher Diversity Policy: https://towardsai.net/about Ethics Policy: https://towardsai.net/about Masthead: https://towardsai.net/about
Name: Towards AI Legal Name: Towards AI, Inc. Description: Towards AI is the world's leading artificial intelligence (AI) and technology publication. Founders: Roberto Iriondo, , Job Title: Co-founder and Advisor Works for: Towards AI, Inc. Follow Roberto: X, LinkedIn, GitHub, Google Scholar, Towards AI Profile, Medium, ML@CMU, FreeCodeCamp, Crunchbase, Bloomberg, Roberto Iriondo, Generative AI Lab, Generative AI Lab Denis Piffaretti, Job Title: Co-founder Works for: Towards AI, Inc. Louie Peters, Job Title: Co-founder Works for: Towards AI, Inc. Louis-François Bouchard, Job Title: Co-founder Works for: Towards AI, Inc. Cover:
Towards AI Cover
Logo:
Towards AI Logo
Areas Served: Worldwide Alternate Name: Towards AI, Inc. Alternate Name: Towards AI Co. Alternate Name: towards ai Alternate Name: towardsai Alternate Name: towards.ai Alternate Name: tai Alternate Name: toward ai Alternate Name: toward.ai Alternate Name: Towards AI, Inc. Alternate Name: towardsai.net Alternate Name: pub.towardsai.net
5 stars – based on 497 reviews

Frequently Used, Contextual References

TODO: Remember to copy unique IDs whenever it needs used. i.e., URL: 304b2e42315e

Resources

Take our 85+ lesson From Beginner to Advanced LLM Developer Certification: From choosing a project to deploying a working product this is the most comprehensive and practical LLM course out there!

Publication

Linked Data Event Streams and TimescaleDB for Real-time Timeseries Data Management
Latest

Linked Data Event Streams and TimescaleDB for Real-time Timeseries Data Management

Last Updated on March 1, 2023 by Editorial Team

Author(s): Samuel Van Ackere

Originally published on Towards AI.

How to consume a Linked Data Event Stream and store it in a TimescaleDB database

Linked Data Event Streams, TimescaleDB, Timeseries, Linked Data Event Streams, Linked Data Event Streams, Linked Data Event Streams, LDESPhoto by Scott Graham onΒ Unsplash

Linked data eventΒ stream

Linked Data Event Streams represent and share fast and slow-moving data on the Web using the Resource Description Framework (RDF). This allows data to be linked and connected to other data sources using unique identifiers (URIs).

A Linked Data Event Stream is a data event stream of a group of immutable objects described as machine-readable RDF (such as sensor observations, address registers, or financial data).

Linked Data Event Streams explained in 8 minutes

LDES streams provide a flexible and interoperable way of describing and exchanging events as Linked Data, enabling different systems and applications to easily consume and act on data streams in a consistent and standardized way.

This article shows how to effortlessly insert sensor data in the form of an LDES into a TimescaleDB database.

Managing large amounts of time-series data

TimescaleDB is an open-source database for storing and querying large amounts of time-series data. It extends PostgreSQL with time-series support, offering features like fast ingestion and querying of large data, flexible data granularity, and long-term dataΒ storage.

Linked Data Event Streams, Linked Data Event Streams, Linked Data Event Streams, Linked Data Event Streams, TimescaleDB, Linked dataInsert performance comparison between TimescaleDB 2.7.2 and PostgreSQL 14.4

TimescaleDB is a powerful and efficient database system well-suited for storing and querying time-series data at scale. It is widely used in various applications, including IoT, finance, and telemetry.

First, a data flow must be configured to ingest a Linked Data Event Stream into PostgreSQL. This can be done in the Apache NiFi environment.

LDES to TimeScaleDB

To persist LDES in a database, we use the Apache NiFi platform. Apache NiFi is an open-source data processing, and integration platform designed to automate data flow between systems. It provides a Web-based user interface for creating, managing, and monitoring data flow and a range of pre-built connectors and processors for performing data processing tasks.

Linked Data Event Streams and TimescaleDB for Real-time Timeseries Data ManagementData pipeline in Apachine NiFi (image byΒ author)

To consume an LDES stream, an LDES client processor is needed in the Apache NiFi flow. An LDES client is a component capable of consuming, processing, and analyzing the events in theΒ stream.

The output of the LDES Client (working with the following LDES):

_:B5edf92f59913b8d14f45818d5bde1d51 <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://def.isotc211.org/iso19156/2011/Measurement#OM_Measurement> .
_:B5edf92f59913b8d14f45818d5bde1d51 <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.observedProperty> <https://data.vmm.be/concept/waterkwaliteitparameter/conductiviteit> .
_:B5edf92f59913b8d14f45818d5bde1d51 <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.phenomenonTime> "2022-11-09T20:30:00.000Z"^^<http://www.w3.org/2001/XMLSchema#datetime> .
_:B5edf92f59913b8d14f45818d5bde1d51 <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.result> _:B727a9b2b6a6311cc83b677d439f4cf68 .
_:B5edf92f59913b8d14f45818d5bde1d51 <http://www.w3.org/ns/sosa/madeBySensor> <urn:ngsi-v2:cot-imec-be:Device:dwg-iow-csHTUVdGuPYK89L34yi88j> .
_:B6003104aae33209ee0e6a26e14ba38cb <https://schema.org/value> "1.152E1"^^<http://www.w3.org/2001/XMLSchema#double> .
_:Bec20a7e293639ee5a985d621d7b2d6d5 <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://def.isotc211.org/iso19156/2011/Measurement#OM_Measurement> .
_:Bec20a7e293639ee5a985d621d7b2d6d5 <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.observedProperty> <https://data.vmm.be/concept/waterkwaliteitparameter/temperatuur> .
_:Bec20a7e293639ee5a985d621d7b2d6d5 <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.phenomenonTime> "2022-11-09T20:30:00.000Z"^^<http://www.w3.org/2001/XMLSchema#datetime> .
_:Bec20a7e293639ee5a985d621d7b2d6d5 <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.result> _:Bdd20913615f9b4c0169c7770bdc770d4 .
_:Bec20a7e293639ee5a985d621d7b2d6d5 <http://www.w3.org/ns/sosa/madeBySensor> <urn:ngsi-v2:cot-imec-be:Device:dwg-iow-csHTUVdGuPYK89L34yi88j> .
_:Bdd20913615f9b4c0169c7770bdc770d4 <http://def.isotc211.org/iso19103/2005/UnitsOfMeasure#Measure.value> _:B6003104aae33209ee0e6a26e14ba38cb .
<urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://www.w3.org/TR/vocab-ssn-ext/#sosa:ObservationCollection> .
<urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> <http://def.isotc211.org/iso19156/2011/SamplingFeature#SF_SamplingFeatureCollection.member> _:B5edf92f59913b8d14f45818d5bde1d51 .
<urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> <http://def.isotc211.org/iso19156/2011/SamplingFeature#SF_SamplingFeatureCollection.member> _:B3105a27f0059867877fb28653f5a6abc .
<urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> <http://def.isotc211.org/iso19156/2011/SamplingFeature#SF_SamplingFeatureCollection.member> _:Bec20a7e293639ee5a985d621d7b2d6d5 .
<urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> <http://purl.org/dc/terms/isVersionOf> <urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn> .
<urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> <http://www.w3.org/ns/prov#generatedAtTime> "2022-11-09T20:30:00.000Z"^^<http://www.w3.org/2001/XMLSchema#dateTime> .
<urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> <http://www.w3.org/ns/sosa/hasFeatureOfInterest> "spt-00029-97" .
_:B3105a27f0059867877fb28653f5a6abc <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://def.isotc211.org/iso19156/2011/Measurement#OM_Measurement> .
_:B3105a27f0059867877fb28653f5a6abc <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.observedProperty> <https://data.vmm.be/concept/observatieparameter/hydrostatische-druk> .
_:B3105a27f0059867877fb28653f5a6abc <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.phenomenonTime> "2022-11-09T20:30:00.000Z"^^<http://www.w3.org/2001/XMLSchema#datetime> .
_:B3105a27f0059867877fb28653f5a6abc <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.result> _:B2598b988faa2635d5dd520f59f376b8e .
_:B3105a27f0059867877fb28653f5a6abc <http://www.w3.org/ns/sosa/madeBySensor> <urn:ngsi-v2:cot-imec-be:Device:dwg-iow-csHTUVdGuPYK89L34yi88j> .
<https://iow.smartdataspace.beta-vlaanderen.be/water-quality-observations> <https://w3id.org/tree#member> <urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> .
_:B727a9b2b6a6311cc83b677d439f4cf68 <http://def.isotc211.org/iso19103/2005/UnitsOfMeasure#Measure.value> _:B9776c69148d2ef600aed62aa2c85bd40 .
_:B2598b988faa2635d5dd520f59f376b8e <http://def.isotc211.org/iso19103/2005/UnitsOfMeasure#Measure.value> _:B436b5950c7725d6d9a38aaf6ca88b00f .
_:B9776c69148d2ef600aed62aa2c85bd40 <https://schema.org/value> "1120"^^<http://www.w3.org/2001/XMLSchema#integer> .
_:B436b5950c7725d6d9a38aaf6ca88b00f <https://schema.org/value> "673"^^<http://www.w3.org/2001/XMLSchema#integer> .

Next in the Apache NiFi flow is a version materialization component. β€œVersion materialization” refers to the process of removing version information from an LDES member and reverting it to reconstruct a β€œstate” object, which only reflects the current state of the LDES member without historical information aboutΒ changes.

Version materialization is needed since a consumer doesn’t want to store these versions in a database.

The data is first converted to JSON-LD to convert to a tabular structure easily. This JSON-LD file looks likeΒ this:

{
"@id" : "urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-g9kEXxUeP28TNc6wJvFsau",
"@type" : "Observatieverzameling",
"Bemonsteringsobjectverzameling.lid" : [ {
"@id" : "_:b8",
"@type" : "Meting",
"Observatie.geobserveerdKenmerk" : "https://data.vmm.be/concept/waterkwaliteitparameter/temperatuur",
"Observatie:.phenomenonTime" : {
"@type" : "http://www.w3.org/2001/XMLSchema#datetime",
"@value" : "2022-11-09T19:30:00.000Z"
},
"Observatie.resultaat" : {
"@id" : "_:b6",
"Maat.maat" : {
"@id" : "_:b1",
"https://schema.org/value" : {
"@type" : "http://www.w3.org/2001/XMLSchema#double",
"@value" : "1.153E1"
}
}
},
"Observatie.uitgevoerdMetSensor" : "urn:ngsi-v2:cot-imec-be:Device:dwg-iow-8JgF3vxoYKoXzwWGUiX3Yc"
}, {
"@id" : "_:b3",
"@type" : "Meting",
"Observatie.geobserveerdKenmerk" : "https://data.vmm.be/concept/waterkwaliteitparameter/conductiviteit",
"Observatie:.phenomenonTime" : {
"@type" : "http://www.w3.org/2001/XMLSchema#datetime",
"@value" : "2022-11-09T19:30:00.000Z"
},
"Observatie.resultaat" : {
"@id" : "_:b4",
"Maat.maat" : {
"@id" : "_:b5",
"https://schema.org/value" : {
"@type" : "http://www.w3.org/2001/XMLSchema#integer",
"@value" : "920"
}
}
},
"Observatie.uitgevoerdMetSensor" : "urn:ngsi-v2:cot-imec-be:Device:dwg-iow-8JgF3vxoYKoXzwWGUiX3Yc"
}, {
"@id" : "_:b7",
"@type" : "Meting",
"Observatie.geobserveerdKenmerk" : "https://data.vmm.be/concept/observatieparameter/hydrostatische-druk",
"Observatie:.phenomenonTime" : {
"@type" : "http://www.w3.org/2001/XMLSchema#datetime",
"@value" : "2022-11-09T19:30:00.000Z"
},
"Observatie.resultaat" : {
"@id" : "_:b2",
"Maat.maat" : {
"@id" : "_:b0",
"https://schema.org/value" : {
"@type" : "http://www.w3.org/2001/XMLSchema#integer",
"@value" : "11133"
}
}
},
"Observatie.uitgevoerdMetSensor" : "urn:ngsi-v2:cot-imec-be:Device:dwg-iow-8JgF3vxoYKoXzwWGUiX3Yc"
} ],
"http://www.w3.org/ns/prov#generatedAtTime" : {
"@type" : "http://www.w3.org/2001/XMLSchema#dateTime",
"@value" : "2022-11-09T19:30:00.000Z"
},
"http://www.w3.org/ns/sosa/hasFeatureOfInterest" : "spt-00027-06"
}

Hereafter, a transformation into a tabular structure is needed and can be done via a SPARQL query or a JOLT transformation. For this case, I use a JOLT transformation. A JOLT transformation filters out the relevant parameters and puts them in a structured JSON file (see output underneath).

{
"id": "urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn",
"temperature_value": "1.152E1",
"temperature_date": "2022-11-09T20:30:00.000Z",
"temperature_sensor": "urn:ngsi-v2:cot-imec-be:Device:dwg-iow-csHTUVdGuPYK89L34yi88j",
"conductivity_value": "1120",
"conductivity_date": "2022-11-09T20:30:00.000Z",
"conductivity_sensor": "urn:ngsi-v2:cot-imec-be:Device:dwg-iow-csHTUVdGuPYK89L34yi88j",
"hydro_pressure_value": "673",
"hydro_pressure_date": "2022-11-09T20:30:00.000Z",
"hydro_pressure_sensor": "urn:ngsi-v2:cot-imec-be:Device:dwg-iow-csHTUVdGuPYK89L34yi88j"
}

After this transformation, the data can be written to a PostgreSQL or TimescaleDB database. Analysis that can be done with data in the TimescaleDB database includes but is not limited to time-series visualization, anomaly detection, forecasting, and aggregation of time-series data over various time intervals.

Linked Data Event Streams and TimescaleDB for Real-time Timeseries Data Management(image byΒ author)

Full code

To replicate the data flow in this article, please go to the LDES2TimescaleDB GitHub repo. It describes how to set up the dockerized TimescaleDB and Apache NiFi, after which the data flow can be started using the supplied Apache NiFi setupΒ file.

GitHub – samuvack/LDES2TimescaleDB: manage timeseries LDES members in TimescaleDB

Wrapping Up

A Linked Data Event Stream is the core API of fast and slow-moving data. It is a data event stream of a group of immutable objects described as machine-readable RDF (such as sensor observations, address registers or financial data).

To write a LDES to a PostgreSQL or TimescaleDB database, a data conversion flow is configured in ApacheΒ NiFi.

With this Medium article as a guideline, you should be able to write Linked Data Event Streams to a TimescaleDB database.

If you like what you read, be sure to ❀️ itβ€Šβ€”β€Šas a writer, it means the world. Stay in touch by following me as anΒ author.

Contributors to this article are ddvlanck (Dwight Van Lancker) (github.com), sandervd (Sander Van Dooren) (github.com) at Smart Data Space (Digital Flanders, Belgium). In a rapidly changing society, governments need to be more agile and resilient than ever. Digital Flanders realizes and supervises digital transformation projects for Flemish and local governments.


Linked Data Event Streams and TimescaleDB for Real-time Timeseries Data Management was originally published in Towards AI on Medium, where people are continuing the conversation by highlighting and responding to this story.

Join thousands of data leaders on the AI newsletter. Join over 80,000 subscribers and keep up to date with the latest developments in AI. From research to projects and ideas. If you are building an AI startup, an AI-related product, or a service, we invite you to consider becoming aΒ sponsor.

Published via Towards AI

Feedback ↓