Skip to content

API Reference

NickySpatial: An open-source object-based image analysis library for remote sensing

NickySpatial is a Python package for object-based image analysis, providing functionality similar to commercial software like eCognition.

Key features: - Multiresolution segmentation - Object-based analysis - Rule-based classification - Statistics calculation - Integration with geospatial data formats

nickyspatial.core

The core package encompasses fundamental data structures and algorithms for nickyspatial.

It helps define the building blocks like layers, segmentation methods, and rule-based logic for object analysis workflows.

Implements supervised classification algorithms to classify the segments.

SupervisedClassifier

Implementation of Supervised Classification algorithm.

Source code in nickyspatial/core/classifier.py
class SupervisedClassifier:
    """Implementation of Supervised Classification algorithm."""

    # TODO: name vs layer_name

    def __init__(self, name=None, classifier_type="Random Forests", classifier_params=None):
        """Initialize the segmentation algorithm.

        Parameters:
        -----------
        scale : str
            classifier type name eg: RF for Random Forest, SVC for Support Vector Classifier
        classifier_params : dict
           additional parameters relayed to classifier
        """
        self.classifier_type = classifier_type
        self.classifier_params = classifier_params
        self.training_layer = None
        self.classifier = None
        self.name = name if name else "Supervised_Classification"
        self.features = None

    def _training_sample(self, layer, samples):
        """Create vector objects from segments.

        Parameters:
        -----------
        samples : dict
            key: class_name
            values: list of segment_ids
            eg: {"cropland":[1,2,3],"built-up":[4,5,6]}

        Returns:
        --------
        segment_objects : geopandas.GeoDataFrame
            GeoDataFrame with segment polygons
        """
        layer["classification"] = None

        for class_name in samples.keys():
            layer.loc[layer["segment_id"].isin(samples[class_name]), "classification"] = class_name

        layer = layer[layer["classification"].notna()]
        self.training_layer = layer
        return layer

    def _train(self, features):
        """Train the classifier using the training samples and compute accuracy and feature importances.

        Parameters
        ----------
        features : list of str or None
            List of feature column names to use. If None, all columns except segment_id, geometry, and classification are used.

        Returns:
        -------
        classifier : sklearn classifier object
            The trained classifier.
        test_accuracy : float
            Accuracy score on training data.
        feature_importances : pd.Series or None
            Feature importances (only for Random Forest), else None.
        """
        self.features = features
        if not self.features:
            self.features = self.training_layer.columns
        self.features = [col for col in self.features if col not in ["segment_id", "classification", "geometry"]]

        x = self.training_layer[self.features]
        y = self.training_layer["classification"]

        # Random Forest
        if self.classifier_type == "Random Forest":
            self.classifier = RandomForestClassifier(**self.classifier_params)
            self.classifier.fit(x, y)
            test_accuracy = self.classifier.oob_score_
            feature_importances = pd.Series(self.classifier.feature_importances_, index=self.features) * 100
            feature_importances = feature_importances.sort_values(ascending=False)

        # Support Vector Machine (SVC)
        elif self.classifier_type == "SVC":
            self.classifier = SVC(**self.classifier_params)
            self.classifier.fit(x, y)
            predictions = self.classifier.predict(x)
            test_accuracy = accuracy_score(y, predictions)
            feature_importances = None  # SVM does not support feature importances

        # K-Nearest Neighbors (KNN)
        elif self.classifier_type == "KNN":
            self.classifier = KNeighborsClassifier(**self.classifier_params)
            self.classifier.fit(x, y)
            predictions = self.classifier.predict(x)
            test_accuracy = accuracy_score(y, predictions)
            feature_importances = None  # KNN does not support feature importances

        else:
            raise ValueError(f"Unsupported classifier type: {self.classifier_type}")

        return self.classifier, test_accuracy, feature_importances

    def _prediction(self, layer):
        """Perform classification prediction on input layer features.

        Parameters
        ----------
        layer : geopandas.GeoDataFrame
            Input data containing at least a 'segment_id' and 'geometry' column, along with
            feature columns required by the classifier. If a 'classification' column does not
            exist, it will be created.

        Returns:
        -------
        The input layer with an updated 'classification' column containing predicted labels.

        """
        layer["classification"] = ""
        # if not features:
        #     x = layer.drop(columns=["segment_id", "classification", "geometry"], errors="ignore")
        # else:
        x = layer[self.features]

        # print(layer.columns)
        # x = layer.drop(columns=["segment_id", "classification", "geometry"], errors="ignore")

        predictions = self.classifier.predict(x)
        layer.loc[layer["classification"] == "", "classification"] = predictions
        return layer

    def execute(self, source_layer, samples, layer_manager=None, layer_name=None, features=None):
        """Execute the supervised classification workflow on the source layer.

        This method creates a new layer by copying the input source layer, training a classifier
        using provided samples, predicting classifications, and storing the results in a new layer.
        Optionally, the resulting layer can be added to a layer manager.

        Parameters
        ----------
        source_layer : Layer
            The input layer containing spatial objects and metadata (transform, CRS, raster).
        samples : dict
            A dictionary of training samples where keys are class labels and values are lists
            of segment IDs or features used for training. Default is an empty dictionary.
        layer_manager : LayerManager, optional
            An optional layer manager object used to manage and store the resulting layer.
        layer_name : str, optional
            The name to assign to the resulting classified layer.

        Returns:
        -------
        Layer
            A new Layer object containing the predicted classifications, copied metadata from
            the source layer, and updated attributes.
        """
        result_layer = Layer(name=layer_name, parent=source_layer, type="merged")
        result_layer.transform = source_layer.transform
        result_layer.crs = source_layer.crs
        result_layer.raster = source_layer.raster.copy() if source_layer.raster is not None else None

        layer = source_layer.objects.copy()
        self._training_sample(layer, samples)
        _, accuracy, feature_importances = self._train(features)

        layer = self._prediction(layer)

        result_layer.objects = layer

        result_layer.metadata = {
            "supervised classification": self.name,
        }

        if layer_manager:
            layer_manager.add_layer(result_layer)

        return result_layer, accuracy, feature_importances

__init__(name=None, classifier_type='Random Forests', classifier_params=None)

Initialize the segmentation algorithm.

Parameters:

scale : str classifier type name eg: RF for Random Forest, SVC for Support Vector Classifier classifier_params : dict additional parameters relayed to classifier

Source code in nickyspatial/core/classifier.py
def __init__(self, name=None, classifier_type="Random Forests", classifier_params=None):
    """Initialize the segmentation algorithm.

    Parameters:
    -----------
    scale : str
        classifier type name eg: RF for Random Forest, SVC for Support Vector Classifier
    classifier_params : dict
       additional parameters relayed to classifier
    """
    self.classifier_type = classifier_type
    self.classifier_params = classifier_params
    self.training_layer = None
    self.classifier = None
    self.name = name if name else "Supervised_Classification"
    self.features = None

execute(source_layer, samples, layer_manager=None, layer_name=None, features=None)

Execute the supervised classification workflow on the source layer.

This method creates a new layer by copying the input source layer, training a classifier using provided samples, predicting classifications, and storing the results in a new layer. Optionally, the resulting layer can be added to a layer manager.

Parameters

source_layer : Layer The input layer containing spatial objects and metadata (transform, CRS, raster). samples : dict A dictionary of training samples where keys are class labels and values are lists of segment IDs or features used for training. Default is an empty dictionary. layer_manager : LayerManager, optional An optional layer manager object used to manage and store the resulting layer. layer_name : str, optional The name to assign to the resulting classified layer.

Returns:

Layer A new Layer object containing the predicted classifications, copied metadata from the source layer, and updated attributes.

Source code in nickyspatial/core/classifier.py
def execute(self, source_layer, samples, layer_manager=None, layer_name=None, features=None):
    """Execute the supervised classification workflow on the source layer.

    This method creates a new layer by copying the input source layer, training a classifier
    using provided samples, predicting classifications, and storing the results in a new layer.
    Optionally, the resulting layer can be added to a layer manager.

    Parameters
    ----------
    source_layer : Layer
        The input layer containing spatial objects and metadata (transform, CRS, raster).
    samples : dict
        A dictionary of training samples where keys are class labels and values are lists
        of segment IDs or features used for training. Default is an empty dictionary.
    layer_manager : LayerManager, optional
        An optional layer manager object used to manage and store the resulting layer.
    layer_name : str, optional
        The name to assign to the resulting classified layer.

    Returns:
    -------
    Layer
        A new Layer object containing the predicted classifications, copied metadata from
        the source layer, and updated attributes.
    """
    result_layer = Layer(name=layer_name, parent=source_layer, type="merged")
    result_layer.transform = source_layer.transform
    result_layer.crs = source_layer.crs
    result_layer.raster = source_layer.raster.copy() if source_layer.raster is not None else None

    layer = source_layer.objects.copy()
    self._training_sample(layer, samples)
    _, accuracy, feature_importances = self._train(features)

    layer = self._prediction(layer)

    result_layer.objects = layer

    result_layer.metadata = {
        "supervised classification": self.name,
    }

    if layer_manager:
        layer_manager.add_layer(result_layer)

    return result_layer, accuracy, feature_importances

Defines the Layer class and related functionality for organizing geospatial data.

A layer can represent a conceptual container for a vector object which is tightly coupled with underlying raster data, allowing additional metadata or processing logic to be attached making sure heirarchical relationships are maintained. This module provides the Layer and LayerManager classes, which manage layers of geospatial data, including segmentation results, classifications,and filters. Layers can be created, copied, and manipulated, and they support attaching functions to calculate additional properties.

Layer

A Layer represents a set of objects (segments or classification results) with associated properties.

Layers can be derived from segmentation, rule application, or filters. Each layer can have functions attached to calculate additional properties.

Source code in nickyspatial/core/layer.py
class Layer:
    """A Layer represents a set of objects (segments or classification results) with associated properties.

    Layers can be derived from segmentation, rule application, or filters.
    Each layer can have functions attached to calculate additional properties.
    """

    def __init__(self, name=None, parent=None, type="generic"):
        """Initialize a Layer.

        Parameters:
        -----------
        name : str, optional
            Name of the layer. If None, a unique name will be generated.
        parent : Layer, optional
            Parent layer that this layer is derived from.
        type : str
            Type of layer: "segmentation", "classification", "filter", or "generic"
        """
        self.id = str(uuid.uuid4())
        self.name = name if name else f"Layer_{self.id[:8]}"
        self.parent = parent
        self.type = type
        self.created_at = pd.Timestamp.now()

        self.raster = None
        self.objects = None
        self.metadata = {}
        self.transform = None
        self.crs = None

        self.attached_functions = {}

    def attach_function(self, function, name=None, **kwargs):
        """Attach a function to this layer and execute it.

        Parameters:
        -----------
        function : callable
            Function to attach and execute
        name : str, optional
            Name for this function. If None, uses function.__name__
        **kwargs : dict
            Arguments to pass to the function

        Returns:
        --------
        self : Layer
            Returns self for chaining
        """
        func_name = name if name else function.__name__

        result = function(self, **kwargs)

        self.attached_functions[func_name] = {
            "function": function,
            "args": kwargs,
            "result": result,
        }

        return self

    def get_function_result(self, function_name):
        """Get the result of an attached function.

        Parameters:
        -----------
        function_name : str
            Name of the attached function

        Returns:
        --------
        result : any
            Result of the function
        """
        if function_name not in self.attached_functions:
            raise ValueError(f"Function '{function_name}' not attached to this layer")

        return self.attached_functions[function_name]["result"]

    def copy(self):
        """Create a copy of this layer.

        Returns:
        --------
        layer_copy : Layer
            Copy of this layer
        """
        new_layer = Layer(name=f"{self.name}_copy", parent=self.parent, type=self.type)

        if self.raster is not None:
            new_layer.raster = self.raster.copy()

        if self.objects is not None:
            new_layer.objects = self.objects.copy()

        new_layer.metadata = self.metadata.copy()
        new_layer.transform = self.transform
        new_layer.crs = self.crs

        return new_layer

    def __str__(self):
        """String representation of the layer."""
        if self.objects is not None:
            num_objects = len(self.objects)
        else:
            num_objects = 0

        parent_name = self.parent.name if self.parent else "None"

        return f"Layer '{self.name}' (type: {self.type}, parent: {parent_name}, objects: {num_objects})"

__init__(name=None, parent=None, type='generic')

Initialize a Layer.

Parameters:

name : str, optional Name of the layer. If None, a unique name will be generated. parent : Layer, optional Parent layer that this layer is derived from. type : str Type of layer: "segmentation", "classification", "filter", or "generic"

Source code in nickyspatial/core/layer.py
def __init__(self, name=None, parent=None, type="generic"):
    """Initialize a Layer.

    Parameters:
    -----------
    name : str, optional
        Name of the layer. If None, a unique name will be generated.
    parent : Layer, optional
        Parent layer that this layer is derived from.
    type : str
        Type of layer: "segmentation", "classification", "filter", or "generic"
    """
    self.id = str(uuid.uuid4())
    self.name = name if name else f"Layer_{self.id[:8]}"
    self.parent = parent
    self.type = type
    self.created_at = pd.Timestamp.now()

    self.raster = None
    self.objects = None
    self.metadata = {}
    self.transform = None
    self.crs = None

    self.attached_functions = {}

__str__()

String representation of the layer.

Source code in nickyspatial/core/layer.py
def __str__(self):
    """String representation of the layer."""
    if self.objects is not None:
        num_objects = len(self.objects)
    else:
        num_objects = 0

    parent_name = self.parent.name if self.parent else "None"

    return f"Layer '{self.name}' (type: {self.type}, parent: {parent_name}, objects: {num_objects})"

attach_function(function, name=None, **kwargs)

Attach a function to this layer and execute it.

Parameters:

function : callable Function to attach and execute name : str, optional Name for this function. If None, uses function.name **kwargs : dict Arguments to pass to the function

Returns:

self : Layer Returns self for chaining

Source code in nickyspatial/core/layer.py
def attach_function(self, function, name=None, **kwargs):
    """Attach a function to this layer and execute it.

    Parameters:
    -----------
    function : callable
        Function to attach and execute
    name : str, optional
        Name for this function. If None, uses function.__name__
    **kwargs : dict
        Arguments to pass to the function

    Returns:
    --------
    self : Layer
        Returns self for chaining
    """
    func_name = name if name else function.__name__

    result = function(self, **kwargs)

    self.attached_functions[func_name] = {
        "function": function,
        "args": kwargs,
        "result": result,
    }

    return self

copy()

Create a copy of this layer.

Returns:

layer_copy : Layer Copy of this layer

Source code in nickyspatial/core/layer.py
def copy(self):
    """Create a copy of this layer.

    Returns:
    --------
    layer_copy : Layer
        Copy of this layer
    """
    new_layer = Layer(name=f"{self.name}_copy", parent=self.parent, type=self.type)

    if self.raster is not None:
        new_layer.raster = self.raster.copy()

    if self.objects is not None:
        new_layer.objects = self.objects.copy()

    new_layer.metadata = self.metadata.copy()
    new_layer.transform = self.transform
    new_layer.crs = self.crs

    return new_layer

get_function_result(function_name)

Get the result of an attached function.

Parameters:

function_name : str Name of the attached function

Returns:

result : any Result of the function

Source code in nickyspatial/core/layer.py
def get_function_result(self, function_name):
    """Get the result of an attached function.

    Parameters:
    -----------
    function_name : str
        Name of the attached function

    Returns:
    --------
    result : any
        Result of the function
    """
    if function_name not in self.attached_functions:
        raise ValueError(f"Function '{function_name}' not attached to this layer")

    return self.attached_functions[function_name]["result"]

