vprdb.reduction_methods

 1#  Copyright (c) 2023, Ivan Moskalenko, Anastasiia Kornilova
 2#
 3#  Licensed under the Apache License, Version 2.0 (the "License");
 4#  you may not use this file except in compliance with the License.
 5#  You may obtain a copy of the License at
 6#
 7#      http://www.apache.org/licenses/LICENSE-2.0
 8#
 9#  Unless required by applicable law or agreed to in writing, software
10#  distributed under the License is distributed on an "AS IS" BASIS,
11#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12#  See the License for the specific language governing permissions and
13#  limitations under the License.
14from vprdb.reduction_methods.cube_division import CubeDivision
15from vprdb.reduction_methods.distance_vector import DistanceVector
16from vprdb.reduction_methods.dominating_set import DominatingSet
17from vprdb.reduction_methods.every_nth import EveryNth
18
19__all__ = ["CubeDivision", "DistanceVector", "DominatingSet", "EveryNth"]
class CubeDivision(vprdb.reduction_methods.reduction_method.ReductionMethod):
21class CubeDivision(ReductionMethod):
22    """
23    The method divides the cuboid in which the trajectory is located
24    into many small cubes, in each of which only one pose is chosen
25    """
26
27    def __init__(self, cube_size: float):
28        """
29        Constructs CubeDivision reduction method
30        :param cube_size: The size of the cubes into which the cuboid will be divided
31        """
32        self.cube_size = cube_size
33
34    def reduce(self, db: Database) -> Database:
35        traj = np.asarray(db.trajectory)
36        xyz_traj = traj[:, :3, 3]
37        min_over_axes = np.amin(xyz_traj, axis=0)
38        max_over_axes = np.amax(xyz_traj, axis=0)
39        voxel_grid = VoxelGrid(min_over_axes, max_over_axes, self.cube_size)
40
41        # Association of points with cubes
42        cubes = dict()
43        for i, trans_matrix in enumerate(traj):
44            point = trans_matrix[:3, 3]
45            cube_coordinates = voxel_grid.get_voxel_coordinates(point)
46
47            point = np.append(point, i)
48            if cube_coordinates in cubes:
49                cubes[cube_coordinates] = np.append(
50                    cubes[cube_coordinates], [point], axis=0
51                )
52            else:
53                cubes[cube_coordinates] = np.asarray([point])
54
55        # Finding the closest point to the center in each cube
56        res_indices = []
57        for (x, y, z), enum_points in cubes.items():
58            indices = enum_points[:, 3]
59            points = enum_points[:, :3]
60            cube_center = (
61                x + self.cube_size / 2,
62                y + self.cube_size / 2,
63                z + self.cube_size / 2,
64            )
65            distances = np.apply_along_axis(
66                lambda p: np.linalg.norm(p - cube_center), axis=1, arr=points
67            )
68            res_indices.append(int(indices[np.argmin(distances)]))
69        res_indices.sort()
70
71        new_rgb = [db.color_images[i] for i in res_indices]
72        new_point_clouds = [db.point_clouds[i] for i in res_indices]
73        new_traj = [db.trajectory[i] for i in res_indices]
74        return Database(new_rgb, new_point_clouds, new_traj)
75
76    reduce.__doc__ = ReductionMethod.reduce.__doc__

The method divides the cuboid in which the trajectory is located into many small cubes, in each of which only one pose is chosen

CubeDivision(cube_size: float)
27    def __init__(self, cube_size: float):
28        """
29        Constructs CubeDivision reduction method
30        :param cube_size: The size of the cubes into which the cuboid will be divided
31        """
32        self.cube_size = cube_size

Constructs CubeDivision reduction method

Parameters
  • cube_size: The size of the cubes into which the cuboid will be divided
