Skip to content

API Reference

NickySpatial: An open-source object-based image analysis library for remote sensing

NickySpatial is a Python package for object-based image analysis, providing functionality similar to commercial software like eCognition.

Key features: - Multiresolution segmentation - Object-based analysis - Rule-based classification - Statistics calculation - Integration with geospatial data formats

nickyspatial.core

The core package encompasses fundamental data structures and algorithms for nickyspatial.

It helps define the building blocks like layers, segmentation methods, and rule-based logic for object analysis workflows.

Implements supervised classification algorithms to classify the segments.

SupervisedClassifier

Implementation of Supervised Classification algorithm.

Source code in nickyspatial/core/classifier.py
class SupervisedClassifier:
    """Implementation of Supervised Classification algorithm."""

    def __init__(self, name=None, classifier_type="Random Forests", classifier_params=None):
        """Initialize the segmentation algorithm.

        Parameters:
        -----------
        scale : str
            classifier type name eg: RF for Random Forest, SVC for Support Vector Classifier
        classifier_params : dict
           additional parameters relayed to classifier
        """
        self.classifier_type = classifier_type
        self.classifier_params = classifier_params
        self.training_layer = None
        self.classifier = None
        self.name = name if name else "Supervised_Classification"
        self.features = None

    def _training_sample(self, layer, samples):
        """Create vector objects from segments.

        Parameters:
        -----------
        samples : dict
            key: class_name
            values: list of segment_ids
            eg: {"cropland":[1,2,3],"built-up":[4,5,6]}

        Returns:
        --------
        segment_objects : geopandas.GeoDataFrame
            GeoDataFrame with segment polygons
        """
        layer["classification"] = None

        for class_name in samples.keys():
            layer.loc[layer["segment_id"].isin(samples[class_name]), "classification"] = class_name

        layer = layer[layer["classification"].notna()]
        self.training_layer = layer
        return layer

    def _train(self, features):
        """Train the classifier using the training samples and compute accuracy and feature importances.

        Parameters
        ----------
        features : list of str or None
            List of feature column names to use. If None, all columns except segment_id, geometry, and classification are used.

        Returns:
        -------
        classifier : sklearn classifier object
            The trained classifier.
        test_accuracy : float
            Accuracy score on training data.
        feature_importances : pd.Series or None
            Feature importances (only for Random Forest), else None.
        """
        self.features = features
        if not self.features:
            self.features = self.training_layer.columns
        self.features = [col for col in self.features if col not in ["segment_id", "classification", "geometry"]]

        x = self.training_layer[self.features]
        y = self.training_layer["classification"]

        if self.classifier_type == "Random Forest":
            self.classifier = RandomForestClassifier(**self.classifier_params)
            self.classifier.fit(x, y)
            if hasattr(self.classifier, "oob_score_") and self.classifier.oob_score_:
                test_accuracy = self.classifier.oob_score_
            else:
                test_accuracy = self.classifier.score(x, y)
            feature_importances = pd.Series(self.classifier.feature_importances_, index=self.features) * 100
            feature_importances = feature_importances.sort_values(ascending=False)

        elif self.classifier_type == "SVC":
            self.classifier = SVC(**self.classifier_params)
            self.classifier.fit(x, y)
            predictions = self.classifier.predict(x)
            test_accuracy = accuracy_score(y, predictions)
            feature_importances = None

        elif self.classifier_type == "KNN":
            self.classifier = KNeighborsClassifier(**self.classifier_params)
            self.classifier.fit(x, y)
            predictions = self.classifier.predict(x)
            test_accuracy = accuracy_score(y, predictions)
            feature_importances = None

        else:
            raise ValueError(f"Unsupported classifier type: {self.classifier_type}")

        return self.classifier, test_accuracy, feature_importances

    def _prediction(self, layer):
        """Perform classification prediction on input layer features.

        Parameters
        ----------
        layer : geopandas.GeoDataFrame
            Input data containing at least a 'segment_id' and 'geometry' column, along with
            feature columns required by the classifier. If a 'classification' column does not
            exist, it will be created.

        Returns:
        -------
        The input layer with an updated 'classification' column containing predicted labels.

        """
        layer["classification"] = ""
        x = layer[self.features]

        predictions = self.classifier.predict(x)
        layer.loc[layer["classification"] == "", "classification"] = predictions
        return layer

    def execute(self, source_layer, samples, layer_manager=None, layer_name=None, features=None):
        """Execute the supervised classification workflow on the source layer.

        This method creates a new layer by copying the input source layer, training a classifier
        using provided samples, predicting classifications, and storing the results in a new layer.
        Optionally, the resulting layer can be added to a layer manager.

        Parameters
        ----------
        source_layer : Layer
            The input layer containing spatial objects and metadata (transform, CRS, raster).
        samples : dict
            A dictionary of training samples where keys are class labels and values are lists
            of segment IDs or features used for training. Default is an empty dictionary.
        layer_manager : LayerManager, optional
            An optional layer manager object used to manage and store the resulting layer.
        layer_name : str, optional
            The name to assign to the resulting classified layer.

        Returns:
        -------
        Layer
            A new Layer object containing the predicted classifications, copied metadata from
            the source layer, and updated attributes.
        """
        result_layer = Layer(name=layer_name, parent=source_layer, layer_type="merged")
        result_layer.transform = source_layer.transform
        result_layer.crs = source_layer.crs
        result_layer.raster = source_layer.raster.copy() if source_layer.raster is not None else None

        layer = source_layer.objects.copy()
        self._training_sample(layer, samples)
        _, accuracy, feature_importances = self._train(features)

        layer = self._prediction(layer)

        result_layer.objects = layer

        result_layer.metadata = {"supervised classification": self.name, "classifier_params": self.classifier_params}

        if layer_manager:
            layer_manager.add_layer(result_layer)

        return result_layer, accuracy, feature_importances

__init__(name=None, classifier_type='Random Forests', classifier_params=None)

Initialize the segmentation algorithm.

Parameters:

scale : str classifier type name eg: RF for Random Forest, SVC for Support Vector Classifier classifier_params : dict additional parameters relayed to classifier

Source code in nickyspatial/core/classifier.py
def __init__(self, name=None, classifier_type="Random Forests", classifier_params=None):
    """Initialize the segmentation algorithm.

    Parameters:
    -----------
    scale : str
        classifier type name eg: RF for Random Forest, SVC for Support Vector Classifier
    classifier_params : dict
       additional parameters relayed to classifier
    """
    self.classifier_type = classifier_type
    self.classifier_params = classifier_params
    self.training_layer = None
    self.classifier = None
    self.name = name if name else "Supervised_Classification"
    self.features = None

execute(source_layer, samples, layer_manager=None, layer_name=None, features=None)

Execute the supervised classification workflow on the source layer.

This method creates a new layer by copying the input source layer, training a classifier using provided samples, predicting classifications, and storing the results in a new layer. Optionally, the resulting layer can be added to a layer manager.

Parameters

source_layer : Layer The input layer containing spatial objects and metadata (transform, CRS, raster). samples : dict A dictionary of training samples where keys are class labels and values are lists of segment IDs or features used for training. Default is an empty dictionary. layer_manager : LayerManager, optional An optional layer manager object used to manage and store the resulting layer. layer_name : str, optional The name to assign to the resulting classified layer.

Returns:

Layer A new Layer object containing the predicted classifications, copied metadata from the source layer, and updated attributes.

Source code in nickyspatial/core/classifier.py
def execute(self, source_layer, samples, layer_manager=None, layer_name=None, features=None):
    """Execute the supervised classification workflow on the source layer.

    This method creates a new layer by copying the input source layer, training a classifier
    using provided samples, predicting classifications, and storing the results in a new layer.
    Optionally, the resulting layer can be added to a layer manager.

    Parameters
    ----------
    source_layer : Layer
        The input layer containing spatial objects and metadata (transform, CRS, raster).
    samples : dict
        A dictionary of training samples where keys are class labels and values are lists
        of segment IDs or features used for training. Default is an empty dictionary.
    layer_manager : LayerManager, optional
        An optional layer manager object used to manage and store the resulting layer.
    layer_name : str, optional
        The name to assign to the resulting classified layer.

    Returns:
    -------
    Layer
        A new Layer object containing the predicted classifications, copied metadata from
        the source layer, and updated attributes.
    """
    result_layer = Layer(name=layer_name, parent=source_layer, layer_type="merged")
    result_layer.transform = source_layer.transform
    result_layer.crs = source_layer.crs
    result_layer.raster = source_layer.raster.copy() if source_layer.raster is not None else None

    layer = source_layer.objects.copy()
    self._training_sample(layer, samples)
    _, accuracy, feature_importances = self._train(features)

    layer = self._prediction(layer)

    result_layer.objects = layer

    result_layer.metadata = {"supervised classification": self.name, "classifier_params": self.classifier_params}

    if layer_manager:
        layer_manager.add_layer(result_layer)

    return result_layer, accuracy, feature_importances

SupervisedClassifierDL

Implementation of deep learning based supervised classification.

Source code in nickyspatial/core/classifier.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
class SupervisedClassifierDL:
    """Implementation of deep learning based supervised classification."""

    def __init__(self, name="CNN_Classification", classifier_type="Convolution Neural Network (CNN)", classifier_params=None):
        """Initialize a Convolutional Neural Network (CNN) classifier.

        Parameters
        ----------
        name : str, optional
            Custom name for the classification layer. Defaults to "CNN_Classification" if None.
        classifier_type : str, optional
            Type of classifier. Defaults to "Convolution Neural Network (CNN)".
        classifier_params : dict, optional
            Dictionary of training parameters for the CNN.
            If None, the following defaults are used:
                - epochs (int): 50
                - batch_size (int): 32
                - patch_size (tuple of int): (5, 5)
                - early_stopping_patience (int): 5
        """
        if not HAS_TENSORFLOW:
            raise ImportError("TensorFlow is required for CNN classification. Install it with: uv add --group cnn tensorflow")

        self.name = name
        self.classifier_type = classifier_type
        self.classifier_params = (
            classifier_params
            if classifier_params
            else {"epochs": 50, "batch_size": 32, "patch_size": (5, 5), "early_stopping_patience": 5}
        )
        self.model = None
        self.le = LabelEncoder()

    def _extract_training_patches(self, image, segments, samples):
        """Extract fixed-size training patches from an image based on segment IDs and labeled samples.

        This method iterates over the provided labeled segments, extract all the possible image patches
        of the specified patch size.

        Parameters
        ----------
        image : np.ndarray
            Input image as a NumPy array of shape (C, H, W) where C is the number of channels.
        segments : object
            Segmentation object containing a `raster` attribute (2D array of segment IDs).
        samples : dict
            Dictionary mapping class labels to lists of segment IDs, e.g.,
            {
                "class_1": [1, 5, 9],
                "class_2": [2, 6, 10]
            }.

        Returns:
        -------
        patches : np.ndarray
            Array of extracted patches of shape (N, patch_height, patch_width, C),
            where N is the number of valid patches.
        labels : list
            List of class labels corresponding to each patch.
        counts_dict : dict
             Dictionary mapping each unique class label to the number of occurrences in `labels`.

        Notes:
        -----
        - Patch size is taken from `self.classifier_params["patch_size"]` if provided;
          otherwise defaults to (5, 5).
        - Only patches where all pixels belong to the same segment ID are included.
        """
        image = np.moveaxis(image, 0, -1)
        patches = []
        labels = []
        patch_size = self.classifier_params["patch_size"] if "patch_size" in self.classifier_params.keys() else (5, 5)

        props = regionprops(segments.raster)
        segment_id_to_region = {prop.label: prop for prop in props}

        for key in samples.keys():
            segment_ids = samples[key]
            for seg_id in segment_ids:
                incount = 0
                outcount = 0
                region = segment_id_to_region.get(seg_id)
                if region is None:
                    print(f"Segment id {seg_id} not found, skipping.")
                    continue

                bbox = region.bbox
                min_row, min_col, max_row, max_col = bbox[0], bbox[1], bbox[2], bbox[3]

                n_row_patches = (max_row - min_row) // patch_size[0]
                n_col_patches = (max_col - min_col) // patch_size[1]

                for i in range(n_row_patches):
                    for j in range(n_col_patches):
                        row_start = min_row + i * patch_size[0]
                        row_end = row_start + patch_size[0]

                        col_start = min_col + j * patch_size[1]
                        col_end = col_start + patch_size[1]

                        mask = segments.raster[row_start:row_end, col_start:col_end] == seg_id
                        if np.all(mask):
                            incount += 1
                            patch = image[row_start:row_end, col_start:col_end]
                            patches.append(patch)
                            labels.append(key)
                        else:
                            outcount += 1
        patches = np.array(patches)
        print(f"** Extracted {len(patches)} training patches of shape {patches.shape[1:]} **")
        counts = Counter(labels)
        counts_dict = dict(counts)
        print("** Class distribution: **")
        for cls, count in counts_dict.items():
            print(f"  - {cls}: {count} patches")
        return patches, labels, counts_dict

    def _extract_patches_for_prediction(self, image, segments):
        """Extract fixed-size patches from an image for prediction.

        This method iterates over each segment in the segmentation raster, identifies
        rectangular regions that match the specified patch size, and extracts those
        patches where all pixels belong to the same segment.

        Parameters
        ----------
        image : np.ndarray
            Input image as a NumPy array of shape (C, H, W), where C is the number of channels.
        segments : object
            Segmentation object containing a `raster` attribute (2D array of segment IDs).

        Returns:
        -------
        patches : np.ndarray
            List of extracted patches, each of shape (patch_height, patch_width, Channels).
        segment_ids : list of ids(int)
            List of segment IDs corresponding to each extracted patch.
        invalid_patches_segments_ids : list of int
            List of segment IDs for which no valid patches were found.

        Notes:
        -----
        - Patch size is taken from `self.classifier_params["patch_size"]`.
        - Only patches where all pixels belong to the same segment ID are included.
        - `segment_ids` and `patches` maintain the same ordering so that each patch
        can be mapped back to its original segment.
        """
        image = np.moveaxis(image, 0, -1)
        patches = []
        segment_ids = []
        invalid_patches_segments_ids = []
        patch_size = self.classifier_params["patch_size"]

        props = regionprops(segments.raster)

        for prop in props:
            incount = 0
            outcount = 0
            bbox = prop.bbox
            min_row, min_col, max_row, max_col = bbox[0], bbox[1], bbox[2], bbox[3]

            n_row_patches = (max_row - min_row) // patch_size[0]
            n_col_patches = (max_col - min_col) // patch_size[1]

            for i in range(n_row_patches):
                for j in range(n_col_patches):
                    row_start = min_row + i * patch_size[0]
                    row_end = row_start + patch_size[0]

                    col_start = min_col + j * patch_size[1]
                    col_end = col_start + patch_size[1]

                    mask = segments.raster[row_start:row_end, col_start:col_end] == prop.label
                    if np.all(mask):
                        incount += 1
                        patch = image[row_start:row_end, col_start:col_end]
                        patches.append(patch)
                        segment_ids.append(prop.label)
                    else:
                        outcount += 1
            if incount == 0:
                invalid_patches_segments_ids.append(prop.label)
        patches = np.array(patches)
        if invalid_patches_segments_ids:
            print(
                f"Error: Could not create patch for the following segments: {invalid_patches_segments_ids}",
                "\n Possible reasons: large patch_size or small segements",
            )
        return patches, segment_ids, invalid_patches_segments_ids

    def _create_cnn_model(self, input_shape, num_classes):
        """Define a CNN model."""
        hidden_layers_default = [
            {"filters": 32, "kernel_size": 3, "max_pooling": True},
            {"filters": 32, "kernel_size": 3, "max_pooling": True},
        ]

        hidden_layers_config = (
            self.classifier_params["hidden_layers_config"]
            if "hidden_layers_config" in self.classifier_params.keys()
            else hidden_layers_default
        )
        use_batch_norm = self.classifier_params["use_batch_norm"] if "use_batch_norm" in self.classifier_params.keys() else True
        dense_units = self.classifier_params["dense_units"] if "dense_units" in self.classifier_params.keys() else 64
        model = models.Sequential()
        model.add(layers.Input(shape=input_shape))

        current_height, current_width = input_shape[0], input_shape[1]

        for layer_cfg in hidden_layers_config:
            model.add(
                layers.Conv2D(
                    layer_cfg["filters"], (layer_cfg["kernel_size"], layer_cfg["kernel_size"]), activation="relu", padding="same"
                )
            )

            if use_batch_norm:
                model.add(layers.BatchNormalization())

            if layer_cfg.get("max_pooling", False) and current_height >= 4 and current_width >= 4:
                model.add(layers.MaxPooling2D((2, 2)))
                current_height = current_height // 2
                current_width = current_width // 2

        model.add(layers.Flatten())
        model.add(layers.Dense(dense_units, activation="relu"))
        model.add(layers.Dense(num_classes, activation="softmax"))
        model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
        return model

    def _train_model(self, patches_train, labels_train, patches_val, labels_val):
        """Train the CNN model using the provided training and validation datasets.

        This method fits the CNN model with early stopping and learning rate reduction
        callbacks to prevent overfitting and improve convergence. The best model weights
        are restored based on the lowest validation loss.

        Parameters
        ----------
        patches_train : np.ndarray
            Training image patches of shape (N, H, W, C).
        labels_train : np.ndarray
            Training labels corresponding to `patches_train`.
        patches_val : np.ndarray
            Validation image patches of shape (N, H, W, C).
        labels_val : np.ndarray
            Validation labels corresponding to `patches_val`.

        Returns:
        -------
        history : tensorflow.python.keras.callbacks.History
            Keras History object containing training and validation loss/accuracy metrics
            for each epoch.

        Notes:
        -----
        - Early stopping patience is taken from `self.classifier_params["early_stopping_patience"]`
          if available; otherwise defaults to 5.
        - The learning rate is reduced by a factor of 0.5 after `early_stopping_patience`
          epochs without improvement, with a minimum learning rate of 1e-7.
        - The method assumes that `self.model` has already been compiled.
        """
        early_stopping_patience = (
            self.classifier_params["early_stopping_patience"] if "early_stopping_patience" in self.classifier_params.keys() else 5
        )

        early_stopping = EarlyStopping(
            monitor="val_loss",
            patience=early_stopping_patience,
            restore_best_weights=True,
            verbose=1,
        )

        reduce_lr = ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=early_stopping_patience, min_lr=1e-7)

        history = self.model.fit(
            patches_train,
            labels_train,
            epochs=self.classifier_params["epochs"],
            batch_size=self.classifier_params["batch_size"],
            validation_data=(patches_val, labels_val),
            callbacks=[early_stopping, reduce_lr],
        )
        return history

    def _prediction(self, patches, segment_ids):
        """Predict class labels for image patches and aggregate results by segment ID.

        This method uses the trained CNN model to predict class probabilities for each
        patch, determines the most probable class, and then assigns a final label to
        each segment based on the majority vote of its corresponding patches.

        Parameters
        ----------
        patches : np.ndarray
            Image patches to classify, of shape (N, patch_height, patch_width, channels).
        segment_ids : list of int
            List of segment IDs corresponding to each patch in `patches`.

        Returns:
        -------
        final_segment_ids : list of int
            Unique segment IDs that received predictions.
        final_labels : list
            Predicted class labels for each segment, determined by majority voting.

        Notes:
        -----
        - For each segment, the label assigned is the one with the highest occurrence
        among its patches.
        """
        predictions = self.model.predict(patches)
        predicted_classes = predictions.argmax(axis=1)
        predicted_labels = self.le.inverse_transform(predicted_classes)

        segment_label_map = defaultdict(list)

        for seg_id, label in zip(segment_ids, predicted_labels, strict=False):
            segment_label_map[seg_id].append(label)

        final_segment_ids = []
        final_labels = []

        for seg_id, labels in segment_label_map.items():
            most_common_label = Counter(labels).most_common(1)[0][0]
            final_segment_ids.append(seg_id)
            final_labels.append(most_common_label)
        return final_segment_ids, final_labels

    def _evaluate(self, patches_test, labels_test):
        """Evaluate the trained CNN model on test data.

        This method predicts class labels for test patches, calculates accuracy,
        confusion matrix, and generates a detailed classification report. Results
        are printed and also returned as a dictionary.

        Parameters
        ----------
        patches_test : np.ndarray
            Test image patches of shape (N_test, patch_height, patch_width, channels).
        labels_test : np.ndarray
            Test labels corresponding to `patches_test`.

        Returns:
        -------
        results : dict
            Dictionary containing evaluation metrics:
            - "accuracy" : float, classification accuracy score.
            - "loss" : float, classification loss score.
            - "confusion_matrix" : np.ndarray, confusion matrix array.
            - "report" : str, text summary of precision, recall, f1-score for each class.

        """
        predictions = self.model.predict(patches_test)
        predicted_classes = predictions.argmax(axis=1)

        loss, accuracy_keras = self.model.evaluate(patches_test, labels_test, verbose=0)

        accuracy = accuracy_score(labels_test, predicted_classes)

        conf_matrix = confusion_matrix(labels_test, predicted_classes)

        report = classification_report(labels_test, predicted_classes, target_names=self.le.classes_)

        print("Evaluation Results:")
        print(f"Accuracy: {accuracy}")
        print(f"Loss: {loss}")
        print("Confusion Matrix:")
        print(conf_matrix)
        print("Classification Report:")
        print(report)
        return {"accuracy": accuracy, "loss": loss, "confusion_matrix": conf_matrix, "report": report}

    def execute(self, source_layer, samples, image_data, layer_manager=None, layer_name=None):
        """Perform CNN-based classification on image segments using labeled training samples.

        This method extracts training patches from the input image based on provided
        samples, trains a CNN model, evaluates it on test data, predicts labels for all
        segments, and stores the classification results in a new output layer.

        Parameters
        ----------
        source_layer : Layer
            Input spatial layer containing segments/objects to classify.
        samples : dict
            Dictionary mapping class names to lists of segment IDs used for training.
        image_data : np.ndarray
            Raster image data array from which patches are extracted for classification.
        layer_manager : LayerManager, optional
            manager object to register the output classification layer.
        layer_name : str, optional
            The name to assign to the resulting classified layer.

        Returns:
        -------
        result_layer : Layer
            New layer containing the original segments with a "classification" attribute
            representing predicted class labels.
        history : keras.callbacks.History
            Training history object containing loss and accuracy metrics per epoch.
        eval_result : dict
            Dictionary containing evaluation metrics such as accuracy, confusion matrix,
            and classification report on the test dataset.
        count_dict : dict
            Dictionary mapping class labels to the count of training patches extracted.
        invalid_patches_segments_ids : list
            List of segment IDs for which no valid patches could be extracted for prediction.
        """
        result_layer = Layer(name=layer_name, parent=source_layer, layer_type="merged")
        result_layer.transform = source_layer.transform
        result_layer.crs = source_layer.crs

        layer = source_layer.objects.copy()
        patches, labels, count_dict = self._extract_training_patches(image=image_data, segments=source_layer, samples=samples)

        indices = np.arange(len(patches))
        np.random.shuffle(indices)
        patches = patches[indices]
        labels = np.array(labels)[indices]

        labels_encoded = self.le.fit_transform(labels)
        num_classes = len(self.le.classes_)
        patches = patches.astype("float32") / 255.0

        patches_temp, patches_test, labels_temp, labels_test = train_test_split(
            patches, labels_encoded, test_size=0.3, random_state=42
        )

        patches_train, patches_val, labels_train, labels_val = train_test_split(
            patches_temp, labels_temp, test_size=0.2, random_state=42
        )

        input_shape = patches.shape[1:]
        num_classes = len(np.unique(labels))

        if self.classifier_type == "Convolution Neural Network (CNN)":
            self.model = self._create_cnn_model(input_shape, num_classes)

        history = self._train_model(patches_train, labels_train, patches_val, labels_val)

        patches_all, segment_ids, invalid_patches_segments_ids = self._extract_patches_for_prediction(
            image=image_data, segments=source_layer
        )

        eval_result = self._evaluate(patches_test, labels_test)

        patches_all = patches_all.astype("float32") / 255.0
        final_segment_ids, final_labels = self._prediction(patches_all, segment_ids)

        segment_to_label = dict(zip(final_segment_ids, final_labels, strict=False))
        layer["classification"] = ""
        layer["classification"] = layer["segment_id"].map(segment_to_label)

        result_layer.objects = layer
        result_layer.metadata = {"cnn_classification": self.name}

        if layer_manager:
            layer_manager.add_layer(result_layer)

        return result_layer, history, eval_result, count_dict, invalid_patches_segments_ids

