Skip to content

viberl.utils.vector_env

Vector environment utilities for parallel sampling.

Classes:

Name Description
VectorEnvSampler

Vector environment sampler for parallel trajectory collection.

Functions:

Name Description
create_vector_sampler

Create a vector environment sampler.

VectorEnvSampler

VectorEnvSampler(
    env_fn: callable, num_envs: int, agent: Agent, max_steps: int = 1000, device: str = 'cpu'
)

Vector environment sampler for parallel trajectory collection.

Initialize vector environment sampler.

Parameters:

Name Type Description Default
env_fn callable

Function that creates a single environment

required
num_envs int

Number of parallel environments

required
agent Agent

RL agent to use for action selection

required
max_steps int

Maximum steps per episode

1000
device str

Device for tensor operations

'cpu'

Methods:

Name Description
reset

Reset all environments.

collect_batch_trajectories

Collect a batch of trajectories using parallel environments.

collect_trajectory_batch

Collect a batch of trajectories using parallel sampling.

close

Close the vector environment.

__enter__

Context manager entry.

__exit__

Context manager exit.

Attributes:

Name Type Description
num_envs
agent
max_steps
device
env
active_trajectories list[list[Transition]]
active_episode_rewards
completed_trajectories list[tuple[Trajectory, float]]
Source code in viberl/utils/vector_env.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def __init__(
    self,
    env_fn: callable,
    num_envs: int,
    agent: Agent,
    max_steps: int = 1000,
    device: str = 'cpu',
) -> None:
    """Initialize vector environment sampler.

    Args:
        env_fn: Function that creates a single environment
        num_envs: Number of parallel environments
        agent: RL agent to use for action selection
        max_steps: Maximum steps per episode
        device: Device for tensor operations
    """
    self.num_envs = num_envs
    self.agent = agent
    self.max_steps = max_steps
    self.device = torch.device(device)

    # Create vector environment
    try:
        from gymnasium.vector import AsyncVectorEnv

        self.env = AsyncVectorEnv([env_fn for _ in range(num_envs)])
        logger.info(f'Created AsyncVectorEnv with {num_envs} environments')
    except ImportError:
        logger.warning('AsyncVectorEnv not available, using SyncVectorEnv')
        from gymnasium.vector import SyncVectorEnv

        self.env = SyncVectorEnv([env_fn for _ in range(num_envs)])

    # Initialize tracking variables
    self.active_trajectories: list[list[Transition]] = [[] for _ in range(num_envs)]
    self.active_episode_rewards = np.zeros(num_envs)
    self.completed_trajectories: list[tuple[Trajectory, float]] = []

num_envs instance-attribute

num_envs = num_envs

agent instance-attribute

agent = agent

max_steps instance-attribute

max_steps = max_steps

device instance-attribute

device = device(device)

env instance-attribute

env = AsyncVectorEnv([env_fn for _ in range(num_envs)])

active_trajectories instance-attribute

active_trajectories: list[list[Transition]] = [[] for _ in range(num_envs)]

active_episode_rewards instance-attribute

active_episode_rewards = zeros(num_envs)

completed_trajectories instance-attribute

completed_trajectories: list[tuple[Trajectory, float]] = []

reset

reset() -> ndarray

Reset all environments.

Returns:

Type Description
ndarray

Initial observations from all environments

Source code in viberl/utils/vector_env.py
62
63
64
65
66
67
68
69
def reset(self) -> np.ndarray:
    """Reset all environments.

    Returns:
        Initial observations from all environments
    """
    observations, _ = self.env.reset()
    return observations

collect_batch_trajectories

collect_batch_trajectories(
    num_trajectories: int, render: bool = False
) -> list[tuple[Trajectory, float]]

Collect a batch of trajectories using parallel environments.

Parameters:

Name Type Description Default
num_trajectories int

Number of trajectories to collect

required
render bool

Whether to render the environments

False

Returns:

Type Description
list[tuple[Trajectory, float]]

List of (trajectory, episode_reward) tuples