cube_size
def reduce(self, db: vprdb.core.database.Database) -> vprdb.core.database.Database:
34    def reduce(self, db: Database) -> Database:
35        traj = np.asarray(db.trajectory)
36        xyz_traj = traj[:, :3, 3]
37        min_over_axes = np.amin(xyz_traj, axis=0)
38        max_over_axes = np.amax(xyz_traj, axis=0)
39        voxel_grid = VoxelGrid(min_over_axes, max_over_axes, self.cube_size)
40
41        # Association of points with cubes
42        cubes = dict()
43        for i, trans_matrix in enumerate(traj):
44            point = trans_matrix[:3, 3]
45            cube_coordinates = voxel_grid.get_voxel_coordinates(point)
46
47            point = np.append(point, i)
48            if cube_coordinates in cubes:
49                cubes[cube_coordinates] = np.append(
50                    cubes[cube_coordinates], [point], axis=0
51                )
52            else:
53                cubes[cube_coordinates] = np.asarray([point])
54
55        # Finding the closest point to the center in each cube
56        res_indices = []
57        for (x, y, z), enum_points in cubes.items():
58            indices = enum_points[:, 3]
59            points = enum_points[:, :3]
60            cube_center = (
61                x + self.cube_size / 2,
62                y + self.cube_size / 2,
63                z + self.cube_size / 2,
64            )
65            distances = np.apply_along_axis(
66                lambda p: np.linalg.norm(p - cube_center), axis=1, arr=points
67            )
68            res_indices.append(int(indices[np.argmin(distances)]))
69        res_indices.sort()
70
71        new_rgb = [db.color_images[i] for i in res_indices]
72        new_point_clouds = [db.point_clouds[i] for i in res_indices]
73        new_traj = [db.trajectory[i] for i in res_indices]
74        return Database(new_rgb, new_point_clouds, new_traj)

Method for reducing the Database

Parameters
  • db: Database for reduction
Returns

Reduced database

class DistanceVector(vprdb.reduction_methods.reduction_method.ReductionMethod):
21class DistanceVector(ReductionMethod):
22    """
23    The method calculates the distance between neighboring poses.
24    Then using the calculated distances, it calculates the distance from the current pose to the last pose taken.
25    If the distance is greater than the threshold value, the current pose is added to the final database.
26    """
27
28    def __init__(self, distance_threshold: float):
29        """
30        Constructs DistanceVector reduction method
31        :param distance_threshold: Threshold value for distance
32        """
33        self.distance_threshold = distance_threshold
34
35    def reduce(self, db: Database) -> Database:
36        traj = np.asarray(db.trajectory)
37        new_traj = [traj[0]]
38        new_color = [db.color_images[0]]
39        new_point_clouds = [db.point_clouds[0]]
40        first_points = traj[:-1, :3, 3]
41        last_points = traj[1:, :3, 3]
42        distances = np.linalg.norm(last_points - first_points, axis=1)
43        partial_distance = 0
44        for i, cur_distance in enumerate(distances):
45            partial_distance += cur_distance
46            if partial_distance > self.distance_threshold:
47                new_traj.append(traj[i + 1])
48                new_color.append(db.color_images[i + 1])
49                new_point_clouds.append(db.point_clouds[i + 1])
50                partial_distance = 0
51
52        return Database(new_color, new_point_clouds, new_traj)
53
54    reduce.__doc__ = ReductionMethod.reduce.__doc__

The method calculates the distance between neighboring poses. Then using the calculated distances, it calculates the distance from the current pose to the last pose taken. If the distance is greater than the threshold value, the current pose is added to the final database.

DistanceVector(distance_threshold: float)
28    def __init__(self, distance_threshold: float):
29        """
30        Constructs DistanceVector reduction method
31        :param distance_threshold: Threshold value for distance
32        """
33        self.distance_threshold = distance_threshold

Constructs DistanceVector reduction method

Parameters
  • distance_threshold: Threshold value for distance
distance_threshold
def reduce(self, db: vprdb.core.database.Database) -> vprdb.core.database.Database:
35    def reduce(self, db: Database) -> Database:
36        traj = np.asarray(db.trajectory)
37        new_traj = [traj[0]]
38        new_color = [db.color_images[0]]
39        new_point_clouds = [db.point_clouds[0]]
40        first_points = traj[:-1, :3, 3]
41        last_points = traj[1:, :3, 3]
42        distances = np.linalg.norm(last_points - first_points, axis=1)
43        partial_distance = 0
44        for i, cur_distance in enumerate(distances):
45            partial_distance += cur_distance
46            if partial_distance > self.distance_threshold:
47                new_traj.append(traj[i + 1])
48                new_color.append(db.color_images[i + 1])
49                new_point_clouds.append(db.point_clouds[i + 1])
50                partial_distance = 0
51
52        return Database(new_color, new_point_clouds, new_traj)

