Skip to content

isaacimi.ros_manager

Classes:

Name Description
RosManager

A static utility container for managing ROS nodes in the simulation.

RosManager

A static utility container for managing ROS nodes in the simulation.

Note: One multi-threaded executor is used to execute all ROS nodes in the simulation.

Warning: the RosManager is not instantiable

Methods:

Name Description
ensure_node

Return the ROS node corresponding to the specified robot name in the simulation.

shutdown

Removes all ROS nodes being managed.

spin_once

Calls the spin_once() method on the multi-threaded executor.

Source code in src/isaacimi/ros_manager.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class RosManager:
    """A static utility container for managing ROS nodes in the simulation.

    Note: One multi-threaded executor is used to execute all ROS nodes in the simulation.

    Warning: the RosManager is not instantiable
    """
    _nodes = {}
    _init = False
    _executor = None
    _context = None

    def __new__(cls, *args, **kwargs):
        raise TypeError("Cannot instantiate RosManager")

    @classmethod
    def ensure_node(cls, robot_name: str) -> Node:
        """Return the ROS node corresponding to the specified robot name in the simulation.

        If a ROS node does not exist for the specified robot, instantiate one and add it to the registry. Otherwise, return the existing node from the registry.

        Args:
            robot_name (str): the name of the robot the ROS node belongs to

        Returns:
            Node: the ROS node instance
        """
        if not cls._init:
            cls._context = Context()
            rclpy.init(context=cls._context)
            cls._executor = MultiThreadedExecutor(context=cls._context)
            cls._init = True
        if robot_name not in cls._nodes:
            node = Node(
                node_name = "isaacsim_node",
                namespace=f"/{robot_name}",
                cli_args=["--remap", "/tf:=tf", "--remap", "/tf_static:=tf_static"],
                context = cls._context
            )
            node.set_parameters([
                Parameter('use_sim_time', Parameter.Type.BOOL, True)
            ])
            cls._executor.add_node(node)
            cls._nodes[robot_name] = node
        return cls._nodes[robot_name]

    @classmethod
    def spin_once(cls) -> None:
        """Calls the `spin_once()` method on the multi-threaded executor.
        """
        if cls._init:
            cls._executor.spin_once(timeout_sec=0)

    @classmethod
    def shutdown(cls) -> None:
        """Removes all ROS nodes being managed.
        """
        if cls._init:
            for node in cls._nodes.values():
                cls._executor.remove_node(node)
                node.destroy_node()
            cls._nodes.clear()
            cls._executor.shutdown(timeout_sec=None)
            rclpy.shutdown(context=cls._context)
            cls._context.destroy()
            cls._executor = None
            cls._context = None
            cls._init = False

ensure_node classmethod

ensure_node(robot_name: str) -> Node

Return the ROS node corresponding to the specified robot name in the simulation.

If a ROS node does not exist for the specified robot, instantiate one and add it to the registry. Otherwise, return the existing node from the registry.

Parameters:

Name Type Description Default

robot_name

str

the name of the robot the ROS node belongs to

required

Returns:

Name Type Description
Node Node

the ROS node instance

Source code in src/isaacimi/ros_manager.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@classmethod
def ensure_node(cls, robot_name: str) -> Node:
    """Return the ROS node corresponding to the specified robot name in the simulation.

    If a ROS node does not exist for the specified robot, instantiate one and add it to the registry. Otherwise, return the existing node from the registry.

    Args:
        robot_name (str): the name of the robot the ROS node belongs to

    Returns:
        Node: the ROS node instance
    """
    if not cls._init:
        cls._context = Context()
        rclpy.init(context=cls._context)
        cls._executor = MultiThreadedExecutor(context=cls._context)
        cls._init = True
    if robot_name not in cls._nodes:
        node = Node(
            node_name = "isaacsim_node",
            namespace=f"/{robot_name}",
            cli_args=["--remap", "/tf:=tf", "--remap", "/tf_static:=tf_static"],
            context = cls._context
        )
        node.set_parameters([
            Parameter('use_sim_time', Parameter.Type.BOOL, True)
        ])
        cls._executor.add_node(node)
        cls._nodes[robot_name] = node
    return cls._nodes[robot_name]

shutdown classmethod

shutdown() -> None

Removes all ROS nodes being managed.

Source code in src/isaacimi/ros_manager.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@classmethod
def shutdown(cls) -> None:
    """Removes all ROS nodes being managed.
    """
    if cls._init:
        for node in cls._nodes.values():
            cls._executor.remove_node(node)
            node.destroy_node()
        cls._nodes.clear()
        cls._executor.shutdown(timeout_sec=None)
        rclpy.shutdown(context=cls._context)
        cls._context.destroy()
        cls._executor = None
        cls._context = None
        cls._init = False

spin_once classmethod

spin_once() -> None

Calls the spin_once() method on the multi-threaded executor.

Source code in src/isaacimi/ros_manager.py
53
54
55
56
57
58
@classmethod
def spin_once(cls) -> None:
    """Calls the `spin_once()` method on the multi-threaded executor.
    """
    if cls._init:
        cls._executor.spin_once(timeout_sec=0)