Source code in viberl/utils/vector_env.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def collect_batch_trajectories(
    self, num_trajectories: int, render: bool = False
) -> list[tuple[Trajectory, float]]:
    """Collect a batch of trajectories using parallel environments.

    Args:
        num_trajectories: Number of trajectories to collect
        render: Whether to render the environments

    Returns:
        List of (trajectory, episode_reward) tuples
    """
    completed_trajectories = []
    collected_count = 0

    # Reset environments if needed
    observations, _ = self.env.reset()
    observations = self._preprocess_observations(observations)

    # Initialize tracking for active episodes
    active_trajectories = [[] for _ in range(self.num_envs)]
    active_rewards = np.zeros(self.num_envs)
    active_masks = np.ones(self.num_envs, dtype=bool)  # Track active environments

    while collected_count < num_trajectories:
        # Select actions for all active environments
        actions = []
        valid_observations = observations[active_masks]

        if len(valid_observations) > 0:
            # Get actions from agent for valid observations
            for obs in valid_observations:
                action_obj = self.agent.act(obs)
                actions.append(action_obj.action)

            # Step environments
            next_observations, rewards, terminations, truncations, infos = self.env.step(
                np.array(actions)
            )
            next_observations = self._preprocess_observations(next_observations)

            # Process results for active environments
            action_idx = 0
            for env_idx in range(self.num_envs):
                if not active_masks[env_idx]:
                    continue

                # Create transition
                transition = Transition(
                    state=observations[env_idx],
                    action=Action(
                        action=actions[action_idx],
                        log_prob=getattr(
                            self.agent.act(valid_observations[action_idx]),
                            'log_prob',
                            None,
                        ),
                    ),
                    reward=float(rewards[env_idx]),
                    next_state=next_observations[env_idx],
                    done=bool(terminations[env_idx] or truncations[env_idx]),
                    info=infos
                    if isinstance(infos, dict)
                    else (infos[env_idx] if isinstance(infos, list | tuple) else {}),
                )

                active_trajectories[env_idx].append(transition)
                active_rewards[env_idx] += rewards[env_idx]
                action_idx += 1

                # Check if episode is done
                if terminations[env_idx] or truncations[env_idx]:
                    # Complete trajectory
                    trajectory = Trajectory.from_transitions(active_trajectories[env_idx])
                    completed_trajectories.append((trajectory, active_rewards[env_idx]))

                    # Reset environment
                    active_trajectories[env_idx] = []
                    active_rewards[env_idx] = 0.0
                    collected_count += 1

                    # Reset this specific environment
                    obs, _ = self.env.reset()
                    next_observations[env_idx] = self._preprocess_observations(obs)[env_idx]

            # Update observations for next step
            observations = next_observations

            # Render if requested
            if render:
                self.env.render()

    return completed_trajectories[:num_trajectories]

collect_trajectory_batch

collect_trajectory_batch(batch_size: int, render: bool = False) -> list[tuple[Trajectory, float]]

Collect a batch of trajectories using parallel sampling.

Parameters:

Name Type Description Default
batch_size int

Number of trajectories to collect

required
render bool

Whether to render environments

False

Returns:

Type Description
list[tuple[Trajectory, float]]

List of (trajectory, episode_reward) tuples

Source code in viberl/utils/vector_env.py
165
166
167
168
169
170
171
172
173
174
175
176
177
def collect_trajectory_batch(
    self, batch_size: int, render: bool = False
) -> list[tuple[Trajectory, float]]:
    """Collect a batch of trajectories using parallel sampling.

    Args:
        batch_size: Number of trajectories to collect
        render: Whether to render environments

    Returns:
        List of (trajectory, episode_reward) tuples
    """
    return self.collect_batch_trajectories(batch_size, render=render)

close

close() -> None

Close the vector environment.

Source code in viberl/utils/vector_env.py
194
195
196
def close(self) -> None:
    """Close the vector environment."""
    self.env.close()

__enter__

__enter__() -> Self

Context manager entry.

Source code in viberl/utils/vector_env.py
198
199
200
def __enter__(self) -> Self:
    """Context manager entry."""
    return self

__exit__

__exit__(exc_type: object, exc_val: object, exc_tb: object) -> None

Context manager exit.

Source code in viberl/utils/vector_env.py
202
203
204
def __exit__(self, exc_type: object, exc_val: object, exc_tb: object) -> None:
    """Context manager exit."""
    self.close()

create_vector_sampler

create_vector_sampler(
    env_fn: callable, num_envs: int, agent: Agent, max_steps: int = 1000, device: str = 'cpu'
) -> Self

Create a vector environment sampler.

Parameters:

Name Type Description Default
env_fn callable

Function that creates a single environment

required
num_envs int

Number of parallel environments

required
agent Agent

RL agent to use for action selection

required
max_steps int

Maximum steps per episode

1000
device str

Device for tensor operations

'cpu'

Returns:

Type Description
Self

VectorEnvSampler instance

Source code in viberl/utils/vector_env.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def create_vector_sampler(
    env_fn: callable,
    num_envs: int,
    agent: Agent,
    max_steps: int = 1000,
    device: str = 'cpu',
) -> Self:
    """Create a vector environment sampler.

    Args:
        env_fn: Function that creates a single environment
        num_envs: Number of parallel environments
        agent: RL agent to use for action selection
        max_steps: Maximum steps per episode
        device: Device for tensor operations

    Returns:
        VectorEnvSampler instance
    """
    return VectorEnvSampler(
        env_fn=env_fn,
        num_envs=num_envs,
        agent=agent,
        max_steps=max_steps,
        device=device,
    )