LayerManager

Manages a collection of layers and their relationships.

Source code in nickyspatial/core/layer.py
class LayerManager:
    """Manages a collection of layers and their relationships."""

    def __init__(self):
        """Initialize the layer manager."""
        self.layers = {}
        self.active_layer = None

    def add_layer(self, layer, set_active=True):
        """Add a layer to the manager.

        Parameters:
        -----------
        layer : Layer
            Layer to add
        set_active : bool
            Whether to set this layer as the active layer

        Returns:
        --------
        layer : Layer
            The added layer
        """
        self.layers[layer.id] = layer

        if set_active:
            self.active_layer = layer

        return layer

    def get_layer(self, layer_id_or_name):
        """Get a layer by ID or name.

        Parameters:
        -----------
        layer_id_or_name : str
            Layer ID or name

        Returns:
        --------
        layer : Layer
            The requested layer
        """
        if layer_id_or_name in self.layers:
            return self.layers[layer_id_or_name]

        for layer in self.layers.values():
            if layer.name == layer_id_or_name:
                return layer

        raise ValueError(f"Layer '{layer_id_or_name}' not found")

    def get_layer_names(self):
        """Get a list of all layer names.

        Returns:
        --------
        names : list
            List of layer names
        """
        return [layer.name for layer in self.layers.values()]

    def remove_layer(self, layer_id_or_name):
        """Remove a layer from the manager.

        Parameters:
        -----------
        layer_id_or_name : str
            Layer ID or name
        """
        layer = self.get_layer(layer_id_or_name)

        if layer.id in self.layers:
            del self.layers[layer.id]

        if self.active_layer and self.active_layer.id == layer.id:
            if self.layers:
                self.active_layer = list(self.layers.values())[-1]
            else:
                self.active_layer = None

__init__()

Initialize the layer manager.

Source code in nickyspatial/core/layer.py
def __init__(self):
    """Initialize the layer manager."""
    self.layers = {}
    self.active_layer = None

add_layer(layer, set_active=True)

Add a layer to the manager.

Parameters:

layer : Layer Layer to add set_active : bool Whether to set this layer as the active layer

Returns:

layer : Layer The added layer

Source code in nickyspatial/core/layer.py
def add_layer(self, layer, set_active=True):
    """Add a layer to the manager.

    Parameters:
    -----------
    layer : Layer
        Layer to add
    set_active : bool
        Whether to set this layer as the active layer

    Returns:
    --------
    layer : Layer
        The added layer
    """
    self.layers[layer.id] = layer

    if set_active:
        self.active_layer = layer

    return layer

get_layer(layer_id_or_name)

Get a layer by ID or name.

Parameters:

layer_id_or_name : str Layer ID or name

Returns:

layer : Layer The requested layer

Source code in nickyspatial/core/layer.py
def get_layer(self, layer_id_or_name):
    """Get a layer by ID or name.

    Parameters:
    -----------
    layer_id_or_name : str
        Layer ID or name

    Returns:
    --------
    layer : Layer
        The requested layer
    """
    if layer_id_or_name in self.layers:
        return self.layers[layer_id_or_name]

    for layer in self.layers.values():
        if layer.name == layer_id_or_name:
            return layer

    raise ValueError(f"Layer '{layer_id_or_name}' not found")

get_layer_names()

Get a list of all layer names.

Returns:

names : list List of layer names

Source code in nickyspatial/core/layer.py
def get_layer_names(self):
    """Get a list of all layer names.

    Returns:
    --------
    names : list
        List of layer names
    """
    return [layer.name for layer in self.layers.values()]

remove_layer(layer_id_or_name)

Remove a layer from the manager.

Parameters:

layer_id_or_name : str Layer ID or name

Source code in nickyspatial/core/layer.py
def remove_layer(self, layer_id_or_name):
    """Remove a layer from the manager.

    Parameters:
    -----------
    layer_id_or_name : str
        Layer ID or name
    """
    layer = self.get_layer(layer_id_or_name)

    if layer.id in self.layers:
        del self.layers[layer.id]

    if self.active_layer and self.active_layer.id == layer.id:
        if self.layers:
            self.active_layer = list(self.layers.values())[-1]
        else:
            self.active_layer = None

Provides a rule engine for object-based analysis, where segments or layers are processed according to custom logic.

Main idea here is to allow encode expert rules that can be applied to object segments which are layers in a nickyspatial context. So rules are tied up to the layers , they can be attached or revoked or executed multiple items

Developers can define domain-specific rules to classify or merge features based on attributes. This module includes the Rule and RuleSet classes, which allow users to create, manage, and apply rules to layers. The RuleSet class can be used to group multiple rules together, and the execute method applies these rules to a given layer. The rules can be defined using string expressions that can be evaluated using the numexpr library for performance.

CommonBase

A shared utility base class for spatial rule sets.

This class provides common methods used by multiple rule sets to preprocess layer data and determine spatial relationships between segments.

Source code in nickyspatial/core/rules.py
class CommonBase:
    """A shared utility base class for spatial rule sets.

    This class provides common methods used by multiple rule sets
    to preprocess layer data and determine spatial relationships
    between segments.
    """

    @staticmethod
    def _preprocess_layer(layer, class_column_name):
        """Prepare geometry and class maps from a spatial layer.

        Parameters:
        -----------
        layer : Layer
            The spatial layer containing objects with segment geometry and class labels.
        class_column_name : str
            The column name that stores class values (e.g., "veg_class", "land_use").

        Returns:
        --------
        geom_map : dict
            A dictionary mapping segment IDs to shapely geometry objects.
        class_map : dict
            A dictionary mapping segment IDs to their respective class values.
        """
        df = layer.objects
        geom_map = {sid: shape(geom) for sid, geom in zip(df["segment_id"], df["geometry"], strict=False)}
        class_map = dict(zip(df["segment_id"], df[class_column_name], strict=False))
        return geom_map, class_map

    @staticmethod
    def _find_neighbors(segment_id, geom_map):
        """Find neighboring segments based on spatial intersection.

        Parameters:
        -----------
        segment_id : int or str
            The ID of the segment whose neighbors are to be found.
        geom_map : dict
            A dictionary mapping segment IDs to shapely geometry objects.

        Returns:
        --------
        neighbors : list
            A list of segment IDs that intersect with the given segment.
        """
        segment_geom = geom_map[segment_id]
        neighbors = []
        for other_id, other_geom in geom_map.items():
            if other_id != segment_id and segment_geom.intersects(other_geom):
                neighbors.append(other_id)
        return neighbors

EnclosedByRuleSet

Bases: CommonBase

A rule set to reclassify segments based on spatial enclosure.

This rule set identifies segments of one class (A) that are entirely surrounded by segments of another class (B), and reclassifies them into a new class.

Source code in nickyspatial/core/rules.py
class EnclosedByRuleSet(CommonBase):
    """A rule set to reclassify segments based on spatial enclosure.

    This rule set identifies segments of one class (A) that are entirely surrounded
    by segments of another class (B), and reclassifies them into a new class.
    """

    def __init__(self, name=None):
        """Initialize the merge rule set.

        Parameters:
        -----------
        name : str, optional
            Name of the merge rule set
        """
        self.name = name if name else "EnclosedByRuleSet"

    def execute(
        self, source_layer, class_column_name, class_value_a, class_value_b, new_class_name, layer_manager=None, layer_name=None
    ):
        """Apply enclosed-by logic to identify and reclassify segments.

        Parameters:
        -----------
        source_layer : Layer
            The source spatial layer containing segments.
        class_column_name : str
            The name of the column containing class labels (e.g., "veg_class").
        class_value_a : str
            The class value to check for enclosure (target to reclassify).
        class_value_b : str
            The class value expected to surround class A segments.
        new_class_name : str
            The new class name to assign to enclosed segments.
        layer_manager : LayerManager, optional
            Optional manager to register the resulting layer.
        layer_name : str, optional
            Optional name for the result layer.

        Returns:
        --------
        result_layer : Layer
            A new layer with updated class values for enclosed segments.
        """
        if not layer_name:
            layer_name = f"{source_layer.name}_{self.name}"

        result_layer = Layer(name=layer_name, parent=source_layer, type="merged")
        result_layer.transform = source_layer.transform
        result_layer.crs = source_layer.crs
        result_layer.raster = source_layer.raster.copy() if source_layer.raster is not None else None

        df = source_layer.objects.copy()
        surrounded_segments = []
        geom_map, class_map = self._preprocess_layer(source_layer, class_column_name)

        for sid in df["segment_id"].unique():
            if class_map.get(sid) != class_value_a:
                continue

            neighbors = self._find_neighbors(sid, geom_map)
            if neighbors and all(class_map.get(n_id) == class_value_b for n_id in neighbors):
                surrounded_segments.append(sid)

        df.loc[(df["segment_id"].isin(surrounded_segments)), class_column_name] = new_class_name

        result_layer.objects = df

        result_layer.metadata = {
            "enclosed_by_ruleset_name": self.name,
        }

        if layer_manager:
            layer_manager.add_layer(result_layer)

        return result_layer

__init__(name=None)

Initialize the merge rule set.

Parameters:

name : str, optional Name of the merge rule set

Source code in nickyspatial/core/rules.py
def __init__(self, name=None):
    """Initialize the merge rule set.

    Parameters:
    -----------
    name : str, optional
        Name of the merge rule set
    """
    self.name = name if name else "EnclosedByRuleSet"

execute(source_layer, class_column_name, class_value_a, class_value_b, new_class_name, layer_manager=None, layer_name=None)

Apply enclosed-by logic to identify and reclassify segments.

Parameters:

source_layer : Layer The source spatial layer containing segments. class_column_name : str The name of the column containing class labels (e.g., "veg_class"). class_value_a : str The class value to check for enclosure (target to reclassify). class_value_b : str The class value expected to surround class A segments. new_class_name : str The new class name to assign to enclosed segments. layer_manager : LayerManager, optional Optional manager to register the resulting layer. layer_name : str, optional Optional name for the result layer.

Returns:

result_layer : Layer A new layer with updated class values for enclosed segments.

Source code in nickyspatial/core/rules.py
def execute(
    self, source_layer, class_column_name, class_value_a, class_value_b, new_class_name, layer_manager=None, layer_name=None
):
    """Apply enclosed-by logic to identify and reclassify segments.

    Parameters:
    -----------
    source_layer : Layer
        The source spatial layer containing segments.
    class_column_name : str
        The name of the column containing class labels (e.g., "veg_class").
    class_value_a : str
        The class value to check for enclosure (target to reclassify).
    class_value_b : str
        The class value expected to surround class A segments.
    new_class_name : str
        The new class name to assign to enclosed segments.
    layer_manager : LayerManager, optional
        Optional manager to register the resulting layer.
    layer_name : str, optional
        Optional name for the result layer.

    Returns:
    --------
    result_layer : Layer
        A new layer with updated class values for enclosed segments.
    """
    if not layer_name:
        layer_name = f"{source_layer.name}_{self.name}"

    result_layer = Layer(name=layer_name, parent=source_layer, type="merged")
    result_layer.transform = source_layer.transform
    result_layer.crs = source_layer.crs
    result_layer.raster = source_layer.raster.copy() if source_layer.raster is not None else None

    df = source_layer.objects.copy()
    surrounded_segments = []
    geom_map, class_map = self._preprocess_layer(source_layer, class_column_name)

    for sid in df["segment_id"].unique():
        if class_map.get(sid) != class_value_a:
            continue

        neighbors = self._find_neighbors(sid, geom_map)
        if neighbors and all(class_map.get(n_id) == class_value_b for n_id in neighbors):
            surrounded_segments.append(sid)

    df.loc[(df["segment_id"].isin(surrounded_segments)), class_column_name] = new_class_name

    result_layer.objects = df

    result_layer.metadata = {
        "enclosed_by_ruleset_name": self.name,
    }

    if layer_manager:
        layer_manager.add_layer(result_layer)

    return result_layer

MergeRuleSet

Bases: CommonBase

A rule set for merging segments of the same class based on specified class values.

Source code in nickyspatial/core/rules.py
class MergeRuleSet(CommonBase):
    """A rule set for merging segments of the same class based on specified class values."""

    def __init__(self, name=None):
        """Initialize the merge rule set.

        Parameters:
        -----------
        name : str, optional
            Name of the merge rule set
        """
        self.name = name if name else "MergeRuleSet"

    def execute(self, source_layer, class_column_name, class_value, layer_manager=None, layer_name=None):
        """Merge segments of the same class in a layer.

        Parameters:
        -----------
        source_layer : Layer
            Source layer with segments to merge
        class_value : str or list of str
            One or more attribute field names to group and merge segments
        layer_manager : LayerManager, optional
            Layer manager to add the result layer to
        layer_name : str, optional
            Name for the result layer

        Returns:
        --------
        result_layer : Layer
            Layer with merged geometries
        """
        if not layer_name:
            layer_name = f"{source_layer.name}_{self.name}"

        result_layer = Layer(name=layer_name, parent=source_layer, type="merged")
        result_layer.transform = source_layer.transform
        result_layer.crs = source_layer.crs
        result_layer.raster = source_layer.raster.copy() if source_layer.raster is not None else None

        df = source_layer.objects.copy()

        # Handle single or multiple class fields
        if isinstance(class_value, str):
            class_values = [class_value]
        else:
            class_values = class_value

        new_rows = []
        to_drop = set()
        geom_map, class_map = self._preprocess_layer(source_layer, class_column_name)

        for class_value in class_values:
            visited = set()

            for sid in df["segment_id"].unique():
                if sid in visited or class_map[sid] != class_value:
                    continue

                group_geom = [geom_map[sid]]
                group_ids = [sid]
                queue = [sid]
                visited.add(sid)

                while queue:
                    current_id = queue.pop()
                    neighbors = self._find_neighbors(current_id, geom_map)
                    for n_id in neighbors:
                        if n_id not in visited and class_map.get(n_id) == class_value:
                            visited.add(n_id)
                            group_geom.append(geom_map[n_id])
                            group_ids.append(n_id)
                            queue.append(n_id)

                merged_geom = unary_union(group_geom)
                row_data = {"segment_id": min(group_ids), class_column_name: class_value, "geometry": merged_geom}

                new_rows.append(row_data)
                to_drop.update(group_ids)

        df = df[~df["segment_id"].isin(to_drop)]
        df = pd.concat([df, pd.DataFrame(new_rows)], ignore_index=True)
        result_layer.objects = df

        result_layer.metadata = {
            "mergeruleset_name": self.name,
            "merged_fields": class_values,
        }

        if layer_manager:
            layer_manager.add_layer(result_layer)

        return result_layer

__init__(name=None)

Initialize the merge rule set.

Parameters:

name : str, optional Name of the merge rule set

Source code in nickyspatial/core/rules.py
def __init__(self, name=None):
    """Initialize the merge rule set.

    Parameters:
    -----------
    name : str, optional
        Name of the merge rule set
    """
    self.name = name if name else "MergeRuleSet"

execute(source_layer, class_column_name, class_value, layer_manager=None, layer_name=None)

Merge segments of the same class in a layer.

Parameters:

source_layer : Layer Source layer with segments to merge class_value : str or list of str One or more attribute field names to group and merge segments layer_manager : LayerManager, optional Layer manager to add the result layer to layer_name : str, optional Name for the result layer

Returns:

result_layer : Layer Layer with merged geometries