__init__(name='CNN_Classification', classifier_type='Convolution Neural Network (CNN)', classifier_params=None)

Initialize a Convolutional Neural Network (CNN) classifier.

Parameters

name : str, optional Custom name for the classification layer. Defaults to "CNN_Classification" if None. classifier_type : str, optional Type of classifier. Defaults to "Convolution Neural Network (CNN)". classifier_params : dict, optional Dictionary of training parameters for the CNN. If None, the following defaults are used: - epochs (int): 50 - batch_size (int): 32 - patch_size (tuple of int): (5, 5) - early_stopping_patience (int): 5

Source code in nickyspatial/core/classifier.py
def __init__(self, name="CNN_Classification", classifier_type="Convolution Neural Network (CNN)", classifier_params=None):
    """Initialize a Convolutional Neural Network (CNN) classifier.

    Parameters
    ----------
    name : str, optional
        Custom name for the classification layer. Defaults to "CNN_Classification" if None.
    classifier_type : str, optional
        Type of classifier. Defaults to "Convolution Neural Network (CNN)".
    classifier_params : dict, optional
        Dictionary of training parameters for the CNN.
        If None, the following defaults are used:
            - epochs (int): 50
            - batch_size (int): 32
            - patch_size (tuple of int): (5, 5)
            - early_stopping_patience (int): 5
    """
    if not HAS_TENSORFLOW:
        raise ImportError("TensorFlow is required for CNN classification. Install it with: uv add --group cnn tensorflow")

    self.name = name
    self.classifier_type = classifier_type
    self.classifier_params = (
        classifier_params
        if classifier_params
        else {"epochs": 50, "batch_size": 32, "patch_size": (5, 5), "early_stopping_patience": 5}
    )
    self.model = None
    self.le = LabelEncoder()

execute(source_layer, samples, image_data, layer_manager=None, layer_name=None)

Perform CNN-based classification on image segments using labeled training samples.

This method extracts training patches from the input image based on provided samples, trains a CNN model, evaluates it on test data, predicts labels for all segments, and stores the classification results in a new output layer.

Parameters

source_layer : Layer Input spatial layer containing segments/objects to classify. samples : dict Dictionary mapping class names to lists of segment IDs used for training. image_data : np.ndarray Raster image data array from which patches are extracted for classification. layer_manager : LayerManager, optional manager object to register the output classification layer. layer_name : str, optional The name to assign to the resulting classified layer.

Returns:

result_layer : Layer New layer containing the original segments with a "classification" attribute representing predicted class labels. history : keras.callbacks.History Training history object containing loss and accuracy metrics per epoch. eval_result : dict Dictionary containing evaluation metrics such as accuracy, confusion matrix, and classification report on the test dataset. count_dict : dict Dictionary mapping class labels to the count of training patches extracted. invalid_patches_segments_ids : list List of segment IDs for which no valid patches could be extracted for prediction.

Source code in nickyspatial/core/classifier.py
def execute(self, source_layer, samples, image_data, layer_manager=None, layer_name=None):
    """Perform CNN-based classification on image segments using labeled training samples.

    This method extracts training patches from the input image based on provided
    samples, trains a CNN model, evaluates it on test data, predicts labels for all
    segments, and stores the classification results in a new output layer.

    Parameters
    ----------
    source_layer : Layer
        Input spatial layer containing segments/objects to classify.
    samples : dict
        Dictionary mapping class names to lists of segment IDs used for training.
    image_data : np.ndarray
        Raster image data array from which patches are extracted for classification.
    layer_manager : LayerManager, optional
        manager object to register the output classification layer.
    layer_name : str, optional
        The name to assign to the resulting classified layer.

    Returns:
    -------
    result_layer : Layer
        New layer containing the original segments with a "classification" attribute
        representing predicted class labels.
    history : keras.callbacks.History
        Training history object containing loss and accuracy metrics per epoch.
    eval_result : dict
        Dictionary containing evaluation metrics such as accuracy, confusion matrix,
        and classification report on the test dataset.
    count_dict : dict
        Dictionary mapping class labels to the count of training patches extracted.
    invalid_patches_segments_ids : list
        List of segment IDs for which no valid patches could be extracted for prediction.
    """
    result_layer = Layer(name=layer_name, parent=source_layer, layer_type="merged")
    result_layer.transform = source_layer.transform
    result_layer.crs = source_layer.crs

    layer = source_layer.objects.copy()
    patches, labels, count_dict = self._extract_training_patches(image=image_data, segments=source_layer, samples=samples)

    indices = np.arange(len(patches))
    np.random.shuffle(indices)
    patches = patches[indices]
    labels = np.array(labels)[indices]

    labels_encoded = self.le.fit_transform(labels)
    num_classes = len(self.le.classes_)
    patches = patches.astype("float32") / 255.0

    patches_temp, patches_test, labels_temp, labels_test = train_test_split(
        patches, labels_encoded, test_size=0.3, random_state=42
    )

    patches_train, patches_val, labels_train, labels_val = train_test_split(
        patches_temp, labels_temp, test_size=0.2, random_state=42
    )

    input_shape = patches.shape[1:]
    num_classes = len(np.unique(labels))

    if self.classifier_type == "Convolution Neural Network (CNN)":
        self.model = self._create_cnn_model(input_shape, num_classes)

    history = self._train_model(patches_train, labels_train, patches_val, labels_val)

    patches_all, segment_ids, invalid_patches_segments_ids = self._extract_patches_for_prediction(
        image=image_data, segments=source_layer
    )

    eval_result = self._evaluate(patches_test, labels_test)

    patches_all = patches_all.astype("float32") / 255.0
    final_segment_ids, final_labels = self._prediction(patches_all, segment_ids)

    segment_to_label = dict(zip(final_segment_ids, final_labels, strict=False))
    layer["classification"] = ""
    layer["classification"] = layer["segment_id"].map(segment_to_label)

    result_layer.objects = layer
    result_layer.metadata = {"cnn_classification": self.name}

    if layer_manager:
        layer_manager.add_layer(result_layer)

    return result_layer, history, eval_result, count_dict, invalid_patches_segments_ids

Layer class and related functionality for organizing geospatial data.

Layer

A Layer represents a set of objects with associated properties.

Source code in nickyspatial/core/layer.py
class Layer:
    """A Layer represents a set of objects with associated properties."""

    def __init__(self, name=None, parent=None, layer_type="generic"):
        """Initialize Layer with unique ID and optional name/parent."""
        self.id = str(uuid.uuid4())
        self.name = name if name else f"Layer_{self.id[:8]}"
        self.parent = parent
        self.type = layer_type
        self.created_at = pd.Timestamp.now()

        self.raster = None
        self.objects = None
        self.metadata = {}
        self.transform = None
        self.crs = None

        self.attached_functions = {}

    def attach_function(self, function, name=None, **kwargs):
        """Attach and execute function, store result for later retrieval."""
        func_name = name if name else function.__name__
        result = function(self, **kwargs)
        self.attached_functions[func_name] = {
            "function": function,
            "args": kwargs,
            "result": result,
        }
        return self

    def get_function_result(self, function_name):
        """Retrieve stored result from previously attached function."""
        if function_name not in self.attached_functions:
            raise ValueError(f"Function '{function_name}' not attached to this layer")
        return self.attached_functions[function_name]["result"]

    def copy(self):
        """Create independent copy with deep-copied raster/objects data."""
        new_layer = Layer(name=f"{self.name}_copy", parent=self.parent, layer_type=self.type)
        if self.raster is not None:
            new_layer.raster = self.raster.copy()
        if self.objects is not None:
            new_layer.objects = self.objects.copy()
        new_layer.metadata = self.metadata.copy()
        new_layer.transform = self.transform
        new_layer.crs = self.crs
        return new_layer

    def __str__(self):
        """Return layer info: name, type, and data availability."""
        num_objects = len(self.objects) if self.objects is not None else 0
        parent_name = self.parent.name if self.parent else "None"
        return f"Layer '{self.name}' (type: {self.type}, parent: {parent_name}, objects: {num_objects})"

__init__(name=None, parent=None, layer_type='generic')

Initialize Layer with unique ID and optional name/parent.

Source code in nickyspatial/core/layer.py
def __init__(self, name=None, parent=None, layer_type="generic"):
    """Initialize Layer with unique ID and optional name/parent."""
    self.id = str(uuid.uuid4())
    self.name = name if name else f"Layer_{self.id[:8]}"
    self.parent = parent
    self.type = layer_type
    self.created_at = pd.Timestamp.now()

    self.raster = None
    self.objects = None
    self.metadata = {}
    self.transform = None
    self.crs = None

    self.attached_functions = {}

__str__()

Return layer info: name, type, and data availability.

Source code in nickyspatial/core/layer.py
def __str__(self):
    """Return layer info: name, type, and data availability."""
    num_objects = len(self.objects) if self.objects is not None else 0
    parent_name = self.parent.name if self.parent else "None"
    return f"Layer '{self.name}' (type: {self.type}, parent: {parent_name}, objects: {num_objects})"

attach_function(function, name=None, **kwargs)

Attach and execute function, store result for later retrieval.

Source code in nickyspatial/core/layer.py
def attach_function(self, function, name=None, **kwargs):
    """Attach and execute function, store result for later retrieval."""
    func_name = name if name else function.__name__
    result = function(self, **kwargs)
    self.attached_functions[func_name] = {
        "function": function,
        "args": kwargs,
        "result": result,
    }
    return self

copy()

Create independent copy with deep-copied raster/objects data.

Source code in nickyspatial/core/layer.py
def copy(self):
    """Create independent copy with deep-copied raster/objects data."""
    new_layer = Layer(name=f"{self.name}_copy", parent=self.parent, layer_type=self.type)
    if self.raster is not None:
        new_layer.raster = self.raster.copy()
    if self.objects is not None:
        new_layer.objects = self.objects.copy()
    new_layer.metadata = self.metadata.copy()
    new_layer.transform = self.transform
    new_layer.crs = self.crs
    return new_layer

get_function_result(function_name)

Retrieve stored result from previously attached function.

Source code in nickyspatial/core/layer.py
def get_function_result(self, function_name):
    """Retrieve stored result from previously attached function."""
    if function_name not in self.attached_functions:
        raise ValueError(f"Function '{function_name}' not attached to this layer")
    return self.attached_functions[function_name]["result"]

LayerManager

Manages a collection of layers and their relationships.

Source code in nickyspatial/core/layer.py
class LayerManager:
    """Manages a collection of layers and their relationships."""

    def __init__(self):
        """Initialize empty layer manager with no active layer."""
        self.layers = {}
        self.active_layer = None

    def add_layer(self, layer, set_active=True):
        """Add layer to manager, optionally set as active layer."""
        self.layers[layer.id] = layer
        if set_active:
            self.active_layer = layer
        return layer

    def get_layer(self, layer_id_or_name):
        """Find layer by ID first, then by name if not found."""
        if layer_id_or_name in self.layers:
            return self.layers[layer_id_or_name]
        for layer in self.layers.values():
            if layer.name == layer_id_or_name:
                return layer
        raise ValueError(f"Layer '{layer_id_or_name}' not found")

    def get_layer_names(self):
        """Get a list of all layer names."""
        return [layer.name for layer in self.layers.values()]

    def remove_layer(self, layer_id_or_name):
        """Remove a layer from the manager."""
        layer = self.get_layer(layer_id_or_name)

        if layer.id in self.layers:
            del self.layers[layer.id]

        if self.active_layer and self.active_layer.id == layer.id:
            if self.layers:
                self.active_layer = list(self.layers.values())[-1]
            else:
                self.active_layer = None

__init__()

Initialize empty layer manager with no active layer.

Source code in nickyspatial/core/layer.py
def __init__(self):
    """Initialize empty layer manager with no active layer."""
    self.layers = {}
    self.active_layer = None

add_layer(layer, set_active=True)

Add layer to manager, optionally set as active layer.

Source code in nickyspatial/core/layer.py
def add_layer(self, layer, set_active=True):
    """Add layer to manager, optionally set as active layer."""
    self.layers[layer.id] = layer
    if set_active:
        self.active_layer = layer
    return layer

get_layer(layer_id_or_name)

Find layer by ID first, then by name if not found.

Source code in nickyspatial/core/layer.py
def get_layer(self, layer_id_or_name):
    """Find layer by ID first, then by name if not found."""
    if layer_id_or_name in self.layers:
        return self.layers[layer_id_or_name]
    for layer in self.layers.values():
        if layer.name == layer_id_or_name:
            return layer
    raise ValueError(f"Layer '{layer_id_or_name}' not found")

get_layer_names()

Get a list of all layer names.

Source code in nickyspatial/core/layer.py
def get_layer_names(self):
    """Get a list of all layer names."""
    return [layer.name for layer in self.layers.values()]

remove_layer(layer_id_or_name)

Remove a layer from the manager.

Source code in nickyspatial/core/layer.py
def remove_layer(self, layer_id_or_name):
    """Remove a layer from the manager."""
    layer = self.get_layer(layer_id_or_name)

    if layer.id in self.layers:
        del self.layers[layer.id]

    if self.active_layer and self.active_layer.id == layer.id:
        if self.layers:
            self.active_layer = list(self.layers.values())[-1]
        else:
            self.active_layer = None

Provides a rule engine for object-based analysis, where segments or layers are processed according to custom logic.

Main idea here is to allow encode expert rules that can be applied to object segments which are layers in a nickyspatial context. So rules are tied up to the layers , they can be attached or revoked or executed multiple items

Developers can define domain-specific rules to classify or merge features based on attributes. This module includes the Rule and RuleSet classes, which allow users to create, manage, and apply rules to layers. The RuleSet class can be used to group multiple rules together, and the execute method applies these rules to a given layer. The rules can be defined using string expressions that can be evaluated using the numexpr library for performance.

CommonBase

A shared utility base class for spatial rule sets.

This class provides common methods used by multiple rule sets to preprocess layer data and determine spatial relationships between segments.

Source code in nickyspatial/core/rules.py
class CommonBase:
    """A shared utility base class for spatial rule sets.

    This class provides common methods used by multiple rule sets
    to preprocess layer data and determine spatial relationships
    between segments.
    """

    @staticmethod
    def _preprocess_layer(layer, class_column_name):
        """Prepare geometry and class maps from a spatial layer.

        Parameters:
        -----------
        layer : Layer
            The spatial layer containing objects with segment geometry and class labels.
        class_column_name : str
            The column name that stores class values (e.g., "veg_class", "land_use").

        Returns:
        --------
        geom_map : dict
            A dictionary mapping segment IDs to shapely geometry objects.
        class_map : dict
            A dictionary mapping segment IDs to their respective class values.
        """
        df = layer.objects
        geom_map = {sid: shape(geom) for sid, geom in zip(df["segment_id"], df["geometry"], strict=False)}
        class_map = dict(zip(df["segment_id"], df[class_column_name], strict=False))
        return geom_map, class_map

    @staticmethod
    def _find_neighbors(segment_id, geom_map):
        """Find neighboring segments based on spatial intersection.

        Parameters:
        -----------
        segment_id : int or str
            The ID of the segment whose neighbors are to be found.
        geom_map : dict
            A dictionary mapping segment IDs to shapely geometry objects.

        Returns:
        --------
        neighbors : list
            A list of segment IDs that intersect with the given segment.
        """
        segment_geom = geom_map[segment_id]
        neighbors = []
        for other_id, other_geom in geom_map.items():
            if other_id != segment_id and segment_geom.intersects(other_geom):
                neighbors.append(other_id)
        return neighbors

EnclosedByRuleSet

Bases: CommonBase

A rule set to reclassify segments based on spatial enclosure.

This rule set identifies segments of one class (A) that are entirely surrounded by segments of another class (B), and reclassifies them into a new class.

Source code in nickyspatial/core/rules.py
class EnclosedByRuleSet(CommonBase):
    """A rule set to reclassify segments based on spatial enclosure.

    This rule set identifies segments of one class (A) that are entirely surrounded
    by segments of another class (B), and reclassifies them into a new class.
    """

    def __init__(self, name=None):
        """Initialize the merge rule set.

        Parameters:
        -----------
        name : str, optional
            Name of the merge rule set
        """
        self.name = name if name else "EnclosedByRuleSet"

    def execute(
        self, source_layer, class_column_name, class_value_a, class_value_b, new_class_name, layer_manager=None, layer_name=None
    ):
        """Apply enclosed-by logic to identify and reclassify segments.

        Parameters:
        -----------
        source_layer : Layer
            The source spatial layer containing segments.
        class_column_name : str
            The name of the column containing class labels (e.g., "veg_class").
        class_value_a : str
            The class value to check for enclosure (target to reclassify).
        class_value_b : str
            The class value expected to surround class A segments.
        new_class_name : str
            The new class name to assign to enclosed segments.
        layer_manager : LayerManager, optional
            Optional manager to register the resulting layer.
        layer_name : str, optional
            Optional name for the result layer.

        Returns:
        --------
        result_layer : Layer
            A new layer with updated class values for enclosed segments.
        """
        if not layer_name:
            layer_name = f"{source_layer.name}_{self.name}"

        result_layer = Layer(name=layer_name, parent=source_layer, layer_type="merged")
        result_layer.transform = source_layer.transform
        result_layer.crs = source_layer.crs
        result_layer.raster = source_layer.raster.copy() if source_layer.raster is not None else None

        df = source_layer.objects.copy()
        surrounded_segments = []
        geom_map, class_map = self._preprocess_layer(source_layer, class_column_name)

        for sid in df["segment_id"].unique():
            if class_map.get(sid) != class_value_a:
                continue

            neighbors = self._find_neighbors(sid, geom_map)
            if neighbors and all(class_map.get(n_id) == class_value_b for n_id in neighbors):
                surrounded_segments.append(sid)

        df.loc[(df["segment_id"].isin(surrounded_segments)), class_column_name] = new_class_name

        result_layer.objects = df

        result_layer.metadata = {
            "enclosed_by_ruleset_name": self.name,
        }

        if layer_manager:
            layer_manager.add_layer(result_layer)

        return result_layer

__init__(name=None)

Initialize the merge rule set.

Parameters:

name : str, optional Name of the merge rule set

Source code in nickyspatial/core/rules.py
def __init__(self, name=None):
    """Initialize the merge rule set.

    Parameters:
    -----------
    name : str, optional
        Name of the merge rule set
    """
    self.name = name if name else "EnclosedByRuleSet"

execute(source_layer, class_column_name, class_value_a, class_value_b, new_class_name, layer_manager=None, layer_name=None)

Apply enclosed-by logic to identify and reclassify segments.

Parameters:

source_layer : Layer The source spatial layer containing segments. class_column_name : str The name of the column containing class labels (e.g., "veg_class"). class_value_a : str The class value to check for enclosure (target to reclassify). class_value_b : str The class value expected to surround class A segments. new_class_name : str The new class name to assign to enclosed segments. layer_manager : LayerManager, optional Optional manager to register the resulting layer. layer_name : str, optional Optional name for the result layer.

Returns:

result_layer : Layer A new layer with updated class values for enclosed segments.

Source code in nickyspatial/core/rules.py
def execute(
    self, source_layer, class_column_name, class_value_a, class_value_b, new_class_name, layer_manager=None, layer_name=None
):
    """Apply enclosed-by logic to identify and reclassify segments.

    Parameters:
    -----------
    source_layer : Layer
        The source spatial layer containing segments.
    class_column_name : str
        The name of the column containing class labels (e.g., "veg_class").
    class_value_a : str
        The class value to check for enclosure (target to reclassify).
    class_value_b : str
        The class value expected to surround class A segments.
    new_class_name : str
        The new class name to assign to enclosed segments.
    layer_manager : LayerManager, optional
        Optional manager to register the resulting layer.
    layer_name : str, optional
        Optional name for the result layer.

    Returns:
    --------
    result_layer : Layer
        A new layer with updated class values for enclosed segments.
    """
    if not layer_name:
        layer_name = f"{source_layer.name}_{self.name}"

    result_layer = Layer(name=layer_name, parent=source_layer, layer_type="merged")
    result_layer.transform = source_layer.transform
    result_layer.crs = source_layer.crs
    result_layer.raster = source_layer.raster.copy() if source_layer.raster is not None else None

    df = source_layer.objects.copy()
    surrounded_segments = []
    geom_map, class_map = self._preprocess_layer(source_layer, class_column_name)

    for sid in df["segment_id"].unique():
        if class_map.get(sid) != class_value_a:
            continue

        neighbors = self._find_neighbors(sid, geom_map)
        if neighbors and all(class_map.get(n_id) == class_value_b for n_id in neighbors):
            surrounded_segments.append(sid)

    df.loc[(df["segment_id"].isin(surrounded_segments)), class_column_name] = new_class_name

    result_layer.objects = df

    result_layer.metadata = {
        "enclosed_by_ruleset_name": self.name,
    }

    if layer_manager:
        layer_manager.add_layer(result_layer)

    return result_layer