Method for reducing the Database

Parameters
  • db: Database for reduction
Returns

Reduced database

class DominatingSet(vprdb.reduction_methods.reduction_method.ReductionMethod):
21class DominatingSet(ReductionMethod):
22    """
23    Method constructs a graph where the images are vertices that
24    are connected if the IoU is greater than the
25    given threshold. Then it finds a minimal subset of graph vertices so
26    that every vertex is either in the set or is connected to a
27    vertex from it.
28    """
29
30    def __init__(self, threshold: float = 0.3, voxel_size: float = 0.3):
31        """
32        Constructs DominatingSet reduction method
33        :param threshold: Threshold value for IoU
34        :param voxel_size: The value indicating which IoU value will be enough
35        to consider the point clouds as overlapping
36        """
37        self.threshold = threshold
38        self.voxel_size = voxel_size
39
40    def reduce(self, db: Database) -> Database:
41        min_bounds, max_bounds = db.bounds
42        voxel_grid = VoxelGrid(min_bounds, max_bounds, self.voxel_size)
43        # Determine which frames cover a particular voxel
44        voxel_to_frames_dict = dict()
45        # Frame size is the number of voxels it covers
46        frames_sizes = []
47        for i, pose in enumerate(db.trajectory):
48            pcd = db.point_clouds[i].point_cloud.transform(pose)
49            down_sampled_pcd = voxel_grid.voxel_down_sample(pcd)
50            points = down_sampled_pcd.points
51            frames_sizes.append(len(points))
52            for point in points:
53                voxel_index = voxel_grid.get_voxel_index(point)
54                if voxel_index in voxel_to_frames_dict:
55                    voxel_to_frames_dict[voxel_index].append(i)
56                else:
57                    voxel_to_frames_dict[voxel_index] = [i]
58
59        intersections = dict()
60        for covering_frames in voxel_to_frames_dict.values():
61            for i, frame_1 in enumerate(covering_frames):
62                for frame_2 in covering_frames[i + 1 :]:
63                    intersection = tuple(sorted((frame_1, frame_2)))
64                    if intersection in intersections:
65                        intersections[intersection] += 1
66                    else:
67                        intersections[intersection] = 1
68
69        IoUs = dict.fromkeys(intersections.keys())
70        for (frame_1, frame_2), intersection in intersections.items():
71            IoUs[(frame_1, frame_2)] = intersection / (
72                frames_sizes[frame_1] + frames_sizes[frame_2] - intersection
73            )
74
75        G = nx.Graph()
76        G.add_nodes_from(range(len(db)))
77        for (fr1, fr2), IoU in IoUs.items():
78            if IoU > self.threshold:
79                G.add_edge(fr1, fr2)
80        result_indices = list(nx.dominating_set(G))
81        result_indices.sort()
82        new_rgb = [db.color_images[i] for i in result_indices]
83        new_point_clouds = [db.point_clouds[i] for i in result_indices]
84        new_traj = [db.trajectory[i] for i in result_indices]
85        return Database(new_rgb, new_point_clouds, new_traj)
86
87    reduce.__doc__ = ReductionMethod.reduce.__doc__

Method constructs a graph where the images are vertices that are connected if the IoU is greater than the given threshold. Then it finds a minimal subset of graph vertices so that every vertex is either in the set or is connected to a vertex from it.

DominatingSet(threshold: float = 0.3, voxel_size: float = 0.3)
30    def __init__(self, threshold: float = 0.3, voxel_size: float = 0.3):
31        """
32        Constructs DominatingSet reduction method
33        :param threshold: Threshold value for IoU
34        :param voxel_size: The value indicating which IoU value will be enough
35        to consider the point clouds as overlapping
36        """
37        self.threshold = threshold
38        self.voxel_size = voxel_size