Source code in nickyspatial/core/rules.py
def execute(self, source_layer, class_column_name, class_value, layer_manager=None, layer_name=None):
    """Merge segments of the same class in a layer.

    Parameters:
    -----------
    source_layer : Layer
        Source layer with segments to merge
    class_value : str or list of str
        One or more attribute field names to group and merge segments
    layer_manager : LayerManager, optional
        Layer manager to add the result layer to
    layer_name : str, optional
        Name for the result layer

    Returns:
    --------
    result_layer : Layer
        Layer with merged geometries
    """
    if not layer_name:
        layer_name = f"{source_layer.name}_{self.name}"

    result_layer = Layer(name=layer_name, parent=source_layer, type="merged")
    result_layer.transform = source_layer.transform
    result_layer.crs = source_layer.crs
    result_layer.raster = source_layer.raster.copy() if source_layer.raster is not None else None

    df = source_layer.objects.copy()

    # Handle single or multiple class fields
    if isinstance(class_value, str):
        class_values = [class_value]
    else:
        class_values = class_value

    new_rows = []
    to_drop = set()
    geom_map, class_map = self._preprocess_layer(source_layer, class_column_name)

    for class_value in class_values:
        visited = set()

        for sid in df["segment_id"].unique():
            if sid in visited or class_map[sid] != class_value:
                continue

            group_geom = [geom_map[sid]]
            group_ids = [sid]
            queue = [sid]
            visited.add(sid)

            while queue:
                current_id = queue.pop()
                neighbors = self._find_neighbors(current_id, geom_map)
                for n_id in neighbors:
                    if n_id not in visited and class_map.get(n_id) == class_value:
                        visited.add(n_id)
                        group_geom.append(geom_map[n_id])
                        group_ids.append(n_id)
                        queue.append(n_id)

            merged_geom = unary_union(group_geom)
            row_data = {"segment_id": min(group_ids), class_column_name: class_value, "geometry": merged_geom}

            new_rows.append(row_data)
            to_drop.update(group_ids)

    df = df[~df["segment_id"].isin(to_drop)]
    df = pd.concat([df, pd.DataFrame(new_rows)], ignore_index=True)
    result_layer.objects = df

    result_layer.metadata = {
        "mergeruleset_name": self.name,
        "merged_fields": class_values,
    }

    if layer_manager:
        layer_manager.add_layer(result_layer)

    return result_layer

Rule

A rule defines a condition to classify segments.

Source code in nickyspatial/core/rules.py
class Rule:
    """A rule defines a condition to classify segments."""

    def __init__(self, name, condition, class_value=None):
        """Initialize a rule.

        Parameters:
        -----------
        name : str
            Name of the rule
        condition : str
            Condition as a string expression that can be evaluated using numexpr
        class_value : str, optional
            Value to assign when the condition is met.
            If None, uses the rule name.
        """
        self.name = name
        self.condition = condition
        self.class_value = class_value if class_value is not None else name

    def __str__(self):
        """String representation of the rule."""
        return f"Rule '{self.name}': {self.condition} -> {self.class_value}"

__init__(name, condition, class_value=None)

Initialize a rule.

Parameters:

name : str Name of the rule condition : str Condition as a string expression that can be evaluated using numexpr class_value : str, optional Value to assign when the condition is met. If None, uses the rule name.

Source code in nickyspatial/core/rules.py
def __init__(self, name, condition, class_value=None):
    """Initialize a rule.

    Parameters:
    -----------
    name : str
        Name of the rule
    condition : str
        Condition as a string expression that can be evaluated using numexpr
    class_value : str, optional
        Value to assign when the condition is met.
        If None, uses the rule name.
    """
    self.name = name
    self.condition = condition
    self.class_value = class_value if class_value is not None else name

__str__()

String representation of the rule.

Source code in nickyspatial/core/rules.py
def __str__(self):
    """String representation of the rule."""
    return f"Rule '{self.name}': {self.condition} -> {self.class_value}"

RuleSet

A collection of rules to apply to a layer.

Source code in nickyspatial/core/rules.py
class RuleSet:
    """A collection of rules to apply to a layer."""

    def __init__(self, name=None):
        """Initialize a rule set.

        Parameters:
        -----------
        name : str, optional
            Name of the rule set
        """
        self.name = name if name else "RuleSet"
        self.rules = []

    @staticmethod
    def wrap_condition_parts_simple(self, condition):
        """Wrap condition parts with parentheses for evaluation."""
        parts = condition.split("&")
        parts = [f"({part.strip()})" for part in parts]
        return " & ".join(parts)

    def add_rule(self, name, condition, class_value=None):
        """Add a rule to the rule set.

        Parameters:
        -----------
        name : str
            Name of the rule
        condition : str
            Condition as a string expression that can be evaluated using numexpr
        class_value : str, optional
            Value to assign when the condition is met

        Returns:
        --------
        rule : Rule
            The added rule
        """
        rule = Rule(name, condition, class_value)
        self.rules.append(rule)
        return rule

    def get_rules(self):
        """Get the list of rules in the rule set.

        Returns:
        --------
        list of tuples
            List of (name, condition) tuples for each rule
        """
        return [(rule.name, rule.condition) for rule in self.rules]

    def execute(
        self,
        source_layer,
        layer_manager=None,
        layer_name=None,
        result_field="classification",
    ):
        """Apply rules to classify segments in a layer.

        Parameters:
        -----------
        source_layer : Layer
            Source layer with segments to classify
        layer_manager : LayerManager, optional
            Layer manager to add the result layer to
        layer_name : str, optional
            Name for the result layer
        result_field : str
            Field name to store classification results

        Returns:
        --------
        result_layer : Layer
            Layer with classification results
        """
        if not layer_name:
            layer_name = f"{source_layer.name}_{self.name}"

        result_layer = Layer(name=layer_name, parent=source_layer, type="classification")
        result_layer.transform = source_layer.transform
        result_layer.crs = source_layer.crs
        result_layer.raster = source_layer.raster.copy() if source_layer.raster is not None else None

        result_layer.objects = source_layer.objects.copy()

        if result_field not in result_layer.objects.columns:
            result_layer.objects[result_field] = None

        result_layer.metadata = {
            "ruleset_name": self.name,
            "rules": [
                {
                    "name": rule.name,
                    "condition": rule.condition,
                    "class_value": rule.class_value,
                }
                for rule in self.rules
            ],
            "result_field": result_field,
        }

        for rule in self.rules:
            try:
                if result_field in result_layer.objects.columns and (
                    f"{result_field} ==" in rule.condition
                    or f"{result_field}==" in rule.condition
                    or f"{result_field} !=" in rule.condition
                    or f"{result_field}!=" in rule.condition
                ):
                    ## TODO : better way to handle this , because & searching in string is not a good idea,
                    # this might produce bug for complex rules
                    eval_condition = rule.condition.replace("&", " and ").replace("|", " or ")

                    mask = result_layer.objects.apply(
                        lambda row, cond=eval_condition: eval(
                            cond,
                            {"__builtins__": {}},
                            {col: row[col] for col in result_layer.objects.columns if col != "geometry"},
                        ),
                        axis=1,
                    )

                else:
                    try:
                        local_dict = {
                            col: result_layer.objects[col].values for col in result_layer.objects.columns if col != "geometry"
                        }

                        mask = ne.evaluate(rule.condition, local_dict=local_dict)
                        mask = pd.Series(mask, index=result_layer.objects.index).fillna(False)
                    except Exception:
                        mask = result_layer.objects.eval(rule.condition, engine="python")

                result_layer.objects.loc[mask, result_field] = rule.class_value

            except Exception as e:
                print(f"Error applying rule '{rule.name}': {str(e)}")
                continue

        if layer_manager:
            layer_manager.add_layer(result_layer)

        return result_layer

__init__(name=None)

Initialize a rule set.

Parameters:

name : str, optional Name of the rule set

Source code in nickyspatial/core/rules.py
def __init__(self, name=None):
    """Initialize a rule set.

    Parameters:
    -----------
    name : str, optional
        Name of the rule set
    """
    self.name = name if name else "RuleSet"
    self.rules = []

add_rule(name, condition, class_value=None)

Add a rule to the rule set.

Parameters:

name : str Name of the rule condition : str Condition as a string expression that can be evaluated using numexpr class_value : str, optional Value to assign when the condition is met

Returns:

rule : Rule The added rule

Source code in nickyspatial/core/rules.py
def add_rule(self, name, condition, class_value=None):
    """Add a rule to the rule set.

    Parameters:
    -----------
    name : str
        Name of the rule
    condition : str
        Condition as a string expression that can be evaluated using numexpr
    class_value : str, optional
        Value to assign when the condition is met

    Returns:
    --------
    rule : Rule
        The added rule
    """
    rule = Rule(name, condition, class_value)
    self.rules.append(rule)
    return rule

execute(source_layer, layer_manager=None, layer_name=None, result_field='classification')

Apply rules to classify segments in a layer.

Parameters:

source_layer : Layer Source layer with segments to classify layer_manager : LayerManager, optional Layer manager to add the result layer to layer_name : str, optional Name for the result layer result_field : str Field name to store classification results

Returns:

result_layer : Layer Layer with classification results

Source code in nickyspatial/core/rules.py
def execute(
    self,
    source_layer,
    layer_manager=None,
    layer_name=None,
    result_field="classification",
):
    """Apply rules to classify segments in a layer.

    Parameters:
    -----------
    source_layer : Layer
        Source layer with segments to classify
    layer_manager : LayerManager, optional
        Layer manager to add the result layer to
    layer_name : str, optional
        Name for the result layer
    result_field : str
        Field name to store classification results

    Returns:
    --------
    result_layer : Layer
        Layer with classification results
    """
    if not layer_name:
        layer_name = f"{source_layer.name}_{self.name}"

    result_layer = Layer(name=layer_name, parent=source_layer, type="classification")
    result_layer.transform = source_layer.transform
    result_layer.crs = source_layer.crs
    result_layer.raster = source_layer.raster.copy() if source_layer.raster is not None else None

    result_layer.objects = source_layer.objects.copy()

    if result_field not in result_layer.objects.columns:
        result_layer.objects[result_field] = None

    result_layer.metadata = {
        "ruleset_name": self.name,
        "rules": [
            {
                "name": rule.name,
                "condition": rule.condition,
                "class_value": rule.class_value,
            }
            for rule in self.rules
        ],
        "result_field": result_field,
    }

    for rule in self.rules:
        try:
            if result_field in result_layer.objects.columns and (
                f"{result_field} ==" in rule.condition
                or f"{result_field}==" in rule.condition
                or f"{result_field} !=" in rule.condition
                or f"{result_field}!=" in rule.condition
            ):
                ## TODO : better way to handle this , because & searching in string is not a good idea,
                # this might produce bug for complex rules
                eval_condition = rule.condition.replace("&", " and ").replace("|", " or ")

                mask = result_layer.objects.apply(
                    lambda row, cond=eval_condition: eval(
                        cond,
                        {"__builtins__": {}},
                        {col: row[col] for col in result_layer.objects.columns if col != "geometry"},
                    ),
                    axis=1,
                )

            else:
                try:
                    local_dict = {
                        col: result_layer.objects[col].values for col in result_layer.objects.columns if col != "geometry"
                    }

                    mask = ne.evaluate(rule.condition, local_dict=local_dict)
                    mask = pd.Series(mask, index=result_layer.objects.index).fillna(False)
                except Exception:
                    mask = result_layer.objects.eval(rule.condition, engine="python")

            result_layer.objects.loc[mask, result_field] = rule.class_value

        except Exception as e:
            print(f"Error applying rule '{rule.name}': {str(e)}")
            continue

    if layer_manager:
        layer_manager.add_layer(result_layer)

    return result_layer

get_rules()

Get the list of rules in the rule set.

Returns:

list of tuples List of (name, condition) tuples for each rule

Source code in nickyspatial/core/rules.py
def get_rules(self):
    """Get the list of rules in the rule set.

    Returns:
    --------
    list of tuples
        List of (name, condition) tuples for each rule
    """
    return [(rule.name, rule.condition) for rule in self.rules]

wrap_condition_parts_simple(condition) staticmethod

Wrap condition parts with parentheses for evaluation.

Source code in nickyspatial/core/rules.py
@staticmethod
def wrap_condition_parts_simple(self, condition):
    """Wrap condition parts with parentheses for evaluation."""
    parts = condition.split("&")
    parts = [f"({part.strip()})" for part in parts]
    return " & ".join(parts)

TouchedByRuleSet

Bases: CommonBase

A rule set to reclassify segments based on spatial enclosure.

This rule set identifies segments of one class (A) that are entirely surrounded by segments of another class (B), and reclassifies them into a new class.

Source code in nickyspatial/core/rules.py
class TouchedByRuleSet(CommonBase):
    """A rule set to reclassify segments based on spatial enclosure.

    This rule set identifies segments of one class (A) that are entirely surrounded
    by segments of another class (B), and reclassifies them into a new class.
    """

    def __init__(self, name=None):
        """Initialize the merge rule set.

        Parameters:
        -----------
        name : str, optional
            Name of the merge rule set
        """
        self.name = name if name else "TouchedByRuleSet"

    def execute(
        self, source_layer, class_column_name, class_value_a, class_value_b, new_class_name, layer_manager=None, layer_name=None
    ):
        """Executes the merge rule set by identifying and updating segments of a given class that are adjacent to another class!

        Parameters:
        - source_layer: Layer
            The input layer containing segment geometries and attributes.
        - class_column_name: str
            The name of the column containing class labels.
        - class_value_a: str or int
            The class value of segments to be checked for touching neighbors.
        - class_value_b: str or int
            The class value of neighboring segments that would trigger a merge.
        - new_class_name: str
            The new class value to assign to segments of class_value_a that touch class_value_b.
        - layer_manager: optional
            An optional manager for adding the resulting layer to a collection or interface.
        - layer_name: optional
            Optional custom name for the resulting layer. Defaults to "<source_layer_name>_<ruleset_name>".

        Returns:
        - result_layer: Layer
            A new Layer object with updated segment classifications where applicable.

        Logic:
        - Copies the source layer and initializes a new result layer.
        - Preprocesses the source layer to build geometry and class lookup maps.
        - Iterates through each segment of class_value_a, checking if any of its neighbors belong to class_value_b.
        - If so, updates the segment's class to new_class_name.
        - Stores the modified DataFrame in the result layer and optionally registers it via the layer_manager.

        """
        if not layer_name:
            layer_name = f"{source_layer.name}_{self.name}"

        result_layer = Layer(name=layer_name, parent=source_layer, type="merged")
        result_layer.transform = source_layer.transform
        result_layer.crs = source_layer.crs
        result_layer.raster = source_layer.raster.copy() if source_layer.raster is not None else None

        df = source_layer.objects.copy()
        touched_segments = []
        geom_map, class_map = self._preprocess_layer(source_layer, class_column_name)

        for sid in df["segment_id"].unique():
            if class_map.get(sid) != class_value_a:
                continue

            neighbors = self._find_neighbors(sid, geom_map)
            if neighbors and any(class_map.get(n_id) == class_value_b for n_id in neighbors):
                touched_segments.append(sid)

        df.loc[(df["segment_id"].isin(touched_segments)), class_column_name] = new_class_name

        result_layer.objects = df

        result_layer.metadata = {
            "enclosed_by_ruleset_name": self.name,
        }

        if layer_manager:
            layer_manager.add_layer(result_layer)

        return result_layer

__init__(name=None)

Initialize the merge rule set.

Parameters:

name : str, optional Name of the merge rule set