MergeRuleSet

Bases: CommonBase

A rule set for merging segments of the same class based on specified class values.

Source code in nickyspatial/core/rules.py
class MergeRuleSet(CommonBase):
    """A rule set for merging segments of the same class based on specified class values."""

    def __init__(self, name=None):
        """Initialize the merge rule set.

        Parameters:
        -----------
        name : str, optional
            Name of the merge rule set
        """
        self.name = name if name else "MergeRuleSet"

    def execute(self, source_layer, class_column_name, class_value, layer_manager=None, layer_name=None):
        """Merge segments of the same class in a layer.

        Parameters:
        -----------
        source_layer : Layer
            Source layer with segments to merge
        class_value : str or list of str
            One or more attribute field names to group and merge segments
        layer_manager : LayerManager, optional
            Layer manager to add the result layer to
        layer_name : str, optional
            Name for the result layer

        Returns:
        --------
        result_layer : Layer
            Layer with merged geometries
        """
        if not layer_name:
            layer_name = f"{source_layer.name}_{self.name}"

        result_layer = Layer(name=layer_name, parent=source_layer, layer_type="merged")
        result_layer.transform = source_layer.transform
        result_layer.crs = source_layer.crs
        result_layer.raster = source_layer.raster.copy() if source_layer.raster is not None else None

        df = source_layer.objects.copy()

        if isinstance(class_value, str):
            class_values = [class_value]
        else:
            class_values = class_value

        new_rows = []
        to_drop = set()
        geom_map, class_map = self._preprocess_layer(source_layer, class_column_name)

        for class_value in class_values:
            visited = set()

            for sid in df["segment_id"].unique():
                if sid in visited or class_map[sid] != class_value:
                    continue

                group_geom = [geom_map[sid]]
                group_ids = [sid]
                queue = [sid]
                visited.add(sid)

                while queue:
                    current_id = queue.pop()
                    neighbors = self._find_neighbors(current_id, geom_map)
                    for n_id in neighbors:
                        if n_id not in visited and class_map.get(n_id) == class_value:
                            visited.add(n_id)
                            group_geom.append(geom_map[n_id])
                            group_ids.append(n_id)
                            queue.append(n_id)

                merged_geom = unary_union(group_geom)
                row_data = {"segment_id": min(group_ids), class_column_name: class_value, "geometry": merged_geom}

                new_rows.append(row_data)
                to_drop.update(group_ids)

        df = df[~df["segment_id"].isin(to_drop)]
        df = pd.concat([df, pd.DataFrame(new_rows)], ignore_index=True)
        result_layer.objects = df

        result_layer.metadata = {
            "mergeruleset_name": self.name,
            "merged_fields": class_values,
        }

        if layer_manager:
            layer_manager.add_layer(result_layer)

        return result_layer

__init__(name=None)

Initialize the merge rule set.

Parameters:

name : str, optional Name of the merge rule set

Source code in nickyspatial/core/rules.py
def __init__(self, name=None):
    """Initialize the merge rule set.

    Parameters:
    -----------
    name : str, optional
        Name of the merge rule set
    """
    self.name = name if name else "MergeRuleSet"

execute(source_layer, class_column_name, class_value, layer_manager=None, layer_name=None)

Merge segments of the same class in a layer.

Parameters:

source_layer : Layer Source layer with segments to merge class_value : str or list of str One or more attribute field names to group and merge segments layer_manager : LayerManager, optional Layer manager to add the result layer to layer_name : str, optional Name for the result layer

Returns:

result_layer : Layer Layer with merged geometries

Source code in nickyspatial/core/rules.py
def execute(self, source_layer, class_column_name, class_value, layer_manager=None, layer_name=None):
    """Merge segments of the same class in a layer.

    Parameters:
    -----------
    source_layer : Layer
        Source layer with segments to merge
    class_value : str or list of str
        One or more attribute field names to group and merge segments
    layer_manager : LayerManager, optional
        Layer manager to add the result layer to
    layer_name : str, optional
        Name for the result layer

    Returns:
    --------
    result_layer : Layer
        Layer with merged geometries
    """
    if not layer_name:
        layer_name = f"{source_layer.name}_{self.name}"

    result_layer = Layer(name=layer_name, parent=source_layer, layer_type="merged")
    result_layer.transform = source_layer.transform
    result_layer.crs = source_layer.crs
    result_layer.raster = source_layer.raster.copy() if source_layer.raster is not None else None

    df = source_layer.objects.copy()

    if isinstance(class_value, str):
        class_values = [class_value]
    else:
        class_values = class_value

    new_rows = []
    to_drop = set()
    geom_map, class_map = self._preprocess_layer(source_layer, class_column_name)

    for class_value in class_values:
        visited = set()

        for sid in df["segment_id"].unique():
            if sid in visited or class_map[sid] != class_value:
                continue

            group_geom = [geom_map[sid]]
            group_ids = [sid]
            queue = [sid]
            visited.add(sid)

            while queue:
                current_id = queue.pop()
                neighbors = self._find_neighbors(current_id, geom_map)
                for n_id in neighbors:
                    if n_id not in visited and class_map.get(n_id) == class_value:
                        visited.add(n_id)
                        group_geom.append(geom_map[n_id])
                        group_ids.append(n_id)
                        queue.append(n_id)

            merged_geom = unary_union(group_geom)
            row_data = {"segment_id": min(group_ids), class_column_name: class_value, "geometry": merged_geom}

            new_rows.append(row_data)
            to_drop.update(group_ids)

    df = df[~df["segment_id"].isin(to_drop)]
    df = pd.concat([df, pd.DataFrame(new_rows)], ignore_index=True)
    result_layer.objects = df

    result_layer.metadata = {
        "mergeruleset_name": self.name,
        "merged_fields": class_values,
    }

    if layer_manager:
        layer_manager.add_layer(result_layer)

    return result_layer

Rule

A rule defines a condition to classify segments.

Source code in nickyspatial/core/rules.py
class Rule:
    """A rule defines a condition to classify segments."""

    def __init__(self, name, condition, class_value=None):
        """Initialize a rule.

        Parameters:
        -----------
        name : str
            Name of the rule
        condition : str
            Condition as a string expression that can be evaluated using numexpr
        class_value : str, optional
            Value to assign when the condition is met.
            If None, uses the rule name.
        """
        self.name = name
        self.condition = condition
        self.class_value = class_value if class_value is not None else name

    def __str__(self):
        """String representation of the rule."""
        return f"Rule '{self.name}': {self.condition} -> {self.class_value}"

__init__(name, condition, class_value=None)

Initialize a rule.

Parameters:

name : str Name of the rule condition : str Condition as a string expression that can be evaluated using numexpr class_value : str, optional Value to assign when the condition is met. If None, uses the rule name.

Source code in nickyspatial/core/rules.py
def __init__(self, name, condition, class_value=None):
    """Initialize a rule.

    Parameters:
    -----------
    name : str
        Name of the rule
    condition : str
        Condition as a string expression that can be evaluated using numexpr
    class_value : str, optional
        Value to assign when the condition is met.
        If None, uses the rule name.
    """
    self.name = name
    self.condition = condition
    self.class_value = class_value if class_value is not None else name

__str__()

String representation of the rule.

Source code in nickyspatial/core/rules.py
def __str__(self):
    """String representation of the rule."""
    return f"Rule '{self.name}': {self.condition} -> {self.class_value}"

RuleSet

A collection of rules to apply to a layer.

Source code in nickyspatial/core/rules.py
class RuleSet:
    """A collection of rules to apply to a layer."""

    def __init__(self, name=None):
        """Initialize a rule set.

        Parameters:
        -----------
        name : str, optional
            Name of the rule set
        """
        self.name = name if name else "RuleSet"
        self.rules = []

    @staticmethod
    def wrap_condition_parts_simple(self, condition):
        """Wrap condition parts with parentheses for evaluation."""
        parts = condition.split("&")
        parts = [f"({part.strip()})" for part in parts]
        return " & ".join(parts)

    def add_rule(self, name, condition, class_value=None):
        """Add a rule to the rule set.

        Parameters:
        -----------
        name : str
            Name of the rule
        condition : str
            Condition as a string expression that can be evaluated using numexpr
        class_value : str, optional
            Value to assign when the condition is met

        Returns:
        --------
        rule : Rule
            The added rule
        """
        rule = Rule(name, condition, class_value)
        self.rules.append(rule)
        return rule

    def get_rules(self):
        """Get the list of rules in the rule set.

        Returns:
        --------
        list of tuples
            List of (name, condition) tuples for each rule
        """
        return [(rule.name, rule.condition) for rule in self.rules]

    def execute(
        self,
        source_layer,
        layer_manager=None,
        layer_name=None,
        result_field="classification",
    ):
        """Apply rules to classify segments in a layer.

        Parameters:
        -----------
        source_layer : Layer
            Source layer with segments to classify
        layer_manager : LayerManager, optional
            Layer manager to add the result layer to
        layer_name : str, optional
            Name for the result layer
        result_field : str
            Field name to store classification results

        Returns:
        --------
        result_layer : Layer
            Layer with classification results
        """
        if not layer_name:
            layer_name = f"{source_layer.name}_{self.name}"

        result_layer = Layer(name=layer_name, parent=source_layer, layer_type="classification")
        result_layer.transform = source_layer.transform
        result_layer.crs = source_layer.crs
        result_layer.raster = source_layer.raster.copy() if source_layer.raster is not None else None

        result_layer.objects = source_layer.objects.copy()

        if result_field not in result_layer.objects.columns:
            result_layer.objects[result_field] = None

        result_layer.metadata = {
            "ruleset_name": self.name,
            "rules": [
                {
                    "name": rule.name,
                    "condition": rule.condition,
                    "class_value": rule.class_value,
                }
                for rule in self.rules
            ],
            "result_field": result_field,
        }

        for rule in self.rules:
            try:
                if result_field in result_layer.objects.columns and (
                    f"{result_field} ==" in rule.condition
                    or f"{result_field}==" in rule.condition
                    or f"{result_field} !=" in rule.condition
                    or f"{result_field}!=" in rule.condition
                ):
                    eval_condition = rule.condition.replace("&", " and ").replace("|", " or ")

                    mask = result_layer.objects.apply(
                        lambda row, cond=eval_condition: eval(
                            cond,
                            {"__builtins__": {}},
                            {col: row[col] for col in result_layer.objects.columns if col != "geometry"},
                        ),
                        axis=1,
                    )

                else:
                    try:
                        local_dict = {
                            col: result_layer.objects[col].values for col in result_layer.objects.columns if col != "geometry"
                        }

                        mask = ne.evaluate(rule.condition, local_dict=local_dict)
                        mask = pd.Series(mask, index=result_layer.objects.index).fillna(False)
                    except Exception:
                        mask = result_layer.objects.eval(rule.condition, engine="python")

                result_layer.objects.loc[mask, result_field] = rule.class_value

            except Exception as e:
                print(f"Error applying rule '{rule.name}': {str(e)}")
                continue

        if layer_manager:
            layer_manager.add_layer(result_layer)

        return result_layer

__init__(name=None)

Initialize a rule set.

Parameters:

name : str, optional Name of the rule set

Source code in nickyspatial/core/rules.py
def __init__(self, name=None):
    """Initialize a rule set.

    Parameters:
    -----------
    name : str, optional
        Name of the rule set
    """
    self.name = name if name else "RuleSet"
    self.rules = []

add_rule(name, condition, class_value=None)

Add a rule to the rule set.

Parameters:

name : str Name of the rule condition : str Condition as a string expression that can be evaluated using numexpr class_value : str, optional Value to assign when the condition is met

Returns:

rule : Rule The added rule

Source code in nickyspatial/core/rules.py
def add_rule(self, name, condition, class_value=None):
    """Add a rule to the rule set.

    Parameters:
    -----------
    name : str
        Name of the rule
    condition : str
        Condition as a string expression that can be evaluated using numexpr
    class_value : str, optional
        Value to assign when the condition is met

    Returns:
    --------
    rule : Rule
        The added rule
    """
    rule = Rule(name, condition, class_value)
    self.rules.append(rule)
    return rule

execute(source_layer, layer_manager=None, layer_name=None, result_field='classification')

Apply rules to classify segments in a layer.

Parameters:

source_layer : Layer Source layer with segments to classify layer_manager : LayerManager, optional Layer manager to add the result layer to layer_name : str, optional Name for the result layer result_field : str Field name to store classification results

Returns:

result_layer : Layer Layer with classification results

Source code in nickyspatial/core/rules.py
def execute(
    self,
    source_layer,
    layer_manager=None,
    layer_name=None,
    result_field="classification",
):
    """Apply rules to classify segments in a layer.

    Parameters:
    -----------
    source_layer : Layer
        Source layer with segments to classify
    layer_manager : LayerManager, optional
        Layer manager to add the result layer to
    layer_name : str, optional
        Name for the result layer
    result_field : str
        Field name to store classification results

    Returns:
    --------
    result_layer : Layer
        Layer with classification results
    """
    if not layer_name:
        layer_name = f"{source_layer.name}_{self.name}"

    result_layer = Layer(name=layer_name, parent=source_layer, layer_type="classification")
    result_layer.transform = source_layer.transform
    result_layer.crs = source_layer.crs
    result_layer.raster = source_layer.raster.copy() if source_layer.raster is not None else None

    result_layer.objects = source_layer.objects.copy()

    if result_field not in result_layer.objects.columns:
        result_layer.objects[result_field] = None

    result_layer.metadata = {
        "ruleset_name": self.name,
        "rules": [
            {
                "name": rule.name,
                "condition": rule.condition,
                "class_value": rule.class_value,
            }
            for rule in self.rules
        ],
        "result_field": result_field,
    }

    for rule in self.rules:
        try:
            if result_field in result_layer.objects.columns and (
                f"{result_field} ==" in rule.condition
                or f"{result_field}==" in rule.condition
                or f"{result_field} !=" in rule.condition
                or f"{result_field}!=" in rule.condition
            ):
                eval_condition = rule.condition.replace("&", " and ").replace("|", " or ")

                mask = result_layer.objects.apply(
                    lambda row, cond=eval_condition: eval(
                        cond,
                        {"__builtins__": {}},
                        {col: row[col] for col in result_layer.objects.columns if col != "geometry"},
                    ),
                    axis=1,
                )

            else:
                try:
                    local_dict = {
                        col: result_layer.objects[col].values for col in result_layer.objects.columns if col != "geometry"
                    }

                    mask = ne.evaluate(rule.condition, local_dict=local_dict)
                    mask = pd.Series(mask, index=result_layer.objects.index).fillna(False)
                except Exception:
                    mask = result_layer.objects.eval(rule.condition, engine="python")

            result_layer.objects.loc[mask, result_field] = rule.class_value

        except Exception as e:
            print(f"Error applying rule '{rule.name}': {str(e)}")
            continue

    if layer_manager:
        layer_manager.add_layer(result_layer)

    return result_layer

get_rules()

Get the list of rules in the rule set.

Returns:

list of tuples List of (name, condition) tuples for each rule

Source code in nickyspatial/core/rules.py
def get_rules(self):
    """Get the list of rules in the rule set.

    Returns:
    --------
    list of tuples
        List of (name, condition) tuples for each rule
    """
    return [(rule.name, rule.condition) for rule in self.rules]

wrap_condition_parts_simple(condition) staticmethod

Wrap condition parts with parentheses for evaluation.

Source code in nickyspatial/core/rules.py
@staticmethod
def wrap_condition_parts_simple(self, condition):
    """Wrap condition parts with parentheses for evaluation."""
    parts = condition.split("&")
    parts = [f"({part.strip()})" for part in parts]
    return " & ".join(parts)

TouchedByRuleSet

Bases: CommonBase

A rule set to reclassify segments based on spatial enclosure.

This rule set identifies segments of one class (A) that are entirely surrounded by segments of another class (B), and reclassifies them into a new class.

Source code in nickyspatial/core/rules.py
class TouchedByRuleSet(CommonBase):
    """A rule set to reclassify segments based on spatial enclosure.

    This rule set identifies segments of one class (A) that are entirely surrounded
    by segments of another class (B), and reclassifies them into a new class.
    """

    def __init__(self, name=None):
        """Initialize the merge rule set.

        Parameters:
        -----------
        name : str, optional
            Name of the merge rule set
        """
        self.name = name if name else "TouchedByRuleSet"

    def execute(
        self, source_layer, class_column_name, class_value_a, class_value_b, new_class_name, layer_manager=None, layer_name=None
    ):
        """Executes the merge rule set by identifying and updating segments of a given class that are adjacent to another class!

        Parameters:
        - source_layer: Layer
            The input layer containing segment geometries and attributes.
        - class_column_name: str
            The name of the column containing class labels.
        - class_value_a: str or int
            The class value of segments to be checked for touching neighbors.
        - class_value_b: str or int
            The class value of neighboring segments that would trigger a merge.
        - new_class_name: str
            The new class value to assign to segments of class_value_a that touch class_value_b.
        - layer_manager: optional
            An optional manager for adding the resulting layer to a collection or interface.
        - layer_name: optional
            Optional custom name for the resulting layer. Defaults to "<source_layer_name>_<ruleset_name>".

        Returns:
        - result_layer: Layer
            A new Layer object with updated segment classifications where applicable.

        Logic:
        - Copies the source layer and initializes a new result layer.
        - Preprocesses the source layer to build geometry and class lookup maps.
        - Iterates through each segment of class_value_a, checking if any of its neighbors belong to class_value_b.
        - If so, updates the segment's class to new_class_name.
        - Stores the modified DataFrame in the result layer and optionally registers it via the layer_manager.

        """
        if not layer_name:
            layer_name = f"{source_layer.name}_{self.name}"

        result_layer = Layer(name=layer_name, parent=source_layer, layer_type="merged")
        result_layer.transform = source_layer.transform
        result_layer.crs = source_layer.crs
        result_layer.raster = source_layer.raster.copy() if source_layer.raster is not None else None

        df = source_layer.objects.copy()
        touched_segments = []
        geom_map, class_map = self._preprocess_layer(source_layer, class_column_name)

        for sid in df["segment_id"].unique():
            if class_map.get(sid) != class_value_a:
                continue

            neighbors = self._find_neighbors(sid, geom_map)
            if neighbors and any(class_map.get(n_id) == class_value_b for n_id in neighbors):
                touched_segments.append(sid)

        df.loc[(df["segment_id"].isin(touched_segments)), class_column_name] = new_class_name

        result_layer.objects = df

        result_layer.metadata = {
            "enclosed_by_ruleset_name": self.name,
        }

        if layer_manager:
            layer_manager.add_layer(result_layer)

        return result_layer

__init__(name=None)

Initialize the merge rule set.

Parameters:

name : str, optional Name of the merge rule set

Source code in nickyspatial/core/rules.py
def __init__(self, name=None):
    """Initialize the merge rule set.

    Parameters:
    -----------
    name : str, optional
        Name of the merge rule set
    """
    self.name = name if name else "TouchedByRuleSet"

execute(source_layer, class_column_name, class_value_a, class_value_b, new_class_name, layer_manager=None, layer_name=None)

Executes the merge rule set by identifying and updating segments of a given class that are adjacent to another class!

  • source_layer: Layer The input layer containing segment geometries and attributes.
  • class_column_name: str The name of the column containing class labels.
  • class_value_a: str or int The class value of segments to be checked for touching neighbors.
  • class_value_b: str or int The class value of neighboring segments that would trigger a merge.
  • new_class_name: str The new class value to assign to segments of class_value_a that touch class_value_b.
  • layer_manager: optional An optional manager for adding the resulting layer to a collection or interface.
  • layer_name: optional Optional custom name for the resulting layer. Defaults to "_".
  • result_layer: Layer A new Layer object with updated segment classifications where applicable.

Logic: - Copies the source layer and initializes a new result layer. - Preprocesses the source layer to build geometry and class lookup maps. - Iterates through each segment of class_value_a, checking if any of its neighbors belong to class_value_b. - If so, updates the segment's class to new_class_name. - Stores the modified DataFrame in the result layer and optionally registers it via the layer_manager.

Source code in nickyspatial/core/rules.py
def execute(
    self, source_layer, class_column_name, class_value_a, class_value_b, new_class_name, layer_manager=None, layer_name=None
):
    """Executes the merge rule set by identifying and updating segments of a given class that are adjacent to another class!

    Parameters:
    - source_layer: Layer
        The input layer containing segment geometries and attributes.
    - class_column_name: str
        The name of the column containing class labels.
    - class_value_a: str or int
        The class value of segments to be checked for touching neighbors.
    - class_value_b: str or int
        The class value of neighboring segments that would trigger a merge.
    - new_class_name: str
        The new class value to assign to segments of class_value_a that touch class_value_b.
    - layer_manager: optional
        An optional manager for adding the resulting layer to a collection or interface.
    - layer_name: optional
        Optional custom name for the resulting layer. Defaults to "<source_layer_name>_<ruleset_name>".

    Returns:
    - result_layer: Layer
        A new Layer object with updated segment classifications where applicable.

    Logic:
    - Copies the source layer and initializes a new result layer.
    - Preprocesses the source layer to build geometry and class lookup maps.
    - Iterates through each segment of class_value_a, checking if any of its neighbors belong to class_value_b.
    - If so, updates the segment's class to new_class_name.
    - Stores the modified DataFrame in the result layer and optionally registers it via the layer_manager.

    """
    if not layer_name:
        layer_name = f"{source_layer.name}_{self.name}"

    result_layer = Layer(name=layer_name, parent=source_layer, layer_type="merged")
    result_layer.transform = source_layer.transform
    result_layer.crs = source_layer.crs
    result_layer.raster = source_layer.raster.copy() if source_layer.raster is not None else None

    df = source_layer.objects.copy()
    touched_segments = []
    geom_map, class_map = self._preprocess_layer(source_layer, class_column_name)

    for sid in df["segment_id"].unique():
        if class_map.get(sid) != class_value_a:
            continue

        neighbors = self._find_neighbors(sid, geom_map)
        if neighbors and any(class_map.get(n_id) == class_value_b for n_id in neighbors):
            touched_segments.append(sid)

    df.loc[(df["segment_id"].isin(touched_segments)), class_column_name] = new_class_name

    result_layer.objects = df

    result_layer.metadata = {
        "enclosed_by_ruleset_name": self.name,
    }

    if layer_manager:
        layer_manager.add_layer(result_layer)

    return result_layer