Constructs DominatingSet reduction method

Parameters
  • threshold: Threshold value for IoU
  • voxel_size: The value indicating which IoU value will be enough to consider the point clouds as overlapping
threshold
voxel_size
def reduce(self, db: vprdb.core.database.Database) -> vprdb.core.database.Database:
40    def reduce(self, db: Database) -> Database:
41        min_bounds, max_bounds = db.bounds
42        voxel_grid = VoxelGrid(min_bounds, max_bounds, self.voxel_size)
43        # Determine which frames cover a particular voxel
44        voxel_to_frames_dict = dict()
45        # Frame size is the number of voxels it covers
46        frames_sizes = []
47        for i, pose in enumerate(db.trajectory):
48            pcd = db.point_clouds[i].point_cloud.transform(pose)
49            down_sampled_pcd = voxel_grid.voxel_down_sample(pcd)
50            points = down_sampled_pcd.points
51            frames_sizes.append(len(points))
52            for point in points:
53                voxel_index = voxel_grid.get_voxel_index(point)
54                if voxel_index in voxel_to_frames_dict:
55                    voxel_to_frames_dict[voxel_index].append(i)
56                else:
57                    voxel_to_frames_dict[voxel_index] = [i]
58
59        intersections = dict()
60        for covering_frames in voxel_to_frames_dict.values():
61            for i, frame_1 in enumerate(covering_frames):
62                for frame_2 in covering_frames[i + 1 :]:
63                    intersection = tuple(sorted((frame_1, frame_2)))
64                    if intersection in intersections:
65                        intersections[intersection] += 1
66                    else:
67                        intersections[intersection] = 1
68
69        IoUs = dict.fromkeys(intersections.keys())
70        for (frame_1, frame_2), intersection in intersections.items():
71            IoUs[(frame_1, frame_2)] = intersection / (
72                frames_sizes[frame_1] + frames_sizes[frame_2] - intersection
73            )
74
75        G = nx.Graph()
76        G.add_nodes_from(range(len(db)))
77        for (fr1, fr2), IoU in IoUs.items():
78            if IoU > self.threshold:
79                G.add_edge(fr1, fr2)
80        result_indices = list(nx.dominating_set(G))
81        result_indices.sort()
82        new_rgb = [db.color_images[i] for i in result_indices]
83        new_point_clouds = [db.point_clouds[i] for i in result_indices]
84        new_traj = [db.trajectory[i] for i in result_indices]
85        return Database(new_rgb, new_point_clouds, new_traj)

Method for reducing the Database

Parameters
  • db: Database for reduction
Returns

Reduced database

class EveryNth(vprdb.reduction_methods.reduction_method.ReductionMethod):
19class EveryNth(ReductionMethod):
20    """The method returns every N-th item from DB"""
21
22    def __init__(self, n: int):
23        """
24        Constructs EveryNth reduction method
25        :param n: Step of method
26        """
27        self.n = n
28
29    def reduce(self, db: Database) -> Database:
30        new_traj = db.trajectory[:: self.n]
31        new_color = db.color_images[:: self.n]
32        new_point_clouds = db.point_clouds[:: self.n]
33        return Database(new_color, new_point_clouds, new_traj)
34
35    reduce.__doc__ = ReductionMethod.reduce.__doc__

The method returns every N-th item from DB

EveryNth(n: int)
22    def __init__(self, n: int):
23        """
24        Constructs EveryNth reduction method
25        :param n: Step of method
26        """
27        self.n = n

Constructs EveryNth reduction method

Parameters
  • n: Step of method
n
def reduce(self, db: vprdb.core.database.Database) -> vprdb.core.database.Database:
29    def reduce(self, db: Database) -> Database:
30        new_traj = db.trajectory[:: self.n]
31        new_color = db.color_images[:: self.n]
32        new_point_clouds = db.point_clouds[:: self.n]
33        return Database(new_color, new_point_clouds, new_traj)

Method for reducing the Database

Parameters
  • db: Database for reduction
Returns

Reduced database