Source code in nickyspatial/core/rules.py
def __init__(self, name=None):
    """Initialize the merge rule set.

    Parameters:
    -----------
    name : str, optional
        Name of the merge rule set
    """
    self.name = name if name else "TouchedByRuleSet"

execute(source_layer, class_column_name, class_value_a, class_value_b, new_class_name, layer_manager=None, layer_name=None)

Executes the merge rule set by identifying and updating segments of a given class that are adjacent to another class!

  • source_layer: Layer The input layer containing segment geometries and attributes.
  • class_column_name: str The name of the column containing class labels.
  • class_value_a: str or int The class value of segments to be checked for touching neighbors.
  • class_value_b: str or int The class value of neighboring segments that would trigger a merge.
  • new_class_name: str The new class value to assign to segments of class_value_a that touch class_value_b.
  • layer_manager: optional An optional manager for adding the resulting layer to a collection or interface.
  • layer_name: optional Optional custom name for the resulting layer. Defaults to "_".
  • result_layer: Layer A new Layer object with updated segment classifications where applicable.

Logic: - Copies the source layer and initializes a new result layer. - Preprocesses the source layer to build geometry and class lookup maps. - Iterates through each segment of class_value_a, checking if any of its neighbors belong to class_value_b. - If so, updates the segment's class to new_class_name. - Stores the modified DataFrame in the result layer and optionally registers it via the layer_manager.

Source code in nickyspatial/core/rules.py
def execute(
    self, source_layer, class_column_name, class_value_a, class_value_b, new_class_name, layer_manager=None, layer_name=None
):
    """Executes the merge rule set by identifying and updating segments of a given class that are adjacent to another class!

    Parameters:
    - source_layer: Layer
        The input layer containing segment geometries and attributes.
    - class_column_name: str
        The name of the column containing class labels.
    - class_value_a: str or int
        The class value of segments to be checked for touching neighbors.
    - class_value_b: str or int
        The class value of neighboring segments that would trigger a merge.
    - new_class_name: str
        The new class value to assign to segments of class_value_a that touch class_value_b.
    - layer_manager: optional
        An optional manager for adding the resulting layer to a collection or interface.
    - layer_name: optional
        Optional custom name for the resulting layer. Defaults to "<source_layer_name>_<ruleset_name>".

    Returns:
    - result_layer: Layer
        A new Layer object with updated segment classifications where applicable.

    Logic:
    - Copies the source layer and initializes a new result layer.
    - Preprocesses the source layer to build geometry and class lookup maps.
    - Iterates through each segment of class_value_a, checking if any of its neighbors belong to class_value_b.
    - If so, updates the segment's class to new_class_name.
    - Stores the modified DataFrame in the result layer and optionally registers it via the layer_manager.

    """
    if not layer_name:
        layer_name = f"{source_layer.name}_{self.name}"

    result_layer = Layer(name=layer_name, parent=source_layer, type="merged")
    result_layer.transform = source_layer.transform
    result_layer.crs = source_layer.crs
    result_layer.raster = source_layer.raster.copy() if source_layer.raster is not None else None

    df = source_layer.objects.copy()
    touched_segments = []
    geom_map, class_map = self._preprocess_layer(source_layer, class_column_name)

    for sid in df["segment_id"].unique():
        if class_map.get(sid) != class_value_a:
            continue

        neighbors = self._find_neighbors(sid, geom_map)
        if neighbors and any(class_map.get(n_id) == class_value_b for n_id in neighbors):
            touched_segments.append(sid)

    df.loc[(df["segment_id"].isin(touched_segments)), class_column_name] = new_class_name

    result_layer.objects = df

    result_layer.metadata = {
        "enclosed_by_ruleset_name": self.name,
    }

    if layer_manager:
        layer_manager.add_layer(result_layer)

    return result_layer

Implements segmentation algorithms to partition images into meaningful region objects.

The functions here might apply clustering or region-growing techniques, aiding object-based remote sensing analysis. This module includes the SlicSegmentation class, which implements a bottom-up region-growing algorithm

SlicSegmentation

Implementation of Multiresolution segmentation algorithm.

This algorithm segments an image using a bottom-up region-growing approach that optimizes the homogeneity of pixel values within segments while considering shape compactness.

Source code in nickyspatial/core/segmentation.py
class SlicSegmentation:
    """Implementation of Multiresolution segmentation algorithm.

    This algorithm segments an image using a bottom-up region-growing approach
    that optimizes the homogeneity of pixel values within segments while
    considering shape compactness.
    """

    def __init__(self, scale=15, compactness=0.6):
        """Initialize the segmentation algorithm.

        Parameters:
        -----------
        scale : float
            Scale parameter that influences the size of the segments.
            Higher values create larger segments.
        shape : float, range [0, 1]
            Weight of shape criterion vs. color criterion.
            Higher values give more weight to shape.
        compactness : float, range [0, 1]
            Weight of compactness criterion vs. smoothness criterion.
            Higher values create more compact segments.
        """
        self.scale = scale
        self.compactness = compactness

    def execute(self, image_data, transform, crs, layer_manager=None, layer_name=None):
        """Perform segmentation and create a layer with the results.

        Parameters:
        -----------
        image_data : numpy.ndarray
            Array with raster data values (bands, height, width)
        transform : affine.Affine
            Affine transformation for the raster
        crs : rasterio.crs.CRS
            Coordinate reference system
        layer_manager : LayerManager, optional
            Layer manager to add the result layer to
        layer_name : str, optional
            Name for the result layer

        Returns:
        --------
        layer : Layer
            Layer containing the segmentation results
        """
        num_bands, height, width = image_data.shape

        normalized_bands = []
        for i in range(num_bands):
            band = image_data[i]

            if band.max() == band.min():
                normalized_bands.append(np.zeros_like(band))
                continue

            norm_band = (band - band.min()) / (band.max() - band.min())
            normalized_bands.append(norm_band)

        multichannel_image = np.stack(normalized_bands, axis=-1)

        n_segments = int(width * height / (self.scale * self.scale))
        print(f"Number of segments: {n_segments}")

        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            segments = segmentation.slic(
                multichannel_image,
                n_segments=n_segments,
                compactness=self.compactness,
                sigma=1.0,
                start_label=1,
                channel_axis=-1,
            )

        if not layer_name:
            layer_name = f"Segmentation_scale{self.scale}_comp{self.compactness}"

        layer = Layer(name=layer_name, type="segmentation")
        layer.raster = segments
        layer.transform = transform
        layer.crs = crs
        layer.metadata = {
            "scale": self.scale,
            "compactness": self.compactness,
            "n_segments": n_segments,
            "num_segments_actual": len(np.unique(segments)),
        }

        segment_objects = self._create_segment_objects(segments, transform, crs)
        layer.objects = segment_objects

        bands = [f"band_{i + 1}" for i in range(num_bands)]
        self._calculate_statistics(layer, image_data, bands)

        if layer_manager:
            layer_manager.add_layer(layer)

        return layer

    def _create_segment_objects(self, segments, transform, crs):
        """Create vector objects from segments.

        Parameters:
        -----------
        segments : numpy.ndarray
            Array with segment IDs
        transform : affine.Affine
            Affine transformation for the raster
        crs : rasterio.crs.CRS
            Coordinate reference system

        Returns:
        --------
        segment_objects : geopandas.GeoDataFrame
            GeoDataFrame with segment polygons
        """
        segment_ids = np.unique(segments)

        geometries = []
        properties = []

        for segment_id in segment_ids:
            mask = segments == segment_id

            if not np.any(mask):
                continue

            shapes = rasterio.features.shapes(mask.astype(np.int16), mask=mask, transform=transform)

            segment_polygons = []
            for geom, val in shapes:
                if val == 1:
                    try:
                        polygon = Polygon(geom["coordinates"][0])
                        if polygon.is_valid:
                            segment_polygons.append(polygon)
                    except Exception:
                        continue

            if not segment_polygons:
                continue

            largest_polygon = max(segment_polygons, key=lambda p: p.area)

            area_pixels = np.sum(mask)

            pixel_width = abs(transform.a)
            pixel_height = abs(transform.e)
            area_units = area_pixels * pixel_width * pixel_height

            prop = {
                "segment_id": int(segment_id),
                "area_pixels": int(area_pixels),
                "area_units": float(area_units),
            }

            geometries.append(largest_polygon)
            properties.append(prop)

        gdf = gpd.GeoDataFrame(properties, geometry=geometries, crs=crs)
        return gdf

    def _calculate_statistics(self, layer, image_data, bands):
        """Calculate statistics for segments based on image data.

        Parameters:
        -----------
        layer : Layer
            Layer containing segments
        image_data : numpy.ndarray
            Array with raster data values (bands, height, width)
        bands : list of str
            Names of the bands
        """
        segments = layer.raster
        segment_objects = layer.objects

        segment_ids = segment_objects["segment_id"].values

        for i, band_name in enumerate(bands):
            if i >= image_data.shape[0]:
                break

            band_data = image_data[i]

            for segment_id in segment_ids:
                mask = segments == segment_id

                if segment_id not in segment_objects["segment_id"].values:
                    continue

                segment_pixels = band_data[mask]

                if len(segment_pixels) == 0:
                    continue

                mean_val = float(np.mean(segment_pixels))
                std_val = float(np.std(segment_pixels))
                min_val = float(np.min(segment_pixels))
                max_val = float(np.max(segment_pixels))
                median_val = float(np.median(segment_pixels))

                idx = segment_objects.index[segment_objects["segment_id"] == segment_id].tolist()[0]
                segment_objects.at[idx, f"{band_name}_mean"] = mean_val
                segment_objects.at[idx, f"{band_name}_std"] = std_val
                segment_objects.at[idx, f"{band_name}_min"] = min_val
                segment_objects.at[idx, f"{band_name}_max"] = max_val
                segment_objects.at[idx, f"{band_name}_median"] = median_val

__init__(scale=15, compactness=0.6)

Initialize the segmentation algorithm.

Parameters:

scale : float Scale parameter that influences the size of the segments. Higher values create larger segments. shape : float, range [0, 1] Weight of shape criterion vs. color criterion. Higher values give more weight to shape. compactness : float, range [0, 1] Weight of compactness criterion vs. smoothness criterion. Higher values create more compact segments.

Source code in nickyspatial/core/segmentation.py
def __init__(self, scale=15, compactness=0.6):
    """Initialize the segmentation algorithm.

    Parameters:
    -----------
    scale : float
        Scale parameter that influences the size of the segments.
        Higher values create larger segments.
    shape : float, range [0, 1]
        Weight of shape criterion vs. color criterion.
        Higher values give more weight to shape.
    compactness : float, range [0, 1]
        Weight of compactness criterion vs. smoothness criterion.
        Higher values create more compact segments.
    """
    self.scale = scale
    self.compactness = compactness

execute(image_data, transform, crs, layer_manager=None, layer_name=None)

Perform segmentation and create a layer with the results.

Parameters:

image_data : numpy.ndarray Array with raster data values (bands, height, width) transform : affine.Affine Affine transformation for the raster crs : rasterio.crs.CRS Coordinate reference system layer_manager : LayerManager, optional Layer manager to add the result layer to layer_name : str, optional Name for the result layer

Returns:

layer : Layer Layer containing the segmentation results

Source code in nickyspatial/core/segmentation.py
def execute(self, image_data, transform, crs, layer_manager=None, layer_name=None):
    """Perform segmentation and create a layer with the results.

    Parameters:
    -----------
    image_data : numpy.ndarray
        Array with raster data values (bands, height, width)
    transform : affine.Affine
        Affine transformation for the raster
    crs : rasterio.crs.CRS
        Coordinate reference system
    layer_manager : LayerManager, optional
        Layer manager to add the result layer to
    layer_name : str, optional
        Name for the result layer

    Returns:
    --------
    layer : Layer
        Layer containing the segmentation results
    """
    num_bands, height, width = image_data.shape

    normalized_bands = []
    for i in range(num_bands):
        band = image_data[i]

        if band.max() == band.min():
            normalized_bands.append(np.zeros_like(band))
            continue

        norm_band = (band - band.min()) / (band.max() - band.min())
        normalized_bands.append(norm_band)

    multichannel_image = np.stack(normalized_bands, axis=-1)

    n_segments = int(width * height / (self.scale * self.scale))
    print(f"Number of segments: {n_segments}")

    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        segments = segmentation.slic(
            multichannel_image,
            n_segments=n_segments,
            compactness=self.compactness,
            sigma=1.0,
            start_label=1,
            channel_axis=-1,
        )

    if not layer_name:
        layer_name = f"Segmentation_scale{self.scale}_comp{self.compactness}"

    layer = Layer(name=layer_name, type="segmentation")
    layer.raster = segments
    layer.transform = transform
    layer.crs = crs
    layer.metadata = {
        "scale": self.scale,
        "compactness": self.compactness,
        "n_segments": n_segments,
        "num_segments_actual": len(np.unique(segments)),
    }

    segment_objects = self._create_segment_objects(segments, transform, crs)
    layer.objects = segment_objects

    bands = [f"band_{i + 1}" for i in range(num_bands)]
    self._calculate_statistics(layer, image_data, bands)

    if layer_manager:
        layer_manager.add_layer(layer)

    return layer

nickyspatial.filters

The filters package provides modules for applying transformations to raster data.

It includes spatial filters (e.g., smoothing) as well as spectral filters (e.g., band math). Main idea is to further manipulate the objects such as merging segments or applying pre-defined rules to filter objects based on their attributes.

Implements spatial operations like smoothing and morphological transformations.

These filters can modify the geometry or arrangement of pixel values to enhance or simplify data for object analysis. The functions here include smoothing boundaries, merging small segments, and selecting segments based on area. These operations are essential for preparing data for object-based image analysis, especially in remote sensing applications. The functions are designed to work with raster data and can be applied to layers created from segmentation algorithms.

merge_small_segments(source_layer, min_size, attribute='area_pixels', layer_manager=None, layer_name=None)

Merge small segments with their largest neighbor.

Parameters:

source_layer : Layer Source layer with segments to merge min_size : float Minimum segment size threshold attribute : str Attribute to use for size comparison layer_manager : LayerManager, optional Layer manager to add the result layer to layer_name : str, optional Name for the result layer

Returns:

result_layer : Layer Layer with merged segments