Implements segmentation algorithms to partition images into meaningful region objects.

The functions here might apply clustering or region-growing techniques, aiding object-based remote sensing analysis. This module includes the SlicSegmentation class, which implements a bottom-up region-growing algorithm Algorithms: - SlicSegmentation: Bottom-up region-growing algorithm - FelzenszwalbSegmentation: Graph-based segmentation - WatershedSegmentation: Topographic watershed algorithm - RegularGridSegmentation: Simple grid-based segmentation

BaseSegmentation

Base class for segmentation algorithms.

Source code in nickyspatial/core/segmentation.py
class BaseSegmentation:
    """Base class for segmentation algorithms."""

    def __init__(self):
        """Initialize the base segmentation class."""
        pass

    def _validate_inputs(self, image_data, transform, crs, raster_path=None):
        """Validate common inputs across all segmentation algorithms."""
        # If raster_path is provided, other params are optional (will be auto-extracted)
        if raster_path is None:
            if image_data is None:
                raise ValueError("Either image_data or raster_path must be provided")
            if transform is None:
                raise ValueError("transform cannot be None when image_data is provided")
            if crs is None:
                raise ValueError("crs cannot be None when image_data is provided")

        if image_data is not None:
            if len(image_data.shape) != 3:
                raise ValueError("image_data must be 3D array (bands, height, width)")
            if image_data.size == 0:
                raise ValueError("image_data cannot be empty")

    def _prepare_inputs(self, image_data=None, transform=None, crs=None, raster_path=None, target_crs=None):
        """Prepare and validate inputs for segmentation algorithms.

        Handles automatic raster loading and CRS reprojection.

        Returns:
        --------
        tuple : (image_data, transform, crs)
            Processed inputs ready for segmentation
        """
        if raster_path is not None:
            from ..io.raster import read_raster

            image_data, transform, crs = read_raster(raster_path)

        if target_crs is not None and target_crs != crs:
            from rasterio.warp import reproject, Resampling, calculate_default_transform
            from rasterio.crs import CRS
            from rasterio.transform import array_bounds

            if isinstance(target_crs, str):
                target_crs = CRS.from_string(target_crs)

            new_transform, new_width, new_height = calculate_default_transform(
                crs,
                target_crs,
                image_data.shape[2],
                image_data.shape[1],
                *array_bounds(image_data.shape[1], image_data.shape[2], transform),
            )

            reprojected_data = np.zeros((image_data.shape[0], new_height, new_width), dtype=image_data.dtype)

            for band_idx in range(image_data.shape[0]):
                reproject(
                    source=image_data[band_idx],
                    destination=reprojected_data[band_idx],
                    src_transform=transform,
                    src_crs=crs,
                    dst_transform=new_transform,
                    dst_crs=target_crs,
                    resampling=Resampling.bilinear,
                )

            image_data = reprojected_data
            transform = new_transform
            crs = target_crs

        self._validate_inputs(image_data, transform, crs, raster_path)

        return image_data, transform, crs

    def _create_segment_objects(self, segments, transform, crs):
        segment_ids = np.unique(segments)
        geometries = []
        properties = []

        for segment_id in segment_ids:
            mask = segments == segment_id

            if not np.any(mask):
                continue

            shapes = rasterio.features.shapes(mask.astype(np.int16), mask=mask, transform=transform)

            segment_polygons = []
            for geom, val in shapes:
                if val == 1:
                    try:
                        polygon = Polygon(geom["coordinates"][0])
                        if polygon.is_valid:
                            segment_polygons.append(polygon)
                    except Exception:
                        continue

            if not segment_polygons:
                continue

            largest_polygon = max(segment_polygons, key=lambda p: p.area)

            area_pixels = np.sum(mask)

            pixel_width = abs(transform.a)
            pixel_height = abs(transform.e)
            area_units = area_pixels * pixel_width * pixel_height

            prop = {
                "segment_id": int(segment_id),
                "area_pixels": int(area_pixels),
                "area_units": float(area_units),
            }

            geometries.append(largest_polygon)
            properties.append(prop)

        gdf = gpd.GeoDataFrame(properties, geometry=geometries, crs=crs)
        return gdf

    def _calculate_statistics(self, layer, image_data, bands):
        """Calculate statistics for segments based on image data.

        Parameters:
        -----------
        layer : Layer
            Layer containing segments
        image_data : numpy.ndarray
            Array with raster data values (bands, height, width)
        bands : list of str
            Names of the bands
        """
        segments = layer.raster
        segment_objects = layer.objects

        segment_ids = segment_objects["segment_id"].values

        for i, band_name in enumerate(bands):
            if i >= image_data.shape[0]:
                break

            band_data = image_data[i]

            for segment_id in segment_ids:
                mask = segments == segment_id

                if segment_id not in segment_objects["segment_id"].values:
                    continue

                segment_pixels = band_data[mask]

                if len(segment_pixels) == 0:
                    continue

                mean_val = float(np.mean(segment_pixels))
                std_val = float(np.std(segment_pixels))
                min_val = float(np.min(segment_pixels))
                max_val = float(np.max(segment_pixels))
                median_val = float(np.median(segment_pixels))

                idx = segment_objects.index[segment_objects["segment_id"] == segment_id].tolist()[0]
                segment_objects.at[idx, f"{band_name}_mean"] = mean_val
                segment_objects.at[idx, f"{band_name}_std"] = std_val
                segment_objects.at[idx, f"{band_name}_min"] = min_val
                segment_objects.at[idx, f"{band_name}_max"] = max_val
                segment_objects.at[idx, f"{band_name}_median"] = median_val

    def _normalize_bands(self, image_data):
        """Normalize image bands to [0, 1] range.

        Parameters:
        -----------
        image_data : numpy.ndarray
            Array with raster data values (bands, height, width)

        Returns:
        --------
        numpy.ndarray
            Normalized multichannel image (height, width, channels)
        """
        num_bands, height, width = image_data.shape
        normalized_bands = []

        for i in range(num_bands):
            band = image_data[i]

            if band.max() == band.min():
                normalized_bands.append(np.zeros_like(band))
                continue

            norm_band = (band - band.min()) / (band.max() - band.min())
            normalized_bands.append(norm_band)

        return np.stack(normalized_bands, axis=-1)

__init__()

Initialize the base segmentation class.

Source code in nickyspatial/core/segmentation.py
def __init__(self):
    """Initialize the base segmentation class."""
    pass

FelzenszwalbSegmentation

Bases: BaseSegmentation

Implementation of Felzenszwalb's efficient graph-based segmentation.

This algorithm builds a graph of pixel similarities and uses a minimum spanning tree approach to segment the image into regions of similar characteristics.

Source code in nickyspatial/core/segmentation.py
class FelzenszwalbSegmentation(BaseSegmentation):
    """Implementation of Felzenszwalb's efficient graph-based segmentation.

    This algorithm builds a graph of pixel similarities and uses a minimum
    spanning tree approach to segment the image into regions of similar
    characteristics.
    """

    def __init__(self, scale=100, sigma=0.5, min_size=50, **kwargs):
        """Initialize the Felzenszwalb segmentation algorithm.

        Parameters:
        -----------
        scale : float
            Free parameter that influences the size of the segments.
            Higher values create larger segments.
        sigma : float
            Width (standard deviation) of Gaussian kernel for pre-processing. Higher values
            give more smoothing.
        min_size : int
            Minimum component size. Smaller components are merged with
            neighboring larger components.
        **kwargs : dict
            Additional parameters passed to skimage.segmentation.felzenszwalb()
        """
        super().__init__()
        self.scale = scale
        self.sigma = sigma
        self.min_size = min_size
        self.fz_kwargs = kwargs

    def execute(
        self, image_data=None, transform=None, crs=None, raster_path=None, target_crs=None, layer_manager=None, layer_name=None
    ):
        """Perform Felzenszwalb segmentation and create a layer with the results."""
        start_time = time.time()

        image_data, transform, crs = self._prepare_inputs(image_data, transform, crs, raster_path, target_crs)
        num_bands, height, width = image_data.shape
        multichannel_image = self._normalize_bands(image_data)

        print(f"Felzenszwalb - Processing {height}x{width} image with {num_bands} bands")

        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            segments = segmentation.felzenszwalb(
                multichannel_image, scale=self.scale, sigma=self.sigma, min_size=self.min_size, channel_axis=-1, **self.fz_kwargs
            )

        segments = segments + 1

        if not layer_name:
            layer_name = f"Felzenszwalb_scale{self.scale}_sigma{self.sigma}"

        layer = Layer(name=layer_name, layer_type="segmentation")
        layer.raster = segments
        layer.transform = transform
        layer.crs = crs
        layer.metadata = {
            "algorithm": "Felzenszwalb",
            "scale": self.scale,
            "sigma": self.sigma,
            "min_size": self.min_size,
            "num_segments_actual": len(np.unique(segments)),
            "execution_time_seconds": round(time.time() - start_time, 3),
        }

        if self.fz_kwargs:
            layer.metadata["kwargs"] = self.fz_kwargs

        segment_objects = self._create_segment_objects(segments, transform, crs)
        layer.objects = segment_objects

        bands = [f"band_{i + 1}" for i in range(num_bands)]
        self._calculate_statistics(layer, image_data, bands)

        if layer_manager:
            layer_manager.add_layer(layer)

        return layer

__init__(scale=100, sigma=0.5, min_size=50, **kwargs)

Initialize the Felzenszwalb segmentation algorithm.

Parameters:

scale : float Free parameter that influences the size of the segments. Higher values create larger segments. sigma : float Width (standard deviation) of Gaussian kernel for pre-processing. Higher values give more smoothing. min_size : int Minimum component size. Smaller components are merged with neighboring larger components. **kwargs : dict Additional parameters passed to skimage.segmentation.felzenszwalb()

Source code in nickyspatial/core/segmentation.py
def __init__(self, scale=100, sigma=0.5, min_size=50, **kwargs):
    """Initialize the Felzenszwalb segmentation algorithm.

    Parameters:
    -----------
    scale : float
        Free parameter that influences the size of the segments.
        Higher values create larger segments.
    sigma : float
        Width (standard deviation) of Gaussian kernel for pre-processing. Higher values
        give more smoothing.
    min_size : int
        Minimum component size. Smaller components are merged with
        neighboring larger components.
    **kwargs : dict
        Additional parameters passed to skimage.segmentation.felzenszwalb()
    """
    super().__init__()
    self.scale = scale
    self.sigma = sigma
    self.min_size = min_size
    self.fz_kwargs = kwargs

execute(image_data=None, transform=None, crs=None, raster_path=None, target_crs=None, layer_manager=None, layer_name=None)

Perform Felzenszwalb segmentation and create a layer with the results.

Source code in nickyspatial/core/segmentation.py
def execute(
    self, image_data=None, transform=None, crs=None, raster_path=None, target_crs=None, layer_manager=None, layer_name=None
):
    """Perform Felzenszwalb segmentation and create a layer with the results."""
    start_time = time.time()

    image_data, transform, crs = self._prepare_inputs(image_data, transform, crs, raster_path, target_crs)
    num_bands, height, width = image_data.shape
    multichannel_image = self._normalize_bands(image_data)

    print(f"Felzenszwalb - Processing {height}x{width} image with {num_bands} bands")

    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        segments = segmentation.felzenszwalb(
            multichannel_image, scale=self.scale, sigma=self.sigma, min_size=self.min_size, channel_axis=-1, **self.fz_kwargs
        )

    segments = segments + 1

    if not layer_name:
        layer_name = f"Felzenszwalb_scale{self.scale}_sigma{self.sigma}"

    layer = Layer(name=layer_name, layer_type="segmentation")
    layer.raster = segments
    layer.transform = transform
    layer.crs = crs
    layer.metadata = {
        "algorithm": "Felzenszwalb",
        "scale": self.scale,
        "sigma": self.sigma,
        "min_size": self.min_size,
        "num_segments_actual": len(np.unique(segments)),
        "execution_time_seconds": round(time.time() - start_time, 3),
    }

    if self.fz_kwargs:
        layer.metadata["kwargs"] = self.fz_kwargs

    segment_objects = self._create_segment_objects(segments, transform, crs)
    layer.objects = segment_objects

    bands = [f"band_{i + 1}" for i in range(num_bands)]
    self._calculate_statistics(layer, image_data, bands)

    if layer_manager:
        layer_manager.add_layer(layer)

    return layer

RegularGridSegmentation

Bases: BaseSegmentation

Implementation of regular grid segmentation algorithm.

This algorithm divides the image into regular rectangular segments of specified dimensions, creating a uniform grid pattern across the entire image.

Source code in nickyspatial/core/segmentation.py
class RegularGridSegmentation(BaseSegmentation):
    """Implementation of regular grid segmentation algorithm.

    This algorithm divides the image into regular rectangular segments
    of specified dimensions, creating a uniform grid pattern across
    the entire image.
    """

    def __init__(self, grid_size=(10, 10), overlap=0, boundary_handling="pad"):
        """Initialize the regular grid segmentation algorithm.

        Parameters:
        -----------
        grid_size : tuple of int
            Size of each grid cell (height, width) in pixels.
            For example, (10, 10) creates 10x10 pixel squares.
        overlap : int, optional
            Number of pixels to overlap between adjacent segments.
            Default is 0 (no overlap).
        boundary_handling : str, optional
            How to handle boundary segments that don't fit exactly:
            - 'pad': Pad the image to fit complete grid cells
            - 'truncate': Allow partial segments at boundaries
            - 'stretch': Stretch boundary segments to fill remaining space
        """
        super().__init__()
        self.grid_size = grid_size
        self.overlap = overlap
        self.boundary_handling = boundary_handling

    def _calculate_grid_dimensions(self, image_shape):
        """Calculate grid dimensions and segment layout."""
        height, width = image_shape
        grid_h, grid_w = self.grid_size

        # Calculate effective grid size considering overlap
        effective_h = grid_h - self.overlap
        effective_w = grid_w - self.overlap

        if self.boundary_handling == "pad":
            # Calculate number of complete segments needed
            n_rows = int(np.ceil(height / effective_h))
            n_cols = int(np.ceil(width / effective_w))

            # Calculate padded dimensions
            padded_height = n_rows * effective_h + self.overlap
            padded_width = n_cols * effective_w + self.overlap

            pad_h = max(0, padded_height - height)
            pad_w = max(0, padded_width - width)

        elif self.boundary_handling == "truncate":
            # Allow partial segments
            n_rows = int(np.ceil(height / effective_h))
            n_cols = int(np.ceil(width / effective_w))

            padded_height = height
            padded_width = width
            pad_h = pad_w = 0

        else:  # stretch
            # Calculate based on fitting segments exactly
            n_rows = max(1, int(height / effective_h))
            n_cols = max(1, int(width / effective_w))

            padded_height = height
            padded_width = width
            pad_h = pad_w = 0

        return {
            "n_rows": n_rows,
            "n_cols": n_cols,
            "padded_height": padded_height,
            "padded_width": padded_width,
            "pad_h": pad_h,
            "pad_w": pad_w,
            "effective_h": effective_h,
            "effective_w": effective_w,
        }

    def _create_grid_segments(self, image_shape, grid_info):
        """Create the segmentation array with regular grid."""
        height, width = image_shape
        segments = np.zeros((height, width), dtype=np.int32)

        segment_id = 1

        for row in range(grid_info["n_rows"]):
            for col in range(grid_info["n_cols"]):
                # Calculate segment boundaries
                if self.boundary_handling == "stretch" and (row == grid_info["n_rows"] - 1 or col == grid_info["n_cols"] - 1):
                    # Stretch last row/column to image boundary
                    start_row = row * grid_info["effective_h"]
                    start_col = col * grid_info["effective_w"]

                    if row == grid_info["n_rows"] - 1:
                        end_row = height
                    else:
                        end_row = min(start_row + self.grid_size[0], height)

                    if col == grid_info["n_cols"] - 1:
                        end_col = width
                    else:
                        end_col = min(start_col + self.grid_size[1], width)
                else:
                    # Regular grid cell
                    start_row = row * grid_info["effective_h"]
                    start_col = col * grid_info["effective_w"]
                    end_row = min(start_row + self.grid_size[0], height)
                    end_col = min(start_col + self.grid_size[1], width)

                # Skip if segment is too small (in truncate mode)
                if end_row <= start_row or end_col <= start_col:
                    continue

                # Assign segment ID
                segments[start_row:end_row, start_col:end_col] = segment_id
                segment_id += 1

        return segments

    def execute(
        self, image_data=None, transform=None, crs=None, raster_path=None, target_crs=None, layer_manager=None, layer_name=None
    ):
        """Perform regular grid segmentation and create a layer with the results."""
        start_time = time.time()

        image_data, transform, crs = self._prepare_inputs(image_data, transform, crs, raster_path, target_crs)
        num_bands, height, width = image_data.shape

        print(f"RegularGrid - Processing {height}x{width} image with {num_bands} bands")
        print(f"Grid size: {self.grid_size}, Overlap: {self.overlap}, Boundary: {self.boundary_handling}")

        grid_info = self._calculate_grid_dimensions((height, width))

        print(f"Creating {grid_info['n_rows']}x{grid_info['n_cols']} = {grid_info['n_rows'] * grid_info['n_cols']} segments")

        if self.boundary_handling == "pad" and (grid_info["pad_h"] > 0 or grid_info["pad_w"] > 0):
            padded_image = np.pad(image_data, ((0, 0), (0, grid_info["pad_h"]), (0, grid_info["pad_w"])), mode="reflect")
            working_shape = (grid_info["padded_height"], grid_info["padded_width"])
            print(f"Padded image to {working_shape} (added {grid_info['pad_h']}x{grid_info['pad_w']} pixels)")
        else:
            padded_image = image_data
            working_shape = (height, width)

        segments = self._create_grid_segments(working_shape, grid_info)

        # If we padded, crop back to original size
        if self.boundary_handling == "pad" and (grid_info["pad_h"] > 0 or grid_info["pad_w"] > 0):
            segments = segments[:height, :width]
            # Also crop the padded image data for statistics calculation
            padded_image = padded_image[:, :height, :width]

        # Ensure we're using the original image data for statistics
        working_image = image_data

        if not layer_name:
            layer_name = f"RegularGrid_{self.grid_size[0]}x{self.grid_size[1]}_overlap{self.overlap}"

        # Create layer
        layer = Layer(name=layer_name, layer_type="segmentation")
        layer.raster = segments
        layer.transform = transform
        layer.crs = crs
        layer.metadata = {
            "algorithm": "RegularGrid",
            "grid_size": self.grid_size,
            "overlap": self.overlap,
            "boundary_handling": self.boundary_handling,
            "n_rows": grid_info["n_rows"],
            "n_cols": grid_info["n_cols"],
            "num_segments_actual": len(np.unique(segments[segments > 0])),
            "image_shape": (height, width),
            "execution_time_seconds": round(time.time() - start_time, 3),
        }

        # Create segment objects
        segment_objects = self._create_segment_objects(segments, transform, crs)
        layer.objects = segment_objects

        # Calculate statistics
        bands = [f"band_{i + 1}" for i in range(num_bands)]
        self._calculate_statistics(layer, working_image, bands)

        if layer_manager:
            layer_manager.add_layer(layer)

        return layer

__init__(grid_size=(10, 10), overlap=0, boundary_handling='pad')

Initialize the regular grid segmentation algorithm.

Parameters:

grid_size : tuple of int Size of each grid cell (height, width) in pixels. For example, (10, 10) creates 10x10 pixel squares. overlap : int, optional Number of pixels to overlap between adjacent segments. Default is 0 (no overlap). boundary_handling : str, optional How to handle boundary segments that don't fit exactly: - 'pad': Pad the image to fit complete grid cells - 'truncate': Allow partial segments at boundaries - 'stretch': Stretch boundary segments to fill remaining space

Source code in nickyspatial/core/segmentation.py
def __init__(self, grid_size=(10, 10), overlap=0, boundary_handling="pad"):
    """Initialize the regular grid segmentation algorithm.

    Parameters:
    -----------
    grid_size : tuple of int
        Size of each grid cell (height, width) in pixels.
        For example, (10, 10) creates 10x10 pixel squares.
    overlap : int, optional
        Number of pixels to overlap between adjacent segments.
        Default is 0 (no overlap).
    boundary_handling : str, optional
        How to handle boundary segments that don't fit exactly:
        - 'pad': Pad the image to fit complete grid cells
        - 'truncate': Allow partial segments at boundaries
        - 'stretch': Stretch boundary segments to fill remaining space
    """
    super().__init__()
    self.grid_size = grid_size
    self.overlap = overlap
    self.boundary_handling = boundary_handling

execute(image_data=None, transform=None, crs=None, raster_path=None, target_crs=None, layer_manager=None, layer_name=None)

Perform regular grid segmentation and create a layer with the results.

