In this script, we will train a deepSSF model on the training data. In this case the training data was generated using the deepSSF_data_prep_id.qmd script, which crops out local images for each step of the observed telemetry data.
Model overview
Refer to the paper and the Model Overview section for a conceptual overview of the model.
Detect computing environment
If using Google Colab, mount the drive and set the base directory to the working folder. If using local, set the base directory.
Code
import os # Operating system utilitiesimport sys# Detect environmentdef is_colab():"""Returns True if running in Google Colab, False otherwise."""try:import google.colabreturnTrueexceptImportError:returnFalse# Set up environment-specific configurationsif is_colab():# Colab-specific setup!pip install rasteriofrom google.colab import drive drive.mount('/content/drive') sys.path.append('/content/drive/MyDrive/GitHub/deepSSF/Python')# for saving plots etc base_path ='/content/drive/MyDrive/GitHub/deepSSF'print("Running in Google Colab environment")else:# Local environment setup base_path ='..'print("Running in local environment")# Now you can use base_path regardless of environmentprint(f"Using base path: {base_path}")
Running in local environment
Using base path: ..
Import packages
Code
print(sys.version) # Print Python version in useimport numpy as np # Array operationsimport matplotlib.pyplot as plt # Plotting libraryimport mpmath as mp # Math libraryimport torch # Main PyTorch libraryimport torch.optim as optim # Optimization algorithmsimport torch.nn as nn # Neural network modulesimport os # Operating system utilitiesimport glob # Pattern matchingimport imageio.v2 as imageio # Image manipulation - for creating GIFsfrom IPython.display import Image, display # For plotting GIFsimport pandas as pd # Data manipulationimport rasterio # Geospatial raster datafrom torch.utils.data import Dataset, DataLoader # Dataset and batch data loadingfrom datetime import datetime, timedelta # Date/time utilitiesfrom rasterio.plot import show # Plot raster dataimport deepSSF_utils # deepSSF utility functions# Get today's datetoday_date = datetime.today().strftime('%Y-%m-%d')print(today_date)# Set random seed for reproducibility# seed = 42
3.12.5 | packaged by Anaconda, Inc. | (main, Sep 12 2024, 18:18:29) [MSC v.1929 64 bit (AMD64)]
2025-07-09
Set the device (accelerator - cuda for NVIDIA GPU or mps for Mac)
Code
# Set the device to be used (GPU or CPU)if torch.cuda.is_available(): device ="cuda"elif torch.backends.mps.is_available(): # For Mac M1/M2/M3 device ="mps"else: device ="cpu"print(f"Using {device} device")if torch.backends.mps.is_available():# Set default tensor type for PyTorch torch.set_default_dtype(torch.float32)print('Set default tensor type to float32')
Using cpu device
Select individual and create directory to save model weights and outputs
Code
# select the id to train the model onbuffalo_id =2005# in our case the actual dataset will be slightly smaller due to steps being removed that were outside the extentn_samples =10297# n_samples = 100
Create a directory to save the outputs
If we have already run this code today, we will add update index to create a new folder
Code
# Count existing directories with similar patternpattern =f'{base_path}/Python/outputs/model_training/id{buffalo_id}_deepSSF_training_*_{today_date}'existing_dirs = glob.glob(pattern)dir_index =len(existing_dirs) +1# Create directory with indexoutput_dir =f'{base_path}/Python/outputs/model_training/id{buffalo_id}_deepSSF_training_{dir_index}_{today_date}'os.makedirs(output_dir, exist_ok=True)print(f"Created directory: {output_dir}")# To use an existing directory for loading trained model# output_dir = f'{base_path}/Python/outputs/model_training/id2005_2025-04-01'
Created directory: ../Python/outputs/model_training/id2005_deepSSF_training_1_2025-07-09
Set paths to data
Code
# Specify the path to CSV file# csv_file_path = f'{base_path}/buffalo_local_data_id/buffalo_{buffalo_id}_data_df_lag_1hr_n{n_samples}.csv'csv_file_path =f'{base_path}/buffalo_local_data_id/buffalo_temporal_cont_{buffalo_id}_data_df_lag_1hr_n{n_samples}.csv'# Path to your TIF filendvi_path =f'{base_path}/buffalo_local_layers_id/buffalo_{buffalo_id}_ndvi_cent101x101_lag_1hr_n{n_samples}.tif'# Path to your TIF filecanopy_path =f'{base_path}/buffalo_local_layers_id/buffalo_{buffalo_id}_canopy_cent101x101_lag_1hr_n{n_samples}.tif'# Path to your TIF fileherby_path =f'{base_path}/buffalo_local_layers_id/buffalo_{buffalo_id}_herby_cent101x101_lag_1hr_n{n_samples}.tif'# Path to your TIF fileslope_path =f'{base_path}/buffalo_local_layers_id/buffalo_{buffalo_id}_slope_cent101x101_lag_1hr_n{n_samples}.tif'# Path to your TIF file# pres_path = f'{base_path}/buffalo_local_layers_id/buffalo_{buffalo_id}_pres_cent101x101_lag_1hr_n{n_samples}.tif'pres_path =f'{base_path}/buffalo_local_layers_id/fixed_buffalo_{buffalo_id}_pres_cent101x101_lag_1hr_n{n_samples}.tif'# pres_path = f'{base_path}/buffalo_local_layers_id/within_cell_buffalo_{buffalo_id}_pres_cent101x101_lag_1hr_n{n_samples}.tif'
Read buffalo data
Code
# Read the CSV file into a DataFramebuffalo_df = pd.read_csv(csv_file_path)print(buffalo_df.shape)# Lag the values in column 'A' by one index to get the bearing of the previous stepbuffalo_df['bearing_tm1'] = buffalo_df['bearing'].shift(1)# Pad the missing value with a specified value, e.g., 0buffalo_df['bearing_tm1'] = buffalo_df['bearing_tm1'].fillna(0)# Display the first few rows of the DataFrameprint(buffalo_df.head())
# Using rasteriowith rasterio.open(ndvi_path) as ndvi:# Read all layers/channels into a single numpy array# rasterio indexes channels starting from 1, hence the range is 1 to src.count + 1 ndvi_stack = ndvi.read([i for i inrange(1, ndvi.count +1)])print(ndvi_stack.shape)
(10103, 101, 101)
Normalise the layers
Code
# Replace NaNs in the original array with -1, which represents waterndvi_stack = np.nan_to_num(ndvi_stack, nan=-1.0)# Convert the numpy array to a PyTorch tensor, which is the format required for training the modelndvi_tens = torch.from_numpy(ndvi_stack)print(ndvi_tens.dtype)# Print the shape of the PyTorch tensorprint(ndvi_tens.shape)# Print the mean, max, and min values of the NDVI tensorndvi_mean = torch.mean(ndvi_tens)ndvi_max = torch.max(ndvi_tens)ndvi_min = torch.min(ndvi_tens)print("Mean = ", ndvi_mean)print("Max = ", ndvi_max)print("Min = ", ndvi_min)# Normalizing the datandvi_tens_norm = (ndvi_tens - ndvi_min) / (ndvi_max - ndvi_min)print("Mean = ", torch.mean(ndvi_tens_norm))print("Max = ", torch.max(ndvi_tens_norm))print("Min = ", torch.min(ndvi_tens_norm))
torch.float32
torch.Size([10103, 101, 101])
Mean = tensor(0.3048)
Max = tensor(0.8220)
Min = tensor(-0.2894)
Mean = tensor(0.5347)
Max = tensor(1.)
Min = tensor(0.)
Plot a single NDVI layer
Code
for i inrange(0, 1): plt.imshow(ndvi_tens_norm[i].numpy()) plt.colorbar() plt.show()
Canopy cover
Code
# Using rasteriowith rasterio.open(canopy_path) as canopy:# Read all layers/channels into a single numpy array# rasterio indexes channels starting from 1, hence the range is 1 to src.count + 1 canopy_stack = canopy.read([i for i inrange(1, canopy.count +1)])print(canopy_stack.shape)
(10103, 101, 101)
Code
# Convert the numpy array to a PyTorch tensor, which is the format required for training the modelcanopy_tens = torch.from_numpy(canopy_stack)print(canopy_tens.shape)# Print the mean, max, and min values of the canopy tensorprint("Mean = ", torch.mean(canopy_tens))canopy_max = torch.max(canopy_tens)canopy_min = torch.min(canopy_tens)print("Max = ", canopy_max)print("Min = ", canopy_min)# Normalizing the datacanopy_tens_norm = (canopy_tens - canopy_min) / (canopy_max - canopy_min)print("Mean = ", torch.mean(canopy_tens_norm))print("Max = ", torch.max(canopy_tens_norm))print("Min = ", torch.min(canopy_tens_norm))
torch.Size([10103, 101, 101])
Mean = tensor(44.3548)
Max = tensor(82.5000)
Min = tensor(0.)
Mean = tensor(0.5376)
Max = tensor(1.)
Min = tensor(0.)
Code
for i inrange(0, 1): plt.imshow(canopy_tens_norm[i].numpy()) plt.colorbar() plt.show()
Herbaceous vegetation
Code
# Using rasteriowith rasterio.open(herby_path) as herby:# Read all layers/channels into a single numpy array# rasterio indexes channels starting from 1, hence the range is 1 to src.count + 1 herby_stack = herby.read([i for i inrange(1, herby.count +1)])print(herby_stack.shape)
(10103, 101, 101)
Code
# Convert the numpy array to a PyTorch tensor, which is the format required for training the modelherby_tens = torch.from_numpy(herby_stack)print(herby_tens.shape)# Print the mean, max, and min values of the herby tensorprint("Mean = ", torch.mean(herby_tens))herby_max = torch.max(herby_tens)herby_min = torch.min(herby_tens)print("Max = ", herby_max)print("Min = ", herby_min)# Normalizing the dataherby_tens_norm = (herby_tens - herby_min) / (herby_max - herby_min)print("Mean = ", torch.mean(herby_tens_norm))print("Max = ", torch.max(herby_tens_norm))print("Min = ", torch.min(herby_tens_norm))
torch.Size([10103, 101, 101])
Mean = tensor(0.8069)
Max = tensor(1.)
Min = tensor(0.)
Mean = tensor(0.8069)
Max = tensor(1.)
Min = tensor(0.)
Code
for i inrange(0, 1): plt.imshow(herby_tens_norm[i]) plt.colorbar() plt.show()
Slope
Code
# Using rasteriowith rasterio.open(slope_path) as slope:# Read all layers/channels into a single numpy array# rasterio indexes channels starting from 1, hence the range is 1 to src.count + 1 slope_stack = slope.read([i for i inrange(1, slope.count +1)])print(slope_stack.shape)
(10103, 101, 101)
Code
# Convert the numpy array to a PyTorch tensor, which is the format required for training the modelslope_tens = torch.from_numpy(slope_stack)print(slope_tens.shape)# Print the mean, max, and min values of the slope tensorprint("Mean = ", torch.mean(slope_tens))slope_max = torch.max(slope_tens)slope_min = torch.min(slope_tens)print("Max = ", slope_max)print("Min = ", slope_min)# Normalizing the dataslope_tens_norm = (slope_tens - slope_min) / (slope_max - slope_min)print("Mean = ", torch.mean(slope_tens_norm))print("Max = ", torch.max(slope_tens_norm))print("Min = ", torch.min(slope_tens_norm))
torch.Size([10103, 101, 101])
Mean = tensor(0.7779)
Max = tensor(12.2981)
Min = tensor(0.0006)
Mean = tensor(0.0632)
Max = tensor(1.)
Min = tensor(0.)
Code
for i inrange(0, 1): plt.imshow(slope_tens_norm[i]) plt.colorbar() plt.show()
Presence records - target of model
This is what the model is trying to predict, which is the location of the next step.
Code
# Using rasteriowith rasterio.open(pres_path) as pres:# Read all layers/channels into a single numpy array# rasterio indexes channels starting from 1, hence the range is 1 to src.count + 1 pres_stack = pres.read([i for i inrange(1, pres.count +1)])print(pres_stack.shape)
(10103, 101, 101)
Code
for i inrange(0, 1): plt.imshow(pres_stack[i]) plt.show()
Combine the spatial layers into channels
Code
# Stack the channels along a new axis; here, 1 is commonly used for the channel axis in PyTorchcombined_stack = torch.stack([ndvi_tens_norm, canopy_tens_norm, herby_tens_norm, slope_tens_norm], dim=1)print(combined_stack.shape)
torch.Size([10103, 4, 101, 101])
From the size we can see that there are 10103 samples (steps), 4 channels (layers = covariates) and 101x101 pixels.
Defining data sets and data loaders
Creating a dataset class
This custom PyTorch Dataset organizes all your input (spatial data, scalar covariates, bearing, and target) in a single object, allowing you to neatly manage how samples are accessed. The __init__ method prepares and stores all the data, __len__ returns the total number of samples, and __getitem__ retrieves a single sample by index—enabling straightforward batching and iteration when used with a DataLoader.
Code
class buffalo_data(Dataset):def__init__(self):# data loading. Here we are just using the combined_stack as the spatial covariatesself.spatial_data_x = combined_stack# the scalar data that will be converted to spatial data and added as channels to the spatial covariatesself.scalar_to_grid_data = torch.from_numpy(buffalo_df[['hour_t2_sin','hour_t2_cos','yday_t2_sin','yday_t2_cos']].values).float()# the bearing data that will be added as a channel to the spatial covariatesself.bearing_x = torch.from_numpy(buffalo_df[['bearing_tm1']].values).float()# the target dataself.target = torch.tensor(pres_stack)# number of samplesself.n_samples =self.spatial_data_x.shape[0]def__len__(self):# allows for the use of len() functionreturnself.n_samplesdef__getitem__(self, index):# allows for indexing of the datasetreturnself.spatial_data_x[index], self.scalar_to_grid_data[index], self.bearing_x[index], self.target[index]
Now we can create an instance of the dataset class and check that is working as expected.
Code
# Create an instance of our custom buffalo_data Dataset:dataset = buffalo_data()# Print the total number of samples loaded (determined by n_samples in the dataset):n_samples_loaded = dataset.n_samplesprint(n_samples_loaded)# Retrieve *all* samples (using the slice dataset[:] invokes __getitem__ on all indices).# This returns a tuple of (spatial data, scalar-to-grid data, bearing data, target labels).features1, features2, features3, labels = dataset[:]# Examine the dimensions of each returned tensor for verification:# Spatial dataprint(features1.shape)# Scalar-to-grid dataprint(features2.shape)# Bearing dataprint(features3.shape)# Target labelsprint(labels.shape)
Uncomment either the random split or the sequential split to use the one you prefer. A sequential split requires more of an extrapolation, as the individual could have encountered a different environmental region in the latter part of the time series, or it could be from a different time period, such as part of a season that isn’t in the training data.
For testing we will therefore use the sequential split as more of a test for the model.
Code
training_split =0.8# 80% of the data will be used for trainingvalidation_split =0.1# 10% of the data will be used for validation (deciding when to stop training)test_split =0.1# 10% of the data will be used for testing (model evaluation)
To split the samples randomly we can use the random_split function from PyTorch
To split the data sequentially, we can use the Subset function from PyTorch, which allows us to select a subset of the data based on the indices we provide.
Code
# For sequential split (need integers)n_samples =len(dataset)n_train =int(training_split * n_samples)n_val =int(validation_split * n_samples)n_test = n_samples - n_train - n_val # Ensure they sum to total number of samples# Get the start and end indices for each splittrain_end =int(training_split * n_samples)val_end =int((training_split + validation_split) * n_samples)train_indices =list(range(0, train_end))val_indices =list(range(train_end, val_end))test_indices =list(range(val_end, len(dataset)))# to split the samples sequentiallydataset_train = torch.utils.data.Subset(dataset, train_indices)dataset_val = torch.utils.data.Subset(dataset, val_indices)dataset_test = torch.utils.data.Subset(dataset, test_indices)print("Number of training samples: ", len(dataset_train))print("Number of validation samples: ", len(dataset_val))print("Number of testing samples: ", len(dataset_test))
Number of training samples: 8082
Number of validation samples: 1010
Number of testing samples: 1011
Create dataloaders
The DataLoader in PyTorch wraps an iterable around the Dataset to enable easy access to the samples.
Code
# Define the batch size for how many samples to process at once in each step:batch_size =32# Create a DataLoader for the training dataset with a batch size of batch_size, and shuffle samples# so that the model doesn't see data in the same order each epoch.dataloader_train = DataLoader(dataset=dataset_train, batch_size=batch_size, shuffle=True)# Create a DataLoader for the validation dataset, also with a batch size of batch_size and shuffling.# Even though it's not always mandatory to shuffle validation data, some users keep the same setting.dataloader_val = DataLoader(dataset=dataset_val, batch_size=batch_size, shuffle=True)# Create a DataLoader for the test dataset, likewise with a batch size of batch_size and shuffling.# As we want to index the testing data for plotting, we will not shuffle the test data.dataloader_test = DataLoader(dataset=dataset_test, batch_size=batch_size, shuffle=False)
Check that the data loader is working as expected.
Code
# Display image and label.# next(iter(dataloader_train)) returns the next batch of the training datafeatures1, features2, features3, labels =next(iter(dataloader_train))print(f"Feature 1 batch shape: {features1.size()}")print(f"Feature 2 batch shape: {features2.size()}")print(f"Feature 3 batch shape: {features3.size()}")print(f"Labels batch shape: {labels.size()}")
Deep learning can be considered as a sequence of blocks, each of which perform some (typically nonlinear) transformation on input data to produce some output. Providing each block has the appropriate inputs, they can be combined to build a larger network that is capable of achieving complex and abstract transformations and can be used to represent complex processes.
A block is modular component of a neural network, in our case defined as a Python class (type of object with certain functionality described by its definition) inheriting from torch.nn.Module in PyTorch. A block encapsulates a sequence of operations, including layers (such as fully connected layers or convolutional layers) and activation functions, to process input data. Each block has a forward method (i.e. instructions) that defines the data flow through the network during inference or training.
Convolutional block for the habitat selection subnetwork
This block is a convolutional layer that takes in the spatial covariates (including the layers created from the scalar values such as time), goes through a series of convolution operations and ReLU activation functions and outputs a feature map, which is the habitat selection probability surface.
Code
class Conv2d_block_spatial(nn.Module):def__init__(self, params):super(Conv2d_block_spatial, self).__init__()# define the parametersself.batch_size = params.batch_sizeself.input_channels = params.input_channelsself.output_channels = params.output_channelsself.kernel_size = params.kernel_sizeself.stride = params.strideself.padding = params.paddingself.image_dim = params.image_dimself.device = params.device# define the layers - nn.Sequential allows for the definition of layers in a sequential mannerself.conv2d = nn.Sequential(# convolutional layer 1 nn.Conv2d(in_channels=self.input_channels, out_channels=self.output_channels, kernel_size=self.kernel_size, stride=self.stride, padding=self.padding),# ReLU activation function nn.ReLU(),# convolutional layer 2 nn.Conv2d(in_channels=self.output_channels, out_channels=self.output_channels, kernel_size=self.kernel_size, stride=self.stride, padding=self.padding),# ReLU activation function nn.ReLU(),# convolutional layer 3, which outputs a single layer, which is the habitat selection map nn.Conv2d(in_channels=self.output_channels, out_channels=1, kernel_size=self.kernel_size, stride=self.stride, padding=self.padding) )# define the forward pass of the model, i.e. how the data flows through the modeldef forward(self, x):# self.conv2d(x) passes the input through the convolutional layers, and the squeeze function removes the channel dimension, resulting in a 2D tensor (habitat selection map)# print("Shape before squeeze:", self.conv2d(x).shape) # Debugging print conv2d_spatial =self.conv2d(x).squeeze(dim =1)# normalise to sum to 1# print("Shape before logsumexp:", conv2d_spatial.shape) # Debugging print conv2d_spatial = conv2d_spatial - torch.logsumexp(conv2d_spatial, dim = (1, 2), keepdim =True)# output the habitat selection mapreturn conv2d_spatial
Convolutional block for the movement subnetwork
This block is also convolutional layer, with the same inputs, but this block also has max pooling layers to reduce the spatial resolution of the feature maps whilst preserving the most prominent features in the feature maps, and outputs a ‘flattened’ feature map. A flattened feature map is a 1D tensor (a vector) that can be used as input to a fully connected layer.
Code
class Conv2d_block_toFC(nn.Module):def__init__(self, params):super(Conv2d_block_toFC, self).__init__()# define the parametersself.batch_size = params.batch_sizeself.input_channels = params.input_channelsself.output_channels_movement = params.output_channels_movementself.kernel_size = params.kernel_sizeself.stride = params.strideself.kernel_size_mp = params.kernel_size_mpself.stride_mp = params.stride_mpself.padding = params.paddingself.image_dim = params.image_dimself.device = params.device# define the layers - nn.Sequential allows for the definition of layers in a sequential mannerself.conv2d = nn.Sequential(# convolutional layer 1 nn.Conv2d(in_channels=self.input_channels, out_channels=self.output_channels_movement, kernel_size=self.kernel_size, stride=self.stride, padding=self.padding),# ReLU activation function nn.ReLU(),# max pooling layer 1 (reduces the spatial dimensions of the data whilst retaining the most important features) nn.MaxPool2d(kernel_size=self.kernel_size_mp, stride=self.stride_mp),# convolutional layer 2 nn.Conv2d(in_channels=self.output_channels_movement, out_channels=self.output_channels_movement, kernel_size=self.kernel_size, stride=self.stride, padding=self.padding),# ReLU activation function nn.ReLU(),# max pooling layer 2 nn.MaxPool2d(kernel_size=self.kernel_size_mp, stride=self.stride_mp),# # to add a third convolutional layer, uncomment the following lines# # convolutional layer 3# nn.Conv2d(in_channels=self.output_channels_movement,# out_channels=self.output_channels_movement,# kernel_size=self.kernel_size,# stride=self.stride,# padding=self.padding),# # ReLU activation function# nn.ReLU(),# # max pooling layer 3# nn.MaxPool2d(kernel_size=self.kernel_size_mp,# stride=self.stride_mp),# flatten the data to pass through the fully connected layer nn.Flatten())def forward(self, x):# self.conv2d(x) passes the input through the convolutional layers, and outputs a 1D tensorreturnself.conv2d(x)
Fully connected block for the movement subnetwork
This block takes in the flattened feature map from the previous block, passes through several fully connected layers, which extracts information from the spatial covariates that is relevant for movement, and outputs the parameters that define the movement kernel.
Code
class FCN_block_all_movement(nn.Module):def__init__(self, params):super(FCN_block_all_movement, self).__init__()# define the parametersself.batch_size = params.batch_sizeself.dense_dim_in_all = params.dense_dim_in_allself.dense_dim_hidden = params.dense_dim_hiddenself.image_dim = params.image_dimself.device = params.deviceself.num_movement_params = params.num_movement_paramsself.dropout = params.dropout# define the layers - nn.Sequential allows for the definition of layers in a sequential mannerself.ffn = nn.Sequential(# fully connected layer 1 (the dense_dim_in_all is the number of input features,# and should match the output of the Conv2d_block_toFC block).# the dense_dim_hidden is the number of neurons in the hidden layer, and doesn't need to be the same as the input features nn.Linear(self.dense_dim_in_all, self.dense_dim_hidden),# dropout layer (helps to reduce overfitting) nn.Dropout(self.dropout),# ReLU activation function nn.ReLU(),# fully connected layer 2# the number of input neurons should match the output from the previous layer nn.Linear(self.dense_dim_hidden, self.dense_dim_hidden),# dropout layer nn.Dropout(self.dropout),# ReLU activation function nn.ReLU(),# fully connected layer 3# the number of input neurons should match the output from the previous layer,# and the number of output neurons should match the number of movement parameters nn.Linear(self.dense_dim_hidden, self.num_movement_params) )def forward(self, x):# self.ffn(x) passes the input through the fully connected layers, and outputs a 1D tensor (vector of movement parameters)returnself.ffn(x)
Block to convert the movement parameters to a probability distribution
What the block does
This block is a bit longer and more involved, but there are no parameters in here that need to be learned (estimated). It is just a series of operations that are applied to the movement parameters to convert them to a probability distribution.
This block takes in the movement parameters and converts them to a probability distribution. This essentially just applies the appropriate density functions using the parameter values predicted by the movement blocks, which in our case is a finite mixture of Gamma distributions and a finite mixture of von Mises distributions.
The formulation of predicting parameters and converting them to a movement kernel ensures that the movement kernel is very flexible, and can be any combination of distributions, which need not all be the same (e.g., a step length distribution may be combination of a Gamma and a log-normal distribution).
Constraints
One constraint to ensure that we can perform backpropagation is that the entire forward pass, including the block below that produces the density functions, must be differentiable with respect to the parameters of the model. PyTorch’s torch.distributions module and its special functions (e.g., torch.special) provide differentiable implementations for many common distributions. Examples are the
Gamma function for the (log) Gamma distribution, torch.lgamma()
The modified Bessel function of the first kind of order 0 for the von Mises distribution, torch.special.i0()
Some of the movement parameters, such as the shape and scale of the Gamma distribution, must be positive. We therefore exponentiate them in this block to ensure that they are positive. This means that the model is actually learning the log of the shape and scale parameters. For the von Mises mu parameters however, they can be any value, so we do not need to exponentiate them. We could constrain them to be between -pi and pi, but this is not necessary as the von Mises distribution is periodic, so any value will be equivalent to another value that is within the range -pi to pi.
Notes
To help with identifiability, it is possible to fix certain parameter values, such as the mu parameters in the mixture of von Mises distributions to pi and -pi for instance (one would then reduce the number of predicted parameters by the previous block, as these no longer need to be predicted).
We can also transform certain parameters such that they are being estimated in a similar range (analagous to standardising variables in linear regression). In our case we know that the scale parameter of one of the Gamma distributions is around 500. What we can then do after exponentiating is multiply the scale parameter by 500, so the model is learning the log of the scale parameter divided by 500. This will ensure that this parameter is in a similar range to the other parameters, and can help with convergence. To do this we:
Pull out the relevant parameters from the input tensor (output of previous block) - gamma_scale2 = torch.exp(x[:, 4]).unsqueeze(0).unsqueeze(0)
Multiply the scale parameter by 500, so the model is learning the log of the scale parameter divided by 500 - gamma_scale2 = gamma_scale2 * 500
Consideration for the centre cell
As the centre cell has a distance of exactly 0 (using the distance layer we created), this can cause numerical issues for the gamma distribution, as when the shape parameter is less than one the mode approaches infinity. To avoid this, we can add a small value to the central cell of the distance layer, so that the distance is never exactly 0.
To get the value to use in the central cell, we will calculate the average distance from the very centre to any point in the cell (assuming that the distance within the cell is continuous). This comes out to be:
class Params_to_Grid_Block(nn.Module):def__init__(self, params):super(Params_to_Grid_Block, self).__init__()# define the parametersself.batch_size = params.batch_sizeself.image_dim = params.image_dimself.pixel_size = params.pixel_size# create distance and bearing layers# determine the distance of each pixel from the centre of the imageself.center =self.image_dim //2 y, x = np.indices((self.image_dim, self.image_dim))self.distance_layer = torch.from_numpy(np.sqrt((self.pixel_size*(x -self.center))**2+ (self.pixel_size*(y -self.center))**2)).float()# change the centre cell to the average distance from the centre to the edge of the pixel# average distance from the centre to any point within the pixel# calculated as a double integral of sqrt(x^2 + y^2) dx dy over the area of the pixelself.distance_layer[self.center, self.center] =0.3826*self.pixel_size# determine the bearing of each pixel from the centre of the imageself.bearing_layer = torch.from_numpy(np.arctan2(self.center - y, x -self.center)).float()self.device = params.device# Gamma densities (on the log-scale) for the mixture distributiondef gamma_density(self, x, shape, scale):# Ensure all tensors are on the same device as x shape = shape.to(x.device) scale = scale.to(x.device)# return -1*torch.lgamma(shape) -shape*torch.log(scale) + (shape - 1)*torch.log(x) - x/scale# to account for change of variablesreturn (-1*torch.lgamma(shape) -shape*torch.log(scale) + (shape -1)*torch.log(x) - x/scale) - torch.log(x)# log von Mises densities (on the log-scale) for the mixture distributiondef vonmises_density(self, x, kappa, vm_mu):# Ensure all tensors are on the same device as x kappa = kappa.to(x.device) vm_mu = vm_mu.to(x.device)return kappa*torch.cos(x - vm_mu) -1*(np.log(2*torch.pi) + torch.log(torch.special.i0(kappa)))def forward(self, x, bearing):# parameters of the first mixture distribution# x are the outputs from the fully connected layers (vector of movement parameters)# we therefore need to extract the appropriate parameters# the locations are not specific to any specific parameters, as long as any aren't extracted more than once# Gamma distributions# pull out the parameters of the first gamma distribution and exponentiate them to ensure they are positive# the unsqueeze function adds a new dimension to the tensor# we do this twice to match the dimensions of the distance_layer,# and then repeat the parameter value across a grid, such that the density can be calculated at every cell/pixel gamma_shape1 = torch.exp(x[:, 0]).unsqueeze(0).unsqueeze(0) gamma_shape1 = gamma_shape1.repeat(self.image_dim, self.image_dim, 1)# this just changes the order of the dimensions to match the distance_layer gamma_shape1 = gamma_shape1.permute(2, 0, 1) gamma_scale1 = torch.exp(x[:, 1]).unsqueeze(0).unsqueeze(0) gamma_scale1 = gamma_scale1.repeat(self.image_dim, self.image_dim, 1) gamma_scale1 = gamma_scale1.permute(2, 0, 1)# gamma_weight1 = torch.exp(x[:, 2]).unsqueeze(0).unsqueeze(0) gamma_weight1 = x[:, 2].unsqueeze(0).unsqueeze(0) gamma_weight1 = gamma_weight1.repeat(self.image_dim, self.image_dim, 1) gamma_weight1 = gamma_weight1.permute(2, 0, 1)# parameters of the second mixture distribution gamma_shape2 = torch.exp(x[:, 3]).unsqueeze(0).unsqueeze(0) gamma_shape2 = gamma_shape2.repeat(self.image_dim, self.image_dim, 1) gamma_shape2 = gamma_shape2.permute(2, 0, 1) gamma_scale2 = torch.exp(x[:, 4]).unsqueeze(0).unsqueeze(0) gamma_scale2 = gamma_scale2 *500### transform the scale parameter so it can be estimated near the same range as the other parameters gamma_scale2 = gamma_scale2.repeat(self.image_dim, self.image_dim, 1) gamma_scale2 = gamma_scale2.permute(2, 0, 1)# gamma_weight2 = torch.exp(x[:, 5]).unsqueeze(0).unsqueeze(0) gamma_weight2 = x[:, 5].unsqueeze(0).unsqueeze(0) gamma_weight2 = gamma_weight2.repeat(self.image_dim, self.image_dim, 1) gamma_weight2 = gamma_weight2.permute(2, 0, 1)# Apply softmax to the mixture weights to ensure they sum to 1 gamma_weights = torch.stack([gamma_weight1, gamma_weight2], dim=0) gamma_weights = torch.nn.functional.softmax(gamma_weights, dim=0) gamma_weight1 = gamma_weights[0] gamma_weight2 = gamma_weights[1]# calculation of Gamma densities gamma_density_layer1 =self.gamma_density(self.distance_layer, gamma_shape1, gamma_scale1).to(device) gamma_density_layer2 =self.gamma_density(self.distance_layer, gamma_shape2, gamma_scale2).to(device)# combining both densities to create a mixture distribution using logsumexp logsumexp_gamma_corr = torch.max(gamma_density_layer1, gamma_density_layer2) gamma_density_layer = logsumexp_gamma_corr + torch.log(gamma_weight1 * torch.exp(gamma_density_layer1 - logsumexp_gamma_corr) + gamma_weight2 * torch.exp(gamma_density_layer2 - logsumexp_gamma_corr))# print(torch.sum(gamma_density_layer))# print(torch.sum(torch.exp(gamma_density_layer)))## Von Mises Distributions# calculate the new bearing from the turning angle# takes in the bearing from the previous step and adds the turning angle, which is estimated by the model# we do not exponentiate the von Mises mu parameters as we want to allow them to be negative bearing_new1 = x[:, 6] + bearing[:, 0]# the new bearing becomes the mean of the von Mises distribution vonmises_mu1 = bearing_new1.unsqueeze(0).unsqueeze(0) vonmises_mu1 = vonmises_mu1.repeat(self.image_dim, self.image_dim, 1) vonmises_mu1 = vonmises_mu1.permute(2, 0, 1)# parameters of the first von Mises distribution vonmises_kappa1 = torch.exp(x[:, 7]).unsqueeze(0).unsqueeze(0) vonmises_kappa1 = vonmises_kappa1.repeat(self.image_dim, self.image_dim, 1) vonmises_kappa1 = vonmises_kappa1.permute(2, 0, 1)# vonmises_weight1 = torch.exp(x[:, 8]).unsqueeze(0).unsqueeze(0) vonmises_weight1 = x[:, 8].unsqueeze(0).unsqueeze(0) vonmises_weight1 = vonmises_weight1.repeat(self.image_dim, self.image_dim, 1) vonmises_weight1 = vonmises_weight1.permute(2, 0, 1)# vm_mu and weight for the second von Mises distribution bearing_new2 = x[:, 9] + bearing[:, 0] vonmises_mu2 = bearing_new2.unsqueeze(0).unsqueeze(0) vonmises_mu2 = vonmises_mu2.repeat(self.image_dim, self.image_dim, 1) vonmises_mu2 = vonmises_mu2.permute(2, 0, 1)# parameters of the second von Mises distribution vonmises_kappa2 = torch.exp(x[:, 10]).unsqueeze(0).unsqueeze(0) vonmises_kappa2 = vonmises_kappa2.repeat(self.image_dim, self.image_dim, 1) vonmises_kappa2 = vonmises_kappa2.permute(2, 0, 1)# vonmises_weight2 = torch.exp(x[:, 11]).unsqueeze(0).unsqueeze(0) vonmises_weight2 = x[:, 11].unsqueeze(0).unsqueeze(0) vonmises_weight2 = vonmises_weight2.repeat(self.image_dim, self.image_dim, 1) vonmises_weight2 = vonmises_weight2.permute(2, 0, 1)# Apply softmax to the weights vonmises_weights = torch.stack([vonmises_weight1, vonmises_weight2], dim=0) vonmises_weights = torch.nn.functional.softmax(vonmises_weights, dim=0) vonmises_weight1 = vonmises_weights[0] vonmises_weight2 = vonmises_weights[1]# calculation of von Mises densities vonmises_density_layer1 =self.vonmises_density(self.bearing_layer, vonmises_kappa1, vonmises_mu1).to(device) vonmises_density_layer2 =self.vonmises_density(self.bearing_layer, vonmises_kappa2, vonmises_mu2).to(device)# combining both densities to create a mixture distribution using the logsumexp trick logsumexp_vm_corr = torch.max(vonmises_density_layer1, vonmises_density_layer2) vonmises_density_layer = logsumexp_vm_corr + torch.log(vonmises_weight1 * torch.exp(vonmises_density_layer1 - logsumexp_vm_corr) + vonmises_weight2 * torch.exp(vonmises_density_layer2 - logsumexp_vm_corr))# print(torch.sum(vonmises_density_layer))# print(torch.sum(torch.exp(vonmises_density_layer)))# combining the two distributions movement_grid = gamma_density_layer + vonmises_density_layer # Gamma and von Mises densities are on the log-scale# normalise (on the log-scale using the log-sum-exp trick) before combining with the habitat predictions movement_grid = movement_grid - torch.logsumexp(movement_grid, dim = (1, 2), keepdim =True)# print('Movement grid norm ', torch.sum(movement_grid))# print(torch.sum(torch.exp(movement_grid)))return movement_grid
Scalar to grid block
This block takes any scalar value (e.g., time of day, day of year) and converts it to a 2D image, with the same values for all pixels.
This is so that the scalar values can be used as input to the convolutional layers.
Code
class Scalar_to_Grid_Block(nn.Module):def__init__(self, params):super(Scalar_to_Grid_Block, self).__init__()# define the parametersself.batch_size = params.batch_sizeself.image_dim = params.image_dimself.device = params.devicedef forward(self, x):# how many scalar values are being passed in num_scalars = x.shape[1]# expand the scalar values to the spatial dimensions of the image scalar_map = x.view(x.shape[0], num_scalars, 1, 1).expand(x.shape[0], num_scalars,self.image_dim,self.image_dim)# return the scalar mapsreturn scalar_map
Combine the blocks into the deepSSF model
Here is where we combine the blocks into a model. Similarly to the previous blocks, the model is a Python class that inherits from torch.nn.Module, which combines other torch.nn.Module modules.
For example, we can instantiate the habitat selection convolution block using self.conv_habitat = Conv2d_block_spatial(params) in the __init__ method (the ‘constructor’ for a class). We can now access that block using self.conv_habitat in the forward method.
In the forward method, we pass the input data through the habitat selection convolution block using output_habitat = self.conv_habitat(all_spatial), where all_spatial is the input data, which is a combination of the spatial covariates and the scalar values converted to 2D images.
First we instantiate the blocks, and then define the forward method, which defines the data flow through the network during inference or training.
Code
class ConvJointModel(nn.Module):def__init__(self, params):""" ConvJointModel: - Initializes blocks for scalar-to-grid transformation, habitat convolution, movement convolution + movement fully connected, and final parameter-to-grid transformation. - Accepts parameters from the params object, which we will define later. """super(ConvJointModel, self).__init__()# Block to convert scalar features into grid-like (spatial) featuresself.scalar_grid_output = Scalar_to_Grid_Block(params)# Convolutional block for habitat selectionself.conv_habitat = Conv2d_block_spatial(params)# Convolutional block for movement extraction (output fed into fully connected layers)self.conv_movement = Conv2d_block_toFC(params)# Fully connected block for movementself.fcn_movement_all = FCN_block_all_movement(params)# Converts movement distribution parameters into a grid (the 2D movement kernel)self.movement_grid_output = Params_to_Grid_Block(params)# Device information from params (e.g., CPU or GPU)self.device = params.devicedef forward(self, x):""" Forward pass: 1. Extract scalar data and convert to grid features. 2. Concatenate the newly created scalar-based grids with spatial data. 3. Pass this combined input through separate sub-networks for habitat and movement. 4. Convert movement parameters to a grid, then stack the habitat and movement outputs. """# x contains:# - spatial_data_x (image-like layers)# - scalars_to_grid (scalar features needing conversion)# - bearing_x (the bearing from the previous time step, the turning angle is estimated as the deviation from this) spatial_data_x = x[0] scalars_to_grid = x[1] bearing_x = x[2]# Convert scalar data to spatial (grid) form scalar_grids =self.scalar_grid_output(scalars_to_grid)# Combine the original spatial data with the newly generated scalar grids all_spatial = torch.cat([spatial_data_x, scalar_grids], dim=1)# HABITAT SUBNETWORK# Convolutional feature extraction for habitat selection output_habitat =self.conv_habitat(all_spatial)# MOVEMENT SUBNETWORK# Convolutional feature extraction (different architecture for movement) conv_movement =self.conv_movement(all_spatial)# Fully connected layers for movement (processing both spatial features and any extras) output_movement =self.fcn_movement_all(conv_movement)# Transform the movement parameters into a grid, using bearing information output_movement =self.movement_grid_output(output_movement, bearing_x)# Combine (stack) habitat and movement outputs without merging them output = torch.stack((output_habitat, output_movement), dim=-1)return output
Set the parameters for the model which will be specified in a dictionary
This Python class serves as a simple parameter container for a model that involves both spatial (e.g., convolutional layers) and non-spatial inputs. It captures all relevant hyperparameters and settings—such as image dimensions, kernel sizes, and fully connected layer dimensions—along with the target device (CPU or GPU). This structure allows easy configuration of the model without scattering parameters throughout the code.
Here we enter the specific parameter values and hyperparameters for the model. These are the values that will be used to instantiate the model.
Code
n_max_pool_layers =2# used to determine the number of inputs entering the fully connected block - needs to be manually changed if the number of max pooling layers is changedparams_dict = {"batch_size": batch_size, #number of samples in each batch"image_dim": 101, #number of pixels along the edge of each local patch/image"pixel_size": 25, #number of metres along the edge of a pixel"input_channels": 4+4, #number of spatial layers in each image + number of scalar layers that are converted to a grid"dim_in_nonspatial_to_grid": 4, #the number of scalar predictors that are converted to a grid and appended to the spatial features"dense_dim_in_nonspatial": 4, #change this to however many other scalar predictors you have (bearing, velocity etc)"kernel_size": 3, #the size of the 2D moving windows / kernels that are being learned"stride": 1, #the stride used when applying the kernel. This reduces the dimension of the output if set to greater than 1"kernel_size_mp": 2, #the size of the kernel that is used in max pooling operations"stride_mp": 2, #the stride that is used in max pooling operations"padding": 1, #the amount of padding to apply to images prior to applying the 2D convolution"num_movement_params": 12, #number of parameters used to parameterise the movement kernel"dropout": 0.1, #the proportion of nodes that are dropped out in the dropout layers# hyperparameters that change the model architecture"output_channels": 4, #number of convolution filters to learn"output_channels_movement": 2, #number of convolution filters to learn for the movement kernel"dense_dim_hidden": 128, #number of nodes in the hidden layers# this will be updated below"dense_dim_in_all": 1, #number of inputs entering the fully connected block once the nonspatial features have been concatenated to the spatial features"device": device }# Now update the dictionary with calculated valuesparams_dict["dense_dim_in_all"] =int(((params_dict["image_dim"] - (params_dict["image_dim"] %2))**2) * (params_dict["output_channels_movement"] / (4**n_max_pool_layers)))
Note about the model
In future scripts (such as when simulating from the deepSSF model or training the model on Sentinel-2 data) we want to load the same model, so to prevent copying and pasting, we will save the model definition to a Python file and import it into future scripts.
We do this by copying the model definition above to a Python file named deepSSF_model.py, which can be imported into future scripts using import deepSSF_model.
Ideally we would just use that file to define the model in this script, but we include it here as it’s helpful to test components of the model and see how it all works.
Just remember that if you make changes to the model in this script, you will have to copy them across the deepSSF_model.py file, or just change the model definition in the deepSSF_model.py file and call that directly from here to train it.
To call it you would uncomment the lines in the next cell.
Code
# # Import the functions in the deepSSF_model.py file# import deepSSF_model# # Create an instance of the ModelParams class using the params_dict# params = deepSSF_model.ModelParams(deepSSF_model.params_dict)# # Create an instance of the ConvJointModel class using the params# model = deepSSF_model.ConvJointModel(params).to(device)# # Print the model architecture to check that it worked# print(model)
Instantiate the model
Here we instantiate the model using the parameters defined above.
Code
# Initialize the parameter container using the parameters defined in 'params_dict'params = ModelParams(params_dict)# Create an instance of the ConvJointModel using the parameters,# and move the model to the specified device (e.g., CPU or GPU)model = ConvJointModel(params).to(device)# Print the model architectureprint(model)
As we’ve defined the model, we can now test the components to ensure that they are working as expected.
We can do this block by block, or we can test the entire model.
We’ll start by testing a few of the blocks.
Testing the movement parameter to probability distribution block
Change any of the values to try out different movement kernels.
Code
# Create a bearing tensor (e.g., 0 radians) on the desired devicetest_bearing = torch.tensor([[0.0]], device=device)# Instantiate the Params_to_Grid_Block using the given parameterstest_block = Params_to_Grid_Block(params)# Define the parameters for the movement density# These are the parameters that the model will learn to predict# First Gamma distributiongamma_shape1 =0.75gamma_scale1 =100gamma_weight1 =0.05# Second Gamma distributiongamma_shape2 =3gamma_scale2 =500gamma_scale2 = gamma_scale2 /500# divide this by 500 (as this is what the model predicts)gamma_weight2 =0.95# First von Mises distributionvonmises_mu1 =0.0vonmises_kappa1 =0.1vonmises_weight1 =0.75# Second von Mises distributionvonmises_mu2 =-np.pivonmises_kappa2 =0.01vonmises_weight2 =0.25# Provide parameters on a log-scale (since the block exponentiates them internally)# Here, each group of three values represents a Gamma distribution's shape, scale, and weight, respectively.movement_density = test_block( torch.tensor( [[# Gamma 1 np.log(gamma_shape1), np.log(gamma_scale1), gamma_weight1, # Gamma 1 shape, scale, and weight# Gamma 2 np.log(gamma_shape2), np.log(gamma_scale2), gamma_weight2, # Gamma 2 shape, scale, and weight# von Mises 1 vonmises_mu1, np.log(vonmises_kappa1), vonmises_weight1, # von Mises 1 mu, kappa, and weight# von Mises 2 vonmises_mu2, np.log(vonmises_kappa2), vonmises_weight2 # von Mises 2 mu, kappa, and weight ]], device=device, dtype=torch.float32 ), test_bearing)# Alternatively, if you had direct (non-log) values as the model sees them:# movement_density = test_block(torch.tensor([[-.5, -.5, -.5, -.5]], device=device))# print(movement_density)print(movement_density.shape)# Plot the resulting movement density as an imageplt.imshow(np.exp(movement_density.detach().cpu().numpy()[0]))plt.colorbar()plt.show()print(np.sum(np.exp(movement_density.detach().cpu().numpy()[0])))
torch.Size([1, 101, 101])
0.99999976
Pull out some testing data
To test the other blocks, and the full model, we will need some data. We can pull that out from the training set.
Code
# Number of samples in the train datasetprint("Number of samples in the train dataset: ", len(dataloader_train.dataset))print('\n')# Select an index from the test dataset to retrieve a sample, between 0 and number of samples# We picked this fairly arbitrarily, but with some interesting environmental features to illustrate the model's predictionsiteration_index =2700# 2. Retrieve a single sample (features and label) from the test dataset at the specified index# sample_spatial_covs is a sample of the spatial covariates for a single step# sample_temporal_covs is a sample of the temporal covariates for a single step# sample_prev_bearing is a sample bearing of the previous step# sample_next_step is the target label (what we are trying to predict) for the next step# We set these here and will also use them later in the script to check how the model's predictions look,# and when we extract feature maps from the convolutional layerssample_spatial_covs, sample_temporal_covs, sample_prev_bearing, sample_next_step = dataloader_train.dataset[iteration_index]# 3. Reshape data tensors to add a batch dimension (since the model expects batches)sample_spatial_covs = sample_spatial_covs.unsqueeze(0).to(device)sample_temporal_covs = sample_temporal_covs.unsqueeze(0).to(device)sample_prev_bearing = sample_prev_bearing.unsqueeze(0).to(device)sample_next_step = sample_next_step.unsqueeze(0).to(device)print(f'Shape of the sample spatial covariates: {sample_spatial_covs.shape}')print(f'Shape of the sample temporal covariates: {sample_temporal_covs.shape}')print(f'Shape of the sample previous bearing: {sample_prev_bearing.shape}')print(f'Shape of the sample next step: {sample_next_step.shape}')
Number of samples in the train dataset: 8082
Shape of the sample spatial covariates: torch.Size([1, 4, 101, 101])
Shape of the sample temporal covariates: torch.Size([1, 4])
Shape of the sample previous bearing: torch.Size([1, 1])
Shape of the sample next step: torch.Size([1, 101, 101])
For visualisation, we can return the scale of the covariates to their original values.
# Convert the PyTorch tensor x2 to a NumPy array:# 1) Detach from the computation graph so no gradients are tracked.# 2) Move to CPU memory.# 3) Convert to NumPy.# Then extract the first sample (index 0) and its respective channel for each variable:hour_t2_sin = sample_temporal_covs.detach().cpu().numpy()[0, 0]hour_t2_cos = sample_temporal_covs.detach().cpu().numpy()[0, 1]yday_t2_sin = sample_temporal_covs.detach().cpu().numpy()[0, 2]yday_t2_cos = sample_temporal_covs.detach().cpu().numpy()[0, 3]# Convert x3 similarly and extract the bearing from the first sample and channel:bearing = sample_prev_bearing.detach().cpu().numpy()[0, 0]
Helper functions
To return the hour and day of the year to their original values, we can use the following functions.
Calculate the hour, day of year and previous bearing of the test sample
Code
hour_t2 = recover_hour(hour_t2_sin, hour_t2_cos)hour_t2_integer =int(hour_t2) # Convert to integerprint(f'Hour: {hour_t2_integer}')yday_t2 = recover_yday(yday_t2_sin, yday_t2_cos)yday_t2_integer =int(yday_t2) # Convert to integerprint(f'Day of the year: {yday_t2_integer}')bearing_degrees = np.degrees(bearing) %360bearing_degrees =round(bearing_degrees, 1) # Round to 2 decimal placesbearing_degrees =int(bearing_degrees) # Convert to integerprint(f'Bearing (radians): {bearing}')print(f'Bearing (degrees): {bearing_degrees}')
Hour: 17
Day of the year: 324
Bearing (radians): 2.7272613048553467
Bearing (degrees): 156
Grab the row and column of the observed next step (label or target)
Code
# Find the coordinates of the element that is 1target = sample_next_step.detach().cpu().numpy()[0,:,:]coordinates = np.where(target ==1)# Extract the coordinatesrow, column = coordinates[0][0], coordinates[1][0]print(f"The location of the next step is (row, column): ({row}, {column})")
The location of the next step is (row, column): (46, 44)
Plot the sample covariates
Code
# Plot the covariatesfig, axs = plt.subplots(2, 2, figsize=(9, 7.5))# Plot NDVIim1 = axs[0, 0].imshow(ndvi_natural.numpy(), cmap='viridis')axs[0, 0].set_title('NDVI')fig.colorbar(im1, ax=axs[0, 0])# Plot Canopy coverim2 = axs[0, 1].imshow(canopy_natural.numpy(), cmap='viridis')axs[0, 1].set_title('Canopy cover')fig.colorbar(im2, ax=axs[0, 1])# Plot Herbaceous vegetationim3 = axs[1, 0].imshow(herby_natural.numpy(), cmap='viridis')axs[1, 0].set_title('Herbaceous vegetation')fig.colorbar(im3, ax=axs[1, 0])# Plot Slopeim4 = axs[1, 1].imshow(slope_natural.numpy(), cmap='viridis')axs[1, 1].set_title('Slope')fig.colorbar(im4, ax=axs[1, 1])filename_covs =f'{output_dir}/covs_id{buffalo_id}_yday{yday_t2_integer}_hour{hour_t2_integer}_bearing{bearing_degrees}_next_r{row}_c{column}.png'plt.tight_layout()plt.savefig(filename_covs, dpi=300, bbox_inches='tight') # if we want to save the figureplt.show()plt.close() # Close the figure to free memory
Plot the target (observed location of the next step)
The model is trying to maximise the probability at the location of the next step, which is the target.
Code
filename_target =f'{output_dir}/target_id{buffalo_id}_yday{yday_t2_integer}_hour{hour_t2_integer}_bearing{bearing_degrees}_next_r{row}_c{column}.png'plt.imshow(target)plt.tight_layout()plt.savefig(filename_target, dpi=300, bbox_inches='tight') # if we want to save the figureplt.colorbar()plt.show()
Testing the scalar to grid function
This should just create a grid with the same value for all pixels.
Code
# x2 contains the scalar inputsprint(sample_temporal_covs.shape) # Check the shape of the scalar inputprint(sample_temporal_covs[0, :]) # Print out the first set of scalars# Create an instance of the scalar-to-grid block using model parameterstest_block = Scalar_to_Grid_Block(params)# Convert scalars into spatial grid representationscalar_maps = test_block(sample_temporal_covs).detach().cpu()# print(scalar_maps) # Optionally, to inspect raw outputprint(scalar_maps.shape) # Check the shape of the generated spatial maps# Visualize one channel of the first example's scalar map# (Values are should be repeated across the grid for each scalar)scalar_index =2# Change this index to visualize other scalar mapsplt.imshow(scalar_maps[0, scalar_index]) # change the second index to see the other scalar mapsplt.colorbar()plt.clim(-1, 1) # Set the color limits to match the range of the scalar values (sine and cosine of temporal parameters)plt.text(scalar_maps.shape[2] //2, scalar_maps.shape[3] //2,f'Value: {round(sample_temporal_covs[0, scalar_index].item(), 2)}', ha='center', va='center', color='white', fontsize=12)plt.show()
The model is initialised with random weights and hasn’t been trained yet, so the output will not be meaningful. However, we can check that the model runs without errors and that the output is the correct shape.
Code
# Put the model in evaluation mode (affects layers like dropout, batch norm, etc.)model.eval()# Pass the data through the modeltest = model((sample_spatial_covs, sample_temporal_covs, sample_prev_bearing))# Print the shape of the outputprint(test.shape)
# Print input bearingprint(f'Bearing (radians): {bearing}')print(f'Bearing (degrees): {bearing_degrees}')# Plot the movement density on the log-scaleplt.imshow(test.detach().cpu().numpy()[0,:,:,1])plt.colorbar()plt.show()# Plot the movement density on the natural scaleplt.imshow(np.exp(test.detach().cpu().numpy()[0,:,:,1]))plt.colorbar()plt.show()
# Combine the habitat selection and the movement probabilities (unnormalised)next_step = (test[:, :, :, 0] + test[:, :, :, 1])# Plot the combined outputplt.imshow(next_step.detach().cpu().numpy()[0,:,:])plt.colorbar()plt.show()
Prepare for training
Loss function
We use a custom negative log likelihood loss function. Essentially what this does is extracts the next-step log-probability at the location of the observed next step, and then takes the negative of this value. This is the loss that we want to minimise, as we want to maximise the probability of the observed next step.
We will also save this loss function in a script called deepSSF_loss.py so that we can import it into future scripts.
Code
class negativeLogLikeLoss(nn.Module):""" Custom negative log-likelihood loss that operates on a 4D prediction tensor (batch, height, width, channels). The forward pass: 1. Sums across channel 3 (two log-densities, habitat selection and movement predictions) to obtain a combined log-density. 2. Multiplies this log-density by the target, which is 0 everywhere except for at the location of the next step, effectively extracting that value, then multiplies by -1 such that the function can be minimised (and the probabilities maximised). 3. Applies the user-specified reduction (mean, sum, or none). """def__init__(self, reduction='mean'):""" Args: reduction (str): Specifies the reduction to apply to the output: 'mean', 'sum', or 'none'. """super(negativeLogLikeLoss, self).__init__()assert reduction in ['mean', 'sum', 'none'], \"reduction should be 'mean', 'sum', or 'none'"self.reduction = reductiondef forward(self, predict, target):""" Forward pass of the negative log-likelihood loss. Args: predict (Tensor): A tensor of shape (B, H, W, 2) with log-densities across two channels to be summed. target (Tensor): A tensor of the same spatial dimensions (B, H, W) indicating where the log-densities should be evaluated. Returns: Tensor: The computed negative log-likelihood loss. Shape depends on the reduction method. """ habitat_probability_surface = predict[:, :, :, 0] movement_probability_surface = predict[:, :, :, 1] # Sum the log-densities from the two channels predict_prod = habitat_probability_surface + movement_probability_surface# Check for NaNs in the combined predictionsif torch.isnan(predict_prod).any():print("NaNs detected in predict_prod")print("predict_prod:", predict_prod)raiseValueError("NaNs detected in predict_prod")# Normalise the next-step log-densities using the log-sum-exp trick# predict_prod = predict_prod - torch.logsumexp(predict_prod, dim = (1, 2), keepdim = True)# Compute negative log-likelihood by multiplying log-densities with target# and then flipping the sign negLogLike =-1* (predict_prod * target)# Check for NaNs after computing negative log-likelihoodif torch.isnan(negLogLike).any():print("NaNs detected in negLogLike")print("negLogLike:", negLogLike)raiseValueError("NaNs detected in negLogLike")# Just extract the value at the next step negLogLike = negLogLike.sum(dim=(1, 2))# Calculate the loss on the habitat selection surface habitat_loss =-1* (habitat_probability_surface * target) habitat_loss = habitat_loss.sum(dim=(1, 2))# Calculate the loss on the movement surface movement_loss =-1* (movement_probability_surface * target) movement_loss = movement_loss.sum(dim=(1, 2))# Apply the specified reductionifself.reduction =='mean':return torch.mean(negLogLike), torch.mean(habitat_loss), torch.mean(movement_loss)elifself.reduction =='sum':return torch.sum(negLogLike), torch.sum(habitat_loss), torch.sum(movement_loss)elifself.reduction =='none':return negLogLike, habitat_loss, movement_loss# Default return (though it should never reach here without hitting an if)return negLogLike, habitat_loss, movement_loss
Test the loss function
Code
# Define the negative log-likelihood loss function with mean reductionloss_fn = negativeLogLikeLoss(reduction='mean')# Calculate the loss using the model outputs and the targetstotal_loss, habitat_loss, movement_loss = loss_fn(model((sample_spatial_covs, sample_temporal_covs, sample_prev_bearing)), sample_next_step)print(f'Total loss: {total_loss}')print(f'Habitat loss: {habitat_loss}')print(f'Movement loss: {movement_loss}')
Total loss: 15.513991355895996
Habitat loss: 9.28042984008789
Movement loss: 6.2335615158081055
Early stopping code
This code will be used to stop training if the validation loss does not improve after a certain number of epochs.
When the loss of the validation data (which is held out from the training data) decreases (i.e. the model improves), the model weights are saved. Each time the validation loss does not decrease, a counter is incremented. If the counter reaches the patience value, the training loop will break and the model will stop training. The ‘final’ model is then the model that had the lowest validation loss.
We have saved this code in a script called deepSSF_early_stopping.py so that we can import it into future scripts.
Code
class EarlyStopping:def__init__(self, patience=5, verbose=False, delta=0, path='checkpoint.pt', trace_func=print):""" Args: patience (int): How long to wait after last time validation loss improved. Default: 5 verbose (bool): If True, prints a message for each validation loss improvement. Default: False delta (float): Minimum change in the monitored quantity to qualify as an improvement. Default: 0 path (str): Path for the checkpoint to be saved to. Default: 'checkpoint.pt' trace_func (function): trace print function. Default: print """self.patience = patienceself.verbose = verboseself.counter =0self.best_score =Noneself.early_stop =Falseself.val_loss_min =float('inf')self.delta = deltaself.path = pathself.trace_func = trace_funcdef__call__(self, val_loss, model):# takes the validation loss and the model as inputs score =-val_loss# save the model's weights if the validation loss decreasesifself.best_score isNone:self.best_score = scoreself.save_checkpoint(val_loss, model)# if the validation loss does not decrease, increment the counterelif score <self.best_score +self.delta:self.counter +=1self.trace_func(f'EarlyStopping counter: {self.counter} out of {self.patience}')ifself.counter >=self.patience:self.early_stop =Trueelse:self.best_score = scoreself.save_checkpoint(val_loss, model)self.counter =0def save_checkpoint(self, val_loss, model):'''Saves model when validation loss decrease.'''ifself.verbose:self.trace_func(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}). Saving model ...') torch.save(model.state_dict(), self.path)self.val_loss_min = val_loss
The learning rate is a hyperparameter that controls how much we are adjusting the weights of our network with respect to the loss gradient. A larger learning rate means that the optimiser will take larger steps, but it may overshoot the minimum. A smaller learning rate means that the optimiser will take smaller steps, but it may take a long time to converge.
We can therefore use an adaptive learning rate, which will adjust the learning rate during training. If the loss does not decrease after a certain number of epochs (also called the patience), the learning rate will be reduced by a factor of 10.
The patience of the learning rate should be less than the patience of the early stopping code, as we want to reduce the learning rate before we stop training.
Code
# Set the initial learning rates for each processinitial_learning_rate_movement =1e-5initial_learning_rate_habitat =1e-4
Instantiate the loss function, optimiser, learning rate scheduler and early stopping code
In this chunk, we set up all the components needed for training and validating a neural network model:
Loss Function: Uses a negative log-likelihood loss (negativeLogLikeLoss) with mean reduction.
Optimizer: Implements the Adam optimization algorithm, updating model parameters based on the computed gradients and a specified learning rate.
Scheduler: Automatically reduces the learning rate when the monitored metric (e.g., validation loss) stops improving.
Early Stopping: Monitors validation performance and stops training if the metric fails to improve after a certain number of epochs (patience).
Code
# Define the negative log-likelihood loss function with mean reductionloss_fn = negativeLogLikeLoss(reduction='mean')# Create a combined optimiser for all movement-related parametersmovement_params =list(model.conv_movement.parameters()) +list(model.fcn_movement_all.parameters())# Define separate optimizers for each componentoptimiser_movement = optim.Adam(movement_params, lr=initial_learning_rate_movement)optimiser_habitat = optim.Adam(model.conv_habitat.parameters(), lr=initial_learning_rate_habitat)# Put optimisers into a tuple to call in the training loopoptimisers = (optimiser_movement, optimiser_habitat)# Create separate schedulers for each optimizerscheduler_movement = torch.optim.lr_scheduler.ReduceLROnPlateau( optimiser_movement, 'min', factor=0.1, patience=5)scheduler_habitat = torch.optim.lr_scheduler.ReduceLROnPlateau( optimiser_habitat, 'min', factor=0.1, patience=5)# EarlyStopping stops training after 'patience=10' epochs with no improvement,# optionally saving the best model weightsearly_stopping = EarlyStopping(patience=25, verbose=True, path=path_save_weights)
Training loop
This code defines the main training loop for a single epoch. It iterates over batches from the training dataloader, moves the data to the correct device (e.g., CPU or GPU), calculates the loss, and performs backpropagation to update the model parameters. It also prints periodic updates of the current loss.
Code
def train_loop(dataloader_train, model, loss_fn, optimisers, skip_epoch0_training=False):""" Runs the training process for one epoch using the given dataloader, model, loss function, and optimizer. Prints progress updates every few batches. """# Unpack optimisers optimiser_movement, optimiser_habitat = optimisers# 1. Total number of training examples num_train_batches =len(dataloader_train) size =len(dataloader_train.dataset)# 2. Put model in training mode (affects layers like dropout, batchnorm) model.train()# 3. Variable to accumulate the total loss over the epoch epoch_loss =0.0# 4. Loop over batches in the training dataloaderfor batch, (x1, x2, x3, y) inenumerate(dataloader_train):# Move the batch of data to the specified device (CPU/GPU) x1 = x1.to(device) x2 = x2.to(device) x3 = x3.to(device) y = y.to(device)# Forward pass: compute the model output and losswith torch.set_grad_enabled(not skip_epoch0_training): outputs = model((x1, x2, x3)) total_loss, habitat_loss, movement_loss = loss_fn(outputs, y) epoch_loss += total_loss.detach() # Use detach to prevent memory leaks# Only perform optimization if not skipping trainingifnot skip_epoch0_training:# Backpropagation: compute gradients and update parameters# Reset gradients before the next iteration# Zero all gradients optimiser_movement.zero_grad() optimiser_habitat.zero_grad()# Single backward pass on total loss total_loss.backward()# For movement optimizer: save habitat gradients, then zero them out habitat_grads = []for param in model.conv_habitat.parameters():# Save the gradientif param.grad isnotNone: habitat_grads.append(param.grad.clone())else: habitat_grads.append(None)# Zero out habitat gradient for movement update param.grad =None# Update movement parameters optimiser_movement.step()# For habitat optimizer: restore habitat gradients and zero movement gradientsfor param in model.conv_movement.parameters(): param.grad =Nonefor param in model.fcn_movement_all.parameters(): param.grad =None# Restore habitat gradientsfor i, param inenumerate(model.conv_habitat.parameters()): param.grad = habitat_grads[i]# Update habitat parameters optimiser_habitat.step()# Print an update every 5 batches to keep track of training progressif batch %20==0: loss_val = total_loss.item() current = batch * batch_size +len(x1)if skip_epoch0_training:print(f"[Observation only] loss: {loss_val:>15f} [{current:>5d}/{size:>5d}]")else:print(f"loss: {loss_val:>15f} [{current:>5d}/{size:>5d}]") torch.cuda.empty_cache()# Compute the average training loss and print it epoch_loss /= num_train_batchesif skip_epoch0_training:print(f"\nAvg training loss (observation only): {epoch_loss:>15f}")else:print(f"\nAvg training loss: {epoch_loss:>15f}") train_losses.append(epoch_loss.item())
Test loop
The test loop is similar to the training loop, but it does not perform backpropagation. It calculates the loss on the test set and returns the average loss.
Code
def test_loop(dataloader_test, model, loss_fn):""" Evaluates the model on the provided test dataset by computing the average loss over all batches. No gradients are computed during this process (torch.no_grad()). """# 1. Set the model to evaluation mode (affects layers like dropout, batchnorm). model.eval() size =len(dataloader_test.dataset) num_batches =len(dataloader_test) test_loss =0# 2. Disable gradient computation to speed up evaluation and reduce memory usagewith torch.no_grad():# 3. Loop through each batch in the test dataloaderfor x1, x2, x3, y in dataloader_test:# Move the batch of data to the appropriate device (CPU/GPU)# x1, x2, x3 are the spatial covariates, temporal covariates, and bearing, respectively# y is the label (observed location of the next step) x1 = x1.to(device) x2 = x2.to(device) x3 = x3.to(device) y = y.to(device)# Compute the loss on the test set (no backward pass needed) total_loss, habitat_loss, movement_loss = loss_fn(model((x1, x2, x3)), y) test_loss += total_loss.detach()# 4. Compute average test loss over all batches test_loss /= num_batches torch.cuda.empty_cache()# Print the average test lossprint(f"Avg test loss: {test_loss:>15f}\n")
Train the model
Here we have the main training process that loops over multiple epochs. Each epoch involves:
Training the model on a training dataset.
Validating the model on a validation dataset to monitor its performance and adjust the learning rate (via scheduler).
Checking for early stopping conditions. If triggered, the best model weights are restored, and a test evaluation is performed.
Additionally, commented-out code at the end shows how you might visualise and save intermediate training results (such as predicted probability surfaces) for diagnostic or research purposes. The saved images can then be combined into an animation.
Code
epochs =120train_losses = [] # Track training losses across epochsval_losses = [] # Track validation losses across epochsval_habitat_losses = [] # Track validation habitat losses across epochsval_movement_losses = [] # Track validation movement losses across epochs# Initialize the parameter container using the parameters defined in 'params_dict'params = ModelParams(params_dict)# Create an instance of the ConvJointModel using the parameters,# and move the model to the specified device (e.g., CPU or GPU)model = ConvJointModel(params).to(device)# Print the model architectureprint(model)# Define the negative log-likelihood loss function with mean reductionloss_fn = negativeLogLikeLoss(reduction='mean') #, alpha=0.5# Set the initial learning rates for each processinitial_learning_rate_movement =1e-5initial_learning_rate_habitat =1e-4# Create a combined optimiser for all movement-related parametersmovement_params =list(model.conv_movement.parameters()) +list(model.fcn_movement_all.parameters())# Define separate optimizers for each componentoptimiser_movement = optim.Adam(movement_params, lr=initial_learning_rate_movement)optimiser_habitat = optim.Adam(model.conv_habitat.parameters(), lr=initial_learning_rate_habitat)# Put optimisers into a tuple to call in the training loopoptimisers = (optimiser_movement, optimiser_habitat)# Create separate schedulers for each optimizerscheduler_movement = torch.optim.lr_scheduler.ReduceLROnPlateau( optimiser_movement, 'min', factor=0.1, patience=5)scheduler_habitat = torch.optim.lr_scheduler.ReduceLROnPlateau( optimiser_habitat, 'min', factor=0.1, patience=5)# Reinitialise early stoppingearly_stopping = EarlyStopping(patience=10, verbose=True, path=path_save_weights)# Create directory for saving training imagesos.makedirs(f'{output_dir}/training_images', exist_ok=True)for t inrange(epochs):# Initialise variables to store during training train_loss =0.0 num_train_batches =len(dataloader_train) val_loss =0.0 val_loss_habitat =0.0 val_loss_movement =0.0 num_batches =len(dataloader_val)print(f"Epoch {t+1}\n-------------------------------")# Skip training in the first epoch, but still calculate losses skip_training = (t ==0)# 1. Run the training loop for one epoch using the training dataloader train_loop(dataloader_train, model, loss_fn, optimisers, skip_epoch0_training=skip_training)# 2. Evaluate model performance on the validation dataset model.eval() # Switch to evaluation mode for proper layer behaviorwith torch.no_grad():# Loop through each batch in the validation dataloaderfor x1, x2, x3, y in dataloader_val:# Move data to the chosen device (CPU/GPU) x1 = x1.to(device) x2 = x2.to(device) x3 = x3.to(device) y = y.to(device)# Accumulate validation loss total_loss, habitat_loss, movement_loss = loss_fn(model((x1, x2, x3)), y) val_loss += total_loss.detach() val_loss_habitat += habitat_loss.detach() val_loss_movement += movement_loss.detach()# # 3. Step the scheduler based on the validation loss (adjusts learning rate if needed)# scheduler.step(val_loss) scheduler_movement.step(val_loss_movement) scheduler_habitat.step(val_loss_habitat)# 4. Compute the average validation loss and print it, along with the current learning rate val_loss /= num_batches val_loss_habitat /= num_batches val_loss_movement /= num_batchesprint(f"Avg validation loss: {val_loss:>15f}")print(f"Avg validation habitat loss: {val_loss_habitat:>15f}")print(f"Avg validation movement loss: {val_loss_movement:>15f}")print(f"Movement learning rate: {scheduler_movement.get_last_lr()}")print(f"Habitat learning rate: {scheduler_habitat.get_last_lr()}")# 5. Track the validation loss for plotting or monitoring val_losses.append(val_loss.item()) val_habitat_losses.append(val_loss_habitat.item()) val_movement_losses.append(val_loss_movement.item())# Memory management - add after validation but before early stopping check# torch.cuda.empty_cache()# gc.collect()# 6. Early stopping: if no improvement in validation loss for a set patience, stop training early_stopping(val_loss, model)if early_stopping.early_stop:print("Early stopping")# Restore the best model weights saved by EarlyStopping model.load_state_dict(torch.load(path_save_weights, weights_only=True, map_location=device)) test_loop(dataloader_test, model, loss_fn) # Evaluate on test set once training stopsbreakelse: model.eval()print("\n") torch.cuda.empty_cache()# ----------------------------------------------------# The following code demonstrates how# to optionally visualize or save intermediate results# (e.g., habitat probability surface, movement probability,# and next-step probability surfaces).# uncomment the code all in one go to run it (it should be inside the training loop)# ----------------------------------------------------# Extract training and validation losses for plotting# Convert the list of tensors to a single tensor train_losses_np = torch.tensor(train_losses).detach().cpu().numpy() val_losses_np = torch.tensor(val_losses).detach().cpu().numpy() val_habitat_losses_np = torch.tensor(val_habitat_losses).detach().cpu().numpy() val_movement_losses_np = torch.tensor(val_movement_losses).detach().cpu().numpy()# Number of epochs n_epochs =len(val_losses)# -----------------------------------------------------------# 1. Retrieve a single test example (covariates and labels)# at the specified 'iteration_index' from the test dataset# ----------------------------------------------------------- x1, x2, x3, labels = dataloader_train.dataset[iteration_index]# -----------------------------------------------------------# 2. Add a batch dimension and move tensors to the device# for model inference# ----------------------------------------------------------- x1 = x1.unsqueeze(0).to(device) x2 = x2.unsqueeze(0).to(device) x3 = x3.unsqueeze(0).to(device)# -----------------------------------------------------------# 3. Run the model on the single test example# ----------------------------------------------------------- test = model((x1, x2, x3))# -----------------------------------------------------------# 4. Extract habitat and movement outputs;# convert them to NumPy arrays for visualization# ----------------------------------------------------------- hab_density = test.detach().cpu().numpy()[0, :, :, 0] movement_density = test.detach().cpu().numpy()[0, :, :, 1]# -----------------------------------------------------------# 5. Generate masks to exclude certain border cells for# color scale reasons (setting them to -inf).# ----------------------------------------------------------- x_mask = np.ones_like(hab_density) y_mask = np.ones_like(hab_density)# Mask out a few columns (0-2 and 98-end) and rows (0-2 and 98-end) x_mask[:, :3] =-np.inf x_mask[:, 98:] =-np.inf y_mask[:3, :] =-np.inf y_mask[98:, :] =-np.inf# Apply the masks to habitat density hab_density_mask = hab_density * x_mask * y_mask# Combine habitat and movement densities to represent# next-step probability step_density = hab_density + movement_density step_density_mask = step_density * x_mask * y_mask# Plot the covariates fig, axs = plt.subplots(2, 2, figsize=(9, 7.5))# # Plot NDVI# im1 = axs[0, 0].imshow(ndvi_natural.numpy(), cmap='viridis')# axs[0, 0].set_title('NDVI')# fig.colorbar(im1, ax=axs[0, 0])# Plot Training and Validation Loss axs[0, 0].plot(range(n_epochs), train_losses_np, label='Training Loss', color='blue') axs[0, 0].plot(range(n_epochs), val_losses_np, label='Validation Loss', color='red')# axs[0, 0].plot(range(n_epochs), val_habitat_losses_np, label='Validation Habitat Loss', color='green')# axs[0, 0].plot(range(n_epochs), val_movement_losses_np, label='Validation Movement Loss', color='orange') axs[0, 0].set_xlim(0, 120) axs[0, 0].set_title('Training and validation loss') axs[0, 0].set_xlabel('Epoch') axs[0, 0].set_ylabel('Loss') axs[0, 0].legend()# Plot habitat selection log-probability im2 = axs[0, 1].imshow(hab_density_mask, cmap='viridis') axs[0, 1].set_title('Habitat log-probability') fig.colorbar(im2, ax=axs[0, 1])# Plot movement log-probability im3 = axs[1, 0].imshow(movement_density, cmap='viridis') axs[1, 0].set_title('Movement log-probability') fig.colorbar(im3, ax=axs[1, 0])# Plot next-step log-probability im4 = axs[1, 1].imshow(step_density_mask, cmap='viridis') axs[1, 1].set_title('Next-step log-probability') fig.colorbar(im4, ax=axs[1, 1]) filename_covs =f'{output_dir}/training_images/training_epoch{t}_id{buffalo_id}_yday{yday_t2_integer}_hour{hour_t2_integer}_bearing{bearing_degrees}_next_r{row}_c{column}.png' plt.tight_layout() plt.savefig(filename_covs, dpi=300) # creates inconsistent image sizes >>> , bbox_inches='tight'# plt.show() plt.close() # Close the figure to free memoryprint("Done!")
First, here’s a function to call to make a gif from a given directory.
Code
# Example sorting by the epoch numberdef extract_epoch(filename):# Extract the epoch number from the filename# Adjust the extraction based on your naming patternimport re match = re.search(r'training_epoch(\d+)_', filename)if match:returnint(match.group(1))return0def create_gif(image_folder, output_filename, fps=10):""" Creates a GIF from a sequence of images in a folder. Parameters: - image_folder: Path to the folder containing images - output_filename: Name of the output GIF file - duration: Duration of each frame in seconds """# Get all png files in the specified folder, sorted by name images =sorted(glob.glob(os.path.join(image_folder, '*.png')), key=extract_epoch)# Check if any images were foundifnot images:print(f"No images found in {image_folder}")return# Read all images frames = [imageio.imread(image) for image in images]# Save as GIF imageio.mimsave(output_filename, frames, fps=fps, loop=0) display(Image(filename=output_filename))print(f"GIF created successfully: {output_filename}")
Code
# Path to your imagesimage_folder =f'{output_dir}/training_images'# Output GIF filenameoutput_filename =f'{output_dir}/training_gif_id{buffalo_id}_yday{yday_t2_integer}_hour{hour_t2_integer}_bearing{bearing_degrees}_next_r{row}_c{column}.gif'# Create the GIFcreate_gif(image_folder, output_filename, fps=10)
<IPython.core.display.Image object>
GIF created successfully: ../Python/outputs/model_training/id2005_deepSSF_training_1_2025-07-09/training_gif_id2005_yday324_hour17_bearing156_next_r46_c44.gif
Code
# to look at the parameters (weights and biases) of the model# print(model.state_dict())
Loading in previous models
As we’ve trained the model, the model parameters are already stored in the model object. But as we were training the model, we were saving it to file, and that, and other trained models can be loaded.
The model parameters that are being loaded must match the model object that has been defined above. If the model object has changed, the model parameters will not be able to be loaded.
# Directory for saving the loss dataframefilename_loss_csv =f'{output_dir}/deepSSF_val_loss_buffalo{buffalo_id}.csv'# Check if val_losses is defined (which means a model has been trained in this session)try:# Convert the list of tensors to a single tensor val_losses_tensor = torch.tensor(val_losses)print("val_losses has been defined - storing as csv\n")# Number of epochs n_epochs =len(val_losses)print(f'Number of epochs: {n_epochs}') val_losses_df = pd.DataFrame({"epoch": range(1, n_epochs +1),"val_losses": val_losses_tensor.detach().cpu().numpy() })print(val_losses_df.head())# Save the validation losses to a CSV file val_losses_df.to_csv(filename_loss_csv, index=False)# if val_losses hasn't been defined (for if you are loading model weights from a saved object)exceptNameError:# This code runs if val_losses is not definedprint("val_losses has not been defined - loading from saved csv\n")# Initialize it with a default value# Read the val_losses csv file val_losses_df = pd.read_csv(filename_loss_csv)print(val_losses_df.head())# Number of epochs n_epochs =len(val_losses_df)print(f'\nNumber of epochs: {n_epochs}')
val_losses has been defined - storing as csv
Number of epochs: 58
epoch val_losses
0 1 15.368755
1 2 15.255189
2 3 15.095748
3 4 14.955804
4 5 14.828360
Code
# Directory for saving the loss dataframefilename_train_loss_csv =f'{output_dir}/deepSSF_train_loss_buffalo{buffalo_id}.csv'# Check if train_losses is defined (which means a model has been trained in this session)try:# Convert the list of tensors to a single tensor train_losses_tensor = torch.tensor(train_losses)print("train_losses has been defined - storing as csv\n") train_losses_df = pd.DataFrame({"epoch": np.linspace(1, n_epochs, len(train_losses)),"train_losses": train_losses_tensor.detach().cpu().numpy() })print(train_losses_df.head)# Save the train losses to a CSV file train_losses_df.to_csv(filename_train_loss_csv, index=False)# if train_losses hasn't been defined (for if you are loading model weights from a saved object)exceptNameError:# This code runs if train_losses is not definedprint("train_losses has not been defined - loading from saved csv\n")# Initialize it with a default value# Read the train_losses csv file train_losses_df = pd.read_csv(filename_train_loss_csv)print(train_losses_df.head())
# Directory for saving the loss plotsfilename_loss =f'{output_dir}/val_loss_buffalo{buffalo_id}.png'# Plot the validation lossesplt.plot(train_losses_df['epoch'], train_losses_df['train_losses'], label='Training Loss', color='blue') # Plot training loss in blueplt.plot(val_losses_df['epoch'], val_losses_df['val_losses'], label='Validation Loss', color='red') # Plot validation loss in redplt.title('Validation Losses')plt.xlabel('Epoch')plt.ylabel('Loss')plt.legend() # Show legend to distinguish linesplt.savefig(filename_loss, dpi=300, bbox_inches='tight')plt.show()
Test the model on sample covariates
Use the same sample covariates as above (selected before training the model) from the test dataset, and then run the model on these covariates. To select a different sample, change the iteration_index in the code above and re-run those chunks.
Print the scalar values and plot the sample covariates
Code
# Print relevant information about the current prediction context# such as time of day, day of year, and bearing angles in both radians and degrees.print(f'Hour: {hour_t2_integer}')print(f'Day of the year: {yday_t2_integer}')print(f'Bearing (radians): {bearing}')print(f'Bearing (degrees): {bearing_degrees}')# Plot the covariatesfig, axs = plt.subplots(2, 2, figsize=(9, 7.5))# Plot NDVIim1 = axs[0, 0].imshow(ndvi_natural.numpy(), cmap='viridis')axs[0, 0].set_title('NDVI')fig.colorbar(im1, ax=axs[0, 0])# Plot Canopy coverim2 = axs[0, 1].imshow(canopy_natural.numpy(), cmap='viridis')axs[0, 1].set_title('Canopy cover')fig.colorbar(im2, ax=axs[0, 1])# Plot Herbaceous vegetationim3 = axs[1, 0].imshow(herby_natural.numpy(), cmap='viridis')axs[1, 0].set_title('Herbaceous vegetation')fig.colorbar(im3, ax=axs[1, 0])# Plot Slopeim4 = axs[1, 1].imshow(slope_natural.numpy(), cmap='viridis')axs[1, 1].set_title('Slope')fig.colorbar(im4, ax=axs[1, 1])filename_covs =f'{output_dir}/id{buffalo_id}_sample{iteration_index}_yday{yday_t2_integer}_hour{hour_t2_integer}_bearing{bearing_degrees}_next_r{row}_c{column}.png'plt.tight_layout()plt.savefig(filename_covs, dpi=300, bbox_inches='tight') # if we want to save the figureplt.show()plt.close() # Close the figure to free memory
Hour: 17
Day of the year: 324
Bearing (radians): 2.7272613048553467
Bearing (degrees): 156
Run the model on the sample covariates
Code
# -------------------------------------------------------------------------# Switch the model to evaluation mode (e.g., disables dropout, etc.)# -------------------------------------------------------------------------model.eval()# -------------------------------------------------------------------------# Pass the inputs through the model; 'test' will have shape [batch, H, W, 2]# -------------------------------------------------------------------------test = model((sample_spatial_covs, sample_temporal_covs, sample_prev_bearing))# test = model((x1, x2, x3))print(test.shape)# -------------------------------------------------------------------------# Extract and exponentiate the habitat density channel# (at index 0 in the last dimension)# -------------------------------------------------------------------------hab_density = test.detach().cpu().numpy()[0, :, :, 0]hab_density_exp = np.exp(hab_density)# print(np.sum(hab_density_exp)) # Debug: check the sum of exponentiated values# -------------------------------------------------------------------------# Create masks to remove unwanted edge cells from visualization# (setting them to -∞ affects the color scale in plots)# -------------------------------------------------------------------------x_mask = np.ones_like(hab_density)y_mask = np.ones_like(hab_density)x_mask[:, :3] =-np.infx_mask[:, 98:] =-np.infy_mask[:3, :] =-np.infy_mask[98:, :] =-np.inf# -------------------------------------------------------------------------# Apply the masks to the habitat density (log scale) and exponentiated version# -------------------------------------------------------------------------hab_density_mask = hab_density * x_mask * y_maskhab_density_exp_mask = hab_density_exp * x_mask * y_mask# -------------------------------------------------------------------------# Plot and save the habitat density in log scale# -------------------------------------------------------------------------plt.imshow(hab_density_mask)plt.colorbar()plt.title('Habitat selection probability (log)')plt.savefig(f'{output_dir}/hab_log_prob_id{buffalo_id}.png', dpi=300, bbox_inches='tight')plt.show()plt.close()# -------------------------------------------------------------------------# Plot and save the habitat density in probability (exponentiated) scale# -------------------------------------------------------------------------plt.imshow(hab_density_exp_mask)plt.colorbar()plt.title('Habitat selection probability')plt.savefig(f'{output_dir}/hab_prob_id{buffalo_id}.png', dpi=300, bbox_inches='tight')plt.show()plt.close()# -------------------------------------------------------------------------# Extract and exponentiate the movement density channel# (at index 1 in the last dimension)# -------------------------------------------------------------------------move_density = test.detach().cpu().numpy()[0, :, :, 1]move_density_exp = np.exp(move_density)# print(np.sum(move_density_exp)) # Debug: check the sum of exponentiated values# -------------------------------------------------------------------------# Apply the same masking strategy to movement densities# -------------------------------------------------------------------------move_density_mask = move_density * x_mask * y_maskmove_density_exp_mask = move_density_exp * x_mask * y_mask# -------------------------------------------------------------------------# Plot and save the movement density in log scale# -------------------------------------------------------------------------plt.imshow(move_density_mask)plt.colorbar()plt.title('Movement probability (log)')plt.savefig(f'{output_dir}/move_log_prob_id{buffalo_id}.png', dpi=300, bbox_inches='tight')plt.show()plt.close()# -------------------------------------------------------------------------# Plot and save the movement density in probability (exponentiated) scale# -------------------------------------------------------------------------plt.imshow(move_density_exp_mask)plt.colorbar()plt.title('Movement probability')plt.savefig(f'{output_dir}/move_prob_id{buffalo_id}.png', dpi=300, bbox_inches='tight')plt.show()plt.close()# -------------------------------------------------------------------------# Compute the next-step density by adding habitat + movement (log-space)# Then exponentiate and normalize# -------------------------------------------------------------------------step_density = test[0, :, :, 0] + test[0, :, :, 1]step_density = step_density.detach().cpu().numpy()step_density_exp = np.exp(step_density)# print('Sum of step density exp = ', np.sum(step_density_exp)) # Debugstep_density_exp_norm = step_density_exp / np.sum(step_density_exp)# print('Sum of step density exp norm = ', np.sum(step_density_exp_norm)) # Debug# -------------------------------------------------------------------------# Apply masks to the step densities (log and exponentiated + normalized)# -------------------------------------------------------------------------step_density_mask = step_density * x_mask * y_maskstep_density_exp_norm_mask = step_density_exp_norm * x_mask * y_mask# -------------------------------------------------------------------------# Plot and save the combined next-step probability surface in log scale# -------------------------------------------------------------------------plt.imshow(step_density_mask)plt.colorbar()plt.title('Next-step probability (log)')plt.savefig(f'{output_dir}/step_log_prob_id{buffalo_id}.png', dpi=300, bbox_inches='tight')plt.show()plt.close()# -------------------------------------------------------------------------# Plot and save the combined next-step probability surface in probability scale# -------------------------------------------------------------------------plt.imshow(step_density_exp_norm_mask)plt.colorbar()plt.title('Next-step probability')plt.savefig(f'{output_dir}/step_prob_id{buffalo_id}.png', dpi=300, bbox_inches='tight')plt.show()plt.close()
torch.Size([1, 101, 101, 2])
Extracting convolution layer outputs
In the convolutional blocks, each convolutional layer learns a set of filters (kernels) that extract different features from the input data. In the habitat selection subnetwork, the convolution filters (and their associated bias parameters - not shown below) are the only parameters that are trained, and it is the filters that transform the set of input covariates into the habitat selection probabilities. They do this by maximising features of the inputs that correlate with observed next-steps.
For each convolutional layer, there are typically a number of filters. For the habitat selection subnetwork, we used 4 filters in the first two layers, and a single filter in the last layer. Each of these filters has a number of channels which correspond one-to-one with the input layers. The outputs of the filter channels are then combined to produce a feature map, with a single feature map produced for each filter. In successive layers, the feature maps become the input layers, and the filters operate on these layers. Because there are multiple filters in ech layer, they can ‘specialise’ in extracting different features from the input layers.
By visualizing and inspecting these filters, and the corresponding feature maps, we can:
Gain interpretability: Understand what kind of features the network is detecting—e.g., edges, shapes, or textures.
Debug: Check if the filters have meaningful patterns or if something went wrong (e.g., all zeros or random noise).
Compare layers: See how early layers often learn low-level patterns while deeper layers learn more abstract features.
We will first set up some activation hooks for storing the feature maps. Activation hooks are placed at certain points within the model’s forward pass and store intermediate results. We will also extract the convolution filters (which are weights of the model and as such don’t require hooks - we can access them directly).
We will then run the sample covariates through the model and extract the feature maps from the habitat selection convolutional block, and plot them along with the covariates and convolution filters.
Note that there are also ReLU activation functions in the convolutional blocks, which are not shown below. These are applied to the feature maps, and set all negative values to zero. They are not learned parameters, but are part of the forward pass of the model.
Convolutional layer 1
Activation hook
Code
# -----------------------------------------------------------# Create a dictionary to store activation outputs# -----------------------------------------------------------activation = {}def get_activation(name):""" Returns a hook function that can be registered on a layer to capture its output (i.e., feature maps) after the forward pass. Args: name (str): The key under which the activation is stored in the 'activation' dict. """def hook(model, input, output):# Detach and save the layer's output in the dictionary activation[name] = output.detach()return hook# -----------------------------------------------------------# Register a forward hook on the first convolution layer# in the model's 'conv_habitat' block# -----------------------------------------------------------model.conv_habitat.conv2d[0].register_forward_hook(get_activation("hab_conv1"))# -----------------------------------------------------------# Perform a forward pass through the model with the desired input# The feature maps from the hooked layer will be stored in 'activation'# -----------------------------------------------------------# out = model((x1, x2, x3)) # e.g., model((spatial_data_x, scalars_to_grid, bearing_x))test = model((sample_spatial_covs, sample_temporal_covs, sample_prev_bearing))# -----------------------------------------------------------# Retrieve the captured feature maps from the dictionary# and move them to the CPU for inspection# -----------------------------------------------------------feat_maps1 = activation["hab_conv1"].cpu()print("Feature map shape:", feat_maps1.shape)# Typically shape: (batch_size, out_channels, height, width)# -----------------------------------------------------------# Visualize the feature maps for the first sample in the batch# -----------------------------------------------------------feat_maps1_sample = feat_maps1[0] # Shape: (out_channels, H, W)num_maps1 = feat_maps1_sample.shape[0]print("Number of feature maps:", num_maps1)
Feature map shape: torch.Size([1, 4, 101, 101])
Number of feature maps: 4
Stack spatial and scalar (as grid) covariates
For plotting. Also create a vector of names to index over.
# -------------------------------------------------------------------------# Check or print the convolution layer in conv_habitat (for debugging)# -------------------------------------------------------------------------print(model.conv_habitat.conv2d)# -------------------------------------------------------------------------# Set the model to evaluation mode (disables dropout, etc.)# -------------------------------------------------------------------------model.eval()# -------------------------------------------------------------------------# Extract the weights (filters) from the first convolution layer in conv_habitat# -------------------------------------------------------------------------filters_c1 = model.conv_habitat.conv2d[0].weight.data.clone().cpu()print("Filters shape:", filters_c1.shape)# Typically (out_channels, in_channels, kernel_height, kernel_width)# -------------------------------------------------------------------------# Visualize each filter’s first channel in a grid of subplots# -------------------------------------------------------------------------num_filters_c1 = filters_c1.shape[1]print(num_filters_c1)for z inrange(num_maps1): fig, axes = plt.subplots(2, num_filters_c1, figsize=(2*num_filters_c1, 4))for i inrange(num_filters_c1):# Add the covariates as the first row of subplots axes[0,i].imshow(covariate_stack[0, i].detach().cpu().numpy(), cmap='viridis') axes[0,i].axis('off') axes[0,i].set_title(f'{covariate_names[i]}')if i >3: im1 = axes[0,i].imshow(covariate_stack[0, i].detach().cpu().numpy(), cmap='viridis') im1.set_clim(-1, 1) axes[0,i].text(scalar_maps.shape[2] //2, scalar_maps.shape[3] //2,f'Value: {round(sample_temporal_covs[0, i-4].item(), 2)}', ha='center', va='center', color='white', fontsize=12) kernel = filters_c1[z, i, :, :] # Show the first input channel im = axes[1,i].imshow(kernel, cmap='viridis') axes[1,i].axis('off') axes[1,i].set_title(f'Layer 1, Filter {z+1}')# Annotate each cell with the numeric valuefor (j, k), val in np.ndenumerate(kernel): axes[1,i].text(k, j, f'{val:.2f}', ha='center', va='center', color='white') plt.tight_layout() plt.savefig(f'{output_dir}/id{buffalo_id}_conv_layer1_filters{z}.png', dpi=300, bbox_inches='tight') plt.show()# -----------------------------------------------------------# Loop over each feature map channel and save them as images.# Multiply by x_mask * y_mask if you need to mask out edges.# ----------------------------------------------------------- plt.figure() plt.imshow(feat_maps1_sample[z].numpy() * x_mask * y_mask, cmap='viridis') plt.title(f"Layer 1, Feature Map {z+1}")# Hide axis if you prefer: plt.axis('off') plt.savefig(f'{output_dir}/id{buffalo_id}_conv_layer1_feature_map{z}.png', dpi=300, bbox_inches='tight') plt.show()
# -----------------------------------------------------------# Register a forward hook on the second convolution layer# in the model's 'conv_habitat' block# -----------------------------------------------------------model.conv_habitat.conv2d[2].register_forward_hook(get_activation("hab_conv2"))# -----------------------------------------------------------# Perform a forward pass through the model with the desired input# The feature maps from the hooked layer will be stored in 'activation'# -----------------------------------------------------------# out = model((x1, x2, x3)) # e.g., model((spatial_data_x, scalars_to_grid, bearing_x))test = model((sample_spatial_covs, sample_temporal_covs, sample_prev_bearing))# -----------------------------------------------------------# Retrieve the captured feature maps from the dictionary# and move them to the CPU for inspection# -----------------------------------------------------------feat_maps2 = activation["hab_conv2"].cpu()print("Feature map shape:", feat_maps2.shape)# Typically shape: (batch_size, out_channels, height, width)# -----------------------------------------------------------# Visualize the feature maps for the first sample in the batch# -----------------------------------------------------------feat_maps2_sample = feat_maps2[0] # Shape: (out_channels, H, W)num_maps2 = feat_maps2_sample.shape[0]print("Number of feature maps:", num_maps2)
Feature map shape: torch.Size([1, 4, 101, 101])
Number of feature maps: 4
Extract filters and plot
Code
# -------------------------------------------------------------------------# Extract the weights (filters) from the second convolution layer in conv_habitat# -------------------------------------------------------------------------filters_c2 = model.conv_habitat.conv2d[2].weight.data.clone().cpu()print("Filters shape:", filters_c2.shape)# Typically (out_channels, in_channels, kernel_height, kernel_width)# -------------------------------------------------------------------------# Visualize each filter’s first channel in a grid of subplots# -------------------------------------------------------------------------num_filters_c2 = filters_c2.shape[1]print(num_filters_c2)for z inrange(num_maps2): fig, axes = plt.subplots(2, num_filters_c2, figsize=(2*num_filters_c2, 4))for i inrange(num_filters_c2):# Add the covariates as the first row of subplots axes[0,i].imshow(feat_maps1_sample[i].numpy() * x_mask * y_mask, cmap='viridis') axes[0,i].axis('off') axes[0,i].set_title(f"Layer 1, Map {z+1}")# if i > 3:# im1 = axes[0,i].imshow(covariate_stack[0, i].detach().cpu().numpy(), cmap='viridis')# im1.set_clim(-1, 1)# axes[0,i].text(scalar_maps.shape[2] // 2, scalar_maps.shape[3] // 2,# f'Value: {round(x2[0, i-4].item(), 2)}',# ha='center', va='center', color='white', fontsize=12) kernel = filters_c2[z, i, :, :] # Show the first input channel im = axes[1,i].imshow(kernel, cmap='viridis') axes[1,i].axis('off') axes[1,i].set_title(f'Layer 2, Filter {z+1}')# Annotate each cell with the numeric valuefor (j, k), val in np.ndenumerate(kernel): axes[1,i].text(k, j, f'{val:.2f}', ha='center', va='center', color='white') plt.tight_layout() plt.savefig(f'{output_dir}/id{buffalo_id}_conv_layer2_filters{z}.png', dpi=300, bbox_inches='tight') plt.show()# -----------------------------------------------------------# 6. Loop over each feature map channel and save them as images.# Multiply by x_mask * y_mask if you need to mask out edges.# ----------------------------------------------------------- plt.figure() plt.imshow(feat_maps2_sample[z].numpy() * x_mask * y_mask, cmap='viridis') plt.title(f"Layer 2, Feature Map {z+1}")# Hide axis if you prefer: plt.axis('off') plt.savefig(f'{output_dir}/id{buffalo_id}_conv_layer2_feature_map{z}.png', dpi=300, bbox_inches='tight') plt.show()
Filters shape: torch.Size([4, 4, 3, 3])
4
Convolutional layer 3
Activation hook
Code
# -----------------------------------------------------------# Register a forward hook on the third convolution layer# in the model's 'conv_habitat' block# -----------------------------------------------------------model.conv_habitat.conv2d[4].register_forward_hook(get_activation("hab_conv3"))# -----------------------------------------------------------# Perform a forward pass through the model with the desired input# The feature maps from the hooked layer will be stored in 'activation'# -----------------------------------------------------------# out = model((x1, x2, x3)) # e.g., model((spatial_data_x, scalars_to_grid, bearing_x))test = model((sample_spatial_covs, sample_temporal_covs, sample_prev_bearing))# -----------------------------------------------------------# Retrieve the captured feature maps from the dictionary# and move them to the CPU for inspection# -----------------------------------------------------------feat_maps3 = activation["hab_conv3"].cpu()print("Feature map shape:", feat_maps3.shape)# Typically shape: (batch_size, out_channels, height, width)# -----------------------------------------------------------# Visualize the feature maps for the first sample in the batch# -----------------------------------------------------------feat_maps3_sample = feat_maps3[0] # Shape: (out_channels, H, W)num_maps3 = feat_maps3_sample.shape[0]print("Number of feature maps:", num_maps3)
Feature map shape: torch.Size([1, 1, 101, 101])
Number of feature maps: 1
Extract filters and plot
Code
# -------------------------------------------------------------------------# Extract the weights (filters) from the second convolution layer in conv_habitat# -------------------------------------------------------------------------filters_c3 = model.conv_habitat.conv2d[4].weight.data.clone().cpu()print("Filters shape:", filters_c3.shape)# Typically (out_channels, in_channels, kernel_height, kernel_width)# -------------------------------------------------------------------------# Visualize each filter’s first channel in a grid of subplots# -------------------------------------------------------------------------num_filters_c3 = filters_c3.shape[1]print(num_filters_c3)for z inrange(num_maps3): fig, axes = plt.subplots(2, num_filters_c3, figsize=(2*num_filters_c3, 4))for i inrange(num_filters_c3):# Add the covariates as the first row of subplots axes[0,i].imshow(feat_maps2_sample[i].numpy() * x_mask * y_mask, cmap='viridis') axes[0,i].axis('off') axes[0,i].set_title(f"Layer 2, Map {z+1}") kernel = filters_c3[z, i, :, :] # Show the first input channel im = axes[1,i].imshow(kernel, cmap='viridis') axes[1,i].axis('off') axes[1,i].set_title(f'Layer 3, Filter {z+1}')# Annotate each cell with the numeric valuefor (j, k), val in np.ndenumerate(kernel): axes[1,i].text(k, j, f'{val:.2f}', ha='center', va='center', color='white') plt.tight_layout() plt.savefig(f'{output_dir}/id{buffalo_id}_conv_layer3_filters{z}.png', dpi=300, bbox_inches='tight') plt.show()# -----------------------------------------------------------# 6. Loop over each feature map channel and save them as images.# Multiply by x_mask * y_mask if you need to mask out edges.# ----------------------------------------------------------- plt.figure() plt.imshow(feat_maps3_sample[z].numpy() * x_mask * y_mask, cmap='viridis') plt.title(f"Habitat selection log probability")# Hide axis if you prefer: plt.axis('off') plt.savefig(f'{output_dir}/id{buffalo_id}_conv_layer3_feature_map{z}.png', dpi=300, bbox_inches='tight') plt.show()
Filters shape: torch.Size([1, 4, 3, 3])
4
Checking estimated movement parameters
Similarly to the convolutional layers, we can set hooks to extract the predicted movement parameters from the model, and assess how variable that is across samples.
Code
torch.cuda.empty_cache()# -------------------------------------------------------------------------# Create a list to store the intermediate output from the fully connected# movement sub-network (fcn_movement_all)# -------------------------------------------------------------------------intermediate_output = []def hook(module, input, output):""" Hook function that captures the output of the specified layer (fcn_movement_all) during the forward pass. """ intermediate_output.append(output)# -------------------------------------------------------------------------# Register the forward hook on 'fcn_movement_all', so its outputs# are recorded every time the model does a forward pass.# -------------------------------------------------------------------------hook_handle = model.fcn_movement_all.register_forward_hook(hook)# -------------------------------------------------------------------------# Perform a forward pass with the model in evaluation mode,# disabling gradient computation.# -------------------------------------------------------------------------model.eval()with torch.no_grad(): final_output = model((sample_spatial_covs, sample_temporal_covs, sample_prev_bearing))# -------------------------------------------------------------------------# Inspect the captured intermediate output# 'intermediate_output[0]' corresponds to the first (and only) forward pass.# -------------------------------------------------------------------------print("Intermediate output shape:", intermediate_output[0].shape)print("Intermediate output values:", intermediate_output[0][0])# -------------------------------------------------------------------------# Remove the hook to avoid repeated capturing in subsequent passes# -------------------------------------------------------------------------hook_handle.remove()# -------------------------------------------------------------------------# Unpack the parameters from the FCN output (assumes a specific ordering)# -------------------------------------------------------------------------gamma_shape1, gamma_scale1, gamma_weight1, \gamma_shape2, gamma_scale2, gamma_weight2, \vonmises_mu1, vonmises_kappa1, vonmises_weight1, \vonmises_mu2, vonmises_kappa2, vonmises_weight2 = intermediate_output[0][0]# -------------------------------------------------------------------------# Convert parameters from log-space (if applicable) and print them# Gamma and von Mises parameters# -------------------------------------------------------------------------# --- Gamma #1 ---print("Gamma shape 1:", torch.exp(gamma_shape1))print("Gamma scale 1:", torch.exp(gamma_scale1))print("Gamma weight 1:", torch.exp(gamma_weight1) / (torch.exp(gamma_weight1) + torch.exp(gamma_weight2)))# --- Gamma #2 ---print("Gamma shape 2:", torch.exp(gamma_shape2))print("Gamma scale 2:", torch.exp(gamma_scale2) *500) # scale factor 500print("Gamma weight 2:", torch.exp(gamma_weight2) / (torch.exp(gamma_weight1) + torch.exp(gamma_weight2)))# --- von Mises #1 ---# % (2*np.pi) ensures the mu (angle) is wrapped within [0, 2π)print("Von Mises mu 1:", vonmises_mu1 % (2*np.pi))print("Von Mises kappa 1:", torch.exp(vonmises_kappa1))print("Von Mises weight 1:", torch.exp(vonmises_weight1) / (torch.exp(vonmises_weight1) + torch.exp(vonmises_weight2)))# --- von Mises #2 ---print("Von Mises mu 2:", vonmises_mu2 % (2*np.pi))print("Von Mises kappa 2:", torch.exp(vonmises_kappa2))print("Von Mises weight 2:", torch.exp(vonmises_weight2) / (torch.exp(vonmises_weight1) + torch.exp(vonmises_weight2)))
Intermediate output shape: torch.Size([1, 12])
Intermediate output values: tensor([ 0.2050, 0.4531, -0.0247, -0.4483, -0.1241, 0.0866, -0.1144, -0.5838,
-0.1406, 0.1756, -0.6430, 0.2865])
Gamma shape 1: tensor(1.2275)
Gamma scale 1: tensor(1.5732)
Gamma weight 1: tensor(0.4722)
Gamma shape 2: tensor(0.6387)
Gamma scale 2: tensor(441.6361)
Gamma weight 2: tensor(0.5278)
Von Mises mu 1: tensor(6.1687)
Von Mises kappa 1: tensor(0.5578)
Von Mises weight 1: tensor(0.3948)
Von Mises mu 2: tensor(0.1756)
Von Mises kappa 2: tensor(0.5257)
Von Mises weight 2: tensor(0.6052)
Plot the movement distributions
We can use the movement parameters to plot the step length and turning angle distributions for the sample covariates.
Code
# -------------------------------------------------------------------------# Define helper functions for calculating Gamma and von Mises log-densities# -------------------------------------------------------------------------def gamma_density(x, shape, scale):""" Computes the log of the Gamma density for each value in x. Args: x (Tensor): Input values for which to compute the density. shape (float): Gamma shape parameter scale (float): Gamma scale parameter Returns: Tensor: The log of the Gamma probability density at each x. """return-1*torch.lgamma(shape) - shape*torch.log(scale) \+ (shape -1)*torch.log(x) - x/scaledef vonmises_density(x, kappa, vm_mu):""" Computes the log of the von Mises density for each value in x. Args: x (Tensor): Input angles in radians. kappa (float): Concentration parameter (kappa) vm_mu (float): Mean direction parameter (mu) Returns: Tensor: The log of the von Mises probability density at each x. """return kappa*torch.cos(x - vm_mu) -1*(np.log(2*torch.pi) + torch.log(torch.special.i0(kappa)))# -------------------------------------------------------------------------# Round and display the mixture weights for the Gamma distributions# -------------------------------------------------------------------------gamma_weight1_recovered = torch.exp(gamma_weight1)/(torch.exp(gamma_weight1) + torch.exp(gamma_weight2))rounded_gamma_weight1 =round(gamma_weight1_recovered.item(), 2)gamma_weight2_recovered = torch.exp(gamma_weight2)/(torch.exp(gamma_weight1) + torch.exp(gamma_weight2))rounded_gamma_weight2 =round(gamma_weight2_recovered.item(), 2)# -------------------------------------------------------------------------# Round and display the mixture weights for the von Mises distributions# -------------------------------------------------------------------------vonmises_weight1_recovered = torch.exp(vonmises_weight1)/(torch.exp(vonmises_weight1) + torch.exp(vonmises_weight2))rounded_vm_weight1 =round(vonmises_weight1_recovered.item(), 2)vonmises_weight2_recovered = torch.exp(vonmises_weight2)/(torch.exp(vonmises_weight1) + torch.exp(vonmises_weight2))rounded_vm_weight2 =round(vonmises_weight2_recovered.item(), 2)# -------------------------------------------------------------------------# 1. Plotting the Gamma mixture distribution# a) Generate x values# b) Compute individual Gamma log densities# c) Exponentiate and combine using recovered weights# -------------------------------------------------------------------------x_values = torch.linspace(1, 101, 1000).to(device)gamma1_density = gamma_density(x_values, torch.exp(gamma_shape1), torch.exp(gamma_scale1))gamma2_density = gamma_density(x_values, torch.exp(gamma_shape2), torch.exp(gamma_scale2)*500)gamma_mixture_density = gamma_weight1_recovered*torch.exp(gamma1_density) \+ gamma_weight2_recovered*torch.exp(gamma2_density)# Move results to CPU and convert to NumPy for plottingx_values_np = x_values.cpu().numpy()gamma1_density_np = np.exp(gamma1_density.cpu().numpy())gamma2_density_np = np.exp(gamma2_density.cpu().numpy())gamma_mixture_density_np = gamma_mixture_density.cpu().numpy()# -------------------------------------------------------------------------# 2. Plot the Gamma distributions and their mixture# -------------------------------------------------------------------------filename_gamma_distributions =f'{output_dir}/id{buffalo_id}_gamma_distributions.png'plt.plot(x_values_np, gamma1_density_np, label=f'Gamma 1 Density: weight = {rounded_gamma_weight1}')plt.plot(x_values_np, gamma2_density_np, label=f'Gamma 2 Density: weight = {rounded_gamma_weight2}')plt.plot(x_values_np, gamma_mixture_density_np, label='Gamma Mixture Density')plt.xlabel('x')plt.ylabel('Density')plt.title('Gamma Density Function')plt.legend()plt.savefig(filename_gamma_distributions, dpi=300, bbox_inches='tight')plt.show()# -------------------------------------------------------------------------# 3. Plotting the von Mises mixture distribution# a) Generate x values from -π to π# b) Compute individual von Mises log densities# c) Exponentiate and combine using recovered weights# -------------------------------------------------------------------------x_values = torch.linspace(-np.pi, np.pi, 1000).to(device)vonmises1_density = vonmises_density(x_values, torch.exp(vonmises_kappa1), vonmises_mu1)vonmises2_density = vonmises_density(x_values, torch.exp(vonmises_kappa2), vonmises_mu2)vonmises_mixture_density = vonmises_weight1_recovered*torch.exp(vonmises1_density) \+ vonmises_weight2_recovered*torch.exp(vonmises2_density)# Move results to CPU and convert to NumPy for plottingx_values_np = x_values.cpu().numpy()vonmises1_density_np = np.exp(vonmises1_density.cpu().numpy())vonmises2_density_np = np.exp(vonmises2_density.cpu().numpy())vonmises_mixture_density_np = vonmises_mixture_density.cpu().numpy()# -------------------------------------------------------------------------# 4. Plot the von Mises distributions and their mixture# -------------------------------------------------------------------------filename_vonmises_distributions =f'{output_dir}/id{buffalo_id}_vonmises_distributions.png'plt.plot(x_values_np, vonmises1_density_np, label=f'Von Mises 1 Density: weight = {rounded_vm_weight1}')plt.plot(x_values_np, vonmises2_density_np, label=f'Von Mises 2 Density: weight = {rounded_vm_weight2}')plt.plot(x_values_np, vonmises_mixture_density_np, label='Von Mises Mixture Density')plt.xlabel('x (radians)')plt.ylabel('Density')plt.title('Von Mises Density Function')plt.ylim(0, 0.5) # Set a limit for the y-axisplt.legend()plt.savefig(filename_vonmises_distributions, dpi=300, bbox_inches='tight')plt.show()
Generate a distribution of movement parameters
To see how variable the movement parameters are across samples, we can generate a distribution of movement parameters from a batch of samples.
We take the code from above that we used to create the DataLoader for the test data and increase the batch size (to get more samples to create the distribution from).
As we’re not using the test dataset any more, we’ll just put all of the samples in the same batch, and generate movement parameters for all of them.
Code
# To use all of the test samplesprint(f'There are {len(dataset_test)} samples in the test dataset')bs =len(dataset_test) # batch sizedataloader_test = DataLoader(dataset=dataset_test, batch_size=bs, shuffle=True)torch.cuda.empty_cache()
There are 1011 samples in the test dataset
Take all of the samples from the test dataset and put them in a single batch.
Code
# -----------------------------------------------------------# Fetch a batch of data from the training dataloader# -----------------------------------------------------------x1_batch, x2_batch, x3_batch, labels =next(iter(dataloader_test))# Move the input batches to the same device as the modelx1_batch = x1_batch.to(device)x2_batch = x2_batch.to(device)x3_batch = x3_batch.to(device)# -----------------------------------------------------------# Register a forward hook to capture the outputs# from 'fcn_movement_all' during the forward pass# -----------------------------------------------------------hook_handle = model.fcn_movement_all.register_forward_hook(hook)# -----------------------------------------------------------# Perform a forward pass in evaluation mode to generate# and capture the sub-network's outputs in 'intermediate_output'# -----------------------------------------------------------model.eval() # Disables certain layers like dropout# Pass the batch through the modelfinal_output = model((x1_batch, x2_batch, x3_batch))# -----------------------------------------------------------# Prepare lists to store the distribution parameters# for each sample in the batch# -----------------------------------------------------------gamma_shape1_list = []gamma_scale1_list = []gamma_weight1_list = []gamma_shape2_list = []gamma_scale2_list = []gamma_weight2_list = []vonmises_mu1_list = []vonmises_kappa1_list = []vonmises_weight1_list = []vonmises_mu2_list = []vonmises_kappa2_list = []vonmises_weight2_list = []# -----------------------------------------------------------# Extract parameters from 'intermediate_output'# for every sample in the batch# -----------------------------------------------------------for batch_output in intermediate_output:# Each 'batch_output' corresponds to one forward pass;# it might contain multiple samples if the batch size > 1for sample_output in batch_output:# Unpack the 12 parameters of the Gamma and von Mises mixtures gamma_shape1, gamma_scale1, gamma_weight1, \ gamma_shape2, gamma_scale2, gamma_weight2, \ vonmises_mu1, vonmises_kappa1, vonmises_weight1, \ vonmises_mu2, vonmises_kappa2, vonmises_weight2 = sample_output# Convert log-space parameters to real space, then store gamma_shape1_list.append(torch.exp(gamma_shape1).item()) gamma_scale1_list.append(torch.exp(gamma_scale1).item()) gamma_weight1_list.append( (torch.exp(gamma_weight1)/(torch.exp(gamma_weight1) + torch.exp(gamma_weight2))).item() ) gamma_shape2_list.append(torch.exp(gamma_shape2).item()) gamma_scale2_list.append((torch.exp(gamma_scale2)*500).item()) # scale factor 500 gamma_weight2_list.append( (torch.exp(gamma_weight2)/(torch.exp(gamma_weight1) + torch.exp(gamma_weight2))).item() ) vonmises_mu1_list.append((vonmises_mu1 % (2*np.pi)).item()) vonmises_kappa1_list.append(torch.exp(vonmises_kappa1).item()) vonmises_weight1_list.append( (torch.exp(vonmises_weight1)/(torch.exp(vonmises_weight1) + torch.exp(vonmises_weight2))).item() ) vonmises_mu2_list.append((vonmises_mu2 % (2*np.pi)).item()) vonmises_kappa2_list.append(torch.exp(vonmises_kappa2).item()) vonmises_weight2_list.append( (torch.exp(vonmises_weight2)/(torch.exp(vonmises_weight1) + torch.exp(vonmises_weight2))).item() )
Plot the distribution of movement parameters
Code
# -----------------------------------------------------------# Define a helper function to plot histograms# for the collected parameters# -----------------------------------------------------------def plot_histogram(data, title, xlabel):""" Plots a histogram of the provided data. Args: data (list): Data points to plot in a histogram. title (str): Title of the histogram plot. xlabel (str): X-axis label. """ plt.figure() plt.hist(data, bins=30, alpha=0.75) plt.title(title) plt.xlabel(xlabel) plt.ylabel('Frequency')# Generate a filename from the title (replace spaces with underscores) filename = title.replace(' ', '_') +'.png' filepath = os.path.join(output_dir, filename)# Save the figure plt.savefig(filepath, dpi=300, bbox_inches='tight') plt.show() plt.close() # Close the figure to free memory\# -----------------------------------------------------------# Plot histograms for each parameter distribution# -----------------------------------------------------------plot_histogram(gamma_shape1_list, 'Gamma Shape 1 Distribution', 'Shape 1')plot_histogram(gamma_scale1_list, 'Gamma Scale 1 Distribution', 'Scale 1')plot_histogram(gamma_weight1_list, 'Gamma Weight 1 Distribution', 'Weight 1')plot_histogram(gamma_shape2_list, 'Gamma Shape 2 Distribution', 'Shape 2')plot_histogram(gamma_scale2_list, 'Gamma Scale 2 Distribution', 'Scale 2')plot_histogram(gamma_weight2_list, 'Gamma Weight 2 Distribution', 'Weight 2')plot_histogram(vonmises_mu1_list, 'Von Mises Mu 1 Distribution', 'Mu 1')plot_histogram(vonmises_kappa1_list, 'Von Mises Kappa 1 Distribution', 'Kappa 1')plot_histogram(vonmises_weight1_list, 'Von Mises Weight 1 Distribution', 'Weight 1')plot_histogram(vonmises_mu2_list, 'Von Mises Mu 2 Distribution', 'Mu 2')plot_histogram(vonmises_kappa2_list, 'Von Mises Kappa 2 Distribution', 'Kappa 2')plot_histogram(vonmises_weight2_list, 'Von Mises Weight 2 Distribution', 'Weight 2')# -----------------------------------------------------------# Remove the hook to stop capturing outputs# in subsequent forward passes# -----------------------------------------------------------hook_handle.remove()
Validate the predictions from the model
We want to assess how well the model is generating movement, habitat selection and next-step probabilities, so we will extract those probabilities at the location of each observed step.
Importing spatial data
Instead of importing the stacks of local layers (one for each step), here we want to import the spatial covariates for the extent we want to simulate over. We use an extent that covers all of the observed locations, which refer to as the ‘landscape’.
NDVI
We have monthly NDVI layers for 2018 and 2019, which we import as a stack. The files don’t import with a time component, so we will use a function further down that indexes them correctly.
Code
# for monthly NDVIfile_path =f'{base_path}/mapping/cropped rasters/ndvi_monthly.tif'# read the raster filewith rasterio.open(file_path) as src:# Read the raster band as separate variable ndvi_landscape = src.read([i for i inrange(1, src.count +1)])# Get the metadata of the raster ndvi_meta = src.meta raster_transform = src.transform# Print the metadata to check for time componentprint("Metadata:", ndvi_meta)# Check for specific time-related metadataif'TIFFTAG_DATETIME'in src.tags():print("Time component found:", src.tags()['TIFFTAG_DATETIME'])else:print("No explicit time component found in metadata.")# the rasters don't contain a time component, so we will use a function later to index the layers correctly
Metadata: {'driver': 'GTiff', 'dtype': 'float32', 'nodata': nan, 'width': 2400, 'height': 2280, 'count': 24, 'crs': CRS.from_wkt('LOCAL_CS["GDA94 / Geoscience Australia Lambert",UNIT["metre",1,AUTHORITY["EPSG","9001"]],AXIS["Easting",EAST],AXIS["Northing",NORTH]]'), 'transform': Affine(25.0, 0.0, 0.0,
0.0, -25.0, -1406000.0)}
No explicit time component found in metadata.
Prepare the NDVI data
There are a few things we need to do to prepare the landscape layers.
First, we need to ensure that there are no NA values in the data. For NDVI we will replace any NA values with -1 (which denotes water), as in our case that is typically why they were set to NA.
Secondly, the model expects the covariates to on the same scale as the training data. We will therefore scale the NDVI data using the same max and min scaling parameters as the training data. To get these, there are some min and max print statements in the deepSSF_train.ipynb script. When we plot the NDVI data below we will see that the values will no longer range from 0 to 1, which is because there are values in the landscape layers that are outside of the range of the training data.
Code
# Check the coordinate reference systemprint("NDVI metadata:")print(ndvi_meta)print("\n")# Have a look at the affine transformation parameters that are used to convert pixel# coordinates to geographic coordinates and vice versaprint("Affine transformation parameters:")print(raster_transform)print("\n")# Check the shape (layers, row, columns) of the rasterprint("Shape of the raster:")print(ndvi_landscape.shape)# Replace NaNs in the original array with -1, which represents waterndvi_landscape = np.nan_to_num(ndvi_landscape, nan=-1.0)# from the stack of local layers (training data)ndvi_max =0.8220ndvi_min =-0.2772# Convert the numpy array to a PyTorch tensorndvi_landscape_tens = torch.from_numpy(ndvi_landscape)# Normalizing the datandvi_landscape_norm = (ndvi_landscape_tens - ndvi_min) / (ndvi_max - ndvi_min)# Show two example layers of the scaled NDVI datalayer_index =1plt.imshow(ndvi_landscape_norm[layer_index,:,:].numpy())plt.colorbar()plt.title(f'NDVI layer index {layer_index}')plt.show()layer_index =8plt.imshow(ndvi_landscape_norm[layer_index,:,:].numpy())plt.colorbar()plt.title(f'NDVI layer index {layer_index}')plt.show()
# Path to the canopy cover raster filefile_path =f'{base_path}/mapping/cropped rasters/canopy_cover.tif'# read the raster filewith rasterio.open(file_path) as src:# Read the raster band as separate variable canopy_landscape = src.read(1)# Get the metadata of the raster canopy_meta = src.meta
Prepare the canopy cover data
As with the NDVI data, we need to ensure that there are no NA values in the data.
As the canopy cover values in the landscape layer are within the same range as the training data, we see that the values range from 0 to 1.
Code
# Check the canopy metadata:print("Canopy metadata:")print(canopy_meta)print("\n")# Check the shape (rows, columns) of the canopy raster:print("Shape of canopy raster:")print(canopy_landscape.shape)print("\n")# Check for NA values in the canopy raster:print("Number of NA values in the canopy raster:")print(np.isnan(canopy_landscape).sum())# Define the maximum and minimum canopy values from the stack of local layers:canopy_max =82.5000canopy_min =0.0# Convert the canopy data from a NumPy array to a PyTorch tensor:canopy_landscape_tens = torch.from_numpy(canopy_landscape)# Normalise the canopy data:canopy_landscape_norm = (canopy_landscape_tens - canopy_min) / (canopy_max - canopy_min)# Visualise the normalised canopy cover:plt.imshow(canopy_landscape_norm.numpy())plt.colorbar()plt.title('Canopy Cover')plt.show()
Canopy metadata:
{'driver': 'GTiff', 'dtype': 'float32', 'nodata': -3.3999999521443642e+38, 'width': 2400, 'height': 2280, 'count': 1, 'crs': CRS.from_wkt('LOCAL_CS["GDA94 / Geoscience Australia Lambert",UNIT["metre",1,AUTHORITY["EPSG","9001"]],AXIS["Easting",EAST],AXIS["Northing",NORTH]]'), 'transform': Affine(25.0, 0.0, 0.0,
0.0, -25.0, -1406000.0)}
Shape of canopy raster:
(2280, 2400)
Number of NA values in the canopy raster:
0
Herbaceous vegetation
Code
# Path to the herbaceous vegetation raster filefile_path =f'{base_path}/mapping/cropped rasters/veg_herby.tif'# read the raster filewith rasterio.open(file_path) as src:# Read the raster band as separate variable herby_landscape = src.read(1)# Get the metadata of the raster herby_meta = src.meta
Code
# Check the herbaceous metadata:print("Herbaceous metadata:")print(herby_meta)print("\n")# Check the shape (rows, columns) of the herbaceous raster:print("Shape of herbaceous raster:")print(herby_landscape.shape)print("\n")# Check for NA values in the herby raster:print("Number of NA values in the herbaceous vegetation raster:")print(np.isnan(herby_landscape).sum())# Define the maximum and minimum herbaceous values from the stack of local layers:herby_max =1.0herby_min =0.0# Convert the herbaceous data from a NumPy array to a PyTorch tensor:herby_landscape_tens = torch.from_numpy(herby_landscape)# Normalize the herbaceous data:herby_landscape_norm = (herby_landscape_tens - herby_min) / (herby_max - herby_min)# Visualize the normalised herbaceous cover:plt.imshow(herby_landscape_norm.numpy())plt.colorbar()plt.show()
Herbaceous metadata:
{'driver': 'GTiff', 'dtype': 'float32', 'nodata': -3.3999999521443642e+38, 'width': 2400, 'height': 2280, 'count': 1, 'crs': CRS.from_wkt('LOCAL_CS["GDA94 / Geoscience Australia Lambert",UNIT["metre",1,AUTHORITY["EPSG","9001"]],AXIS["Easting",EAST],AXIS["Northing",NORTH]]'), 'transform': Affine(25.0, 0.0, 0.0,
0.0, -25.0, -1406000.0)}
Shape of herbaceous raster:
(2280, 2400)
Number of NA values in the herbaceous vegetation raster:
0
Slope
Code
# Path to the slope raster filefile_path =f'{base_path}/mapping/cropped rasters/slope.tif'# read the raster filewith rasterio.open(file_path) as src:# Read the raster band as separate variable slope_landscape = src.read(1)# Get the metadata of the raster slope_meta = src.meta
Code
# Check the slope metadata:print("Slope metadata:")print(slope_meta)print("\n")# Check the shape (rows, columns) of the slope landscape raster:print("Shape of slope landscape raster:")print(slope_landscape.shape)print("\n")# Check for NA values in the slope raster:print("Number of NA values in the slope raster:")print(np.isnan(slope_landscape).sum())# Replace NaNs in the slope array with 0.0:slope_landscape = np.nan_to_num(slope_landscape, nan=0.0)# Define the maximum and minimum slope values from the stack of local layers:slope_max =12.2981slope_min =0.0006# Convert the slope landscape data from a NumPy array to a PyTorch tensor:slope_landscape_tens = torch.from_numpy(slope_landscape)# Normalize the slope landscape data:slope_landscape_norm = (slope_landscape_tens - slope_min) / (slope_max - slope_min)# Visualize the slope landscape (note: displaying the original tensor, not the normalised data):plt.imshow(slope_landscape_tens.numpy())plt.colorbar()plt.show()
Slope metadata:
{'driver': 'GTiff', 'dtype': 'float32', 'nodata': nan, 'width': 2400, 'height': 2280, 'count': 1, 'crs': CRS.from_wkt('LOCAL_CS["GDA94 / Geoscience Australia Lambert",UNIT["metre",1,AUTHORITY["EPSG","9001"]],AXIS["Easting",EAST],AXIS["Northing",NORTH]]'), 'transform': Affine(25.0, 0.0, 0.0,
0.0, -25.0, -1406000.0)}
Shape of slope landscape raster:
(2280, 2400)
Number of NA values in the slope raster:
9356
Subset function (with padding)
Now that we have our landscape layers imported, we need a way to crop out the local layers that can be fed into the deepSSF model as covariates.
We will use the same subset function as in the simulation script, which we stored in the deepSSF_utils.py script. This function will take the landscape layers and a set of coordinates, and return the local layers for those coordinates.
This function also has padding for if the simulated individual was to go off the edge of the landscape, which we retain here (although we won’t need that functionality).
Use the subset function to crop out the local layers for all covariates. Try different locations using the x and y coordinates, which are in geographic coordinates (x = easting/longitude, y = northing/latitude).
Code
# Pick a location (x, y) from the buffalo DataFramex = buffalo_df['x1_'].iloc[0]y = buffalo_df['y1_'].iloc[0]# Define the size of the window to extractwindow_size =101# Select the NDVI layer indexwhich_ndvi =1# Extract subsets from various raster layers using the custom function.# Each call centres the window at the specified (x, y) location and applies padding where necessary.ndvi_subset, origin_x, origin_y = subset_raster(ndvi_landscape_norm[which_ndvi, :, :], x, y, window_size, raster_transform)canopy_subset, origin_x, origin_y = subset_raster(canopy_landscape_norm, x, y, window_size, raster_transform)herby_subset, origin_x, origin_y = subset_raster(herby_landscape_norm, x, y, window_size, raster_transform)slope_subset, origin_x, origin_y = subset_raster(slope_landscape_norm, x, y, window_size, raster_transform)# Create a 2x2 grid of subplots with a fixed figure size.fig, axs = plt.subplots(2, 2, figsize=(10, 8))# Plot the NDVI subset.im0 = axs[0, 0].imshow(ndvi_subset.numpy(), cmap='viridis')fig.colorbar(im0, ax=axs[0, 0], shrink=0.8)axs[0, 0].set_title('NDVI Subset')# Plot the Canopy Cover subset.im1 = axs[0, 1].imshow(canopy_subset.numpy(), cmap='viridis')fig.colorbar(im1, ax=axs[0, 1], shrink=0.8)axs[0, 1].set_title('Canopy Cover Subset')# Plot the Herbaceous Vegetation subset.im2 = axs[1, 0].imshow(herby_subset.numpy(), cmap='viridis')fig.colorbar(im2, ax=axs[1, 0], shrink=0.8)axs[1, 0].set_title('Herbaceous Vegetation Subset')# Plot the Slope subset.im3 = axs[1, 1].imshow(slope_subset.numpy(), cmap='viridis')fig.colorbar(im3, ax=axs[1, 1], shrink=0.8)axs[1, 1].set_title('Slope Subset')
Text(0.5, 1.0, 'Slope Subset')
Create a mask for edge cells
Due to the padding at the edges of the covariates, convolutional layers create artifacts that can affect the colour scale of the predictions when plotting. To avoid this, we will create a mask that we can apply to the predictions to remove the edge cells.
Code
# Create a mask to remove the edge values for plotting# (as it affects the colour scale)x_mask = np.ones_like(ndvi_subset)y_mask = np.ones_like(ndvi_subset)# Mask out bordering cellsx_mask[:, :3] =-np.infx_mask[:, 98:] =-np.infy_mask[:3, :] =-np.infy_mask[98:, :] =-np.inf
Setup simulation parameters
To get the simulation running we need a few extra functions.
Firstly, we need to index the NDVI layers correctly, based on the time of the simulated location. We’ll do this by creating a function that takes day of the year of the simulated location and returns the correct index for the NDVI layers.
Recall that Python indexes from 0, so when the month_index is equal to 2 for instance, this will index the third layer, which is for March.
Code
# Create a mapping from day of the year to month indexdef day_to_month_index(day_of_year):# Calculate the year and the day within that year base_date = datetime(2018, 1, 1) # base date for the calculation, which is when the NDVI layers start date = base_date + timedelta(days=int(day_of_year) -1) year_diff = date.year - base_date.year month_index = (date.month -1) + (year_diff *12) # month index (0-based, accounting for year change)return month_indexyday =70# day of the year, which is March 11thmonth_index = day_to_month_index(yday)print(month_index)
2
Next-step probability values
We can now calculate the next-step probabilities for each observed step. As we generate habitat selection, movement and next-step probability surfaces, we can get the predicted probability values for each one, which can be compared to the respective process in the SSF.
The process for generating the next-step probabilities is as follows:
Get the current location of the individual
Crop out the local layers for the current location
Run the model of the local layers to get the habitat selection, movement and next-step probability surfaces
Get the predicted probability values at the location of the next step
Store the predicted probability values and export them as a csv for comparison with the SSF
First, select the data to generate prediction values for. For testing the function we can select a subset.
Code
# To select a subset of samples to test the function# test_data = buffalo_df.iloc[0:10]# To select all of the datatest_data = buffalo_df# Get the number of samples in the test datan_samples =len(test_data)print(f'Number of samples: {n_samples}')# Create empty vectors to store the predicted probabilitieshabitat_probs = np.repeat(0., n_samples)move_probs = np.repeat(0., n_samples)next_step_probs = np.repeat(0., n_samples)
Number of samples: 10103
Loop over each step
Code
# Create directory for saving prediction imagesos.makedirs(f'{output_dir}/prediction_images', exist_ok=True)# Start at 1 so the bearing at t - 1 is availablefor i inrange(1, n_samples):# for i in range(1, 4): sample = test_data.iloc[i]# Current location (x1, y1) x = sample['x1_'] y = sample['y1_']# Convert geographic coordinates to pixel coordinates px, py =~raster_transform * (x, y)# print('px and py are ', px, py) # Debugging# Next step location (x2, y2) x2 = sample['x2_'] y2 = sample['y2_']# Convert geographic coordinates to pixel coordinates px2, py2 =~raster_transform * (x2, y2)# print('px2 and py2 are ', px2, py2) # Debugging# The difference in x and y coordinates d_x = x2 - x d_y = y2 - y# print('d_x and d_y are ', d_x, d_y) # Debugging# The difference in pixel coordinates d_px = px2 - px d_py = py2 - py# print('d_px and d_py are ', d_px, d_py) # Debugging# Temporal covariates hour_t2_sin = sample['hour_t2_sin'] hour_t2_cos = sample['hour_t2_cos'] yday_t2_sin = sample['yday_t2_sin'] yday_t2_cos = sample['yday_t2_cos']# Bearing of previous step (t - 1) bearing = sample['bearing_tm1']# Hour of the day (for saving the plot) hour_t2 = sample['hour_t2']# Day of the year yday = sample['yday_t2']# Convert day of the year to month index month_index = day_to_month_index(yday)# print(month_index)# Extract the subset of the covariates at the location of x1, y1# NDVI ndvi_subset, origin_x, origin_y = subset_raster(ndvi_landscape_norm[month_index,:,:], x, y, window_size, raster_transform)# Canopy cover canopy_subset, origin_x, origin_y = subset_raster(canopy_landscape_norm, x, y, window_size, raster_transform)# Herbaceous vegetation herby_subset, origin_x, origin_y = subset_raster(herby_landscape_norm, x, y, window_size, raster_transform)# Slope slope_subset, origin_x, origin_y = subset_raster(slope_landscape_norm, x, y, window_size, raster_transform)# Location of the current step in local pixel coordinates px_subset = px - origin_x py_subset = py - origin_y# print('px_subset and py_subset are ', px_subset, py_subset) # Debugging# Location of the next step in local pixel coordinates px2_subset = px2 - origin_x py2_subset = py2 - origin_y# print('px2_subset and py2_subset are ', px2_subset, py2_subset, '\n') # Debugging# print(int(py2_subset), int(px2_subset))# Location of the next step in local pixel coordinates px2_subset_corrected = (px2 - px) + (px - origin_x) py2_subset_corrected = (py2 - py) + (py - origin_y)# print('px2_subset_corrected and py2_subset_corrected are ', px2_subset_corrected, py2_subset_corrected, '\n') # Debugging# Extract the value of the covariates at the location of x2, y2# value = ndvi_subset.detach().cpu().numpy()[(int(py2_subset), int(px2_subset))]# Stack the channels along a new axis x1 = torch.stack([ndvi_subset, canopy_subset, herby_subset, slope_subset], dim=0)# Add a batch dimension (required to be the correct dimension for the model) x1 = x1.unsqueeze(0).to(device)# print(x1.shape)# Convert lists to PyTorch tensors hour_t2_sin_tensor = torch.tensor(hour_t2_sin).float() hour_t2_cos_tensor = torch.tensor(hour_t2_cos).float() yday_t2_sin_tensor = torch.tensor(yday_t2_sin).float() yday_t2_cos_tensor = torch.tensor(yday_t2_cos).float()# Stack tensors x2 = torch.stack((hour_t2_sin_tensor.unsqueeze(0), hour_t2_cos_tensor.unsqueeze(0), yday_t2_sin_tensor.unsqueeze(0), yday_t2_cos_tensor.unsqueeze(0)), dim=1)# print(x2)# print(x2.shape) x2 = x2.to(device)# Put bearing in the correct dimension (batch_size, 1) bearing = torch.tensor(bearing).float().unsqueeze(0).unsqueeze(0)# print(bearing)# print(bearing.shape) bearing = bearing.to(device)# -------------------------------------------------------------------------# Run the model# ------------------------------------------------------------------------- model_output = model((x1, x2, bearing))# -------------------------------------------------------------------------# Habitat selection probability# ------------------------------------------------------------------------- hab_density = model_output.detach().cpu().numpy()[0,:,:,0] hab_density_exp = np.exp(hab_density)# Normalise the probability surface to sum to 1 hab_density_exp_norm = hab_density_exp / np.sum(hab_density_exp)# print(np.sum(hab_density_exp_norm)) # Should be 1# Store the probability of habitat selection at the location of x2, y2# These probabilities are normalised in the model function habitat_probs[i] = hab_density_exp_norm[(int(py2_subset), int(px2_subset))]# print('Habitat probability value = ', habitat_probs[i])# -------------------------------------------------------------------------# Movement probability# ------------------------------------------------------------------------- move_density = model_output.detach().cpu().numpy()[0,:,:,1] move_density_exp = np.exp(move_density)# Normalise the probability surface to sum to 1 move_density_exp_norm = move_density_exp / np.sum(move_density_exp)# print(np.sum(move_density_exp_norm)) # Should be 1# Store the movement probability at the location of x2, y2# These probabilities are normalised in the model function move_probs[i] = move_density_exp_norm[(int(py2_subset), int(px2_subset))]# print('Movement probability value = ', move_probs[i])# -------------------------------------------------------------------------# Next step probability# ------------------------------------------------------------------------- step_density = hab_density + move_density step_density_exp = np.exp(step_density)# print('Sum of step density exp = ', np.sum(step_density_exp)) # Won't be 1 step_density_exp_norm = step_density_exp / np.sum(step_density_exp)# print('Sum of step density exp norm = ', np.sum(step_density_exp_norm)) # Should be 1# Extract the value of the covariates at the location of x2, y2 next_step_probs[i] = step_density_exp_norm[(int(py2_subset), int(px2_subset))]# print('Next-step probability value = ', next_step_probs[i])# -------------------------------------------------------------------------# Plot the next-step predictions# -------------------------------------------------------------------------# Plot the first few probability surfaces - change the condition to i < n_steps to plot allif i <51:# Mask out bordering cells hab_density_mask = np.log(hab_density_exp_norm) * x_mask * y_mask move_density_mask = np.log(move_density_exp_norm) * x_mask * y_mask step_density_mask = np.log(step_density_exp_norm) * x_mask * y_mask# Create a mask for the next step next_step_mask = np.ones_like(hab_density) next_step_mask[int(py2_subset), int(px2_subset)] =-np.inf# Plot the outputs fig_out, axs_out = plt.subplots(2, 2, figsize=(10, 8))# Plot NDVI im1 = axs_out[0, 0].imshow(ndvi_subset.numpy(), cmap='viridis') axs_out[0, 0].set_title('NDVI') fig_out.colorbar(im1, ax=axs_out[0, 0], shrink=0.7)# Plot habitat selection log-probability im2 = axs_out[0, 1].imshow(hab_density_mask * next_step_mask, cmap='viridis') axs_out[0, 1].set_title('Habitat selection log-probability') fig_out.colorbar(im2, ax=axs_out[0, 1], shrink=0.7)# Movement density log-probability im3 = axs_out[1, 0].imshow(move_density_mask * next_step_mask, cmap='viridis') axs_out[1, 0].set_title('Movement log-probability') fig_out.colorbar(im3, ax=axs_out[1, 0], shrink=0.7)# Next-step probability im4 = axs_out[1, 1].imshow(step_density_mask * next_step_mask, cmap='viridis') axs_out[1, 1].set_title('Next-step log-probability') fig_out.colorbar(im4, ax=axs_out[1, 1], shrink=0.7) filename_covs =f'{output_dir}/prediction_images/id{buffalo_id}_step{i+1}_yday{yday}_hour{hour_t2}.png' plt.tight_layout() plt.savefig(filename_covs, dpi=300)# plt.show() plt.close() # Close the figure to free memory
Create gif of next-step predictions against the observed data
Change the extract epoch to extract step for ordering the images.
Code
# Example sorting by the epoch numberdef extract_step(filename):# Extract the epoch number from the filename# Adjust the extraction based on your naming patternimport re match = re.search(r'_step(\d+)_', filename)if match:returnint(match.group(1))return0def create_gif(image_folder, output_filename, fps=10):""" Creates a GIF from a sequence of images in a folder. Parameters: - image_folder: Path to the folder containing images - output_filename: Name of the output GIF file - duration: Duration of each frame in seconds """# Get all png files in the specified folder, sorted by name images =sorted(glob.glob(os.path.join(image_folder, '*.png')), key=extract_step)# Check if any images were foundifnot images:print(f"No images found in {image_folder}")return# Read all images frames = [imageio.imread(image) for image in images]# Save as GIF imageio.mimsave(output_filename, frames, fps=fps, loop=0) display(Image(filename=output_filename))print(f"GIF created successfully: {output_filename}")
Code
# Path to your imagesimage_folder =f'{output_dir}/prediction_images'# Output GIF filenameoutput_filename =f'{output_dir}/prediction_gif_id{buffalo_id}_yday{yday_t2_integer}_hour{hour_t2_integer}_bearing{bearing_degrees}_next_r{row}_c{column}.gif'# Create the GIFcreate_gif(image_folder, output_filename, fps=5)
<IPython.core.display.Image object>
GIF created successfully: ../Python/outputs/model_training/id2005_deepSSF_training_1_2025-07-09/prediction_gif_id2005_yday324_hour17_bearing156_next_r46_c44.gif
Calculate the null probabilities
As each cell has a probability values, we can calculate what the probability would be if the model provided no information at all, and each cell was equally likely to be the next step. This is just 1 divided by the total number of cells.
rolling_window_size =100# Rolling window size# Convert to pandas Series and compute rolling meanrolling_mean_habitat = pd.Series(habitat_probs).rolling(window=window_size, center=True).mean()rolling_mean_movement = pd.Series(move_probs).rolling(window=window_size, center=True).mean()rolling_mean_next_step = pd.Series(next_step_probs).rolling(window=window_size, center=True).mean()
Plot the probabilities
We can get an idea of how variable the probabilities are for the habitat selection and movement surfaces, and for the next-step probabilities, by plotting them across the trajectory
Code
# Plot the habitat probs through time as a line graphplt.plot(habitat_probs[habitat_probs >0], color='blue', label='Habitat Probabilities')plt.plot(rolling_mean_habitat[rolling_mean_habitat >0], color='red', label='Rolling Mean')plt.axhline(y=null_prob, color='black', linestyle='--', label='Null Probability') # null probsplt.xlabel('Index')plt.ylabel('Probability')plt.title('Habitat Probability')plt.legend() # Add legend to differentiate linesplt.savefig(f'{output_dir}/id{buffalo_id}_habitat_probs.png', dpi=300, bbox_inches='tight')plt.show()# Plot the movement probs through time as a line graphplt.plot(move_probs[move_probs >0], color='blue', label='Movement Probabilities')plt.plot(rolling_mean_movement[rolling_mean_movement >0], color='red', label='Rolling Mean')plt.axhline(y=null_prob, color='black', linestyle='--', label='Null Probability') # null probsplt.xlabel('Index')plt.ylabel('Probability')plt.title('Movement Probability')plt.legend() # Add legend to differentiate linesplt.savefig(f'{output_dir}/id{buffalo_id}_movement_probs.png', dpi=300, bbox_inches='tight')plt.show()# Plot the next step probs through time as a line graphplt.plot(next_step_probs[next_step_probs >0], color='blue', label='Next Step Probabilities')plt.plot(rolling_mean_next_step[rolling_mean_next_step >0], color='red', label='Rolling Mean')plt.axhline(y=null_prob, color='black', linestyle='--', label='Null Probability') # null probsplt.xlabel('Index')plt.ylabel('Probability')plt.title('Next Step Probability')plt.legend() # Add legend to differentiate linesplt.savefig(f'{output_dir}/id{buffalo_id}_next_step_probs.png', dpi=300, bbox_inches='tight')plt.show()
Save the probabilities
We can save the probabilities to a csv file to compare with the SSF probabilities.
Code
# Append the probabilities to the dataframebuffalo_df['habitat_probs'] = habitat_probsbuffalo_df['move_probs'] = move_probsbuffalo_df['next_step_probs'] = next_step_probscsv_filename =f'{output_dir}/deepSSF_id{buffalo_id}_n{len(test_data)}_{today_date}.csv'print(csv_filename)buffalo_df.to_csv(csv_filename, index=True)