Source code in nickyspatial/filters/spatial.py
def merge_small_segments(source_layer, min_size, attribute="area_pixels", layer_manager=None, layer_name=None):
    """Merge small segments with their largest neighbor.

    Parameters:
    -----------
    source_layer : Layer
        Source layer with segments to merge
    min_size : float
        Minimum segment size threshold
    attribute : str
        Attribute to use for size comparison
    layer_manager : LayerManager, optional
        Layer manager to add the result layer to
    layer_name : str, optional
        Name for the result layer

    Returns:
    --------
    result_layer : Layer
        Layer with merged segments
    """
    if not layer_name:
        layer_name = f"{source_layer.name}_merged"

    result_layer = Layer(name=layer_name, parent=source_layer, type="filter")
    result_layer.transform = source_layer.transform
    result_layer.crs = source_layer.crs

    result_layer.metadata = {
        "filter_type": "merge_small_segments",
        "min_size": min_size,
        "attribute": attribute,
    }

    objects = source_layer.objects.copy()
    small_segments = objects[objects[attribute] < min_size]

    if len(small_segments) == 0:
        result_layer.objects = objects
        result_layer.raster = source_layer.raster.copy() if source_layer.raster is not None else None

        if layer_manager:
            layer_manager.add_layer(result_layer)

        return result_layer

    for idx, small_segment in small_segments.iterrows():
        if idx not in objects.index:
            continue

        neighbors = objects[objects.index != idx].overlay(
            gpd.GeoDataFrame(geometry=[small_segment.geometry], crs=objects.crs),
            how="intersection",
        )

        if len(neighbors) == 0:
            continue

        largest_neighbor_idx = neighbors[attribute].idxmax()

        largest_neighbor = objects.loc[largest_neighbor_idx]
        merged_geometry = largest_neighbor.geometry.union(small_segment.geometry)

        objects.at[largest_neighbor_idx, "geometry"] = merged_geometry
        objects.at[largest_neighbor_idx, attribute] += small_segment[attribute]

        objects = objects.drop(idx)

    if source_layer.raster is not None:
        segments_raster = source_layer.raster.copy()

        old_to_new = {}
        for _idx, obj in objects.iterrows():
            old_id = obj["segment_id"]
            old_to_new[old_id] = old_id

        for idx, small_segment in small_segments.iterrows():
            if idx not in objects.index:
                old_id = small_segment["segment_id"]

                touching_segments = objects.intersects(small_segment.geometry)
                if any(touching_segments):
                    new_id = objects[touching_segments].iloc[0]["segment_id"]
                    old_to_new[old_id] = new_id

        for old_id, new_id in old_to_new.items():
            if old_id != new_id:
                segments_raster[segments_raster == old_id] = new_id

        result_layer.raster = segments_raster

    result_layer.objects = objects

    if layer_manager:
        layer_manager.add_layer(result_layer)

    return result_layer

select_by_area(source_layer, min_area=None, max_area=None, area_column='area_units', layer_manager=None, layer_name=None)

Select segments based on area.

Parameters:

source_layer : Layer Source layer with segments to filter min_area : float, optional Minimum area threshold max_area : float, optional Maximum area threshold area_column : str Column containing area values layer_manager : LayerManager, optional Layer manager to add the result layer to layer_name : str, optional Name for the result layer

Returns:

result_layer : Layer Layer with filtered segments

Source code in nickyspatial/filters/spatial.py
def select_by_area(
    source_layer,
    min_area=None,
    max_area=None,
    area_column="area_units",
    layer_manager=None,
    layer_name=None,
):
    """Select segments based on area.

    Parameters:
    -----------
    source_layer : Layer
        Source layer with segments to filter
    min_area : float, optional
        Minimum area threshold
    max_area : float, optional
        Maximum area threshold
    area_column : str
        Column containing area values
    layer_manager : LayerManager, optional
        Layer manager to add the result layer to
    layer_name : str, optional
        Name for the result layer

    Returns:
    --------
    result_layer : Layer
        Layer with filtered segments
    """
    if not layer_name:
        layer_name = f"{source_layer.name}_area_filtered"

    result_layer = Layer(name=layer_name, parent=source_layer, type="filter")
    result_layer.transform = source_layer.transform
    result_layer.crs = source_layer.crs

    result_layer.metadata = {
        "filter_type": "select_by_area",
        "min_area": min_area,
        "max_area": max_area,
        "area_column": area_column,
    }

    objects = source_layer.objects.copy()

    if min_area is not None:
        objects = objects[objects[area_column] >= min_area]

    if max_area is not None:
        objects = objects[objects[area_column] <= max_area]

    result_layer.objects = objects

    if source_layer.raster is not None:
        kept_ids = set(objects["segment_id"])

        segments_raster = source_layer.raster.copy()
        mask = np.isin(segments_raster, list(kept_ids))

        segments_raster[~mask] = 0

        result_layer.raster = segments_raster

    if layer_manager:
        layer_manager.add_layer(result_layer)

    return result_layer

smooth_boundaries(source_layer, iterations=1, layer_manager=None, layer_name=None)

Smooth segment boundaries by applying morphological operations.

Parameters:

source_layer : Layer Source layer with segments to smooth iterations : int Number of smoothing iterations to apply layer_manager : LayerManager, optional Layer manager to add the result layer to layer_name : str, optional Name for the result layer

Returns:

result_layer : Layer Layer with smoothed segment boundaries

Source code in nickyspatial/filters/spatial.py
def smooth_boundaries(source_layer, iterations=1, layer_manager=None, layer_name=None):
    """Smooth segment boundaries by applying morphological operations.

    Parameters:
    -----------
    source_layer : Layer
        Source layer with segments to smooth
    iterations : int
        Number of smoothing iterations to apply
    layer_manager : LayerManager, optional
        Layer manager to add the result layer to
    layer_name : str, optional
        Name for the result layer

    Returns:
    --------
    result_layer : Layer
        Layer with smoothed segment boundaries
    """
    if not layer_name:
        layer_name = f"{source_layer.name}_smoothed"

    result_layer = Layer(name=layer_name, parent=source_layer, type="filter")
    result_layer.transform = source_layer.transform
    result_layer.crs = source_layer.crs
    result_layer.raster = source_layer.raster.copy() if source_layer.raster is not None else None

    result_layer.metadata = {
        "filter_type": "smooth_boundaries",
        "iterations": iterations,
    }

    objects = source_layer.objects.copy()

    smoothed_geometries = []
    for geom in objects.geometry:
        smoothed_geom = geom
        for _ in range(iterations):
            buffer_distance = np.sqrt(smoothed_geom.area) * 0.01
            smoothed_geom = smoothed_geom.buffer(-buffer_distance).buffer(buffer_distance * 2)

        if not smoothed_geom.is_valid:
            smoothed_geom = smoothed_geom.buffer(0)

        smoothed_geometries.append(smoothed_geom)

    objects.geometry = smoothed_geometries
    result_layer.objects = objects

    if layer_manager:
        layer_manager.add_layer(result_layer)

    return result_layer

Performs spectral-based manipulations of imagery, including band arithmetic and transformations.

It supports generating new spectral bands or combinations to highlight specific features. It also includes functions for enhancing contrast and applying spectral filters based on mathematical expressions. This module is designed to work with raster . The functions here include contrast enhancement, spectral filtering, and band arithmetic. Not a great fan of these but might be handy sometime

enhance_contrast(source_layer, percentile_min=2, percentile_max=98, layer_manager=None, layer_name=None)

Enhance contrast in source layer raster data.

Parameters:

source_layer : Layer Source layer with raster data percentile_min : float Lower percentile for contrast stretching percentile_max : float Upper percentile for contrast stretching layer_manager : LayerManager, optional Layer manager to add the result layer to layer_name : str, optional Name for the result layer

Returns:

result_layer : Layer Layer with enhanced contrast

Source code in nickyspatial/filters/spectral.py
def enhance_contrast(
    source_layer,
    percentile_min=2,
    percentile_max=98,
    layer_manager=None,
    layer_name=None,
):
    """Enhance contrast in source layer raster data.

    Parameters:
    -----------
    source_layer : Layer
        Source layer with raster data
    percentile_min : float
        Lower percentile for contrast stretching
    percentile_max : float
        Upper percentile for contrast stretching
    layer_manager : LayerManager, optional
        Layer manager to add the result layer to
    layer_name : str, optional
        Name for the result layer

    Returns:
    --------
    result_layer : Layer
        Layer with enhanced contrast
    """
    if source_layer.raster is None:
        raise ValueError("Source layer must have raster data")

    if not layer_name:
        layer_name = f"{source_layer.name}_enhanced"

    result_layer = Layer(name=layer_name, parent=source_layer, type="filter")
    result_layer.transform = source_layer.transform
    result_layer.crs = source_layer.crs
    result_layer.objects = source_layer.objects.copy() if source_layer.objects is not None else None

    result_layer.metadata = {
        "filter_type": "enhance_contrast",
        "percentile_min": percentile_min,
        "percentile_max": percentile_max,
    }

    enhanced_raster = source_layer.raster.copy()

    p_min = np.percentile(enhanced_raster, percentile_min)
    p_max = np.percentile(enhanced_raster, percentile_max)

    enhanced_raster = np.clip(enhanced_raster, p_min, p_max)
    enhanced_raster = (enhanced_raster - p_min) / (p_max - p_min)

    result_layer.raster = enhanced_raster

    if layer_manager:
        layer_manager.add_layer(result_layer)

    return result_layer

spectral_filter(source_layer, expression, layer_manager=None, layer_name=None)

Apply a spectral filter based on a mathematical expression.

Parameters:

source_layer : Layer Source layer with segment statistics expression : str Mathematical expression to apply (e.g., "NDVI > 0.5") layer_manager : LayerManager, optional Layer manager to add the result layer to layer_name : str, optional Name for the result layer

Returns:

result_layer : Layer Layer with filtered segments

Source code in nickyspatial/filters/spectral.py
def spectral_filter(source_layer, expression, layer_manager=None, layer_name=None):
    """Apply a spectral filter based on a mathematical expression.

    Parameters:
    -----------
    source_layer : Layer
        Source layer with segment statistics
    expression : str
        Mathematical expression to apply (e.g., "NDVI > 0.5")
    layer_manager : LayerManager, optional
        Layer manager to add the result layer to
    layer_name : str, optional
        Name for the result layer

    Returns:
    --------
    result_layer : Layer
        Layer with filtered segments
    """
    import numexpr as ne

    if not layer_name:
        layer_name = f"{source_layer.name}_spectral_filtered"

    result_layer = Layer(name=layer_name, parent=source_layer, type="filter")
    result_layer.transform = source_layer.transform
    result_layer.crs = source_layer.crs

    result_layer.metadata = {"filter_type": "spectral_filter", "expression": expression}

    objects = source_layer.objects.copy()

    try:
        local_dict = {col: objects[col].values for col in objects.columns if col != "geometry"}
        mask = ne.evaluate(expression, local_dict=local_dict)
        mask = np.array(mask, dtype=bool)

        filtered_objects = objects.iloc[mask]
        result_layer.objects = filtered_objects

        if source_layer.raster is not None:
            kept_ids = set(filtered_objects["segment_id"])
            segments_raster = source_layer.raster.copy()
            raster_mask = np.isin(segments_raster, list(kept_ids))
            segments_raster[~raster_mask] = 0
            result_layer.raster = segments_raster

    except Exception as e:
        raise ValueError(f"Error applying spectral filter: {str(e)}") from e

    if layer_manager:
        layer_manager.add_layer(result_layer)

    return result_layer

nickyspatial.io

The io package contains modules for reading and writing both raster and vector data.

It abstracts file operations and coordinate system handling to facilitate I/O tasks.

Handles raster input and output operations, including reading and saving multi-band images.

Functions in this module may also provide metadata parsing and coordinate transform tools.

layer_to_raster(layer, output_path, column=None, nodata=0)

Save a layer to a raster file.

Parameters:

layer : Layer Layer to save output_path : str Path to the output raster file column : str, optional Column to rasterize (if saving from vector objects) nodata : int or float, optional No data value

Source code in nickyspatial/io/raster.py
def layer_to_raster(layer, output_path, column=None, nodata=0):
    """Save a layer to a raster file.

    Parameters:
    -----------
    layer : Layer
        Layer to save
    output_path : str
        Path to the output raster file
    column : str, optional
        Column to rasterize (if saving from vector objects)
    nodata : int or float, optional
        No data value
    """
    from rasterio import features

    os.makedirs(os.path.dirname(output_path), exist_ok=True)

    if layer.raster is not None and column is None:
        write_raster(
            output_path,
            layer.raster.reshape(1, *layer.raster.shape),
            layer.transform,
            layer.crs,
            nodata,
        )
        return

    if layer.objects is not None and column is not None:
        if column not in layer.objects.columns:
            raise ValueError(f"Column '{column}' not found in layer objects")

        objects = layer.objects
        col_values = objects[column]
        # Check if values are numeric
        if np.issubdtype(col_values.dtype, np.number):
            shapes = [(geom, float(val)) for geom, val in zip(objects.geometry, col_values, strict=False)]
        else:
            unique_vals = col_values.unique()
            val_map = {val: idx for idx, val in enumerate(unique_vals)}
            print(f"Mapping categorical values: {val_map}")
            shapes = [(geom, val_map[val]) for geom, val in zip(objects.geometry, col_values, strict=False)]
        if layer.raster is not None:
            if len(layer.raster.shape) == 3:
                height, width = layer.raster.shape[1], layer.raster.shape[2]
            else:
                height, width = layer.raster.shape
            out_shape = (height, width)
        else:
            bounds = objects.total_bounds
            resolution = 10
            if layer.transform:
                resolution = abs(layer.transform.a)
            width = int((bounds[2] - bounds[0]) / resolution)
            height = int((bounds[3] - bounds[1]) / resolution)
            out_shape = (height, width)
            if layer.transform is None:
                layer.transform = from_origin(bounds[0], bounds[3], resolution, resolution)

        output = np.ones(out_shape, dtype=np.float32) * nodata

        features.rasterize(shapes, out=output, transform=layer.transform, fill=nodata)

        write_raster(
            output_path,
            output.reshape(1, *out_shape),
            layer.transform,
            layer.crs,
            nodata,
        )
    else:
        raise ValueError("Layer must have either raster data or objects with a specified column")

read_raster(raster_path)

Read a raster file and return its data, transform, and CRS.

Parameters:

raster_path : str Path to the raster file

Returns:

image_data : numpy.ndarray Array with raster data values transform : affine.Affine Affine transformation for the raster crs : rasterio.crs.CRS Coordinate reference system

Source code in nickyspatial/io/raster.py
def read_raster(raster_path):
    """Read a raster file and return its data, transform, and CRS.

    Parameters:
    -----------
    raster_path : str
        Path to the raster file

    Returns:
    --------
    image_data : numpy.ndarray
        Array with raster data values
    transform : affine.Affine
        Affine transformation for the raster
    crs : rasterio.crs.CRS
        Coordinate reference system
    """
    with rasterio.open(raster_path) as src:
        image_data = src.read()
        transform = src.transform
        crs = src.crs

    return image_data, transform, crs

write_raster(output_path, data, transform, crs, nodata=None)

Write raster data to a file.

Parameters:

output_path : str Path to the output raster file data : numpy.ndarray Array with raster data values transform : affine.Affine Affine transformation for the raster crs : rasterio.crs.CRS Coordinate reference system nodata : int or float, optional No data value

Source code in nickyspatial/io/raster.py
def write_raster(output_path, data, transform, crs, nodata=None):
    """Write raster data to a file.

    Parameters:
    -----------
    output_path : str
        Path to the output raster file
    data : numpy.ndarray
        Array with raster data values
    transform : affine.Affine
        Affine transformation for the raster
    crs : rasterio.crs.CRS
        Coordinate reference system
    nodata : int or float, optional
        No data value
    """
    os.makedirs(os.path.dirname(output_path), exist_ok=True)

    if len(data.shape) == 2:
        data = data.reshape(1, *data.shape)

    height, width = data.shape[-2], data.shape[-1]
    count = data.shape[0]

    with rasterio.open(
        output_path,
        "w",
        driver="GTiff",
        height=height,
        width=width,
        count=count,
        dtype=data.dtype,
        crs=crs,
        transform=transform,
        nodata=nodata,
    ) as dst:
        dst.write(data)

Manages vector data I/O, supporting formats like Shapefile and GeoJSON.

This module typically offers utilities for handling attributes, geometries, and coordinate reference systems.

layer_to_vector(layer, output_path)

Save a layer's objects to a vector file.

Parameters:

layer : Layer Layer to save output_path : str Path to the output vector file

Source code in nickyspatial/io/vector.py
def layer_to_vector(layer, output_path):
    """Save a layer's objects to a vector file.

    Parameters:
    -----------
    layer : Layer
        Layer to save
    output_path : str
        Path to the output vector file
    """
    if layer.objects is None:
        raise ValueError("Layer has no vector objects")

    write_vector(layer.objects, output_path)