Source code in nickyspatial/core/segmentation.py
def execute(
    self, image_data=None, transform=None, crs=None, raster_path=None, target_crs=None, layer_manager=None, layer_name=None
):
    """Perform regular grid segmentation and create a layer with the results."""
    start_time = time.time()

    image_data, transform, crs = self._prepare_inputs(image_data, transform, crs, raster_path, target_crs)
    num_bands, height, width = image_data.shape

    print(f"RegularGrid - Processing {height}x{width} image with {num_bands} bands")
    print(f"Grid size: {self.grid_size}, Overlap: {self.overlap}, Boundary: {self.boundary_handling}")

    grid_info = self._calculate_grid_dimensions((height, width))

    print(f"Creating {grid_info['n_rows']}x{grid_info['n_cols']} = {grid_info['n_rows'] * grid_info['n_cols']} segments")

    if self.boundary_handling == "pad" and (grid_info["pad_h"] > 0 or grid_info["pad_w"] > 0):
        padded_image = np.pad(image_data, ((0, 0), (0, grid_info["pad_h"]), (0, grid_info["pad_w"])), mode="reflect")
        working_shape = (grid_info["padded_height"], grid_info["padded_width"])
        print(f"Padded image to {working_shape} (added {grid_info['pad_h']}x{grid_info['pad_w']} pixels)")
    else:
        padded_image = image_data
        working_shape = (height, width)

    segments = self._create_grid_segments(working_shape, grid_info)

    # If we padded, crop back to original size
    if self.boundary_handling == "pad" and (grid_info["pad_h"] > 0 or grid_info["pad_w"] > 0):
        segments = segments[:height, :width]
        # Also crop the padded image data for statistics calculation
        padded_image = padded_image[:, :height, :width]

    # Ensure we're using the original image data for statistics
    working_image = image_data

    if not layer_name:
        layer_name = f"RegularGrid_{self.grid_size[0]}x{self.grid_size[1]}_overlap{self.overlap}"

    # Create layer
    layer = Layer(name=layer_name, layer_type="segmentation")
    layer.raster = segments
    layer.transform = transform
    layer.crs = crs
    layer.metadata = {
        "algorithm": "RegularGrid",
        "grid_size": self.grid_size,
        "overlap": self.overlap,
        "boundary_handling": self.boundary_handling,
        "n_rows": grid_info["n_rows"],
        "n_cols": grid_info["n_cols"],
        "num_segments_actual": len(np.unique(segments[segments > 0])),
        "image_shape": (height, width),
        "execution_time_seconds": round(time.time() - start_time, 3),
    }

    # Create segment objects
    segment_objects = self._create_segment_objects(segments, transform, crs)
    layer.objects = segment_objects

    # Calculate statistics
    bands = [f"band_{i + 1}" for i in range(num_bands)]
    self._calculate_statistics(layer, working_image, bands)

    if layer_manager:
        layer_manager.add_layer(layer)

    return layer

SlicSegmentation

Bases: BaseSegmentation

Implementation of Multiresolution segmentation algorithm.

This algorithm segments an image using a bottom-up region-growing approach that optimizes the homogeneity of pixel values within segments while considering shape compactness.

Source code in nickyspatial/core/segmentation.py
class SlicSegmentation(BaseSegmentation):
    """Implementation of Multiresolution segmentation algorithm.

    This algorithm segments an image using a bottom-up region-growing approach
    that optimizes the homogeneity of pixel values within segments while
    considering shape compactness.
    """

    def __init__(self, scale=15, compactness=0.6, **kwargs):
        """Initialize the segmentation algorithm.

        Parameters:
        -----------
        scale : float
            Scale parameter that influences the size of the segments.
            Higher values create larger segments.
        shape : float, range [0, 1]
            Weight of shape criterion vs. color criterion.
            Higher values give more weight to shape.
        compactness : float, range [0, 1]
            Weight of compactness criterion vs. smoothness criterion.
            Higher values create more compact segments.
        **kwargs : dict
            Additional parameters passed to skimage.segmentation.slic()
        """
        super().__init__()
        self.scale = scale
        self.compactness = compactness
        self.slic_kwargs = kwargs

    def execute(
        self, image_data=None, transform=None, crs=None, raster_path=None, target_crs=None, layer_manager=None, layer_name=None
    ):
        """Perform segmentation and create a layer with the results.

        Parameters:
        -----------
        image_data : numpy.ndarray, optional
            Array with raster data values (bands, height, width).
            If not provided, must specify raster_path.
        transform : affine.Affine, optional
            Affine transformation for the raster.
            If not provided, will be extracted from raster_path.
        crs : rasterio.crs.CRS, optional
            Coordinate reference system.
            If not provided, will be extracted from raster_path.
        raster_path : str, optional
            Path to raster file. If provided, image_data, transform, and crs
            will be extracted automatically.
        target_crs : str or rasterio.crs.CRS, optional
            Target CRS for the output. If different from input CRS,
            automatic reprojection will be performed.
        layer_manager : LayerManager, optional
            Layer manager to add the result layer to
        layer_name : str, optional
            Name for the result layer

        Returns:
        --------
        layer : Layer
            Layer containing the segmentation results
        """
        start_time = time.time()

        image_data, transform, crs = self._prepare_inputs(image_data, transform, crs, raster_path, target_crs)
        num_bands, height, width = image_data.shape

        multichannel_image = self._normalize_bands(image_data)

        n_segments = int(width * height / (self.scale * self.scale))
        print(f"Number of segments: {n_segments}")

        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            segments = segmentation.slic(
                multichannel_image,
                n_segments=n_segments,
                compactness=self.compactness,
                sigma=1.0,
                start_label=1,
                channel_axis=-1,
                **self.slic_kwargs,
            )

        if not layer_name:
            layer_name = f"Segmentation_scale{self.scale}_comp{self.compactness}"

        layer = Layer(name=layer_name, layer_type="segmentation")
        layer.raster = segments
        layer.transform = transform
        layer.crs = crs
        layer.metadata = {
            "scale": self.scale,
            "compactness": self.compactness,
            "n_segments": n_segments,
            "num_segments_actual": len(np.unique(segments)),
            "execution_time_seconds": round(time.time() - start_time, 3),
        }

        if self.slic_kwargs:
            layer.metadata["kwargs"] = self.slic_kwargs

        segment_objects = self._create_segment_objects(segments, transform, crs)
        layer.objects = segment_objects

        bands = [f"band_{i + 1}" for i in range(num_bands)]
        self._calculate_statistics(layer, image_data, bands)

        if layer_manager:
            layer_manager.add_layer(layer)

        return layer

__init__(scale=15, compactness=0.6, **kwargs)

Initialize the segmentation algorithm.

Parameters:

scale : float Scale parameter that influences the size of the segments. Higher values create larger segments. shape : float, range [0, 1] Weight of shape criterion vs. color criterion. Higher values give more weight to shape. compactness : float, range [0, 1] Weight of compactness criterion vs. smoothness criterion. Higher values create more compact segments. **kwargs : dict Additional parameters passed to skimage.segmentation.slic()

Source code in nickyspatial/core/segmentation.py
def __init__(self, scale=15, compactness=0.6, **kwargs):
    """Initialize the segmentation algorithm.

    Parameters:
    -----------
    scale : float
        Scale parameter that influences the size of the segments.
        Higher values create larger segments.
    shape : float, range [0, 1]
        Weight of shape criterion vs. color criterion.
        Higher values give more weight to shape.
    compactness : float, range [0, 1]
        Weight of compactness criterion vs. smoothness criterion.
        Higher values create more compact segments.
    **kwargs : dict
        Additional parameters passed to skimage.segmentation.slic()
    """
    super().__init__()
    self.scale = scale
    self.compactness = compactness
    self.slic_kwargs = kwargs

execute(image_data=None, transform=None, crs=None, raster_path=None, target_crs=None, layer_manager=None, layer_name=None)

Perform segmentation and create a layer with the results.

Parameters:

image_data : numpy.ndarray, optional Array with raster data values (bands, height, width). If not provided, must specify raster_path. transform : affine.Affine, optional Affine transformation for the raster. If not provided, will be extracted from raster_path. crs : rasterio.crs.CRS, optional Coordinate reference system. If not provided, will be extracted from raster_path. raster_path : str, optional Path to raster file. If provided, image_data, transform, and crs will be extracted automatically. target_crs : str or rasterio.crs.CRS, optional Target CRS for the output. If different from input CRS, automatic reprojection will be performed. layer_manager : LayerManager, optional Layer manager to add the result layer to layer_name : str, optional Name for the result layer

Returns:

layer : Layer Layer containing the segmentation results

Source code in nickyspatial/core/segmentation.py
def execute(
    self, image_data=None, transform=None, crs=None, raster_path=None, target_crs=None, layer_manager=None, layer_name=None
):
    """Perform segmentation and create a layer with the results.

    Parameters:
    -----------
    image_data : numpy.ndarray, optional
        Array with raster data values (bands, height, width).
        If not provided, must specify raster_path.
    transform : affine.Affine, optional
        Affine transformation for the raster.
        If not provided, will be extracted from raster_path.
    crs : rasterio.crs.CRS, optional
        Coordinate reference system.
        If not provided, will be extracted from raster_path.
    raster_path : str, optional
        Path to raster file. If provided, image_data, transform, and crs
        will be extracted automatically.
    target_crs : str or rasterio.crs.CRS, optional
        Target CRS for the output. If different from input CRS,
        automatic reprojection will be performed.
    layer_manager : LayerManager, optional
        Layer manager to add the result layer to
    layer_name : str, optional
        Name for the result layer

    Returns:
    --------
    layer : Layer
        Layer containing the segmentation results
    """
    start_time = time.time()

    image_data, transform, crs = self._prepare_inputs(image_data, transform, crs, raster_path, target_crs)
    num_bands, height, width = image_data.shape

    multichannel_image = self._normalize_bands(image_data)

    n_segments = int(width * height / (self.scale * self.scale))
    print(f"Number of segments: {n_segments}")

    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        segments = segmentation.slic(
            multichannel_image,
            n_segments=n_segments,
            compactness=self.compactness,
            sigma=1.0,
            start_label=1,
            channel_axis=-1,
            **self.slic_kwargs,
        )

    if not layer_name:
        layer_name = f"Segmentation_scale{self.scale}_comp{self.compactness}"

    layer = Layer(name=layer_name, layer_type="segmentation")
    layer.raster = segments
    layer.transform = transform
    layer.crs = crs
    layer.metadata = {
        "scale": self.scale,
        "compactness": self.compactness,
        "n_segments": n_segments,
        "num_segments_actual": len(np.unique(segments)),
        "execution_time_seconds": round(time.time() - start_time, 3),
    }

    if self.slic_kwargs:
        layer.metadata["kwargs"] = self.slic_kwargs

    segment_objects = self._create_segment_objects(segments, transform, crs)
    layer.objects = segment_objects

    bands = [f"band_{i + 1}" for i in range(num_bands)]
    self._calculate_statistics(layer, image_data, bands)

    if layer_manager:
        layer_manager.add_layer(layer)

    return layer

WatershedSegmentation

Bases: BaseSegmentation

Implementation of watershed segmentation algorithm using regular grid seeding.

The watershed algorithm treats the image as a topographic surface where pixel intensities represent elevation. It finds watershed lines that separate different catchment basins, effectively segmenting the image into distinct regions.

Source code in nickyspatial/core/segmentation.py
class WatershedSegmentation(BaseSegmentation):
    """Implementation of watershed segmentation algorithm using regular grid seeding.

    The watershed algorithm treats the image as a topographic surface where
    pixel intensities represent elevation. It finds watershed lines that
    separate different catchment basins, effectively segmenting the image
    into distinct regions.
    """

    def __init__(self, n_points=468, compactness=0, watershed_line=False, preprocessing="sobel", mask=None, **kwargs):
        """Initialize the watershed segmentation algorithm.

        Parameters:
        -----------
        n_points : int
            Number of seed points to generate using regular grid.
            Higher values create more segments.
        compactness : float, optional
            Use compact watershed with given compactness parameter.
            Higher values result in more regularly-shaped watershed basins.
        watershed_line : bool, optional
            If watershed_line is True, a one-pixel wide line separates the regions
            obtained by the watershed algorithm. The line has the label 0.
        preprocessing : str, optional
            Method for preprocessing the image before watershed segmentation.
            Options: 'sobel', 'prewitt', 'scharr'
        mask : ndarray of bools or 0s and 1s, optional
            Array of same shape as image. Only points at which mask == True
            will be labeled.
        **kwargs : dict
            Additional parameters passed to watershed function
        """
        super().__init__()
        self.n_points = n_points
        self.compactness = compactness
        self.watershed_line = watershed_line
        self.preprocessing = preprocessing
        self.mask = mask
        self.watershed_kwargs = kwargs

    def _preprocess_image(self, image):
        """Apply edge detection preprocessing to the image before watershed segmentation."""
        if len(image.shape) == 3:
            image = np.mean(image, axis=-1)

        if self.preprocessing == "sobel":
            processed = filters.sobel(image)
        elif self.preprocessing == "prewitt":
            processed = filters.prewitt(image)
        elif self.preprocessing == "scharr":
            processed = filters.scharr(image)
        else:
            raise ValueError(f"Unknown preprocessing method: {self.preprocessing}. Available options: 'sobel', 'prewitt', 'scharr'")

        return processed

    def _generate_seeds(self, image_shape, n_points):
        """Generate seed points using regular grid approach."""
        grid = util.regular_grid(image_shape, n_points=n_points)

        seeds = np.zeros(image_shape, dtype=int)
        seeds[grid] = np.arange(seeds[grid].size).reshape(seeds[grid].shape) + 1

        return seeds

    def execute(
        self, image_data=None, transform=None, crs=None, raster_path=None, target_crs=None, layer_manager=None, layer_name=None
    ):
        """Perform watershed segmentation and create a layer with the results."""
        start_time = time.time()

        image_data, transform, crs = self._prepare_inputs(image_data, transform, crs, raster_path, target_crs)
        num_bands, height, width = image_data.shape

        multichannel_image = self._normalize_bands(image_data)

        print(f"Watershed - Processing {height}x{width} image with {num_bands} bands")
        print(f"Using {self.n_points} seed points with {self.preprocessing} edge detection")

        edges = self._preprocess_image(multichannel_image)

        seeds = self._generate_seeds((height, width), self.n_points)
        actual_seeds = len(np.unique(seeds)) - 1
        print(f"Generated {actual_seeds} seed points")

        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            segments = segmentation.watershed(
                edges,
                markers=seeds,
                mask=self.mask,
                compactness=self.compactness,
                watershed_line=self.watershed_line,
                **self.watershed_kwargs,
            )

        # Handle watershed lines if present
        if self.watershed_line and 0 in np.unique(segments):
            print("Watershed lines preserved (label 0)")
        elif np.min(segments) == 0:
            # Relabel to start from 1 if no watershed lines
            segments = segments + 1

        if not layer_name:
            layer_name = f"Watershed_{self.preprocessing}_n{self.n_points}_comp{self.compactness}"

        # Create layer
        layer = Layer(name=layer_name, layer_type="segmentation")
        layer.raster = segments
        layer.transform = transform
        layer.crs = crs
        layer.metadata = {
            "algorithm": "Watershed",
            "preprocessing": self.preprocessing,
            "n_points": self.n_points,
            "compactness": self.compactness,
            "watershed_line": self.watershed_line,
            "num_seeds": actual_seeds,
            "num_segments_actual": len(np.unique(segments)),
            "execution_time_seconds": round(time.time() - start_time, 3),
        }

        if self.watershed_kwargs:
            layer.metadata["kwargs"] = self.watershed_kwargs

        # Create segment objects
        segment_objects = self._create_segment_objects(segments, transform, crs)
        layer.objects = segment_objects

        # Calculate statistics
        bands = [f"band_{i + 1}" for i in range(num_bands)]
        self._calculate_statistics(layer, image_data, bands)

        if layer_manager:
            layer_manager.add_layer(layer)

        return layer

__init__(n_points=468, compactness=0, watershed_line=False, preprocessing='sobel', mask=None, **kwargs)

Initialize the watershed segmentation algorithm.

Parameters:

n_points : int Number of seed points to generate using regular grid. Higher values create more segments. compactness : float, optional Use compact watershed with given compactness parameter. Higher values result in more regularly-shaped watershed basins. watershed_line : bool, optional If watershed_line is True, a one-pixel wide line separates the regions obtained by the watershed algorithm. The line has the label 0. preprocessing : str, optional Method for preprocessing the image before watershed segmentation. Options: 'sobel', 'prewitt', 'scharr' mask : ndarray of bools or 0s and 1s, optional Array of same shape as image. Only points at which mask == True will be labeled. **kwargs : dict Additional parameters passed to watershed function

Source code in nickyspatial/core/segmentation.py
def __init__(self, n_points=468, compactness=0, watershed_line=False, preprocessing="sobel", mask=None, **kwargs):
    """Initialize the watershed segmentation algorithm.

    Parameters:
    -----------
    n_points : int
        Number of seed points to generate using regular grid.
        Higher values create more segments.
    compactness : float, optional
        Use compact watershed with given compactness parameter.
        Higher values result in more regularly-shaped watershed basins.
    watershed_line : bool, optional
        If watershed_line is True, a one-pixel wide line separates the regions
        obtained by the watershed algorithm. The line has the label 0.
    preprocessing : str, optional
        Method for preprocessing the image before watershed segmentation.
        Options: 'sobel', 'prewitt', 'scharr'
    mask : ndarray of bools or 0s and 1s, optional
        Array of same shape as image. Only points at which mask == True
        will be labeled.
    **kwargs : dict
        Additional parameters passed to watershed function
    """
    super().__init__()
    self.n_points = n_points
    self.compactness = compactness
    self.watershed_line = watershed_line
    self.preprocessing = preprocessing
    self.mask = mask
    self.watershed_kwargs = kwargs

execute(image_data=None, transform=None, crs=None, raster_path=None, target_crs=None, layer_manager=None, layer_name=None)

Perform watershed segmentation and create a layer with the results.

Source code in nickyspatial/core/segmentation.py
def execute(
    self, image_data=None, transform=None, crs=None, raster_path=None, target_crs=None, layer_manager=None, layer_name=None
):
    """Perform watershed segmentation and create a layer with the results."""
    start_time = time.time()

    image_data, transform, crs = self._prepare_inputs(image_data, transform, crs, raster_path, target_crs)
    num_bands, height, width = image_data.shape

    multichannel_image = self._normalize_bands(image_data)

    print(f"Watershed - Processing {height}x{width} image with {num_bands} bands")
    print(f"Using {self.n_points} seed points with {self.preprocessing} edge detection")

    edges = self._preprocess_image(multichannel_image)

    seeds = self._generate_seeds((height, width), self.n_points)
    actual_seeds = len(np.unique(seeds)) - 1
    print(f"Generated {actual_seeds} seed points")

    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        segments = segmentation.watershed(
            edges,
            markers=seeds,
            mask=self.mask,
            compactness=self.compactness,
            watershed_line=self.watershed_line,
            **self.watershed_kwargs,
        )

    # Handle watershed lines if present
    if self.watershed_line and 0 in np.unique(segments):
        print("Watershed lines preserved (label 0)")
    elif np.min(segments) == 0:
        # Relabel to start from 1 if no watershed lines
        segments = segments + 1

    if not layer_name:
        layer_name = f"Watershed_{self.preprocessing}_n{self.n_points}_comp{self.compactness}"

    # Create layer
    layer = Layer(name=layer_name, layer_type="segmentation")
    layer.raster = segments
    layer.transform = transform
    layer.crs = crs
    layer.metadata = {
        "algorithm": "Watershed",
        "preprocessing": self.preprocessing,
        "n_points": self.n_points,
        "compactness": self.compactness,
        "watershed_line": self.watershed_line,
        "num_seeds": actual_seeds,
        "num_segments_actual": len(np.unique(segments)),
        "execution_time_seconds": round(time.time() - start_time, 3),
    }

    if self.watershed_kwargs:
        layer.metadata["kwargs"] = self.watershed_kwargs

    # Create segment objects
    segment_objects = self._create_segment_objects(segments, transform, crs)
    layer.objects = segment_objects

    # Calculate statistics
    bands = [f"band_{i + 1}" for i in range(num_bands)]
    self._calculate_statistics(layer, image_data, bands)

    if layer_manager:
        layer_manager.add_layer(layer)

    return layer

nickyspatial.filters

The filters package provides modules for applying transformations to raster data.

It includes spatial filters (e.g., smoothing) as well as spectral filters (e.g., band math). Main idea is to further manipulate the objects such as merging segments or applying pre-defined rules to filter objects based on their attributes.

Implements spatial operations like smoothing and morphological transformations.

These filters can modify the geometry or arrangement of pixel values to enhance or simplify data for object analysis. The functions here include smoothing boundaries, merging small segments, and selecting segments based on area. These operations are essential for preparing data for object-based image analysis, especially in remote sensing applications. The functions are designed to work with raster data and can be applied to layers created from segmentation algorithms.

merge_small_segments(source_layer, min_size, attribute='area_pixels', layer_manager=None, layer_name=None)

Merge small segments with their largest neighbor.

Parameters:

source_layer : Layer Source layer with segments to merge min_size : float Minimum segment size threshold attribute : str Attribute to use for size comparison layer_manager : LayerManager, optional Layer manager to add the result layer to layer_name : str, optional Name for the result layer

Returns:

result_layer : Layer Layer with merged segments

Source code in nickyspatial/filters/spatial.py
def merge_small_segments(source_layer, min_size, attribute="area_pixels", layer_manager=None, layer_name=None):
    """Merge small segments with their largest neighbor.

    Parameters:
    -----------
    source_layer : Layer
        Source layer with segments to merge
    min_size : float
        Minimum segment size threshold
    attribute : str
        Attribute to use for size comparison
    layer_manager : LayerManager, optional
        Layer manager to add the result layer to
    layer_name : str, optional
        Name for the result layer

    Returns:
    --------
    result_layer : Layer
        Layer with merged segments
    """
    if not layer_name:
        layer_name = f"{source_layer.name}_merged"

    result_layer = Layer(name=layer_name, parent=source_layer, layer_type="filter")
    result_layer.transform = source_layer.transform
    result_layer.crs = source_layer.crs

    result_layer.metadata = {
        "filter_type": "merge_small_segments",
        "min_size": min_size,
        "attribute": attribute,
    }

    objects = source_layer.objects.copy()
    small_segments = objects[objects[attribute] < min_size]

    if len(small_segments) == 0:
        result_layer.objects = objects
        result_layer.raster = source_layer.raster.copy() if source_layer.raster is not None else None

        if layer_manager:
            layer_manager.add_layer(result_layer)

        return result_layer

    for idx, small_segment in small_segments.iterrows():
        if idx not in objects.index:
            continue

        neighbors = objects[objects.index != idx].overlay(
            gpd.GeoDataFrame(geometry=[small_segment.geometry], crs=objects.crs),
            how="intersection",
        )

        if len(neighbors) == 0:
            continue

        largest_neighbor_idx = neighbors[attribute].idxmax()

        largest_neighbor = objects.loc[largest_neighbor_idx]
        merged_geometry = largest_neighbor.geometry.union(small_segment.geometry)

        objects.at[largest_neighbor_idx, "geometry"] = merged_geometry
        objects.at[largest_neighbor_idx, attribute] += small_segment[attribute]

        objects = objects.drop(idx)

    if source_layer.raster is not None:
        segments_raster = source_layer.raster.copy()

        old_to_new = {}
        for _idx, obj in objects.iterrows():
            old_id = obj["segment_id"]
            old_to_new[old_id] = old_id

        for idx, small_segment in small_segments.iterrows():
            if idx not in objects.index:
                old_id = small_segment["segment_id"]

                touching_segments = objects.intersects(small_segment.geometry)
                if any(touching_segments):
                    new_id = objects[touching_segments].iloc[0]["segment_id"]
                    old_to_new[old_id] = new_id

        for old_id, new_id in old_to_new.items():
            if old_id != new_id:
                segments_raster[segments_raster == old_id] = new_id

        result_layer.raster = segments_raster

    result_layer.objects = objects

    if layer_manager:
        layer_manager.add_layer(result_layer)

    return result_layer

