Source code for deepforest.utilities

"""Utilities model"""
import json
import os
import urllib
import warnings

import geopandas as gpd
import numpy as np
import pandas as pd
import rasterio
import shapely
import xmltodict
import yaml
from tqdm import tqdm

from deepforest import _ROOT


[docs]def read_config(config_path): """Read config yaml file""" try: with open(config_path, 'r') as f: config = yaml.load(f, Loader=yaml.FullLoader) except Exception as e: raise FileNotFoundError("There is no config at {}, yields {}".format( config_path, e)) return config
[docs]class DownloadProgressBar(tqdm): """Download progress bar class."""
[docs] def update_to(self, b=1, bsize=1, tsize=None): """ Update class attributes Args: b: bsize: tsize: Returns: """ if tsize is not None: self.total = tsize self.update(b * bsize - self.n)
[docs]def use_release(save_dir=os.path.join(_ROOT, "data/"), prebuilt_model="NEON"): """ Check the existence of, or download the latest model release from github Args: save_dir: Directory to save filepath, default to "data" in deepforest repo prebuilt_model: Currently only accepts "NEON", but could be expanded to include other prebuilt models. The local model will be called prebuilt_model.h5 on disk. Returns: release_tag, output_path (str): path to downloaded model """ # Find latest github tag release from the DeepLidar repo _json = json.loads( urllib.request.urlopen( urllib.request.Request( 'https://api.github.com/repos/Weecology/DeepForest-pytorch/releases/latest', headers={'Accept': 'application/vnd.github.v3+json'}, )).read()) asset = _json['assets'][0] url = asset['browser_download_url'] # Naming based on pre-built model output_path = os.path.join(save_dir, prebuilt_model + ".pt") # Check the release tagged locally try: release_txt = pd.read_csv(save_dir + "current_release.csv") except BaseException: release_txt = pd.DataFrame({"current_release": [None]}) # Download the current release it doesn't exist if not release_txt.current_release[0] == _json["html_url"]: print("Downloading model from DeepForest release {}, see {} for details".format( _json["tag_name"], _json["html_url"])) with DownloadProgressBar(unit='B', unit_scale=True, miniters=1, desc=url.split('/')[-1]) as t: urllib.request.urlretrieve(url, filename=output_path, reporthook=t.update_to) print("Model was downloaded and saved to {}".format(output_path)) # record the release tag locally release_txt = pd.DataFrame({"current_release": [_json["html_url"]]}) release_txt.to_csv(save_dir + "current_release.csv") else: print("Model from DeepForest release {} was already downloaded. " "Loading model from file.".format(_json["html_url"])) return _json["html_url"], output_path
[docs]def xml_to_annotations(xml_path): """ Load annotations from xml format (e.g. RectLabel editor) and convert them into retinanet annotations format. Args: xml_path (str): Path to the annotations xml, formatted by RectLabel Returns: Annotations (pandas dataframe): in the format -> path-to-image.png,x1,y1,x2,y2,class_name """ # parse with open(xml_path) as fd: doc = xmltodict.parse(fd.read()) # grab xml objects try: tile_xml = doc["annotation"]["object"] except Exception as e: raise Exception("error {} for path {} with doc annotation{}".format( e, xml_path, doc["annotation"])) xmin = [] xmax = [] ymin = [] ymax = [] label = [] if isinstance(tile_xml, list): # Construct frame if multiple trees for tree in tile_xml: xmin.append(tree["bndbox"]["xmin"]) xmax.append(tree["bndbox"]["xmax"]) ymin.append(tree["bndbox"]["ymin"]) ymax.append(tree["bndbox"]["ymax"]) label.append(tree['name']) else: xmin.append(tile_xml["bndbox"]["xmin"]) xmax.append(tile_xml["bndbox"]["xmax"]) ymin.append(tile_xml["bndbox"]["ymin"]) ymax.append(tile_xml["bndbox"]["ymax"]) label.append(tile_xml['name']) rgb_name = os.path.basename(doc["annotation"]["filename"]) # set dtypes, check for floats and round xmin = [round_with_floats(x) for x in xmin] xmax = [round_with_floats(x) for x in xmax] ymin = [round_with_floats(x) for x in ymin] ymax = [round_with_floats(x) for x in ymax] annotations = pd.DataFrame({ "image_path": rgb_name, "xmin": xmin, "ymin": ymin, "xmax": xmax, "ymax": ymax, "label": label }) return (annotations)
[docs]def shapefile_to_annotations(shapefile, rgb, savedir="."): """ Convert a shapefile of annotations into annotations csv file for DeepForest training and evaluation Args: shapefile: Path to a shapefile on disk. If a label column is present, it will be used, else all labels are assumed to be "Tree" rgb: Path to the RGB image on disk savedir: Directory to save csv files Returns: results: a pandas dataframe """ # Read shapefile gdf = gpd.read_file(shapefile) # get coordinates df = gdf.geometry.bounds # raster bounds with rasterio.open(rgb) as src: left, bottom, right, top = src.bounds resolution = src.res[0] # Transform project coordinates to image coordinates df["tile_xmin"] = (df.minx - left) / resolution df["tile_xmin"] = df["tile_xmin"].astype(int) df["tile_xmax"] = (df.maxx - left) / resolution df["tile_xmax"] = df["tile_xmax"].astype(int) # UTM is given from the top, but origin of an image is top left df["tile_ymax"] = (top - df.miny) / resolution df["tile_ymax"] = df["tile_ymax"].astype(int) df["tile_ymin"] = (top - df.maxy) / resolution df["tile_ymin"] = df["tile_ymin"].astype(int) # Add labels is they exist if "label" in gdf.columns: df["label"] = gdf["label"] else: df["label"] = "Tree" # add filename df["image_path"] = os.path.basename(rgb) # select columns result = df[[ "image_path", "tile_xmin", "tile_ymin", "tile_xmax", "tile_ymax", "label" ]] result = result.rename(columns={ "tile_xmin": "xmin", "tile_ymin": "ymin", "tile_xmax": "xmax", "tile_ymax": "ymax" }) # ensure no zero area polygons due to rounding to pixel size result = result[~(result.xmin == result.xmax)] result = result[~(result.ymin == result.ymax)] return result
[docs]def round_with_floats(x): """Check if string x is float or int, return int, rounded if needed.""" try: result = int(x) except BaseException: warnings.warn( "Annotations file contained non-integer coordinates. " "These coordinates were rounded to nearest int. " "All coordinates must correspond to pixels in the image coordinate system. " "If you are attempting to use projected data, " "first convert it into image coordinates see FAQ for suggestions.") result = int(np.round(float(x))) return result
[docs]def check_file(df): """Check a file format for correct column names and structure""" if not all(x in df.columns for x in ["image_path", "xmin", "xmax", "ymin", "ymax", "label"]): raise IOError( "Input file has incorrect column names, the following columns must exist 'image_path','xmin','ymin','xmax','ymax','label'." ) return df
[docs]def check_image(image): """Check an image is three channel, channel last format Args: image: numpy array Returns: None, throws error on assert """ if not image.shape[2] == 3: raise ValueError("image is expected have three channels, channel last format, found image with shape {}".format(image.shape))
[docs]def project_boxes(df, root_dir, transform=True): """ Convert from image coordinates to geopgraphic cooridinates Note that this assumes df is just a single plot being passed to this function df: a pandas type dataframe with columns name, xmin, ymin, xmax, ymax, name. Name is the relative path to the root_dir arg. root_dir: directory of images transform: If true, convert from image to geographic coordinates """ plot_names = df.image_path.unique() if len(plot_names) > 1: raise ValueError( "This function projects a single plots worth of data. Multiple plot names found {}" .format(plot_names)) else: plot_name = plot_names[0] rgb_path = "{}/{}".format(root_dir, plot_name) with rasterio.open(rgb_path) as dataset: bounds = dataset.bounds pixelSizeX, pixelSizeY = dataset.res crs = dataset.crs if transform: # subtract origin. Recall that numpy origin is top left! Not bottom # left. df["xmin"] = (df["xmin"].astype(float) * pixelSizeX) + bounds.left df["xmax"] = (df["xmax"].astype(float) * pixelSizeX) + bounds.left df["ymin"] = bounds.top - (df["ymin"].astype(float) * pixelSizeY) df["ymax"] = bounds.top - (df["ymax"].astype(float) * pixelSizeY) # combine column to a shapely Box() object, save shapefile df['geometry'] = df.apply( lambda x: shapely.geometry.box(x.xmin, x.ymin, x.xmax, x.ymax), axis=1) df = gpd.GeoDataFrame(df, geometry='geometry') df.crs = crs return df
[docs]def collate_fn(batch): return tuple(zip(*batch))