read_vector(vector_path)

Read a vector file into a GeoDataFrame.

Parameters:

vector_path : str Path to the vector file

Returns:

gdf : geopandas.GeoDataFrame GeoDataFrame with vector data

Source code in nickyspatial/io/vector.py
def read_vector(vector_path):
    """Read a vector file into a GeoDataFrame.

    Parameters:
    -----------
    vector_path : str
        Path to the vector file

    Returns:
    --------
    gdf : geopandas.GeoDataFrame
        GeoDataFrame with vector data
    """
    return gpd.read_file(vector_path)

write_vector(gdf, output_path)

Write a GeoDataFrame to a vector file.

Parameters:

gdf : geopandas.GeoDataFrame GeoDataFrame to write output_path : str Path to the output vector file

Source code in nickyspatial/io/vector.py
def write_vector(gdf, output_path):
    """Write a GeoDataFrame to a vector file.

    Parameters:
    -----------
    gdf : geopandas.GeoDataFrame
        GeoDataFrame to write
    output_path : str
        Path to the output vector file
    """
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    file_extension = os.path.splitext(output_path)[1].lower()

    if file_extension == ".shp":
        gdf.to_file(output_path)
    elif file_extension == ".geojson":
        gdf.to_file(output_path, driver="GeoJSON")
    else:
        raise ValueError(f"Unsupported vector format: {file_extension}")

nickyspatial.stats

The stats package includes modules for calculating statistical metrics on objects.

This is to mimic the stats module in ecognition because they will be necessary in order to apply rules later on and extremely essential to work with objects

Basic statistics for layers in NickySpatial.

attach_basic_stats(layer, column, prefix=None)

Attach basic statistics for a column to a layer.

Parameters:

layer : Layer Layer to attach statistics to column : str Column to calculate statistics for prefix : str, optional Prefix for result names

Returns:

stats : dict Dictionary with calculated statistics

Source code in nickyspatial/stats/basic.py
def attach_basic_stats(layer, column, prefix=None):
    """Attach basic statistics for a column to a layer.

    Parameters:
    -----------
    layer : Layer
        Layer to attach statistics to
    column : str
        Column to calculate statistics for
    prefix : str, optional
        Prefix for result names

    Returns:
    --------
    stats : dict
        Dictionary with calculated statistics
    """
    if layer.objects is None or column not in layer.objects.columns:
        raise ValueError(f"Column '{column}' not found in layer objects")

    prefix = f"{prefix}_" if prefix else ""

    values = layer.objects[column]
    stats = {
        f"{prefix}min": values.min(),
        f"{prefix}max": values.max(),
        f"{prefix}mean": values.mean(),
        f"{prefix}median": values.median(),
        f"{prefix}std": values.std(),
        f"{prefix}sum": values.sum(),
        f"{prefix}count": len(values),
    }

    percentiles = [10, 25, 50, 75, 90]
    for p in percentiles:
        stats[f"{prefix}percentile_{p}"] = np.percentile(values, p)

    return stats

attach_class_distribution(layer, class_column='classification')

Calculate the distribution of classes in a layer.

Parameters:

layer : Layer Layer to analyze class_column : str Column containing class values

Returns:

distribution : dict Dictionary with class counts and percentages

Source code in nickyspatial/stats/basic.py
def attach_class_distribution(layer, class_column="classification"):
    """Calculate the distribution of classes in a layer.

    Parameters:
    -----------
    layer : Layer
        Layer to analyze
    class_column : str
        Column containing class values

    Returns:
    --------
    distribution : dict
        Dictionary with class counts and percentages
    """
    if layer.objects is None or class_column not in layer.objects.columns:
        return {}

    class_counts = layer.objects[class_column].value_counts()
    total_count = len(layer.objects)
    class_percentages = (class_counts / total_count * 100).round(2)

    distribution = {
        "counts": class_counts.to_dict(),
        "percentages": class_percentages.to_dict(),
        "total": total_count,
    }

    return distribution

attach_count(layer, class_column='classification', class_value=None)

Count objects in a layer, optionally filtered by class.

Parameters:

layer : Layer Layer to count objects in class_column : str Column containing class values class_value : str, optional Class value to filter by

Returns:

count : int Number of objects

Source code in nickyspatial/stats/basic.py
def attach_count(layer, class_column="classification", class_value=None):
    """Count objects in a layer, optionally filtered by class.

    Parameters:
    -----------
    layer : Layer
        Layer to count objects in
    class_column : str
        Column containing class values
    class_value : str, optional
        Class value to filter by

    Returns:
    --------
    count : int
        Number of objects
    """
    if layer.objects is None:
        return 0

    if class_value is not None and class_column in layer.objects.columns:
        count = layer.objects[layer.objects[class_column] == class_value].shape[0]
    else:
        count = layer.objects.shape[0]

    return count

Spatial statistics for layers in NickySpatial.

attach_area_stats(layer, area_column='area_units', by_class=None)

Calculate area statistics for objects in a layer.

Parameters:

layer : Layer Layer to calculate statistics for area_column : str Column containing area values by_class : str, optional Column to group by (e.g., 'classification')

Returns:

stats : dict Dictionary with area statistics

Source code in nickyspatial/stats/spatial.py
def attach_area_stats(layer, area_column="area_units", by_class=None):
    """Calculate area statistics for objects in a layer.

    Parameters:
    -----------
    layer : Layer
        Layer to calculate statistics for
    area_column : str
        Column containing area values
    by_class : str, optional
        Column to group by (e.g., 'classification')

    Returns:
    --------
    stats : dict
        Dictionary with area statistics
    """
    if layer.objects is None or area_column not in layer.objects.columns:
        return {}

    total_area = layer.objects[area_column].sum()

    if by_class and by_class in layer.objects.columns:
        class_areas = {}
        class_percentages = {}

        for class_value, group in layer.objects.groupby(by_class):
            if class_value is None:
                continue

            class_area = group[area_column].sum()
            class_percentage = (class_area / total_area * 100).round(2)

            class_areas[class_value] = class_area
            class_percentages[class_value] = class_percentage

        stats = {
            "total_area": total_area,
            "class_areas": class_areas,
            "class_percentages": class_percentages,
        }
    else:
        areas = layer.objects[area_column]

        stats = {
            "total_area": total_area,
            "min_area": areas.min(),
            "max_area": areas.max(),
            "mean_area": areas.mean(),
            "median_area": areas.median(),
            "std_area": areas.std(),
        }

    return stats

attach_neighbor_stats(layer)

Calculate neighborhood statistics for objects in a layer.

Parameters:

layer : Layer Layer to calculate statistics for

Returns:

stats : dict Dictionary with neighborhood statistics

Source code in nickyspatial/stats/spatial.py
def attach_neighbor_stats(layer):
    """Calculate neighborhood statistics for objects in a layer.

    Parameters:
    -----------
    layer : Layer
        Layer to calculate statistics for

    Returns:
    --------
    stats : dict
        Dictionary with neighborhood statistics
    """
    if layer.objects is None:
        return {}

    neighbors = {}
    neighbor_counts = []

    for idx, obj in layer.objects.iterrows():
        touching = layer.objects[layer.objects.index != idx].intersects(obj.geometry)
        neighbor_ids = layer.objects[touching].index.tolist()

        neighbors[idx] = neighbor_ids
        neighbor_counts.append(len(neighbor_ids))

    layer.objects["neighbor_count"] = neighbor_counts

    stats = {
        "neighbor_count": {
            "mean": np.mean(neighbor_counts),
            "min": np.min(neighbor_counts),
            "max": np.max(neighbor_counts),
            "std": np.std(neighbor_counts),
        }
    }

    return stats

attach_shape_metrics(layer)

Calculate shape metrics for objects in a layer.

Parameters:

layer : Layer Layer to calculate metrics for

Returns:

metrics : dict Dictionary with shape metrics

Source code in nickyspatial/stats/spatial.py
def attach_shape_metrics(layer):
    """Calculate shape metrics for objects in a layer.

    Parameters:
    -----------
    layer : Layer
        Layer to calculate metrics for

    Returns:
    --------
    metrics : dict
        Dictionary with shape metrics
    """
    if layer.objects is None:
        return {}

    layer.objects["perimeter"] = layer.objects.geometry.length

    if "area_units" not in layer.objects.columns:
        layer.objects["area_units"] = layer.objects.geometry.area

    layer.objects["shape_index"] = (
        (layer.objects["perimeter"] / (2 * np.sqrt(np.pi * layer.objects["area_units"])))
        .replace([np.inf, -np.inf], np.nan)
        .fillna(0)
    )

    layer.objects["compactness"] = (
        (4 * np.pi * layer.objects["area_units"] / (layer.objects["perimeter"] ** 2)).replace([np.inf, -np.inf], np.nan).fillna(0)
    )

    metrics = {
        "shape_index": {
            "mean": layer.objects["shape_index"].mean(),
            "min": layer.objects["shape_index"].min(),
            "max": layer.objects["shape_index"].max(),
            "std": layer.objects["shape_index"].std(),
        },
        "compactness": {
            "mean": layer.objects["compactness"].mean(),
            "min": layer.objects["compactness"].min(),
            "max": layer.objects["compactness"].max(),
            "std": layer.objects["compactness"].std(),
        },
    }

    return metrics

Spectral indices calculation module.

attach_ndvi(layer, nir_column='NIR_mean', red_column='Red_mean', output_column='NDVI')

Calculate NDVI (Normalized Difference Vegetation Index) for objects in a layer.

Parameters:

layer : Layer Layer to calculate NDVI for nir_column : str Column containing NIR band values red_column : str Column containing Red band values output_column : str Column to store NDVI values

Returns:

ndvi_stats : dict Dictionary with NDVI statistics

Source code in nickyspatial/stats/spectral.py
def attach_ndvi(layer, nir_column="NIR_mean", red_column="Red_mean", output_column="NDVI"):
    """Calculate NDVI (Normalized Difference Vegetation Index) for objects in a layer.

    Parameters:
    -----------
    layer : Layer
        Layer to calculate NDVI for
    nir_column : str
        Column containing NIR band values
    red_column : str
        Column containing Red band values
    output_column : str
        Column to store NDVI values

    Returns:
    --------
    ndvi_stats : dict
        Dictionary with NDVI statistics
    """
    if layer.objects is None or nir_column not in layer.objects.columns or red_column not in layer.objects.columns:
        return {}

    nir = layer.objects[nir_column]
    red = layer.objects[red_column]

    denominator = nir + red
    mask = denominator != 0

    ndvi = np.zeros(len(layer.objects))
    ndvi[mask] = (nir[mask] - red[mask]) / denominator[mask]

    layer.objects[output_column] = ndvi

    ndvi_stats = {
        "mean": ndvi.mean(),
        "min": ndvi.min(),
        "max": ndvi.max(),
        "std": np.std(ndvi),
        "median": np.median(ndvi),
    }

    return ndvi_stats

attach_spectral_indices(layer, bands=None)

Calculate multiple spectral indices for objects in a layer.

Parameters:

layer : Layer Layer to calculate indices for bands : dict, optional Dictionary mapping band names to column names

Returns:

indices : dict Dictionary with calculated indices

Source code in nickyspatial/stats/spectral.py
def attach_spectral_indices(layer, bands=None):
    """Calculate multiple spectral indices for objects in a layer.

    Parameters:
    -----------
    layer : Layer
        Layer to calculate indices for
    bands : dict, optional
        Dictionary mapping band names to column names

    Returns:
    --------
    indices : dict
        Dictionary with calculated indices
    """
    if layer.objects is None:
        return {}

    if bands is None:
        bands = {
            "blue": "Blue_mean",
            "green": "Green_mean",
            "red": "Red_mean",
            "nir": "NIR_mean",
        }

    for _band_name, column in bands.items():
        if column not in layer.objects.columns:
            print(f"Warning: Band column '{column}' not found. Some indices may not be calculated.")

    indices = {}

    # NDVI (Normalized Difference Vegetation Index)
    if "nir" in bands and "red" in bands:
        if bands["nir"] in layer.objects.columns and bands["red"] in layer.objects.columns:
            ndvi = attach_ndvi(layer, bands["nir"], bands["red"], "NDVI")
            indices["NDVI"] = ndvi

    # NDWI (Normalized Difference Water Index)
    if "green" in bands and "nir" in bands:
        if bands["green"] in layer.objects.columns and bands["nir"] in layer.objects.columns:
            green = layer.objects[bands["green"]]
            nir = layer.objects[bands["nir"]]

            denominator = green + nir
            mask = denominator != 0

            ndwi = np.zeros(len(layer.objects))
            ndwi[mask] = (green[mask] - nir[mask]) / denominator[mask]

            layer.objects["NDWI"] = ndwi

            indices["NDWI"] = {
                "mean": ndwi.mean(),
                "min": ndwi.min(),
                "max": ndwi.max(),
                "std": np.std(ndwi),
            }

    return indices

nickyspatial.utils

This modules contains utility functions for handling attributes, geometries, and coordinate reference systems.

Helpers , Aren't they useful ?

calculate_statistics_summary(layer_manager, output_file=None)

Calculate summary statistics for all layers in a layer manager.

Parameters:

layer_manager : LayerManager Layer manager containing layers output_file : str, optional Path to save the summary to (as JSON)

Returns:

summary : dict Dictionary with summary statistics

Source code in nickyspatial/utils/helpers.py
def calculate_statistics_summary(layer_manager, output_file=None):
    """Calculate summary statistics for all layers in a layer manager.

    Parameters:
    -----------
    layer_manager : LayerManager
        Layer manager containing layers
    output_file : str, optional
        Path to save the summary to (as JSON)

    Returns:
    --------
    summary : dict
        Dictionary with summary statistics
    """
    summary = {}

    for layer_name in layer_manager.get_layer_names():
        layer = layer_manager.get_layer(layer_name)

        layer_summary = {
            "type": layer.type,
            "created_at": str(layer.created_at),
            "parent": layer.parent.name if layer.parent else None,
        }

        if layer.objects is not None:
            layer_summary["object_count"] = len(layer.objects)

            if "area_units" in layer.objects.columns:
                layer_summary["total_area"] = float(layer.objects["area_units"].sum())
                layer_summary["mean_area"] = float(layer.objects["area_units"].mean())

            for col in layer.objects.columns:
                if col.lower().endswith("class") or col.lower() == "classification":
                    class_counts = layer.objects[col].value_counts().to_dict()
                    layer_summary[f"{col}_counts"] = {str(k): int(v) for k, v in class_counts.items() if k is not None}

        if layer.attached_functions:
            layer_summary["functions"] = list(layer.attached_functions.keys())

        summary[layer_name] = layer_summary

    if output_file:
        os.makedirs(os.path.dirname(output_file), exist_ok=True)
        with open(output_file, "w") as f:
            json.dump(summary, f, indent=2)

    return summary

create_sample_data()

Create a synthetic 4-band (B, G, R, NIR) image for testing.

Returns:

image_data : numpy.ndarray Synthetic image data transform : affine.Affine Affine transformation for the raster crs : rasterio.crs.CRS Coordinate reference system

Source code in nickyspatial/utils/helpers.py
def create_sample_data():
    """Create a synthetic 4-band (B, G, R, NIR) image for testing.

    Returns:
    --------
    image_data : numpy.ndarray
        Synthetic image data
    transform : affine.Affine
        Affine transformation for the raster
    crs : rasterio.crs.CRS
        Coordinate reference system
    """
    ## need to write logic for this one to create a pseduo image

    # return image_data, transform, crs
    return None