select_by_area(source_layer, min_area=None, max_area=None, area_column='area_units', layer_manager=None, layer_name=None)

Select segments based on area.

Parameters:

source_layer : Layer Source layer with segments to filter min_area : float, optional Minimum area threshold max_area : float, optional Maximum area threshold area_column : str Column containing area values layer_manager : LayerManager, optional Layer manager to add the result layer to layer_name : str, optional Name for the result layer

Returns:

result_layer : Layer Layer with filtered segments

Source code in nickyspatial/filters/spatial.py
def select_by_area(
    source_layer,
    min_area=None,
    max_area=None,
    area_column="area_units",
    layer_manager=None,
    layer_name=None,
):
    """Select segments based on area.

    Parameters:
    -----------
    source_layer : Layer
        Source layer with segments to filter
    min_area : float, optional
        Minimum area threshold
    max_area : float, optional
        Maximum area threshold
    area_column : str
        Column containing area values
    layer_manager : LayerManager, optional
        Layer manager to add the result layer to
    layer_name : str, optional
        Name for the result layer

    Returns:
    --------
    result_layer : Layer
        Layer with filtered segments
    """
    if not layer_name:
        layer_name = f"{source_layer.name}_area_filtered"

    result_layer = Layer(name=layer_name, parent=source_layer, layer_type="filter")
    result_layer.transform = source_layer.transform
    result_layer.crs = source_layer.crs

    result_layer.metadata = {
        "filter_type": "select_by_area",
        "min_area": min_area,
        "max_area": max_area,
        "area_column": area_column,
    }

    objects = source_layer.objects.copy()

    if min_area is not None:
        objects = objects[objects[area_column] >= min_area]

    if max_area is not None:
        objects = objects[objects[area_column] <= max_area]

    result_layer.objects = objects

    if source_layer.raster is not None:
        kept_ids = set(objects["segment_id"])

        segments_raster = source_layer.raster.copy()
        mask = np.isin(segments_raster, list(kept_ids))

        segments_raster[~mask] = 0

        result_layer.raster = segments_raster

    if layer_manager:
        layer_manager.add_layer(result_layer)

    return result_layer

smooth_boundaries(source_layer, iterations=1, layer_manager=None, layer_name=None)

Smooth segment boundaries by applying morphological operations.

Parameters:

source_layer : Layer Source layer with segments to smooth iterations : int Number of smoothing iterations to apply layer_manager : LayerManager, optional Layer manager to add the result layer to layer_name : str, optional Name for the result layer

Returns:

result_layer : Layer Layer with smoothed segment boundaries

Source code in nickyspatial/filters/spatial.py
def smooth_boundaries(source_layer, iterations=1, layer_manager=None, layer_name=None):
    """Smooth segment boundaries by applying morphological operations.

    Parameters:
    -----------
    source_layer : Layer
        Source layer with segments to smooth
    iterations : int
        Number of smoothing iterations to apply
    layer_manager : LayerManager, optional
        Layer manager to add the result layer to
    layer_name : str, optional
        Name for the result layer

    Returns:
    --------
    result_layer : Layer
        Layer with smoothed segment boundaries
    """
    if not layer_name:
        layer_name = f"{source_layer.name}_smoothed"

    result_layer = Layer(name=layer_name, parent=source_layer, layer_type="filter")
    result_layer.transform = source_layer.transform
    result_layer.crs = source_layer.crs
    result_layer.raster = source_layer.raster.copy() if source_layer.raster is not None else None

    result_layer.metadata = {
        "filter_type": "smooth_boundaries",
        "iterations": iterations,
    }

    objects = source_layer.objects.copy()

    smoothed_geometries = []
    for geom in objects.geometry:
        smoothed_geom = geom
        for _ in range(iterations):
            buffer_distance = np.sqrt(smoothed_geom.area) * 0.01
            smoothed_geom = smoothed_geom.buffer(-buffer_distance).buffer(buffer_distance * 2)

        if not smoothed_geom.is_valid:
            smoothed_geom = smoothed_geom.buffer(0)

        smoothed_geometries.append(smoothed_geom)

    objects.geometry = smoothed_geometries
    result_layer.objects = objects

    if layer_manager:
        layer_manager.add_layer(result_layer)

    return result_layer

Performs spectral-based manipulations of imagery, including band arithmetic and transformations.

It supports generating new spectral bands or combinations to highlight specific features. It also includes functions for enhancing contrast and applying spectral filters based on mathematical expressions. This module is designed to work with raster . The functions here include contrast enhancement, spectral filtering, and band arithmetic. Not a great fan of these but might be handy sometime

enhance_contrast(source_layer, percentile_min=2, percentile_max=98, layer_manager=None, layer_name=None)

Enhance contrast in source layer raster data.

Parameters:

source_layer : Layer Source layer with raster data percentile_min : float Lower percentile for contrast stretching percentile_max : float Upper percentile for contrast stretching layer_manager : LayerManager, optional Layer manager to add the result layer to layer_name : str, optional Name for the result layer

Returns:

result_layer : Layer Layer with enhanced contrast

Source code in nickyspatial/filters/spectral.py
def enhance_contrast(
    source_layer,
    percentile_min=2,
    percentile_max=98,
    layer_manager=None,
    layer_name=None,
):
    """Enhance contrast in source layer raster data.

    Parameters:
    -----------
    source_layer : Layer
        Source layer with raster data
    percentile_min : float
        Lower percentile for contrast stretching
    percentile_max : float
        Upper percentile for contrast stretching
    layer_manager : LayerManager, optional
        Layer manager to add the result layer to
    layer_name : str, optional
        Name for the result layer

    Returns:
    --------
    result_layer : Layer
        Layer with enhanced contrast
    """
    if source_layer.raster is None:
        raise ValueError("Source layer must have raster data")

    if not layer_name:
        layer_name = f"{source_layer.name}_enhanced"

    result_layer = Layer(name=layer_name, parent=source_layer, layer_type="filter")
    result_layer.transform = source_layer.transform
    result_layer.crs = source_layer.crs
    result_layer.objects = source_layer.objects.copy() if source_layer.objects is not None else None

    result_layer.metadata = {
        "filter_type": "enhance_contrast",
        "percentile_min": percentile_min,
        "percentile_max": percentile_max,
    }

    enhanced_raster = source_layer.raster.copy()

    p_min = np.percentile(enhanced_raster, percentile_min)
    p_max = np.percentile(enhanced_raster, percentile_max)

    enhanced_raster = np.clip(enhanced_raster, p_min, p_max)
    enhanced_raster = (enhanced_raster - p_min) / (p_max - p_min)

    result_layer.raster = enhanced_raster

    if layer_manager:
        layer_manager.add_layer(result_layer)

    return result_layer

spectral_filter(source_layer, expression, layer_manager=None, layer_name=None)

Apply a spectral filter based on a mathematical expression.

Parameters:

source_layer : Layer Source layer with segment statistics expression : str Mathematical expression to apply (e.g., "NDVI > 0.5") layer_manager : LayerManager, optional Layer manager to add the result layer to layer_name : str, optional Name for the result layer

Returns:

result_layer : Layer Layer with filtered segments

Source code in nickyspatial/filters/spectral.py
def spectral_filter(source_layer, expression, layer_manager=None, layer_name=None):
    """Apply a spectral filter based on a mathematical expression.

    Parameters:
    -----------
    source_layer : Layer
        Source layer with segment statistics
    expression : str
        Mathematical expression to apply (e.g., "NDVI > 0.5")
    layer_manager : LayerManager, optional
        Layer manager to add the result layer to
    layer_name : str, optional
        Name for the result layer

    Returns:
    --------
    result_layer : Layer
        Layer with filtered segments
    """
    import numexpr as ne

    if not layer_name:
        layer_name = f"{source_layer.name}_spectral_filtered"

    result_layer = Layer(name=layer_name, parent=source_layer, layer_type="filter")
    result_layer.transform = source_layer.transform
    result_layer.crs = source_layer.crs

    result_layer.metadata = {"filter_type": "spectral_filter", "expression": expression}

    objects = source_layer.objects.copy()

    try:
        local_dict = {col: objects[col].values for col in objects.columns if col != "geometry"}
        mask = ne.evaluate(expression, local_dict=local_dict)
        mask = np.array(mask, dtype=bool)

        filtered_objects = objects.iloc[mask]
        result_layer.objects = filtered_objects

        if source_layer.raster is not None:
            kept_ids = set(filtered_objects["segment_id"])
            segments_raster = source_layer.raster.copy()
            raster_mask = np.isin(segments_raster, list(kept_ids))
            segments_raster[~raster_mask] = 0
            result_layer.raster = segments_raster

    except Exception as e:
        raise ValueError(f"Error applying spectral filter: {str(e)}") from e

    if layer_manager:
        layer_manager.add_layer(result_layer)

    return result_layer

nickyspatial.io

The io package contains modules for reading and writing both raster and vector data.

It abstracts file operations and coordinate system handling to facilitate I/O tasks.

Handles raster input and output operations, including reading and saving multi-band images.

Functions in this module may also provide metadata parsing and coordinate transform tools.

layer_to_raster(layer, output_path, column=None, nodata=0)

Save a layer to a raster file.

Parameters:

layer : Layer Layer to save output_path : str Path to the output raster file column : str, optional Column to rasterize (if saving from vector objects) nodata : int or float, optional No data value

Source code in nickyspatial/io/raster.py
def layer_to_raster(layer, output_path, column=None, nodata=0):
    """Save a layer to a raster file.

    Parameters:
    -----------
    layer : Layer
        Layer to save
    output_path : str
        Path to the output raster file
    column : str, optional
        Column to rasterize (if saving from vector objects)
    nodata : int or float, optional
        No data value
    """
    from rasterio import features

    os.makedirs(os.path.dirname(output_path), exist_ok=True)

    if layer.raster is not None and column is None:
        write_raster(
            output_path,
            layer.raster.reshape(1, *layer.raster.shape),
            layer.transform,
            layer.crs,
            nodata,
        )
        return

    if layer.objects is not None and column is not None:
        if column not in layer.objects.columns:
            raise ValueError(f"Column '{column}' not found in layer objects")

        objects = layer.objects
        col_values = objects[column]
        if np.issubdtype(col_values.dtype, np.number):
            shapes = [(geom, float(val)) for geom, val in zip(objects.geometry, col_values, strict=False)]
        else:
            unique_vals = col_values.unique()
            val_map = {val: idx for idx, val in enumerate(unique_vals)}
            print(f"Mapping categorical values: {val_map}")
            shapes = [(geom, val_map[val]) for geom, val in zip(objects.geometry, col_values, strict=False)]
        if layer.raster is not None:
            if len(layer.raster.shape) == 3:
                height, width = layer.raster.shape[1], layer.raster.shape[2]
            else:
                height, width = layer.raster.shape
            out_shape = (height, width)
        else:
            bounds = objects.total_bounds
            resolution = 10
            if layer.transform:
                resolution = abs(layer.transform.a)
            width = int((bounds[2] - bounds[0]) / resolution)
            height = int((bounds[3] - bounds[1]) / resolution)
            out_shape = (height, width)
            if layer.transform is None:
                layer.transform = from_origin(bounds[0], bounds[3], resolution, resolution)

        output = np.ones(out_shape, dtype=np.float32) * nodata

        features.rasterize(shapes, out=output, transform=layer.transform, fill=nodata)

        write_raster(
            output_path,
            output.reshape(1, *out_shape),
            layer.transform,
            layer.crs,
            nodata,
        )
    else:
        raise ValueError("Layer must have either raster data or objects with a specified column")

read_raster(raster_path)

Read a raster file and return its data, transform, and CRS.

Parameters:

raster_path : str Path to the raster file

Returns:

image_data : numpy.ndarray Array with raster data values transform : affine.Affine Affine transformation for the raster crs : rasterio.crs.CRS Coordinate reference system

Source code in nickyspatial/io/raster.py
def read_raster(raster_path):
    """Read a raster file and return its data, transform, and CRS.

    Parameters:
    -----------
    raster_path : str
        Path to the raster file

    Returns:
    --------
    image_data : numpy.ndarray
        Array with raster data values
    transform : affine.Affine
        Affine transformation for the raster
    crs : rasterio.crs.CRS
        Coordinate reference system
    """
    with rasterio.open(raster_path) as src:
        image_data = src.read()
        transform = src.transform
        crs = src.crs

    return image_data, transform, crs

write_raster(output_path, data, transform, crs, nodata=None)

Write raster data to a file.

Parameters:

output_path : str Path to the output raster file data : numpy.ndarray Array with raster data values transform : affine.Affine Affine transformation for the raster crs : rasterio.crs.CRS Coordinate reference system nodata : int or float, optional No data value

Source code in nickyspatial/io/raster.py
def write_raster(output_path, data, transform, crs, nodata=None):
    """Write raster data to a file.

    Parameters:
    -----------
    output_path : str
        Path to the output raster file
    data : numpy.ndarray
        Array with raster data values
    transform : affine.Affine
        Affine transformation for the raster
    crs : rasterio.crs.CRS
        Coordinate reference system
    nodata : int or float, optional
        No data value
    """
    os.makedirs(os.path.dirname(output_path), exist_ok=True)

    if len(data.shape) == 2:
        data = data.reshape(1, *data.shape)

    height, width = data.shape[-2], data.shape[-1]
    count = data.shape[0]

    with rasterio.open(
        output_path,
        "w",
        driver="GTiff",
        height=height,
        width=width,
        count=count,
        dtype=data.dtype,
        crs=crs,
        transform=transform,
        nodata=nodata,
    ) as dst:
        dst.write(data)

Manages vector data I/O, supporting formats like Shapefile and GeoJSON.

This module typically offers utilities for handling attributes, geometries, and coordinate reference systems.

layer_to_vector(layer, output_path)

Save a layer's objects to a vector file.

Parameters:

layer : Layer Layer to save output_path : str Path to the output vector file

Source code in nickyspatial/io/vector.py
def layer_to_vector(layer, output_path):
    """Save a layer's objects to a vector file.

    Parameters:
    -----------
    layer : Layer
        Layer to save
    output_path : str
        Path to the output vector file
    """
    if layer.objects is None:
        raise ValueError("Layer has no vector objects")

    write_vector(layer.objects, output_path)

read_vector(vector_path)

Read a vector file into a GeoDataFrame.

Parameters:

vector_path : str Path to the vector file

Returns:

gdf : geopandas.GeoDataFrame GeoDataFrame with vector data

Source code in nickyspatial/io/vector.py
def read_vector(vector_path):
    """Read a vector file into a GeoDataFrame.

    Parameters:
    -----------
    vector_path : str
        Path to the vector file

    Returns:
    --------
    gdf : geopandas.GeoDataFrame
        GeoDataFrame with vector data
    """
    return gpd.read_file(vector_path)

write_vector(gdf, output_path)

Write a GeoDataFrame to a vector file.

Parameters:

gdf : geopandas.GeoDataFrame GeoDataFrame to write output_path : str Path to the output vector file

Source code in nickyspatial/io/vector.py
def write_vector(gdf, output_path):
    """Write a GeoDataFrame to a vector file.

    Parameters:
    -----------
    gdf : geopandas.GeoDataFrame
        GeoDataFrame to write
    output_path : str
        Path to the output vector file
    """
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    file_extension = os.path.splitext(output_path)[1].lower()

    if file_extension == ".shp":
        gdf.to_file(output_path)
    elif file_extension == ".geojson":
        gdf.to_file(output_path, driver="GeoJSON")
    else:
        raise ValueError(f"Unsupported vector format: {file_extension}")

nickyspatial.stats

The stats package includes modules for calculating statistical metrics on objects.

This is to mimic the stats module in ecognition because they will be necessary in order to apply rules later on and extremely essential to work with objects

Basic statistics for layers in NickySpatial.

attach_basic_stats(layer, column, prefix=None)

Attach basic statistics for a column to a layer.

Parameters:

layer : Layer Layer to attach statistics to column : str Column to calculate statistics for prefix : str, optional Prefix for result names

Returns:

stats : dict Dictionary with calculated statistics

Source code in nickyspatial/stats/basic.py
def attach_basic_stats(layer, column, prefix=None):
    """Attach basic statistics for a column to a layer.

    Parameters:
    -----------
    layer : Layer
        Layer to attach statistics to
    column : str
        Column to calculate statistics for
    prefix : str, optional
        Prefix for result names

    Returns:
    --------
    stats : dict
        Dictionary with calculated statistics
    """
    if layer.objects is None or column not in layer.objects.columns:
        raise ValueError(f"Column '{column}' not found in layer objects")

    prefix = f"{prefix}_" if prefix else ""

    values = layer.objects[column]
    stats = {
        f"{prefix}min": values.min(),
        f"{prefix}max": values.max(),
        f"{prefix}mean": values.mean(),
        f"{prefix}median": values.median(),
        f"{prefix}std": values.std(),
        f"{prefix}sum": values.sum(),
        f"{prefix}count": len(values),
    }

    percentiles = [10, 25, 50, 75, 90]
    for p in percentiles:
        stats[f"{prefix}percentile_{p}"] = np.percentile(values, p)

    return stats

attach_class_distribution(layer, class_column='classification')

Calculate the distribution of classes in a layer.

Parameters:

layer : Layer Layer to analyze class_column : str Column containing class values

Returns:

distribution : dict Dictionary with class counts and percentages

Source code in nickyspatial/stats/basic.py
def attach_class_distribution(layer, class_column="classification"):
    """Calculate the distribution of classes in a layer.

    Parameters:
    -----------
    layer : Layer
        Layer to analyze
    class_column : str
        Column containing class values

    Returns:
    --------
    distribution : dict
        Dictionary with class counts and percentages
    """
    if layer.objects is None or class_column not in layer.objects.columns:
        return {}

    class_counts = layer.objects[class_column].value_counts()
    total_count = len(layer.objects)
    class_percentages = (class_counts / total_count * 100).round(2)

    distribution = {
        "counts": class_counts.to_dict(),
        "percentages": class_percentages.to_dict(),
        "total": total_count,
    }

    return distribution

attach_count(layer, class_column='classification', class_value=None)

Count objects in a layer, optionally filtered by class.

Parameters:

layer : Layer Layer to count objects in class_column : str Column containing class values class_value : str, optional Class value to filter by

Returns:

count : int Number of objects

Source code in nickyspatial/stats/basic.py
def attach_count(layer, class_column="classification", class_value=None):
    """Count objects in a layer, optionally filtered by class.

    Parameters:
    -----------
    layer : Layer
        Layer to count objects in
    class_column : str
        Column containing class values
    class_value : str, optional
        Class value to filter by

    Returns:
    --------
    count : int
        Number of objects
    """
    if layer.objects is None:
        return 0

    if class_value is not None and class_column in layer.objects.columns:
        count = layer.objects[layer.objects[class_column] == class_value].shape[0]
    else:
        count = layer.objects.shape[0]

    return count

Spatial statistics for layers in NickySpatial.

attach_area_stats(layer, area_column='area_units', by_class=None)

Calculate area statistics for objects in a layer.

Parameters:

layer : Layer Layer to calculate statistics for area_column : str Column containing area values by_class : str, optional Column to group by (e.g., 'classification')

Returns:

stats : dict Dictionary with area statistics

Source code in nickyspatial/stats/spatial.py
def attach_area_stats(layer, area_column="area_units", by_class=None):
    """Calculate area statistics for objects in a layer.

    Parameters:
    -----------
    layer : Layer
        Layer to calculate statistics for
    area_column : str
        Column containing area values
    by_class : str, optional
        Column to group by (e.g., 'classification')

    Returns:
    --------
    stats : dict
        Dictionary with area statistics
    """
    if layer.objects is None or area_column not in layer.objects.columns:
        return {}

    total_area = layer.objects[area_column].sum()

    if by_class and by_class in layer.objects.columns:
        class_areas = {}
        class_percentages = {}

        for class_value, group in layer.objects.groupby(by_class):
            if class_value is None:
                continue

            class_area = group[area_column].sum()
            class_percentage = (class_area / total_area * 100).round(2)

            class_areas[class_value] = class_area
            class_percentages[class_value] = class_percentage

        stats = {
            "total_area": total_area,
            "class_areas": class_areas,
            "class_percentages": class_percentages,
        }
    else:
        areas = layer.objects[area_column]

        stats = {
            "total_area": total_area,
            "min_area": areas.min(),
            "max_area": areas.max(),
            "mean_area": areas.mean(),
            "median_area": areas.median(),
            "std_area": areas.std(),
        }

    return stats

attach_neighbor_stats(layer)

Calculate neighborhood statistics for objects in a layer.

Parameters:

layer : Layer Layer to calculate statistics for

Returns:

stats : dict Dictionary with neighborhood statistics

Source code in nickyspatial/stats/spatial.py
def attach_neighbor_stats(layer):
    """Calculate neighborhood statistics for objects in a layer.

    Parameters:
    -----------
    layer : Layer
        Layer to calculate statistics for

    Returns:
    --------
    stats : dict
        Dictionary with neighborhood statistics
    """
    if layer.objects is None:
        return {}

    neighbors = {}
    neighbor_counts = []

    for idx, obj in layer.objects.iterrows():
        touching = layer.objects[layer.objects.index != idx].intersects(obj.geometry)
        neighbor_ids = layer.objects[touching].index.tolist()

        neighbors[idx] = neighbor_ids
        neighbor_counts.append(len(neighbor_ids))

    layer.objects["neighbor_count"] = neighbor_counts

    stats = {
        "neighbor_count": {
            "mean": np.mean(neighbor_counts),
            "min": np.min(neighbor_counts),
            "max": np.max(neighbor_counts),
            "std": np.std(neighbor_counts),
        }
    }

    return stats

attach_shape_metrics(layer)

Calculate shape metrics for objects in a layer.

Parameters:

layer : Layer Layer to calculate metrics for

Returns:

metrics : dict Dictionary with shape metrics

Source code in nickyspatial/stats/spatial.py
def attach_shape_metrics(layer):
    """Calculate shape metrics for objects in a layer.

    Parameters:
    -----------
    layer : Layer
        Layer to calculate metrics for

    Returns:
    --------
    metrics : dict
        Dictionary with shape metrics
    """
    if layer.objects is None:
        return {}

    layer.objects["perimeter"] = layer.objects.geometry.length

    if "area_units" not in layer.objects.columns:
        layer.objects["area_units"] = layer.objects.geometry.area

    layer.objects["shape_index"] = (
        (layer.objects["perimeter"] / (2 * np.sqrt(np.pi * layer.objects["area_units"])))
        .replace([np.inf, -np.inf], np.nan)
        .fillna(0)
    )

    layer.objects["compactness"] = (
        (4 * np.pi * layer.objects["area_units"] / (layer.objects["perimeter"] ** 2)).replace([np.inf, -np.inf], np.nan).fillna(0)
    )

    metrics = {
        "shape_index": {
            "mean": layer.objects["shape_index"].mean(),
            "min": layer.objects["shape_index"].min(),
            "max": layer.objects["shape_index"].max(),
            "std": layer.objects["shape_index"].std(),
        },
        "compactness": {
            "mean": layer.objects["compactness"].mean(),
            "min": layer.objects["compactness"].min(),
            "max": layer.objects["compactness"].max(),
            "std": layer.objects["compactness"].std(),
        },
    }

    return metrics

Spectral indices calculation module.

SpectralIndexCalculator

A spectral index calculator that supports custom formulas.

Supports predefined indices from the awesome-spectral-indices catalogue.

Source code in nickyspatial/stats/spectral.py
class SpectralIndexCalculator:
    """A spectral index calculator that supports custom formulas.

    Supports predefined indices from the awesome-spectral-indices catalogue.
    """

    def __init__(self):
        """Initialize the calculator with predefined indices."""
        self.predefined_indices = {
            "NDVI": {
                "formula": "(NIR - RED) / (NIR + RED)",
                "description": "Normalized Difference Vegetation Index",
                "reference": "Rouse et al. 1974",
            },
            "NDWI": {
                "formula": "(GREEN - NIR) / (GREEN + NIR)",
                "description": "Normalized Difference Water Index",
                "reference": "McFeeters 1996",
            },
            "NDBI": {
                "formula": "(SWIR1 - NIR) / (SWIR1 + NIR)",
                "description": "Normalized Difference Built-up Index",
                "reference": "Zha et al. 2003",
            },
            "EVI": {
                "formula": "2.5 * ((NIR - RED) / (NIR + 6 * RED - 7.5 * BLUE + 1))",
                "description": "Enhanced Vegetation Index",
                "reference": "Huete et al. 2002",
            },
            "SAVI": {
                "formula": "((NIR - RED) / (NIR + RED + 0.5)) * 1.5",
                "description": "Soil Adjusted Vegetation Index",
                "reference": "Huete 1988",
            },
            "GNDVI": {
                "formula": "(NIR - GREEN) / (NIR + GREEN)",
                "description": "Green Normalized Difference Vegetation Index",
                "reference": "Gitelson et al. 1996",
            },
            "NBR": {
                "formula": "(NIR - SWIR2) / (NIR + SWIR2)",
                "description": "Normalized Burn Ratio",
                "reference": "Key & Benson 2006",
            },
            "MNDWI": {
                "formula": "(GREEN - SWIR1) / (GREEN + SWIR1)",
                "description": "Modified Normalized Difference Water Index",
                "reference": "Xu 2006",
            },
        }

    def _parse_formula(self, formula: str, bands: Dict[str, str]) -> str:
        """Parse a formula string and replace band names with actual column references."""
        parsed_formula = formula.upper()

        # Sort band names by length (descending) to avoid partial replacements
        sorted_bands = sorted(bands.items(), key=lambda x: len(x[0]), reverse=True)

        for band_name, column_name in sorted_bands:
            pattern = r"\b" + re.escape(band_name.upper()) + r"\b"
            parsed_formula = re.sub(pattern, f"bands['{column_name}']", parsed_formula)

        return parsed_formula

    def _evaluate_formula(self, formula: str, bands_data: Dict[str, np.ndarray]) -> np.ndarray:
        """Evaluate a mathematical formula using band data."""
        safe_dict = {
            "bands": bands_data,
            "np": np,
            "NP": np,
            "__builtins__": {},
            "abs": abs,
            "ABS": abs,
            "min": min,
            "MIN": min,
            "max": max,
            "MAX": max,
            "sqrt": np.sqrt,
            "SQRT": np.sqrt,
            "log": np.log,
            "LOG": np.log,
            "exp": np.exp,
            "EXP": np.exp,
            "sin": np.sin,
            "SIN": np.sin,
            "cos": np.cos,
            "COS": np.cos,
            "tan": np.tan,
            "TAN": np.tan,
        }

        try:
            result = eval(formula, safe_dict)
            if isinstance(result, (int, float)):
                result = np.full(len(next(iter(bands_data.values()))), result)
            return result.astype(float)
        except Exception as err:
            raise ValueError(f"Error evaluating formula '{formula}': {str(err)}") from err

    def _calculate_statistics(self, values: np.ndarray) -> Dict[str, float]:
        """Calculate statistics for an array of values."""
        return {
            "mean": float(np.mean(values)),
            "min": float(np.min(values)),
            "max": float(np.max(values)),
            "std": float(np.std(values)),
            "median": float(np.median(values)),
            "count": len(values),
        }

__init__()

Initialize the calculator with predefined indices.

Source code in nickyspatial/stats/spectral.py
def __init__(self):
    """Initialize the calculator with predefined indices."""
    self.predefined_indices = {
        "NDVI": {
            "formula": "(NIR - RED) / (NIR + RED)",
            "description": "Normalized Difference Vegetation Index",
            "reference": "Rouse et al. 1974",
        },
        "NDWI": {
            "formula": "(GREEN - NIR) / (GREEN + NIR)",
            "description": "Normalized Difference Water Index",
            "reference": "McFeeters 1996",
        },
        "NDBI": {
            "formula": "(SWIR1 - NIR) / (SWIR1 + NIR)",
            "description": "Normalized Difference Built-up Index",
            "reference": "Zha et al. 2003",
        },
        "EVI": {
            "formula": "2.5 * ((NIR - RED) / (NIR + 6 * RED - 7.5 * BLUE + 1))",
            "description": "Enhanced Vegetation Index",
            "reference": "Huete et al. 2002",
        },
        "SAVI": {
            "formula": "((NIR - RED) / (NIR + RED + 0.5)) * 1.5",
            "description": "Soil Adjusted Vegetation Index",
            "reference": "Huete 1988",
        },
        "GNDVI": {
            "formula": "(NIR - GREEN) / (NIR + GREEN)",
            "description": "Green Normalized Difference Vegetation Index",
            "reference": "Gitelson et al. 1996",
        },
        "NBR": {
            "formula": "(NIR - SWIR2) / (NIR + SWIR2)",
            "description": "Normalized Burn Ratio",
            "reference": "Key & Benson 2006",
        },
        "MNDWI": {
            "formula": "(GREEN - SWIR1) / (GREEN + SWIR1)",
            "description": "Modified Normalized Difference Water Index",
            "reference": "Xu 2006",
        },
    }

add_custom_index(name, formula, description='', reference='')

Add a custom index to the global predefined indices.

Parameters:

name : str Name of the index formula : str Mathematical formula description : str, optional Description of the index reference : str, optional Reference/citation for the index

Source code in nickyspatial/stats/spectral.py
def add_custom_index(name, formula, description="", reference=""):
    """Add a custom index to the global predefined indices.

    Parameters:
    -----------
    name : str
        Name of the index
    formula : str
        Mathematical formula
    description : str, optional
        Description of the index
    reference : str, optional
        Reference/citation for the index
    """
    _calculator.predefined_indices[name.upper()] = {"formula": formula, "description": description, "reference": reference}

attach_ndvi(layer, nir_column='NIR_mean', red_column='Red_mean', output_column='NDVI')

Calculate NDVI (Normalized Difference Vegetation Index) for objects in a layer.

Parameters:

layer : Layer Layer to calculate NDVI for nir_column : str Column containing NIR band values red_column : str Column containing Red band values output_column : str Column to store NDVI values

Returns:

ndvi_stats : dict Dictionary with NDVI statistics

Source code in nickyspatial/stats/spectral.py
def attach_ndvi(layer, nir_column="NIR_mean", red_column="Red_mean", output_column="NDVI"):
    """Calculate NDVI (Normalized Difference Vegetation Index) for objects in a layer.

    Parameters:
    -----------
    layer : Layer
        Layer to calculate NDVI for
    nir_column : str
        Column containing NIR band values
    red_column : str
        Column containing Red band values
    output_column : str
        Column to store NDVI values

    Returns:
    --------
    ndvi_stats : dict
        Dictionary with NDVI statistics
    """
    if layer.objects is None or nir_column not in layer.objects.columns or red_column not in layer.objects.columns:
        return {}

    nir = layer.objects[nir_column]
    red = layer.objects[red_column]

    denominator = nir + red
    mask = denominator != 0

    ndvi = np.zeros(len(layer.objects))
    ndvi[mask] = (nir[mask] - red[mask]) / denominator[mask]

    layer.objects[output_column] = ndvi

    ndvi_stats = {
        "mean": ndvi.mean(),
        "min": ndvi.min(),
        "max": ndvi.max(),
        "std": np.std(ndvi),
        "median": np.median(ndvi),
    }

    return ndvi_stats

attach_spectral_index(layer, index_name, formula=None, bands=None, output_column=None)

Calculate a single spectral index using a custom or predefined formula.

This function is designed to work with Layer.attach_function().

Parameters:

layer : Layer Layer object with objects DataFrame index_name : str Name of the index to calculate formula : str, optional Custom formula to use. If None, uses predefined formula for index_name bands : dict, optional Dictionary mapping band names to column names output_column : str, optional Name of output column. If None, uses index_name

Returns:

dict Dictionary with index statistics and metadata

Source code in nickyspatial/stats/spectral.py
def attach_spectral_index(layer, index_name, formula=None, bands=None, output_column=None):
    """Calculate a single spectral index using a custom or predefined formula.

    This function is designed to work with Layer.attach_function().

    Parameters:
    -----------
    layer : Layer
        Layer object with objects DataFrame
    index_name : str
        Name of the index to calculate
    formula : str, optional
        Custom formula to use. If None, uses predefined formula for index_name
    bands : dict, optional
        Dictionary mapping band names to column names
    output_column : str, optional
        Name of output column. If None, uses index_name

    Returns:
    --------
    dict
        Dictionary with index statistics and metadata
    """
    if layer.objects is None:
        return {"error": "Layer has no objects"}

    # Set default bands mapping
    if bands is None:
        bands = {
            "BLUE": "band_1_mean",
            "GREEN": "band_2_mean",
            "RED": "band_3_mean",
            "NIR": "band_4_mean",
            "SWIR1": "band_5_mean",
            "SWIR2": "band_6_mean",
        }

    # Use predefined formula if no custom formula provided
    if formula is None:
        if index_name.upper() in _calculator.predefined_indices:
            formula = _calculator.predefined_indices[index_name.upper()]["formula"]
        else:
            return {"error": f"No predefined formula for '{index_name}' and no custom formula provided"}

    # Set output column name
    if output_column is None:
        output_column = index_name.upper()

    # Check available columns
    available_bands = {}
    for band_name, column_name in bands.items():
        if column_name in layer.objects.columns:
            available_bands[band_name] = column_name

    if not available_bands:
        return {"error": "No required band columns found in layer"}

    try:
        # Parse formula
        parsed_formula = _calculator._parse_formula(formula, available_bands)

        # Prepare band data
        bands_data = {}
        for _band_name, column_name in available_bands.items():
            bands_data[column_name] = layer.objects[column_name].values

        # Calculate index
        index_values = _calculator._evaluate_formula(parsed_formula, bands_data)

        # Add to layer
        layer.objects[output_column] = index_values

        # Calculate statistics
        stats = _calculator._calculate_statistics(index_values)

        # Prepare result
        result = {
            "index_name": index_name,
            "formula": formula,
            "output_column": output_column,
            "bands_used": available_bands,
            "statistics": stats,
        }

        # Add description if predefined index
        if index_name.upper() in _calculator.predefined_indices:
            result["description"] = _calculator.predefined_indices[index_name.upper()]["description"]
            result["reference"] = _calculator.predefined_indices[index_name.upper()]["reference"]

        return result

    except Exception as e:
        return {"error": f"Failed to calculate {index_name}: {str(e)}"}

attach_spectral_indices(layer, bands=None)

Calculate multiple spectral indices for objects in a layer.

Parameters:

layer : Layer Layer to calculate indices for bands : dict, optional Dictionary mapping band names to column names

Returns:

indices : dict Dictionary with calculated indices

Source code in nickyspatial/stats/spectral.py
def attach_spectral_indices(layer, bands=None):
    """Calculate multiple spectral indices for objects in a layer.

    Parameters:
    -----------
    layer : Layer
        Layer to calculate indices for
    bands : dict, optional
        Dictionary mapping band names to column names

    Returns:
    --------
    indices : dict
        Dictionary with calculated indices
    """
    if layer.objects is None:
        return {}

    if bands is None:
        bands = {
            "blue": "Blue_mean",
            "green": "Green_mean",
            "red": "Red_mean",
            "nir": "NIR_mean",
        }

    for _band_name, column in bands.items():
        if column not in layer.objects.columns:
            print(f"Warning: Band column '{column}' not found. Some indices may not be calculated.")

    indices = {}

    if "nir" in bands and "red" in bands:
        if bands["nir"] in layer.objects.columns and bands["red"] in layer.objects.columns:
            ndvi = attach_ndvi(layer, bands["nir"], bands["red"], "NDVI")
            indices["NDVI"] = ndvi

    if "green" in bands and "nir" in bands:
        if bands["green"] in layer.objects.columns and bands["nir"] in layer.objects.columns:
            green = layer.objects[bands["green"]]
            nir = layer.objects[bands["nir"]]

            denominator = green + nir
            mask = denominator != 0

            ndwi = np.zeros(len(layer.objects))
            ndwi[mask] = (green[mask] - nir[mask]) / denominator[mask]

            layer.objects["NDWI"] = ndwi

            indices["NDWI"] = {
                "mean": ndwi.mean(),
                "min": ndwi.min(),
                "max": ndwi.max(),
                "std": np.std(ndwi),
            }

    return indices

get_available_indices()

Get a list of all available predefined spectral indices.

Returns:

dict Dictionary with index names as keys and their metadata as values

Source code in nickyspatial/stats/spectral.py
def get_available_indices():
    """Get a list of all available predefined spectral indices.

    Returns:
    --------
    dict
        Dictionary with index names as keys and their metadata as values
    """
    return _calculator.predefined_indices.copy()

nickyspatial.utils

This modules contains utility functions for handling attributes, geometries, and coordinate reference systems.

Helper functions.

create_sample_data()

Create a synthetic 4-band (B, G, R, NIR) image for testing.

Source code in nickyspatial/utils/helpers.py
5
6
7
def create_sample_data():
    """Create a synthetic 4-band (B, G, R, NIR) image for testing."""
    return None

nickyspatial.viz

Alrighty , let's get this visualization party started!

No matter what you do you need to see it and present it , this is the module for it to contain all code about visualizaing the layers , object rule results etc.

Visualization functions for plotting histograms, statistics, and scatter plots.

plot_histogram(layer, attribute, bins=20, figsize=(10, 6), by_class=None)

Plot a histogram of attribute values.

Parameters:

layer : Layer Layer containing data attribute : str Attribute to plot bins : int Number of bins figsize : tuple Figure size by_class : str, optional Column to group by (e.g., 'classification')

Returns:

fig : matplotlib.figure.Figure Figure object

Source code in nickyspatial/viz/charts.py
def plot_histogram(layer, attribute, bins=20, figsize=(10, 6), by_class=None):
    """Plot a histogram of attribute values.

    Parameters:
    -----------
    layer : Layer
        Layer containing data
    attribute : str
        Attribute to plot
    bins : int
        Number of bins
    figsize : tuple
        Figure size
    by_class : str, optional
        Column to group by (e.g., 'classification')

    Returns:
    --------
    fig : matplotlib.figure.Figure
        Figure object
    """
    if layer.objects is None or attribute not in layer.objects.columns:
        raise ValueError(f"Attribute '{attribute}' not found in layer objects")

    fig, ax = plt.subplots(figsize=figsize)

    if by_class and by_class in layer.objects.columns:
        data = layer.objects[[attribute, by_class]].copy()

        for class_value, group in data.groupby(by_class):
            if class_value is None:
                continue

            sns.histplot(group[attribute], bins=bins, alpha=0.6, label=str(class_value), ax=ax)

        ax.legend(title=by_class)
    else:
        sns.histplot(layer.objects[attribute], bins=bins, ax=ax)

    ax.set_title(f"Histogram of {attribute}")
    ax.set_xlabel(attribute)
    ax.set_ylabel("Count")

    return fig

plot_statistics(layer, stats_dict, figsize=(12, 8), kind='bar', y_log=False)

Plot statistics from a statistics dictionary.

Parameters:

layer : Layer Layer the statistics are calculated for stats_dict : dict Dictionary with statistics (from attach_* functions) figsize : tuple Figure size kind : str Plot type: 'bar', 'line', or 'pie' y_log : bool Whether to use logarithmic scale for y-axis

Returns:

fig : matplotlib.figure.Figure Figure object

Source code in nickyspatial/viz/charts.py
def plot_statistics(layer, stats_dict, figsize=(12, 8), kind="bar", y_log=False):
    """Plot statistics from a statistics dictionary.

    Parameters:
    -----------
    layer : Layer
        Layer the statistics are calculated for
    stats_dict : dict
        Dictionary with statistics (from attach_* functions)
    figsize : tuple
        Figure size
    kind : str
        Plot type: 'bar', 'line', or 'pie'
    y_log : bool
        Whether to use logarithmic scale for y-axis

    Returns:
    --------
    fig : matplotlib.figure.Figure
        Figure object
    """
    flat_stats = {}

    def _flatten_dict(d, prefix=""):
        for key, value in d.items():
            if isinstance(value, dict):
                _flatten_dict(value, f"{prefix}{key}_")
            else:
                flat_stats[f"{prefix}{key}"] = value

    _flatten_dict(stats_dict)

    fig, ax = plt.subplots(figsize=figsize)

    if kind == "pie" and "class_percentages" in stats_dict:
        percentages = stats_dict["class_percentages"]
        values = list(percentages.values())
        labels = list(percentages.keys())

        ax.pie(values, labels=labels, autopct="%1.1f%%", startangle=90, shadow=True)
        ax.axis("equal")
        ax.set_title("Class Distribution")

    elif kind == "pie" and "percentages" in flat_stats:
        percentages = pd.Series(flat_stats).filter(like="percentage")
        values = percentages.values
        labels = [label.replace("_percentage", "") for label in percentages.index]

        ax.pie(values, labels=labels, autopct="%1.1f%%", startangle=90, shadow=True)
        ax.axis("equal")
        ax.set_title("Distribution")

    else:
        stats_df = pd.DataFrame({"Metric": list(flat_stats.keys()), "Value": list(flat_stats.values())})

        if kind != "line":
            stats_df = stats_df.sort_values("Value", ascending=False)

        if kind == "bar":
            sns.barplot(x="Metric", y="Value", data=stats_df, ax=ax)
            ax.set_xticklabels(ax.get_xticklabels(), rotation=45, ha="right")
        elif kind == "line":
            sns.lineplot(x="Metric", y="Value", data=stats_df, ax=ax, marker="o")
            ax.set_xticklabels(ax.get_xticklabels(), rotation=45, ha="right")

        if y_log:
            ax.set_yscale("log")

        ax.set_title("Statistics Summary")

    plt.tight_layout()
    return fig

plot_training_history(history)

Plot (training and validation) loss and accuracy curves from a Keras(CNN) training history.

This function visualizes the model's performance over epochs by plotting: - Training and validation loss - Training and validation accuracy

Parameters

history : keras.callbacks.History History object returned by model.fit(), containing loss and accuracy values for each epoch.

Returns:

matplotlib.pyplot The pyplot module with the generated figure, allowing further modification or saving.

Source code in nickyspatial/viz/charts.py
def plot_training_history(history):
    """Plot (training and validation) loss and accuracy curves from a Keras(CNN) training history.

    This function visualizes the model's performance over epochs by plotting:
    - Training and validation loss
    - Training and validation accuracy

    Parameters
    ----------
    history : keras.callbacks.History
        History object returned by `model.fit()`, containing loss and accuracy values
        for each epoch.

    Returns:
    -------
    matplotlib.pyplot
        The pyplot module with the generated figure, allowing further modification or saving.
    """
    epoches = np.arange(1, len(history.history.get("loss")) + 1)

    fig, (ax1, ax2) = plt.subplots(1, 2, sharex=True, figsize=(15, 7))

    train_loss = history.history.get("loss")
    val_loss = history.history.get("val_loss")

    train_acc = [x * 100 for x in history.history.get("accuracy")]
    val_acc = [x * 100 for x in history.history.get("val_accuracy")]

    ax1.plot(epoches, train_loss, "b", marker="o", label="Training Loss")
    ax1.plot(epoches, val_loss, "r", marker="o", label="Validation Loss")
    ax1.set_title("Training and Validation Loss", fontsize=16)
    ax1.set_ylabel("Loss", fontsize=14)
    ax1.legend(fontsize=13)
    ax1.tick_params(axis="x", labelsize=14)
    ax1.tick_params(axis="y", labelsize=14)

    ax1.set_ylim(min(train_loss + val_loss) * 0.9, max(train_loss + val_loss) * 1.1)
    ax1.set_yticks(np.linspace(min(train_loss + val_loss), max(train_loss + val_loss), num=5))

    ax2.plot(epoches, train_acc, "b", marker="o", label="Training Accuracy")
    ax2.plot(epoches, val_acc, "r", marker="o", label="Validation Accuracy")
    ax2.set_title("Training and Validation Accuracy", fontsize=16)
    ax2.set_ylabel("Accuracy (%)", fontsize=14)
    ax2.legend(fontsize=13)
    ax2.tick_params(axis="x", labelsize=14)
    ax2.tick_params(axis="y", labelsize=14)

    ax2.set_ylim(min(train_acc + val_acc) * 0.9, max(train_acc + val_acc) * 1.1)
    ax2.set_yticks(np.linspace(min(train_acc + val_acc), max(train_acc + val_acc), num=5))
    return plt

