
How to Augment Wildfire Datasets with Historical Weather Data using Python and Google Earth Engine
Last Updated on July 4, 2025 by Editorial Team
Author(s): Ruiz Rivera
Originally published on Towards AI.
Picture this: Youβre a data scientist working with wildfire data, and all you have are basic fire records β location coordinates, timestamps, and maybe a unique fire ID. While this information tells you where and when a fire occurred, it doesnβt tell you why it started or spread the way it did.
Weather conditions play a crucial role in wildfire behavior. Temperature, wind speed, humidity, and soil moisture can mean the difference between a small grass fire and a devastating blaze. But manually collecting historical weather data for hundreds or thousands of fire locations? That sounds like a nightmare.
This is where the power of Python and Google Earth Engine comes to the rescue! With additional context built in to our code, weβre capable of building more sophisticated fire risk model aided by machine learning to incorporate historical weather patterns alongside location and vegetation data, potentially identifying high-risk areas before fire season even begins.
The results of this work is crucial as it extends the confines of the labs and offices thinking about these very challenges. For example, of the wide range of sectors we can impact with this engineered dataset, one that you may not have thought about is the insurance industry. Insurance companies can leverage this enriched dataset to develop more accurate wildfire risk premiums by analyzing how specific weather conditions (like low humidity combined with high winds) correlate with fire severity and property damage, leading to more precise underwriting and fairer pricing models. We can also impact emergency management agencies by helping them develop more effective early warning systems through the identification of weather conditions that historically preceded large fires, thus enabling them to issue targeted alerts and deploy resources proactively to save not only property, but most importantly, lives.
In this tutorial, weβll build a tool that automatically enriches wildfire datasets with comprehensive weather information, opening up new possibilities for fire risk assessment, insurance modeling, and research applications. By the end of it, youβll have a Python script that can accomplish the following:
- Take location data (coordinates + timestamps)
- Automatically fetch historical weather data from Google Earth Engineβs ERA5 dataset
- Process large datasets efficiently using batch processing
- Handle multiple file formats (CSV, Excel, JSON, SQLite)
- Export enriched datasets ready for analysis
The enriched data will include temperature, wind speed/direction, humidity levels, and soil temperature β all the environmental context you need for meaningful wildfire analysis. You ready to make big impact with data? Right then. Letβs begin.
Prerequisites
Before diving in, make sure you have:
- Python 3.8+ installed
- A Google Cloud Project with Earth Engine API enabled
- Basic familiarity with pandas and data manipulation
1. Installing dependencies
First, letβs install the required packages by saving them and their desired versions in a file weβll title as requirements.txt
:
earthengine-api==0.1.406
geemap==0.32.1 #Backup version: 0.20.4
pandas>=1.3.0
python-dotenv>=0.19.0
requests>=2.25.0
Then run the following command in a terminal that can run Python scripts such as Bash or Anaconda Prompt:
!pip install -r requirements.txt
2. Project configuration
And if you donβt already have a .env
file in your repository, create one using the following commands:
touch .env
start .env
If you needed help creating a Google Earth Engine project, lucky for you Iβve published a tutorial on exactly how to do so. Be sure to check it out if needed!
This approach keeps sensitive information out of your code β perfect for sharing or open-source projects. Once you have a Google Earth Engine project name in place, edit the .env
file and add your Google Earth Engine project name in it:
PROJECT_NAME="insert-your-project-name"
3. Secure your credentials
Additionally, itβs crucial to add a .gitignore
file to our repository using the following commands we saw earlier in the event that we don't already have one:
touch .gitignore
start .gitignore
The .gitignore
file acts as a security guard for your repository, preventing Git from tracking, uploading, and displaying sensitive information when you commit code to version control platforms like GitHub. Adding files like .env
to .gitignore
is crucial because these files contain sensitive information like your Google Cloud project name, API keys, passwords, and other configuration secrets that should never be publicly visible. Without proper .gitignore
protection, you could accidentally expose your project credentials to anyone who views your repository, potentially leading to unauthorized access to your Google Earth Engine resources or unexpected billing charges.
The .gitignore
file ensures that only your code gets shared publicly while keeping your personal configuration and credentials safely on your local machine. If you're curious as to what a .gitignore
file looks like or how it should be structured, feel free to check out the template I've used for this project.
4. Input Data Requirements
The last prerequisite weβll need when feeding data to our script is that it expects the wildfire data to contain some version of these four essential columns:
Note that the date format is flexible as it can handle both YYYYMMDD
(date only) and YYYYMMDDHHMMSS
(date and time) formats. For our script to run, we'll need either the file path or the link pointing to the dataset you plan on processing.
Understanding the core components
Now that weβve laid the groundwork to successfully (and safely) run this tutorial, letβs go through out Python code piece by piece so we understand how weβre extracting this historical weather data:
1. Date Conversion Utility
One challenge with wildfire datasets is inconsistent date formats. With the code below, letβs create a robust date converter function which can gracefully handles different date formats and missing values essential for real-world data processing:
import ee
import geemap
import logging
import math
import random
import requests
import os
import sqlite3
import time
import datetime as dt
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from dotenv import load_dotenv
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def convert_float_to_datetime(series):
"""
Converts a pandas Series of float-based date values to datetime.
Handles:
- 8-digit formats (YYYYMMDD)
- 14-digit formats (YYYYMMDDHHMMSS)
- Invalid lengths become NaT
"""
# Initialize output series with NaT (same index/dtype as input)
datetime_series = pd.Series(index=series.index,
data=series, # data=pd.NaT,
dtype='datetime64[ns]')
# Process non-null values
non_null = series.dropna()
if non_null.empty:
return datetime_series
try:
# Convert to integer then string (avoids scientific notation)
str_dates = non_null.astype('int64').astype(str)
# Identify valid date lengths
mask_8 = str_dates.str.len() == 8 # Date only
mask_14 = str_dates.str.len() == 14 # Date + time
# Parse valid formats
parsed_dates = pd.concat([
pd.to_datetime(str_dates[mask_8], format='%Y%m%d', errors='coerce'),
pd.to_datetime(str_dates[mask_14], format='%Y%m%d%H%M%S', errors='coerce')
]).sort_index() # Maintain original order
# Update result series with valid dates
datetime_series.update(parsed_dates)
except Exception as e:
print(f"Conversion error: {str(e)}")
return datetime_series
Weβve also created a create_dataframe()
function serves as a universal data loader that intelligently handles multiple file formats (CSV, Excel, JSON, SQLite) from both local file paths and remote URLs, making it extremely user-friendly for loading wildfire datasets regardless of how they're stored. It automatically detects the file type based on the file extension and applies the appropriate pandas reading method, while also providing interactive prompts to guide users through the data loading process.
def create_dataframe(hardcoded_path=None, sheet_name=None, db_name=None, table_name=None):
"""
Create a Pandas DataFrame from a file path or a link.
Parameters:
- hardcoded_path (str, optional): The file path or link to be used. If None, the user will be prompted to enter it.
- sheet_name (str, optional): The sheet name for Excel files or table name for SQL databases.
- db_name (str, optional): The database name for SQL databases.
- table_name (str, optional): The table name for SQL databases.
"""
if hardcoded_path is None:
filepath_or_link = input("/n Please enter the file path or link for the data source: ")
else:
filepath_or_link = hardcoded_path
try:
# Handle web URL
if filepath_or_link.startswith(('http://', 'https://')):
if filepath_or_link.endswith('.csv'):
df = pd.read_csv(filepath_or_link)
elif filepath_or_link.endswith(('.xls', '.xlsx')):
df = pd.read_excel(filepath_or_link, sheet_name=sheet_name)
elif filepath_or_link.endswith('.json'):
df = pd.read_json(filepath_or_link)
elif filepath_or_link.endswith('.db'):
response = requests.get(filepath_or_link)
db_name = input("\n Please enter the DATABASE NAME ONLY (do not include the .db extension): ")
table_name = input(f"\n Please enter the specific TABLE NAME you wish to access in the {db_name} database: ")
with open(db_name, "wb") as f:
f.write(response.content)
conn = sqlite3.connect(db_name)
query = f"SELECT * FROM {table_name}" if table_name else "SELECT * FROM sqlite_master WHERE type='table';"
df = pd.read_sql_query(query, conn)
conn.close()
else:
raise ValueError("Unsupported file format from the URL.")
else:
# Handle local file path
filepath_or_link = os.path.expanduser(filepath_or_link)
if not os.path.exists(filepath_or_link):
raise FileNotFoundError(f"The file {filepath_or_link} does not exist.")
if filepath_or_link.endswith('.csv'):
df = pd.read_csv(filepath_or_link)
elif filepath_or_link.endswith(('.xls', '.xlsx')):
df = pd.read_excel(filepath_or_link, sheet_name=sheet_name)
elif filepath_or_link.endswith('.json'):
df = pd.read_json(filepath_or_link)
elif filepath_or_link.endswith('.db'):
conn = sqlite3.connect(filepath_or_link)
query = f"SELECT * FROM {table_name}" if table_name else "SELECT * FROM sqlite_master WHERE type='table';"
df = pd.read_sql_query(query, conn)
conn.close()
else:
raise ValueError("Unsupported file format.")
print("\n ✅ DataFrame created successfully:")
return df
except Exception as e:
print(f"\n 🙄 An error occurred: {e}")
return None
2. Helper Functions
With our utilitarian functions out of the way, letβs talk about accessing Googleβs Earth Engine API! The heart of our tool is the get_weather_data()
function we've written to query Google's Earth Engine for weather data. The aim of this function is to handle the complex process of querying satellite data, converting units, and calculating derived metrics like wind speed from component vectors.
And within get_weather_data()
are helper functions like the sample_point_data()
function that handles the technical complexity of actually extracting weather data from Google Earth Engine at specific coordinates, including error handling when satellite data isn't available at a location. It abstracts away the Earth Engine API intricacies, allowing get_weather_data()
to focus on data processing logic rather than API management details.
def sample_point_data(weather_data, point, lat, lon, date_val):
"""
Sample Earth Engine data at a specific point.
Args:
weather_data: Earth Engine Image with weather variables
point: Earth Engine Geometry point
lat: Latitude value for logging
lon: Longitude value for logging
date_val: Date value for logging
Returns:
Dictionary of sampled data or None if no data available
"""
sample_result = weather_data.sample(point, 30).first()
# Check if sample_result is null
if sample_result is None or ee.Algorithms.IsEqual(sample_result, None).getInfo():
logger.warning(f"No data at point ({lat}, {lon}) for date {date_val}")
return None
# Convert to dictionary
return sample_result.toDictionary().getInfo()
In my opinion, the magic with this process is our retry_with_backoff()
decorator function engineered to supports the sample_point_data()
function by automatically retrying failed API calls to Google Earth Engine. However, what's interesting about this function is that it increases the delay between attempts, making the weather data extraction process more robust against temporary network issues or API timeouts. It also implements exponential backoff (doubling the wait time after each failure) and adds random jitter to prevent multiple processes from overwhelming the API simultaneously when retrying. This function is crucial for handling the inherent instability of remote API calls, ensuring that temporary hiccups don't derail the entire weather data enrichment process when processing large wildfire datasets.
# Apply the retry decorator to the sampling function
sample_point_data_with_retry = retry_with_backoff()(sample_point_data)
Another important helper function is wind_direction_to_text()
which transforms the calculated wind direction from degrees (like 245.8Β°) into human-readable cardinal directions (like "Southwest"), making the output dataset more interpretable for analysts and researchers.
def wind_direction_to_text(wind_dir_deg):
"""
Convert wind direction in degrees to 8-point cardinal direction.
Args:
wind_dir_deg (float): Wind direction in degrees (0-360)
Returns:
str: Cardinal direction as text (N, NE, E, SE, S, SW, W, NW)
"""
# Define direction ranges and corresponding text
directions = [
(337.5, 360, "North"),
(0, 22.5, "North"),
(22.5, 67.5, "Northeast"),
(67.5, 112.5, "East"),
(112.5, 157.5, "Southeast"),
(157.5, 202.5, "South"),
(202.5, 247.5, "Southwest"),
(247.5, 292.5, "West"),
(292.5, 337.5, "Northwest")
]
# Normalize the degree to be between 0 and 360
wind_dir_deg = wind_dir_deg % 360
# Find the matching direction
for start, end, direction in directions:
if (start <= wind_dir_deg < end) or (start <= wind_dir_deg <= end and end == 360):
return direction
# This should never happen if the ranges are correct
return "Unknown"
Together, these functions separate concerns where the former handles the data extraction complexity while the latter enhances data usability β allowing get_weather_data()
to orchestrate the overall weather enrichment process cleanly:
def get_weather_data(row, id_col):
"""
Extract weather data from Google Earth Engine for a specific location and time.
Args:
row: DataFrame row containing 'ignition_datetime', 'LATITUDE', and 'LONGITUDE'
Returns:
dict: Weather data or NaN values if data cannot be retrieved
"""
# Validate input data
date_val = row.get('ignition_datetime')
lat = row.get('LATITUDE')
lon = row.get('LONGITUDE')
fire_label = row.get(id_col)
# Initialize default return values
default_values = {
'temperature_c': np.nan,
'wind_speed_ms': np.nan,
'wind_direction_deg': np.nan,
'wind_direction': 'No data returned',
'humidity_dewpoint_temperature_2m': np.nan,
'soil_temperature_level_1': np.nan,
'fire_label': fire_label,
'ignition_datetime': date_val
}
try:
# Check if we have all required values
if date_val is None or pd.isna(date_val) or not isinstance(date_val, datetime):
logger.warning(f"Fire label {fire_label} has an invalid ignition_datetime: {date_val}")
return default_values
if lat is None or pd.isna(lat) or lon is None or pd.isna(lon):
logger.warning(f"Fire label {fire_label} has an invalid coordinates: lat={lat}, lon={lon}")
return default_values
# Convert datetime to Earth Engine format
date = ee.Date(date_val)
# Create point geometry
point = ee.Geometry.Point([lon, lat])
# Get ERA5 reanalysis data
# era5 = ee.ImageCollection('ECMWF/ERA5/HOURLY')
era5 = ee.ImageCollection('ECMWF/ERA5_LAND/HOURLY')
# Filter to the date (add buffer to ensure we get data)
start_date = date.advance(-1, 'hour')
end_date = date.advance(2, 'hour')
era5_filtered = era5.filterDate(start_date, end_date)
# Check if we have any images
if era5_filtered.size().getInfo() == 0:
logger.warning(f"No ERA5 data found for time range around {date_val} for the {fire_label} fire label")
# return None
return default_values
# Get the image closest to our target time
era5_list = era5_filtered.toList(era5_filtered.size())
era5_img = ee.Image(era5_list.get(0)) # Get first image
# Extract weather variables at the point (using resample for faster computation)
weather_data = era5_img.select(
['temperature_2m', 'u_component_of_wind_10m',
'v_component_of_wind_10m', 'dewpoint_temperature_2m', 'soil_temperature_level_1']).resample("bilinear")
# Sample the point with error handling and retry
try:
data = sample_point_data_with_retry(weather_data, point, lat, lon, date_val)
# Check if data is empty
if not data:
logger.warning(f"Empty data returned for ({lat}, {lon}) at {date_val} for the {fire_label} fire label")
return default_values
# Calculate wind speed and direction from u,v components
u = data.get('u_component_of_wind_10m', 0)
v = data.get('v_component_of_wind_10m', 0)
wind_speed = (u**2 + v**2)**0.5
# Avoid division by zero or undefined math
if u == 0 and v == 0:
wind_dir = 0 # No wind
else:
wind_dir = (270 - (180/3.14159) * math.atan2(v, u)) % 360
# Convert temperature from K to C (handle None values)
temp_k = data.get('temperature_2m')
temp_c = temp_k - 273.15 if temp_k is not None else np.nan
return {
'temperature_c': temp_c,
'wind_speed_ms': wind_speed,
'wind_direction_deg': wind_dir,
'wind_direction': wind_direction_to_text(wind_dir),
'humidity_dewpoint_temperature_2m': data.get('dewpoint_temperature_2m'),
'soil_temperature_level_1': data.get('soil_temperature_level_1'),
'fire_label': fire_label,
'ignition_datetime': date_val
}
except ee.EEException as e:
logger.error(f"Earth Engine sampling error for ({lat}, {lon}) at {date_val}: {str(e)}")
return default_values
except Exception as e:
logger.error(f"Error processing row: {str(e)}")
# For debugging in development
# import traceback
# logger.error(traceback.format_exc())
return default_values
3. Data Extraction
Now that weβve understood how the data is extracted from the Google Earth Engine, letβs now have a look at the function orchestrating this entire process. The process_dataframe()
function is the workhorse that breaks large wildfire datasets into manageable batches (default 100 records) to avoid overwhelming Google Earth Engine's API limits while providing real-time progress updates. It includes smart error handling that lets processing continue even if individual batches fail, and automatically saves each batch's results to prevent data loss if the process gets interrupted. The function also incorporates mandatory delays between batches to respect API rate limits, ensuring smooth and reliable weather data extraction.
def process_dataframe(df, batch_size=100, batch_delay=3, id_col=None):
"""
Process the dataframe in batches to avoid Earth Engine quota issues.
Args:
df: DataFrame with wildfire data
batch_size: Number of rows to process in each batch
batch_delay: Delay in seconds between processing batches
Returns:
DataFrame with added weather data
"""
df = df[df["ignition_datetime"].notna()].sort_values("ignition_datetime")
results = []
total_batches = (len(df) + batch_size - 1) // batch_size
# Process in batches
for i in range(0, len(df), batch_size):
batch = df.iloc[i:i+batch_size]
batch_num = i//batch_size + 1
# Clear progress reporting
print(f"\n Processing batch {batch_num} of {total_batches} (rows {i} to {min(i+batch_size-1, len(df)-1)})")
logger.info(f"\n Processing batch {batch_num} of {total_batches} (rows {i} to {min(i+batch_size-1, len(df)-1)})")
# Apply to each row in this batch
batch_results = batch.apply(get_weather_data, axis=1, result_type='expand', args=(id_col,))
### ADDITIONAL CHECK ###
# Check if 'temp_c' key exists in the dictionary and is not empty # or not(bool(batch_results['temperature_c']))
if 'temperature_c' not in batch_results or (len(batch_results['temperature_c'].value_counts()) == 0):
print(f"Skipping batch {batch_num} of {total_batches} - no temperature data available \n\n")
continue # Skip to the next iteration of the loop
else:
results.append(batch_results)
# Add progress information
print(f"\n Completed batch {batch_num}/{total_batches} ({batch_num/total_batches*100:.1f}%) \n")
logger.info(f"Completed batch {batch_num}/{total_batches} ({batch_num/total_batches*100:.1f}%)")
# Saving each batch to ensure we don't waste computation:
download_name = f"weather_data_batch_{batch_num}_of_{total_batches}.csv"
save_results_to_downloads(batch_results, filename=download_name)
# Add a delay between batches to reduce pressure on the API
if batch_num < total_batches:
print(f"\n Pausing for {batch_delay} second(s) before next batch... \n\n")
logger.info(f"Pausing for {batch_delay} second(s) before next batch...")
time.sleep(batch_delay)
# Combine all batches
if results:
print("\n Concatenating weather results... \n\n")
logger.info("Concatenating weather results...")
weather_data = pd.concat(results)
# Force completion of all pending operations
print("\n Finalizing all Earth Engine operations...")
ee.data.computeValue(ee.Number(1)) # This forces a sync point
print("\n Weather data processing complete.")
logger.info("Weather data processing complete.")
return weather_data
else:
print("No weather data to process.")
logger.warning("No weather data to process.")
return df
And for each batch thatβs processed, weβll rely on the save_results_to_downloads()
function to download our result in the scenario that an error occurs as we're scraping data. The function was engineered to handles the file management aspect by dynamically creating a temp_downloads
folder in the user's Downloads directory (or any specified location) so that we get immediate feedback from the output files and continuously save our progress in an easily accessible location.
def save_results_to_downloads(weather_data, filename='weather_data.csv', save_folder='temp_downloads'):
"""
Save results directly to temporary downloads folder
Args:
weather_data: DataFrame to save
filename: Name of the file to save
Returns:
Path where the file was saved
"""
# Save to new folder in Downloads or create it if it doesn't exist
relative_path = '~/Downloads'
expand = os.path.expanduser(relative_path)
save_path = f'{expand}/{save_folder}'
if not os.path.exists(save_path):
os.makedirs(save_path)
# Create full file path
full_path = os.path.join(save_path, filename)
# Save the DataFrame
weather_data.to_csv(full_path, index=False)
print(f"✅ Data successfully saved to: {full_path}")
In summary, our batch processing approach provides several benefits:
- Progress tracking: Users can see processing status
- Error resilience: Failed batches donβt stop the entire process
- Rate limiting: Prevents overwhelming the API
- Resume capability: Individual batch saves allow restarting from failures
4. Script Execution
Now that weβve discussed how all the individual functions play their role in the data extraction process, we can now enact the final step of bringing everything we learned together in the main execution script:
def main():
# Load environment variables
load_dotenv()
# Authentication
# Get the secret project name from userdata
project_name = os.environ['PROJECT_NAME']
if not project_name:
raise ValueError("PROJECT_NAME environment variable is required. Please set it in your .env file.")
# Trigger the authentication flow.
ee.Authenticate()
# Initialize the library.
ee.Initialize(project=project_name)
start_time = time.time()
df = create_dataframe() #[:200]
datetime_col = input("\n Enter the column name of the DATETIME column in your dataset: ")
df["ignition_datetime"] = convert_float_to_datetime(df[datetime_col])
unique_id_col = input("\n Enter the column name of the column you wish to designate as the UNIQUE ID field for your dataset: ")
weather_data = process_dataframe(df, id_col=unique_id_col)
end_time = time.time()
print(f"Execution took {end_time - start_time:.2f} seconds, or {((end_time - start_time) / 60):.2f} minutes, or {((end_time - start_time) / 3600):.2f} hours")
save_results_to_downloads(weather_data)
if __name__ == "__main__":
main()
Conclusion
Voila! You now have a powerful tool for enriching wildfire datasets with weather information. This opens up numerous possibilities for analysis and modeling. Consider extending the tool to include:
- Additional weather variables (precipitation, atmospheric pressure)
- Vegetation indices from satellite imagery
- Topographical data (elevation, slope)
- Historical fire perimeter data
The combination of location, time, weather, and environmental data creates rich datasets perfect for machine learning applications in wildfire research and risk management.
Thank you for reading till the very end! Please feel free to leave us some feedback by contacting us directly or leaving comments in the articleβs comment section. And if you want even more content to help you get started with your wildfire management research, we invite you to check out the BC Governmentβs Wildfire Predictive Servicesβ (WPS) GitHub page where weβll continue to post tutorials on this very topic.
Till next time 👋
Join thousands of data leaders on the AI newsletter. Join over 80,000 subscribers and keep up to date with the latest developments in AI. From research to projects and ideas. If you are building an AI startup, an AI-related product, or a service, we invite you to consider becoming aΒ sponsor.
Published via Towards AI