get_band_statistics(image_data, band_names=None)

Calculate statistics for each band in a raster image.

Parameters:

image_data : numpy.ndarray Raster image data (bands, height, width) band_names : list of str, optional Names of the bands

Returns:

stats : dict Dictionary with band statistics

Source code in nickyspatial/utils/helpers.py
def get_band_statistics(image_data, band_names=None):
    """Calculate statistics for each band in a raster image.

    Parameters:
    -----------
    image_data : numpy.ndarray
        Raster image data (bands, height, width)
    band_names : list of str, optional
        Names of the bands

    Returns:
    --------
    stats : dict
        Dictionary with band statistics
    """
    num_bands = image_data.shape[0]

    if band_names is None:
        band_names = [f"Band_{i + 1}" for i in range(num_bands)]

    stats = {}

    for i, band_name in enumerate(band_names):
        if i >= num_bands:
            break

        band_data = image_data[i]
        stats[band_name] = {
            "min": float(np.min(band_data)),
            "max": float(np.max(band_data)),
            "mean": float(np.mean(band_data)),
            "std": float(np.std(band_data)),
            "median": float(np.median(band_data)),
            "percentile_5": float(np.percentile(band_data, 5)),
            "percentile_95": float(np.percentile(band_data, 95)),
        }

    return stats

memory_usage(layer)

Estimate memory usage of a layer in MB.

Parameters:

layer : Layer Layer to calculate memory usage for

Returns:

memory_mb : float Estimated memory usage in MB

Source code in nickyspatial/utils/helpers.py
def memory_usage(layer):
    """Estimate memory usage of a layer in MB.

    Parameters:
    -----------
    layer : Layer
        Layer to calculate memory usage for

    Returns:
    --------
    memory_mb : float
        Estimated memory usage in MB
    """
    import sys

    memory = 0

    if layer.raster is not None:
        memory += layer.raster.nbytes

    if layer.objects is not None:
        for col in layer.objects.columns:
            if col != "geometry":
                memory += sys.getsizeof(layer.objects[col].values)

        memory += len(layer.objects) * 1000
    memory_mb = memory / (1024 * 1024)

    return memory_mb

nickyspatial.viz

Alrighty , let's get this visualization party started!

No matter what you do you need to see it and present it , this is the module for it to contain all code about visualizaing the layers , object rule results etc.

Visualization functions for plotting histograms, statistics, and scatter plots.

plot_histogram(layer, attribute, bins=20, figsize=(10, 6), by_class=None)

Plot a histogram of attribute values.

Parameters:

layer : Layer Layer containing data attribute : str Attribute to plot bins : int Number of bins figsize : tuple Figure size by_class : str, optional Column to group by (e.g., 'classification')

Returns:

fig : matplotlib.figure.Figure Figure object

Source code in nickyspatial/viz/charts.py
def plot_histogram(layer, attribute, bins=20, figsize=(10, 6), by_class=None):
    """Plot a histogram of attribute values.

    Parameters:
    -----------
    layer : Layer
        Layer containing data
    attribute : str
        Attribute to plot
    bins : int
        Number of bins
    figsize : tuple
        Figure size
    by_class : str, optional
        Column to group by (e.g., 'classification')

    Returns:
    --------
    fig : matplotlib.figure.Figure
        Figure object
    """
    if layer.objects is None or attribute not in layer.objects.columns:
        raise ValueError(f"Attribute '{attribute}' not found in layer objects")

    fig, ax = plt.subplots(figsize=figsize)

    if by_class and by_class in layer.objects.columns:
        data = layer.objects[[attribute, by_class]].copy()

        for class_value, group in data.groupby(by_class):
            if class_value is None:
                continue

            sns.histplot(group[attribute], bins=bins, alpha=0.6, label=str(class_value), ax=ax)

        ax.legend(title=by_class)
    else:
        sns.histplot(layer.objects[attribute], bins=bins, ax=ax)

    ax.set_title(f"Histogram of {attribute}")
    ax.set_xlabel(attribute)
    ax.set_ylabel("Count")

    return fig

plot_scatter(layer, x_attribute, y_attribute, color_by=None, figsize=(10, 8))

Create a scatter plot of two attributes.

Parameters:

layer : Layer Layer containing data x_attribute : str Attribute for x-axis y_attribute : str Attribute for y-axis color_by : str, optional Attribute to color points by figsize : tuple Figure size

Returns:

fig : matplotlib.figure.Figure Figure object

Source code in nickyspatial/viz/charts.py
def plot_scatter(layer, x_attribute, y_attribute, color_by=None, figsize=(10, 8)):
    """Create a scatter plot of two attributes.

    Parameters:
    -----------
    layer : Layer
        Layer containing data
    x_attribute : str
        Attribute for x-axis
    y_attribute : str
        Attribute for y-axis
    color_by : str, optional
        Attribute to color points by
    figsize : tuple
        Figure size

    Returns:
    --------
    fig : matplotlib.figure.Figure
        Figure object
    """
    if layer.objects is None or x_attribute not in layer.objects.columns or y_attribute not in layer.objects.columns:
        raise ValueError("Attributes not found in layer objects")

    fig, ax = plt.subplots(figsize=figsize)

    if color_by and color_by in layer.objects.columns:
        scatter = ax.scatter(
            layer.objects[x_attribute],
            layer.objects[y_attribute],
            c=layer.objects[color_by],
            cmap="viridis",
            alpha=0.7,
            s=50,
            edgecolor="k",
        )
        cbar = plt.colorbar(scatter, ax=ax)
        cbar.set_label(color_by)
    else:
        ax.scatter(
            layer.objects[x_attribute],
            layer.objects[y_attribute],
            alpha=0.7,
            s=50,
            edgecolor="k",
        )

    ax.set_title(f"{y_attribute} vs {x_attribute}")
    ax.set_xlabel(x_attribute)
    ax.set_ylabel(y_attribute)
    ax.grid(alpha=0.3)

    return fig

plot_statistics(layer, stats_dict, figsize=(12, 8), kind='bar', y_log=False)

Plot statistics from a statistics dictionary.

Parameters:

layer : Layer Layer the statistics are calculated for stats_dict : dict Dictionary with statistics (from attach_* functions) figsize : tuple Figure size kind : str Plot type: 'bar', 'line', or 'pie' y_log : bool Whether to use logarithmic scale for y-axis

Returns:

fig : matplotlib.figure.Figure Figure object

Source code in nickyspatial/viz/charts.py
def plot_statistics(layer, stats_dict, figsize=(12, 8), kind="bar", y_log=False):
    """Plot statistics from a statistics dictionary.

    Parameters:
    -----------
    layer : Layer
        Layer the statistics are calculated for
    stats_dict : dict
        Dictionary with statistics (from attach_* functions)
    figsize : tuple
        Figure size
    kind : str
        Plot type: 'bar', 'line', or 'pie'
    y_log : bool
        Whether to use logarithmic scale for y-axis

    Returns:
    --------
    fig : matplotlib.figure.Figure
        Figure object
    """
    flat_stats = {}

    def _flatten_dict(d, prefix=""):
        for key, value in d.items():
            if isinstance(value, dict):
                _flatten_dict(value, f"{prefix}{key}_")
            else:
                flat_stats[f"{prefix}{key}"] = value

    _flatten_dict(stats_dict)

    fig, ax = plt.subplots(figsize=figsize)

    if kind == "pie" and "class_percentages" in stats_dict:
        percentages = stats_dict["class_percentages"]
        values = list(percentages.values())
        labels = list(percentages.keys())

        ax.pie(values, labels=labels, autopct="%1.1f%%", startangle=90, shadow=True)
        ax.axis("equal")
        ax.set_title("Class Distribution")

    elif kind == "pie" and "percentages" in flat_stats:
        percentages = pd.Series(flat_stats).filter(like="percentage")
        values = percentages.values
        labels = [label.replace("_percentage", "") for label in percentages.index]

        ax.pie(values, labels=labels, autopct="%1.1f%%", startangle=90, shadow=True)
        ax.axis("equal")
        ax.set_title("Distribution")

    else:
        stats_df = pd.DataFrame({"Metric": list(flat_stats.keys()), "Value": list(flat_stats.values())})

        if kind != "line":
            stats_df = stats_df.sort_values("Value", ascending=False)

        if kind == "bar":
            sns.barplot(x="Metric", y="Value", data=stats_df, ax=ax)
            ax.set_xticklabels(ax.get_xticklabels(), rotation=45, ha="right")
        elif kind == "line":
            sns.lineplot(x="Metric", y="Value", data=stats_df, ax=ax, marker="o")
            ax.set_xticklabels(ax.get_xticklabels(), rotation=45, ha="right")

        if y_log:
            ax.set_yscale("log")

        ax.set_title("Statistics Summary")

    plt.tight_layout()
    return fig

Functions to create maps and visualize layers.

plot_classification(layer, class_field='classification', figsize=(12, 10), legend=True, class_color=None)

Plot classified segments with different colors for each class.

Source code in nickyspatial/viz/maps.py
def plot_classification(layer, class_field="classification", figsize=(12, 10), legend=True, class_color=None):
    """Plot classified segments with different colors for each class."""
    fig, ax = plt.subplots(figsize=figsize)
    if not class_color:
        class_color = {}

    if class_field not in layer.objects.columns:
        raise ValueError(f"Class field '{class_field}' not found in layer objects")

    class_values = [v for v in layer.objects[class_field].unique() if v is not None]

    # generate base colormap
    base_colors = plt.cm.tab20(np.linspace(0, 1, max(len(class_values), 1)))

    colors_list = []
    for idx, class_value in enumerate(class_values):
        if class_color and class_value in list(class_color.keys()):
            # reuse stored color
            color_hex = class_color[class_value]
        else:
            # assign new color (from tab20 or random if exceeds)
            if idx < len(base_colors):
                rgb = base_colors[idx][:3]
                color_hex = "#{:02x}{:02x}{:02x}".format(int(rgb[0] * 255), int(rgb[1] * 255), int(rgb[2] * 255))
            else:
                color_hex = "#{:06x}".format(random.randint(0, 0xFFFFFF))
            class_color[class_value] = color_hex

        # convert hex → RGB tuple for ListedColormap
        rgb_tuple = tuple(int(color_hex[i : i + 2], 16) / 255 for i in (1, 3, 5))
        colors_list.append(rgb_tuple)

    # create colormap
    cmap = ListedColormap(colors_list)

    # map class values to indices
    class_map = {value: i for i, value in enumerate(class_values)}
    layer.objects["_class_id"] = layer.objects[class_field].map(class_map)

    layer.objects.plot(
        column="_class_id",
        cmap=cmap,
        ax=ax,
        edgecolor="black",
        linewidth=0.5,
        legend=False,
    )

    if legend and len(class_values) > 0:
        patches = [mpatches.Patch(color=class_color[value], label=value) for value in class_values]
        ax.legend(handles=patches, loc="upper right", title=class_field)

    # ax.set_title(f"Classification by {class_field}")
    ax.set_title("Classification Map")

    ax.set_xlabel("X Coordinate")
    ax.set_ylabel("Y Coordinate")

    # cleanup temporary column
    if "_class_id" in layer.objects.columns:
        layer.objects = layer.objects.drop(columns=["_class_id"])

    return fig

plot_comparison(before_layer, after_layer, attribute=None, class_field=None, figsize=(16, 8), title=None)

Plot before and after views of layers for comparison.

Source code in nickyspatial/viz/maps.py
def plot_comparison(
    before_layer,
    after_layer,
    attribute=None,
    class_field=None,
    figsize=(16, 8),
    title=None,
):
    """Plot before and after views of layers for comparison."""
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=figsize)

    if title:
        fig.suptitle(title)

    if attribute and attribute in before_layer.objects.columns:
        before_layer.objects.plot(column=attribute, ax=ax1, legend=True)
        ax1.set_title(f"Before: {attribute}")
    elif class_field and class_field in before_layer.objects.columns:
        class_values = [v for v in before_layer.objects[class_field].unique() if v is not None]
        num_classes = len(class_values)
        colors = plt.cm.tab20(np.linspace(0, 1, max(num_classes, 1)))
        cmap = ListedColormap(colors)
        class_map = {value: i for i, value in enumerate(class_values)}
        before_layer.objects["_class_id"] = before_layer.objects[class_field].map(class_map)

        before_layer.objects.plot(
            column="_class_id",
            cmap=cmap,
            ax=ax1,
            edgecolor="black",
            linewidth=0.5,
            legend=False,
        )

        patches = [mpatches.Patch(color=colors[i], label=value) for i, value in enumerate(class_values)]
        ax1.legend(handles=patches, loc="upper right", title=class_field)
        ax1.set_title(f"Before: {class_field}")
    else:
        before_layer.objects.plot(ax=ax1)
        ax1.set_title("Before")

    if attribute and attribute in after_layer.objects.columns:
        after_layer.objects.plot(column=attribute, ax=ax2, legend=True)
        ax2.set_title(f"After: {attribute}")
    elif class_field and class_field in after_layer.objects.columns:
        class_values = [v for v in after_layer.objects[class_field].unique() if v is not None]
        num_classes = len(class_values)
        colors = plt.cm.tab20(np.linspace(0, 1, max(num_classes, 1)))
        cmap = ListedColormap(colors)
        class_map = {value: i for i, value in enumerate(class_values)}
        after_layer.objects["_class_id"] = after_layer.objects[class_field].map(class_map)

        after_layer.objects.plot(
            column="_class_id",
            cmap=cmap,
            ax=ax2,
            edgecolor="black",
            linewidth=0.5,
            legend=False,
        )

        patches = [mpatches.Patch(color=colors[i], label=value) for i, value in enumerate(class_values)]
        ax2.legend(handles=patches, loc="upper right", title=class_field)
        ax2.set_title(f"After: {class_field}")
    else:
        after_layer.objects.plot(ax=ax2)
        ax2.set_title("After")

    if "_class_id" in before_layer.objects.columns:
        before_layer.objects = before_layer.objects.drop(columns=["_class_id"])
    if "_class_id" in after_layer.objects.columns:
        after_layer.objects = after_layer.objects.drop(columns=["_class_id"])

    return fig

plot_layer(layer, image_data=None, attribute=None, title=None, rgb_bands=(2, 1, 0), figsize=(12, 10), cmap='viridis', show_boundaries=False)

Plot a layer, optionally with an attribute or image backdrop.