Functions to create maps and visualize layers.

plot_classification(layer, class_field='classification', figsize=(12, 10), legend=True, class_color=None)

Plot classified segments with different colors for each class.

Source code in nickyspatial/viz/maps.py
def plot_classification(layer, class_field="classification", figsize=(12, 10), legend=True, class_color=None):
    """Plot classified segments with different colors for each class."""
    fig, ax = plt.subplots(figsize=figsize)
    if not class_color:
        class_color = {}

    if class_field not in layer.objects.columns:
        raise ValueError(f"Class field '{class_field}' not found in layer objects")

    class_values = [v for v in layer.objects[class_field].unique() if v is not None]

    base_colors = plt.cm.tab20(np.linspace(0, 1, max(len(class_values), 1)))

    colors_list = []
    for idx, class_value in enumerate(class_values):
        if class_color and class_value in list(class_color.keys()):
            color_hex = class_color[class_value]
        else:
            if idx < len(base_colors):
                rgb = base_colors[idx][:3]
                color_hex = "#{:02x}{:02x}{:02x}".format(int(rgb[0] * 255), int(rgb[1] * 255), int(rgb[2] * 255))
            else:
                color_hex = "#{:06x}".format(random.randint(0, 0xFFFFFF))
            class_color[class_value] = color_hex

        rgb_tuple = tuple(int(color_hex[i : i + 2], 16) / 255 for i in (1, 3, 5))
        colors_list.append(rgb_tuple)

    cmap = ListedColormap(colors_list)

    class_map = {value: i for i, value in enumerate(class_values)}
    layer.objects["_class_id"] = layer.objects[class_field].map(class_map)

    layer.objects.plot(
        column="_class_id",
        cmap=cmap,
        ax=ax,
        edgecolor="black",
        linewidth=0.5,
        legend=False,
    )

    if legend and len(class_values) > 0:
        patches = [mpatches.Patch(color=class_color[value], label=value) for value in class_values]
        ax.legend(handles=patches, loc="upper right", title=class_field)

    ax.set_title("Classification Map")

    ax.set_xlabel("X Coordinate")
    ax.set_ylabel("Y Coordinate")

    if "_class_id" in layer.objects.columns:
        layer.objects = layer.objects.drop(columns=["_class_id"])

    return fig

plot_comparison(before_layer, after_layer, attribute=None, class_field=None, figsize=(16, 8), title=None)

Plot before and after views of layers for comparison.

Source code in nickyspatial/viz/maps.py
def plot_comparison(
    before_layer,
    after_layer,
    attribute=None,
    class_field=None,
    figsize=(16, 8),
    title=None,
):
    """Plot before and after views of layers for comparison."""
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=figsize)

    if title:
        fig.suptitle(title)

    if attribute and attribute in before_layer.objects.columns:
        before_layer.objects.plot(column=attribute, ax=ax1, legend=True)
        ax1.set_title(f"Before: {attribute}")
    elif class_field and class_field in before_layer.objects.columns:
        class_values = [v for v in before_layer.objects[class_field].unique() if v is not None]
        num_classes = len(class_values)
        colors = plt.cm.tab20(np.linspace(0, 1, max(num_classes, 1)))
        cmap = ListedColormap(colors)
        class_map = {value: i for i, value in enumerate(class_values)}
        before_layer.objects["_class_id"] = before_layer.objects[class_field].map(class_map)

        before_layer.objects.plot(
            column="_class_id",
            cmap=cmap,
            ax=ax1,
            edgecolor="black",
            linewidth=0.5,
            legend=False,
        )

        patches = [mpatches.Patch(color=colors[i], label=value) for i, value in enumerate(class_values)]
        ax1.legend(handles=patches, loc="upper right", title=class_field)
        ax1.set_title(f"Before: {class_field}")
    else:
        before_layer.objects.plot(ax=ax1)
        ax1.set_title("Before")

    if attribute and attribute in after_layer.objects.columns:
        after_layer.objects.plot(column=attribute, ax=ax2, legend=True)
        ax2.set_title(f"After: {attribute}")
    elif class_field and class_field in after_layer.objects.columns:
        class_values = [v for v in after_layer.objects[class_field].unique() if v is not None]
        num_classes = len(class_values)
        colors = plt.cm.tab20(np.linspace(0, 1, max(num_classes, 1)))
        cmap = ListedColormap(colors)
        class_map = {value: i for i, value in enumerate(class_values)}
        after_layer.objects["_class_id"] = after_layer.objects[class_field].map(class_map)

        after_layer.objects.plot(
            column="_class_id",
            cmap=cmap,
            ax=ax2,
            edgecolor="black",
            linewidth=0.5,
            legend=False,
        )

        patches = [mpatches.Patch(color=colors[i], label=value) for i, value in enumerate(class_values)]
        ax2.legend(handles=patches, loc="upper right", title=class_field)
        ax2.set_title(f"After: {class_field}")
    else:
        after_layer.objects.plot(ax=ax2)
        ax2.set_title("After")

    if "_class_id" in before_layer.objects.columns:
        before_layer.objects = before_layer.objects.drop(columns=["_class_id"])
    if "_class_id" in after_layer.objects.columns:
        after_layer.objects = after_layer.objects.drop(columns=["_class_id"])

    return fig

plot_layer(layer, image_data=None, attribute=None, title=None, rgb_bands=(2, 1, 0), figsize=(12, 10), cmap='viridis', show_boundaries=False)

Plot a layer, optionally with an attribute or image backdrop.

Source code in nickyspatial/viz/maps.py
def plot_layer(
    layer,
    image_data=None,
    attribute=None,
    title=None,
    rgb_bands=(2, 1, 0),
    figsize=(12, 10),
    cmap="viridis",
    show_boundaries=False,
):
    """Plot a layer, optionally with an attribute or image backdrop."""
    fig, ax = plt.subplots(figsize=figsize)

    if title:
        ax.set_title(title)
    elif attribute:
        ax.set_title(f"{attribute} by Segment")
    else:
        ax.set_title("Layer Visualization")

    if image_data is not None:
        num_bands = image_data.shape[0]
        if num_bands >= 3 and max(rgb_bands) < num_bands:
            r = image_data[rgb_bands[0]]
            g = image_data[rgb_bands[1]]
            b = image_data[rgb_bands[2]]

            r_norm = np.clip((r - r.min()) / (r.max() - r.min() + 1e-10), 0, 1)
            g_norm = np.clip((g - g.min()) / (g.max() - g.min() + 1e-10), 0, 1)
            b_norm = np.clip((b - b.min()) / (b.max() - b.min() + 1e-10), 0, 1)

            rgb = np.stack([r_norm, g_norm, b_norm], axis=2)

            ax.imshow(rgb)
        else:
            gray = image_data[0]
            gray_norm = (gray - gray.min()) / (gray.max() - gray.min() + 1e-10)
            ax.imshow(gray_norm, cmap="gray")

    if attribute and attribute in layer.objects.columns:
        layer.objects.plot(
            column=attribute,
            cmap=cmap,
            ax=ax,
            legend=True,
            alpha=0.7 if image_data is not None else 1.0,
        )

    if show_boundaries and layer.raster is not None:
        from skimage.segmentation import mark_boundaries

        if image_data is not None:
            if "num_bands" in locals() and num_bands >= 3:
                base_img = rgb
            else:
                gray = image_data[0]
                gray_norm = (gray - gray.min()) / (gray.max() - gray.min() + 1e-10)
                base_img = np.stack([gray_norm, gray_norm, gray_norm], axis=2)

            bounded = mark_boundaries(base_img, layer.raster, color=(1, 1, 0), mode="thick")

            if attribute is None:
                ax.imshow(bounded)
        else:
            ax.imshow(
                mark_boundaries(
                    np.zeros((layer.raster.shape[0], layer.raster.shape[1], 3)),
                    layer.raster,
                    color=(1, 1, 0),
                    mode="thick",
                )
            )

    ax.grid(alpha=0.3)
    return fig

plot_layer_interactive(layer, image_data=None, figsize=(10, 8))

Interactive plot of a layer with widgets and working click.

Source code in nickyspatial/viz/maps.py
def plot_layer_interactive(layer, image_data=None, figsize=(10, 8)):
    """Interactive plot of a layer with widgets and working click."""
    """
    %matplotlib widget
    plot_layer_interactive(layer=segmentation_layer,image_data=image_data,figsize=(10,8))
    Not supported in google collab
    """
    title_widget = widgets.Text(value="Layer Visualization", description="Title:")

    rgb_band_max = image_data.shape[0] - 1 if image_data is not None else 2

    red_band_widget = widgets.Select(
        options=list(range(rgb_band_max + 1)), value=0 if rgb_band_max >= 2 else 0, description="Red Band:"
    )
    green_band_widget = widgets.Select(
        options=list(range(rgb_band_max + 1)), value=1 if rgb_band_max >= 2 else 0, description="Green Band:"
    )
    blue_band_widget = widgets.Select(
        options=list(range(rgb_band_max + 1)), value=2 if rgb_band_max >= 2 else 0, description="Blue Band:"
    )

    show_boundaries_widget = widgets.Checkbox(value=True, description="Show Boundaries")

    fig, ax = plt.subplots(figsize=figsize)
    out_fig = widgets.Output()

    def onclick(event):
        if event.xdata is None or event.ydata is None:
            return
        x_pix, y_pix = int(event.xdata), int(event.ydata)
        if (0 <= x_pix < layer.raster.shape[1]) and (0 <= y_pix < layer.raster.shape[0]):
            segment_id = layer.raster[y_pix, x_pix]
            msg = f"Clicked at (x={x_pix}, y={y_pix}) → Segment ID: {segment_id}"
            title_widget.value = msg
            ax.set_title(msg)
            fig.canvas.draw_idle()
        else:
            msg = "Clicked outside raster bounds"
            title_widget.value = msg
            ax.set_title(msg)
            fig.canvas.draw_idle()

    fig.canvas.mpl_connect("button_press_event", onclick)

    def update_plot(red_band, green_band, blue_band, show_boundaries):
        ax.clear()

        if image_data is not None:
            num_bands = image_data.shape[0]
            if red_band >= 0 and green_band >= 0 and blue_band >= 0:
                r = image_data[red_band].astype(float)
                g = image_data[green_band].astype(float)
                b = image_data[blue_band].astype(float)

                r_norm = np.clip((r - r.min()) / (r.max() - r.min() + 1e-10), 0, 1)
                g_norm = np.clip((g - g.min()) / (g.max() - g.min() + 1e-10), 0, 1)
                b_norm = np.clip((b - b.min()) / (b.max() - b.min() + 1e-10), 0, 1)

                rgb = np.stack([r_norm, g_norm, b_norm], axis=2)
                ax.imshow(rgb)
            else:
                gray = image_data[0]
                gray_norm = (gray - gray.min()) / (gray.max() - gray.min() + 1e-10)
                ax.imshow(gray_norm, cmap="gray")

        if show_boundaries and layer.raster is not None:
            if image_data is not None:
                if num_bands >= 3:
                    base_img = rgb
                else:
                    gray = image_data[0]
                    gray_norm = (gray - gray.min()) / (gray.max() - gray.min() + 1e-10)
                    base_img = np.stack([gray_norm, gray_norm, gray_norm], axis=2)

                bounded = mark_boundaries(base_img, layer.raster, color=(1, 1, 0), mode="thick")
                ax.imshow(bounded)
            else:
                ax.imshow(
                    mark_boundaries(
                        np.zeros((layer.raster.shape[0], layer.raster.shape[1], 3)),
                        layer.raster,
                        color=(1, 1, 0),
                        mode="thick",
                    )
                )

        ax.grid(alpha=0.3)
        fig.canvas.draw_idle()

    ui = widgets.VBox(
        [
            red_band_widget,
            green_band_widget,
            blue_band_widget,
            show_boundaries_widget,
        ]
    )

    controls = widgets.interactive_output(
        update_plot,
        {
            "red_band": red_band_widget,
            "green_band": green_band_widget,
            "blue_band": blue_band_widget,
            "show_boundaries": show_boundaries_widget,
        },
    )

    display(ui, out_fig, controls)

plot_layer_interactive_plotly(layer, image_data, rgb_bands=(0, 1, 2), show_boundaries=True, figsize=(800, 400))

Display an interactive RGB image with segment boundaries and hoverable segment IDs using Plotly.

Run in google collab as well.

Parameters:

layer : object An object with a .raster attribute representing the labeled segmentation layer (e.g., output from a segmentation algorithm, such as SLIC). image_data : image data to be visualized. rgb_bands : tuple of int, optional Tuple of three integers specifying which bands to use for the RGB composite (default is (0, 1, 2)). show_boundaries : bool, optional Whether to overlay the segment boundaries on the RGB image (default is True). figsize : tuple of int, optional Tuple specifying the width and height of the interactive Plotly figure in pixels (default is (800, 400)).

Returns:

None The function displays the interactive plot directly in the output cell in a Jupyter Notebook.

Notes:

  • Segment boundaries are drawn using skimage.segmentation.mark_boundaries.
  • Hovering over the image displays the segment ID from layer.raster.
Source code in nickyspatial/viz/maps.py
def plot_layer_interactive_plotly(layer, image_data, rgb_bands=(0, 1, 2), show_boundaries=True, figsize=(800, 400)):
    """Display an interactive RGB image with segment boundaries and hoverable segment IDs using Plotly.

    Run in google collab as well.

    Parameters:
    ----------
    layer : object
        An object with a `.raster` attribute representing the labeled segmentation layer
        (e.g., output from a segmentation algorithm, such as SLIC).
    image_data : image data to be visualized.
    rgb_bands : tuple of int, optional
        Tuple of three integers specifying which bands to use for the RGB composite (default is (0, 1, 2)).
    show_boundaries : bool, optional
        Whether to overlay the segment boundaries on the RGB image (default is True).
    figsize : tuple of int, optional
        Tuple specifying the width and height of the interactive Plotly figure in pixels (default is (800, 400)).

    Returns:
    -------
    None
        The function displays the interactive plot directly in the output cell in a Jupyter Notebook.

    Notes:
    -----
    - Segment boundaries are drawn using `skimage.segmentation.mark_boundaries`.
    - Hovering over the image displays the segment ID from `layer.raster`.

    """

    def get_rgb_image(r, g, b):
        r_norm = np.clip((r - r.min()) / (r.max() - r.min() + 1e-10), 0, 1)
        g_norm = np.clip((g - g.min()) / (g.max() - g.min() + 1e-10), 0, 1)
        b_norm = np.clip((b - b.min()) / (b.max() - b.min() + 1e-10), 0, 1)
        return np.stack([r_norm, g_norm, b_norm], axis=2)

    def update_plot(rgb_bands, show_boundaries=True):
        rgb_image = get_rgb_image(image_data[rgb_bands[0]], image_data[rgb_bands[1]], image_data[rgb_bands[2]])

        if show_boundaries:
            rgb_image = mark_boundaries(rgb_image, layer.raster, color=(1, 1, 0), mode="thick")

        fig = go.Figure(data=go.Image(z=(rgb_image * 255).astype(np.uint8)))

        fig.add_trace(
            go.Heatmap(
                z=layer.raster,
                opacity=0,
                hoverinfo="z",
                showscale=False,
                hovertemplate="Segment ID: %{z}<extra></extra>",
                colorscale="gray",
            )
        )

        fig.update_layout(
            title="Hover to see Segment ID", dragmode="pan", margin=dict(l=0, r=0, t=30, b=0), height=figsize[1], width=figsize[0]
        )
        fig.update_xaxes(showticklabels=False)
        fig.update_yaxes(showticklabels=False, scaleanchor="x")

        fig.show()

    update_plot(rgb_bands=rgb_bands, show_boundaries=show_boundaries)

plot_sample(layer, image_data=None, transform=None, rgb_bands=None, class_field='classification', figsize=(8, 6), class_color=None, legend=True)

Plot classified segments on top of RGB or grayscale image data.

Parameters: - layer: Layer object with .objects (GeoDataFrame) - image_data: 3D numpy array (bands, height, width) - transform: Affine transform for the image (needed to compute extent) - red_band, green_band, blue_band: indices for RGB bands

Source code in nickyspatial/viz/maps.py
def plot_sample(
    layer,
    image_data=None,
    transform=None,
    rgb_bands=None,
    class_field="classification",
    figsize=(8, 6),
    class_color=None,
    legend=True,
):
    """Plot classified segments on top of RGB or grayscale image data.

    Parameters:
    - layer: Layer object with .objects (GeoDataFrame)
    - image_data: 3D numpy array (bands, height, width)
    - transform: Affine transform for the image (needed to compute extent)
    - red_band, green_band, blue_band: indices for RGB bands
    """
    fig, ax = plt.subplots(figsize=figsize)

    if image_data is not None:
        num_bands = image_data.shape[0]
        if rgb_bands and num_bands >= 3:
            r = image_data[rgb_bands[0]].astype(float)
            g = image_data[rgb_bands[1]].astype(float)
            b = image_data[rgb_bands[2]].astype(float)

            r_norm = np.clip((r - r.min()) / (r.max() - r.min() + 1e-10), 0, 1)
            g_norm = np.clip((g - g.min()) / (g.max() - g.min() + 1e-10), 0, 1)
            b_norm = np.clip((b - b.min()) / (b.max() - b.min() + 1e-10), 0, 1)

            rgb = np.stack([r_norm, g_norm, b_norm], axis=2)
            if transform:
                from rasterio.plot import plotting_extent

                extent = plotting_extent(image_data[0], transform=transform)
                ax.imshow(rgb, extent=extent)
            else:
                ax.imshow(rgb)
        else:
            gray = image_data[0]
            gray_norm = (gray - gray.min()) / (gray.max() - gray.min() + 1e-10)
            if transform:
                from rasterio.plot import plotting_extent

                extent = plotting_extent(gray, transform=transform)
                ax.imshow(gray_norm, cmap="gray", extent=extent)
            else:
                ax.imshow(gray_norm, cmap="gray")

    gdf = layer.objects.copy()
    if gdf.crs is None:
        raise ValueError("GeoDataFrame has no CRS")

    if not class_color:
        class_color = {}
    if class_field not in gdf.columns:
        raise ValueError(f"Class field '{class_field}' not found")

    class_values = [v for v in gdf[class_field].unique() if v is not None]
    base_colors = plt.cm.tab20(np.linspace(0, 1, max(len(class_values), 1)))
    class_map = {}

    for idx, class_value in enumerate(class_values):
        if class_value in class_color:
            color_hex = class_color[class_value]
        else:
            rgb_val = base_colors[idx % len(base_colors)][:3]
            color_hex = "#{:02x}{:02x}{:02x}".format(int(rgb_val[0] * 255), int(rgb_val[1] * 255), int(rgb_val[2] * 255))
            class_color[class_value] = color_hex
        class_map[class_value] = color_hex

    for class_value in class_values:
        gdf[gdf[class_field] == class_value].plot(ax=ax, facecolor=class_map[class_value], edgecolor="black", linewidth=0.5)

    if legend:
        handles = [mpatches.Patch(color=class_map[val], label=val) for val in class_values]
        ax.legend(handles=handles, loc="upper right", title=class_field)

    ax.set_title("Sample Data Visualization")
    ax.set_axis_off()
    return fig

plot_subplots_classification(layer, class_field='classification', figsize=(12, 10), legend=True, class_color=None, ax=None, title=None)

Plot classified segments with different colors for each class.

layer : figsize: class_clor: ax : ax instance of matplotlib Axes to plot on. title : str, optional Title for the plot. If None, defaults to "Classification Map".

Source code in nickyspatial/viz/maps.py
def plot_subplots_classification(
    layer, class_field="classification", figsize=(12, 10), legend=True, class_color=None, ax=None, title=None
):
    """Plot classified segments with different colors for each class.

    layer :
    figsize:
    class_clor:
    ax : ax instance of matplotlib Axes to plot on.
    title : str, optional
        Title for the plot. If None, defaults to "Classification Map".
    """
    if ax is None:
        fig, ax = plt.subplots(figsize=figsize)
    if not class_color:
        class_color = {}

    if class_field not in layer.objects.columns:
        raise ValueError(f"Class field '{class_field}' not found in layer objects")

    class_values = [v for v in layer.objects[class_field].unique() if v is not None]

    # generate base colormap
    base_colors = plt.cm.tab20(np.linspace(0, 1, max(len(class_values), 1)))

    colors_list = []
    for idx, class_value in enumerate(class_values):
        if class_color and class_value in list(class_color.keys()):
            # reuse stored color
            color_hex = class_color[class_value]
        else:
            # assign new color (from tab20 or random if exceeds)
            if idx < len(base_colors):
                rgb = base_colors[idx][:3]
                color_hex = "#{:02x}{:02x}{:02x}".format(int(rgb[0] * 255), int(rgb[1] * 255), int(rgb[2] * 255))
            else:
                color_hex = "#{:06x}".format(random.randint(0, 0xFFFFFF))
            class_color[class_value] = color_hex

        # convert hex → RGB tuple for ListedColormap
        rgb_tuple = tuple(int(color_hex[i : i + 2], 16) / 255 for i in (1, 3, 5))
        colors_list.append(rgb_tuple)

    # create colormap
    cmap = ListedColormap(colors_list)

    # map class values to indices
    class_map = {value: i for i, value in enumerate(class_values)}
    layer.objects["_class_id"] = layer.objects[class_field].map(class_map)

    layer.objects.plot(
        column="_class_id",
        cmap=cmap,
        ax=ax,
        edgecolor="black",
        linewidth=0.5,
        legend=False,
    )

    if legend and len(class_values) > 0:
        patches = [mpatches.Patch(color=class_color[value], label=value) for value in class_values]
        ax.legend(handles=patches, loc="upper right", title=class_field)

    # ax.set_title(f"Classification by {class_field}")
    if title is None:
        ax.set_title("Classification Map")
    else:
        ax.set_title(title)

    ax.set_xlabel("X Coordinate")
    ax.set_ylabel("Y Coordinate")

    # cleanup temporary column
    if "_class_id" in layer.objects.columns:
        layer.objects = layer.objects.drop(columns=["_class_id"])

    if ax is None:
        return fig
    return ax