Source code in nickyspatial/viz/maps.py
def plot_layer(
    layer,
    image_data=None,
    attribute=None,
    title=None,
    rgb_bands=(2, 1, 0),
    figsize=(12, 10),
    cmap="viridis",
    show_boundaries=False,
):
    """Plot a layer, optionally with an attribute or image backdrop."""
    fig, ax = plt.subplots(figsize=figsize)

    if title:
        ax.set_title(title)
    elif attribute:
        ax.set_title(f"{attribute} by Segment")
    else:
        ax.set_title("Layer Visualization")

    if image_data is not None:
        num_bands = image_data.shape[0]
        if num_bands >= 3 and max(rgb_bands) < num_bands:
            r = image_data[rgb_bands[0]]
            g = image_data[rgb_bands[1]]
            b = image_data[rgb_bands[2]]

            r_norm = np.clip((r - r.min()) / (r.max() - r.min() + 1e-10), 0, 1)
            g_norm = np.clip((g - g.min()) / (g.max() - g.min() + 1e-10), 0, 1)
            b_norm = np.clip((b - b.min()) / (b.max() - b.min() + 1e-10), 0, 1)

            rgb = np.stack([r_norm, g_norm, b_norm], axis=2)

            ax.imshow(rgb)
        else:
            gray = image_data[0]
            gray_norm = (gray - gray.min()) / (gray.max() - gray.min() + 1e-10)
            ax.imshow(gray_norm, cmap="gray")

    if attribute and attribute in layer.objects.columns:
        layer.objects.plot(
            column=attribute,
            cmap=cmap,
            ax=ax,
            legend=True,
            alpha=0.7 if image_data is not None else 1.0,
        )

    if show_boundaries and layer.raster is not None:
        from skimage.segmentation import mark_boundaries

        if image_data is not None:
            if "num_bands" in locals() and num_bands >= 3:
                base_img = rgb
            else:
                gray = image_data[0]
                gray_norm = (gray - gray.min()) / (gray.max() - gray.min() + 1e-10)
                base_img = np.stack([gray_norm, gray_norm, gray_norm], axis=2)

            bounded = mark_boundaries(base_img, layer.raster, color=(1, 1, 0), mode="thick")

            if attribute is None:
                ax.imshow(bounded)
        else:
            ax.imshow(
                mark_boundaries(
                    np.zeros((layer.raster.shape[0], layer.raster.shape[1], 3)),
                    layer.raster,
                    color=(1, 1, 0),
                    mode="thick",
                )
            )

    ax.grid(alpha=0.3)
    return fig

plot_layer_interactive(layer, image_data=None, figsize=(10, 8))

Interactive plot of a layer with widgets and working click.

Source code in nickyspatial/viz/maps.py
def plot_layer_interactive(layer, image_data=None, figsize=(10, 8)):
    """Interactive plot of a layer with widgets and working click."""
    """
    %matplotlib widget
    plot_layer_interactive(layer=segmentation_layer,image_data=image_data,figsize=(10,8))
    Not supported in google collab
    """
    # attribute_options = [None] + list(layer.objects.columns)
    # attribute_widget = widgets.Dropdown(
    #     options=attribute_options,
    #     value=None,
    #     description='Attribute:'
    # )

    title_widget = widgets.Text(value="Layer Visualization", description="Title:")

    # cmap_widget = widgets.Dropdown(
    #     options=plt.colormaps(),
    #     value='viridis',
    #     description='Colormap:'
    # )

    rgb_band_max = image_data.shape[0] - 1 if image_data is not None else 2

    # # Ensure that the default value is within the valid range
    # default_value = (2, 1, 0) if rgb_band_max >= 2 else (0,)

    # # Create the widget with the correct range and default value
    # red_band_widget = widgets.Select(
    #     options=list(range(rgb_band_max + 1)),  # This ensures valid options based on the shape of the image
    #     value=default_value[:rgb_band_max + 1],  # This makes sure the default value matches the options available
    #     description='Red Band:'
    # )

    # rgb_band_max = image_data.shape[0] - 1 if image_data is not None else 2
    red_band_widget = widgets.Select(
        options=list(range(rgb_band_max + 1)), value=0 if rgb_band_max >= 2 else 0, description="Red Band:"
    )
    green_band_widget = widgets.Select(
        options=list(range(rgb_band_max + 1)), value=1 if rgb_band_max >= 2 else 0, description="Green Band:"
    )
    blue_band_widget = widgets.Select(
        options=list(range(rgb_band_max + 1)), value=2 if rgb_band_max >= 2 else 0, description="Blue Band:"
    )

    show_boundaries_widget = widgets.Checkbox(value=True, description="Show Boundaries")

    # Create a figure and output widget
    fig, ax = plt.subplots(figsize=figsize)
    out_fig = widgets.Output()

    # with out_fig:
    #     display(fig)

    def onclick(event):
        if event.xdata is None or event.ydata is None:
            return
        x_pix, y_pix = int(event.xdata), int(event.ydata)
        if (0 <= x_pix < layer.raster.shape[1]) and (0 <= y_pix < layer.raster.shape[0]):
            segment_id = layer.raster[y_pix, x_pix]
            msg = f"Clicked at (x={x_pix}, y={y_pix}) → Segment ID: {segment_id}"
            title_widget.value = msg
            ax.set_title(msg)
            fig.canvas.draw_idle()
        else:
            msg = "Clicked outside raster bounds"
            title_widget.value = msg
            ax.set_title(msg)
            fig.canvas.draw_idle()

    fig.canvas.mpl_connect("button_press_event", onclick)

    def update_plot(red_band, green_band, blue_band, show_boundaries):
        # def update_plot(attribute, title, cmap, rgb_bands, show_boundaries):

        ax.clear()
        # ax.set_title(title)

        if image_data is not None:
            num_bands = image_data.shape[0]
            if red_band >= 0 and green_band >= 0 and blue_band >= 0:
                r = image_data[red_band].astype(float)
                g = image_data[green_band].astype(float)
                b = image_data[blue_band].astype(float)

                r_norm = np.clip((r - r.min()) / (r.max() - r.min() + 1e-10), 0, 1)
                g_norm = np.clip((g - g.min()) / (g.max() - g.min() + 1e-10), 0, 1)
                b_norm = np.clip((b - b.min()) / (b.max() - b.min() + 1e-10), 0, 1)

                rgb = np.stack([r_norm, g_norm, b_norm], axis=2)
                ax.imshow(rgb)
            else:
                gray = image_data[0]
                gray_norm = (gray - gray.min()) / (gray.max() - gray.min() + 1e-10)
                ax.imshow(gray_norm, cmap="gray")

        # if attribute and attribute in layer.objects.columns:
        #     layer.objects.plot(
        #         column=attribute,
        #         cmap=cmap,
        #         ax=ax,
        #         legend=True,
        #         alpha=0.7 if image_data is not None else 1.0,
        #     )

        if show_boundaries and layer.raster is not None:  # show_boundaries and
            if image_data is not None:
                if num_bands >= 3:  # and len(rgb_bands) >= 3:
                    base_img = rgb
                else:
                    gray = image_data[0]
                    gray_norm = (gray - gray.min()) / (gray.max() - gray.min() + 1e-10)
                    base_img = np.stack([gray_norm, gray_norm, gray_norm], axis=2)

                bounded = mark_boundaries(base_img, layer.raster, color=(1, 1, 0), mode="thick")
                # if attribute is None:
                ax.imshow(bounded)
            else:
                ax.imshow(
                    mark_boundaries(
                        np.zeros((layer.raster.shape[0], layer.raster.shape[1], 3)),
                        layer.raster,
                        color=(1, 1, 0),
                        mode="thick",
                    )
                )

        ax.grid(alpha=0.3)
        fig.canvas.draw_idle()

    # Zoom control widgets (manual)
    # zoom_in_button = widgets.Button(description="Zoom In")
    # zoom_out_button = widgets.Button(description="Zoom Out")

    # def zoom_in(change):
    #     xlim, ylim = ax.get_xlim(), ax.get_ylim()
    #     ax.set_xlim(xlim[0] * 0.9, xlim[1] * 0.9)
    #     ax.set_ylim(ylim[0] * 0.9, ylim[1] * 0.9)
    #     fig.canvas.draw_idle()

    # def zoom_out(change):
    #     xlim, ylim = ax.get_xlim(), ax.get_ylim()
    #     ax.set_xlim(xlim[0] * 1.1, xlim[1] * 1.1)
    #     ax.set_ylim(ylim[0] * 1.1, ylim[1] * 1.1)
    #     fig.canvas.draw_idle()

    # zoom_in_button.on_click(zoom_in)
    # zoom_out_button.on_click(zoom_out)

    ui = widgets.VBox(
        [
            # attribute_widget,
            # title_widget,
            # cmap_widget,
            red_band_widget,
            green_band_widget,
            blue_band_widget,
            show_boundaries_widget,
            # zoom_in_button,
            # zoom_out_button,
        ]
    )

    controls = widgets.interactive_output(
        update_plot,
        {
            # 'attribute': attribute_widget,
            # 'title': title_widget,
            # 'cmap': cmap_widget,
            "red_band": red_band_widget,
            "green_band": green_band_widget,
            "blue_band": blue_band_widget,
            "show_boundaries": show_boundaries_widget,
        },
    )

    display(ui, out_fig, controls)

plot_layer_interactive_plotly(layer, image_data, rgb_bands=(0, 1, 2), show_boundaries=True, figsize=(800, 400))

Display an interactive RGB image with segment boundaries and hoverable segment IDs using Plotly.

Run in google collab as well.

Parameters:

layer : object An object with a .raster attribute representing the labeled segmentation layer (e.g., output from a segmentation algorithm, such as SLIC). image_data : image data to be visualized. rgb_bands : tuple of int, optional Tuple of three integers specifying which bands to use for the RGB composite (default is (0, 1, 2)). show_boundaries : bool, optional Whether to overlay the segment boundaries on the RGB image (default is True). figsize : tuple of int, optional Tuple specifying the width and height of the interactive Plotly figure in pixels (default is (800, 400)).

Returns:

None The function displays the interactive plot directly in the output cell in a Jupyter Notebook.

Notes:

  • Segment boundaries are drawn using skimage.segmentation.mark_boundaries.
  • Hovering over the image displays the segment ID from layer.raster.
Source code in nickyspatial/viz/maps.py
def plot_layer_interactive_plotly(layer, image_data, rgb_bands=(0, 1, 2), show_boundaries=True, figsize=(800, 400)):
    """Display an interactive RGB image with segment boundaries and hoverable segment IDs using Plotly.

    Run in google collab as well.

    Parameters:
    ----------
    layer : object
        An object with a `.raster` attribute representing the labeled segmentation layer
        (e.g., output from a segmentation algorithm, such as SLIC).
    image_data : image data to be visualized.
    rgb_bands : tuple of int, optional
        Tuple of three integers specifying which bands to use for the RGB composite (default is (0, 1, 2)).
    show_boundaries : bool, optional
        Whether to overlay the segment boundaries on the RGB image (default is True).
    figsize : tuple of int, optional
        Tuple specifying the width and height of the interactive Plotly figure in pixels (default is (800, 400)).

    Returns:
    -------
    None
        The function displays the interactive plot directly in the output cell in a Jupyter Notebook.

    Notes:
    -----
    - Segment boundaries are drawn using `skimage.segmentation.mark_boundaries`.
    - Hovering over the image displays the segment ID from `layer.raster`.

    """

    def get_rgb_image(r, g, b):
        r_norm = np.clip((r - r.min()) / (r.max() - r.min() + 1e-10), 0, 1)
        g_norm = np.clip((g - g.min()) / (g.max() - g.min() + 1e-10), 0, 1)
        b_norm = np.clip((b - b.min()) / (b.max() - b.min() + 1e-10), 0, 1)
        return np.stack([r_norm, g_norm, b_norm], axis=2)

    def update_plot(rgb_bands, show_boundaries=True):
        rgb_image = get_rgb_image(image_data[rgb_bands[0]], image_data[rgb_bands[1]], image_data[rgb_bands[2]])

        if show_boundaries:
            rgb_image = mark_boundaries(rgb_image, layer.raster, color=(1, 1, 0), mode="thick")

        fig = go.Figure(data=go.Image(z=(rgb_image * 255).astype(np.uint8)))

        # Add segment ID overlay with hover
        fig.add_trace(
            go.Heatmap(
                z=layer.raster,
                opacity=0,
                hoverinfo="z",
                showscale=False,
                hovertemplate="Segment ID: %{z}<extra></extra>",
                colorscale="gray",
            )
        )

        fig.update_layout(
            title="Hover to see Segment ID", dragmode="pan", margin=dict(l=0, r=0, t=30, b=0), height=figsize[1], width=figsize[0]
        )
        fig.update_xaxes(showticklabels=False)
        fig.update_yaxes(showticklabels=False, scaleanchor="x")

        fig.show()

    update_plot(rgb_bands=rgb_bands, show_boundaries=show_boundaries)

plot_sample(layer, image_data=None, transform=None, rgb_bands=None, class_field='classification', figsize=(8, 6), class_color=None, legend=True)

Plot classified segments on top of RGB or grayscale image data.

Parameters: - layer: Layer object with .objects (GeoDataFrame) - image_data: 3D numpy array (bands, height, width) - transform: Affine transform for the image (needed to compute extent) - red_band, green_band, blue_band: indices for RGB bands

Source code in nickyspatial/viz/maps.py
def plot_sample(
    layer,
    image_data=None,
    transform=None,
    rgb_bands=None,
    class_field="classification",
    figsize=(8, 6),
    class_color=None,
    legend=True,
):
    """Plot classified segments on top of RGB or grayscale image data.

    Parameters:
    - layer: Layer object with .objects (GeoDataFrame)
    - image_data: 3D numpy array (bands, height, width)
    - transform: Affine transform for the image (needed to compute extent)
    - red_band, green_band, blue_band: indices for RGB bands
    """
    fig, ax = plt.subplots(figsize=figsize)

    # ---- Plot RGB or Grayscale image from array ----
    if image_data is not None:
        num_bands = image_data.shape[0]
        if rgb_bands and num_bands >= 3:
            r = image_data[rgb_bands[0]].astype(float)
            g = image_data[rgb_bands[1]].astype(float)
            b = image_data[rgb_bands[2]].astype(float)

            r_norm = np.clip((r - r.min()) / (r.max() - r.min() + 1e-10), 0, 1)
            g_norm = np.clip((g - g.min()) / (g.max() - g.min() + 1e-10), 0, 1)
            b_norm = np.clip((b - b.min()) / (b.max() - b.min() + 1e-10), 0, 1)

            rgb = np.stack([r_norm, g_norm, b_norm], axis=2)
            if transform:
                from rasterio.plot import plotting_extent

                extent = plotting_extent(image_data[0], transform=transform)
                ax.imshow(rgb, extent=extent)
            else:
                ax.imshow(rgb)
        else:
            gray = image_data[0]
            gray_norm = (gray - gray.min()) / (gray.max() - gray.min() + 1e-10)
            if transform:
                from rasterio.plot import plotting_extent

                extent = plotting_extent(gray, transform=transform)
                ax.imshow(gray_norm, cmap="gray", extent=extent)
            else:
                ax.imshow(gray_norm, cmap="gray")

    # ---- Plot classification overlay ----
    gdf = layer.objects.copy()
    if gdf.crs is None:
        raise ValueError("GeoDataFrame has no CRS")

    if not class_color:
        class_color = {}
    if class_field not in gdf.columns:
        raise ValueError(f"Class field '{class_field}' not found")

    class_values = [v for v in gdf[class_field].unique() if v is not None]
    base_colors = plt.cm.tab20(np.linspace(0, 1, max(len(class_values), 1)))
    class_map = {}

    for idx, class_value in enumerate(class_values):
        if class_value in class_color:
            color_hex = class_color[class_value]
        else:
            rgb_val = base_colors[idx % len(base_colors)][:3]
            color_hex = "#{:02x}{:02x}{:02x}".format(int(rgb_val[0] * 255), int(rgb_val[1] * 255), int(rgb_val[2] * 255))
            class_color[class_value] = color_hex
        class_map[class_value] = color_hex

    for class_value in class_values:
        gdf[gdf[class_field] == class_value].plot(ax=ax, facecolor=class_map[class_value], edgecolor="black", linewidth=0.5)

    if legend:
        handles = [mpatches.Patch(color=class_map[val], label=val) for val in class_values]
        ax.legend(handles=handles, loc="upper right", title=class_field)

    ax.set_title("Sample Data Visualization")
    ax.set_axis_off()
    return fig