| | from collections import deque |
| | import cv2 |
| | cv2.ocl.setUseOpenCL(False) |
| | from .atari_wrappers import WarpFrame, ClipRewardEnv, FrameStack, ScaledFloatFrame |
| | from .wrappers import TimeLimit |
| | import numpy as np |
| | import gym |
| |
|
| |
|
| | class StochasticFrameSkip(gym.Wrapper): |
| | def __init__(self, env, n, stickprob): |
| | gym.Wrapper.__init__(self, env) |
| | self.n = n |
| | self.stickprob = stickprob |
| | self.curac = None |
| | self.rng = np.random.RandomState() |
| | self.supports_want_render = hasattr(env, "supports_want_render") |
| |
|
| | def reset(self, **kwargs): |
| | self.curac = None |
| | return self.env.reset(**kwargs) |
| |
|
| | def step(self, ac): |
| | done = False |
| | totrew = 0 |
| | for i in range(self.n): |
| | |
| | if self.curac is None: |
| | self.curac = ac |
| | |
| | elif i==0: |
| | if self.rng.rand() > self.stickprob: |
| | self.curac = ac |
| | |
| | elif i==1: |
| | self.curac = ac |
| | if self.supports_want_render and i<self.n-1: |
| | ob, rew, done, info = self.env.step(self.curac, want_render=False) |
| | else: |
| | ob, rew, done, info = self.env.step(self.curac) |
| | totrew += rew |
| | if done: break |
| | return ob, totrew, done, info |
| |
|
| | def seed(self, s): |
| | self.rng.seed(s) |
| |
|
| | class PartialFrameStack(gym.Wrapper): |
| | def __init__(self, env, k, channel=1): |
| | """ |
| | Stack one channel (channel keyword) from previous frames |
| | """ |
| | gym.Wrapper.__init__(self, env) |
| | shp = env.observation_space.shape |
| | self.channel = channel |
| | self.observation_space = gym.spaces.Box(low=0, high=255, |
| | shape=(shp[0], shp[1], shp[2] + k - 1), |
| | dtype=env.observation_space.dtype) |
| | self.k = k |
| | self.frames = deque([], maxlen=k) |
| | shp = env.observation_space.shape |
| |
|
| | def reset(self): |
| | ob = self.env.reset() |
| | assert ob.shape[2] > self.channel |
| | for _ in range(self.k): |
| | self.frames.append(ob) |
| | return self._get_ob() |
| |
|
| | def step(self, ac): |
| | ob, reward, done, info = self.env.step(ac) |
| | self.frames.append(ob) |
| | return self._get_ob(), reward, done, info |
| |
|
| | def _get_ob(self): |
| | assert len(self.frames) == self.k |
| | return np.concatenate([frame if i==self.k-1 else frame[:,:,self.channel:self.channel+1] |
| | for (i, frame) in enumerate(self.frames)], axis=2) |
| |
|
| | class Downsample(gym.ObservationWrapper): |
| | def __init__(self, env, ratio): |
| | """ |
| | Downsample images by a factor of ratio |
| | """ |
| | gym.ObservationWrapper.__init__(self, env) |
| | (oldh, oldw, oldc) = env.observation_space.shape |
| | newshape = (oldh//ratio, oldw//ratio, oldc) |
| | self.observation_space = gym.spaces.Box(low=0, high=255, |
| | shape=newshape, dtype=np.uint8) |
| |
|
| | def observation(self, frame): |
| | height, width, _ = self.observation_space.shape |
| | frame = cv2.resize(frame, (width, height), interpolation=cv2.INTER_AREA) |
| | if frame.ndim == 2: |
| | frame = frame[:,:,None] |
| | return frame |
| |
|
| | class Rgb2gray(gym.ObservationWrapper): |
| | def __init__(self, env): |
| | """ |
| | Downsample images by a factor of ratio |
| | """ |
| | gym.ObservationWrapper.__init__(self, env) |
| | (oldh, oldw, _oldc) = env.observation_space.shape |
| | self.observation_space = gym.spaces.Box(low=0, high=255, |
| | shape=(oldh, oldw, 1), dtype=np.uint8) |
| |
|
| | def observation(self, frame): |
| | frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY) |
| | return frame[:,:,None] |
| |
|
| |
|
| | class MovieRecord(gym.Wrapper): |
| | def __init__(self, env, savedir, k): |
| | gym.Wrapper.__init__(self, env) |
| | self.savedir = savedir |
| | self.k = k |
| | self.epcount = 0 |
| | def reset(self): |
| | if self.epcount % self.k == 0: |
| | self.env.unwrapped.movie_path = self.savedir |
| | else: |
| | self.env.unwrapped.movie_path = None |
| | self.env.unwrapped.movie = None |
| | self.epcount += 1 |
| | return self.env.reset() |
| |
|
| | class AppendTimeout(gym.Wrapper): |
| | def __init__(self, env): |
| | gym.Wrapper.__init__(self, env) |
| | self.action_space = env.action_space |
| | self.timeout_space = gym.spaces.Box(low=np.array([0.0]), high=np.array([1.0]), dtype=np.float32) |
| | self.original_os = env.observation_space |
| | if isinstance(self.original_os, gym.spaces.Dict): |
| | import copy |
| | ordered_dict = copy.deepcopy(self.original_os.spaces) |
| | ordered_dict['value_estimation_timeout'] = self.timeout_space |
| | self.observation_space = gym.spaces.Dict(ordered_dict) |
| | self.dict_mode = True |
| | else: |
| | self.observation_space = gym.spaces.Dict({ |
| | 'original': self.original_os, |
| | 'value_estimation_timeout': self.timeout_space |
| | }) |
| | self.dict_mode = False |
| | self.ac_count = None |
| | while 1: |
| | if not hasattr(env, "_max_episode_steps"): |
| | env = env.env |
| | continue |
| | break |
| | self.timeout = env._max_episode_steps |
| |
|
| | def step(self, ac): |
| | self.ac_count += 1 |
| | ob, rew, done, info = self.env.step(ac) |
| | return self._process(ob), rew, done, info |
| |
|
| | def reset(self): |
| | self.ac_count = 0 |
| | return self._process(self.env.reset()) |
| |
|
| | def _process(self, ob): |
| | fracmissing = 1 - self.ac_count / self.timeout |
| | if self.dict_mode: |
| | ob['value_estimation_timeout'] = fracmissing |
| | else: |
| | return { 'original': ob, 'value_estimation_timeout': fracmissing } |
| |
|
| | class StartDoingRandomActionsWrapper(gym.Wrapper): |
| | """ |
| | Warning: can eat info dicts, not good if you depend on them |
| | """ |
| | def __init__(self, env, max_random_steps, on_startup=True, every_episode=False): |
| | gym.Wrapper.__init__(self, env) |
| | self.on_startup = on_startup |
| | self.every_episode = every_episode |
| | self.random_steps = max_random_steps |
| | self.last_obs = None |
| | if on_startup: |
| | self.some_random_steps() |
| |
|
| | def some_random_steps(self): |
| | self.last_obs = self.env.reset() |
| | n = np.random.randint(self.random_steps) |
| | |
| | for _ in range(n): |
| | self.last_obs, _, done, _ = self.env.step(self.env.action_space.sample()) |
| | if done: self.last_obs = self.env.reset() |
| |
|
| | def reset(self): |
| | return self.last_obs |
| |
|
| | def step(self, a): |
| | self.last_obs, rew, done, info = self.env.step(a) |
| | if done: |
| | self.last_obs = self.env.reset() |
| | if self.every_episode: |
| | self.some_random_steps() |
| | return self.last_obs, rew, done, info |
| |
|
| | def make_retro(*, game, state=None, max_episode_steps=4500, **kwargs): |
| | import retro |
| | if state is None: |
| | state = retro.State.DEFAULT |
| | env = retro.make(game, state, **kwargs) |
| | env = StochasticFrameSkip(env, n=4, stickprob=0.25) |
| | if max_episode_steps is not None: |
| | env = TimeLimit(env, max_episode_steps=max_episode_steps) |
| | return env |
| |
|
| | def wrap_deepmind_retro(env, scale=True, frame_stack=4): |
| | """ |
| | Configure environment for retro games, using config similar to DeepMind-style Atari in wrap_deepmind |
| | """ |
| | env = WarpFrame(env) |
| | env = ClipRewardEnv(env) |
| | if frame_stack > 1: |
| | env = FrameStack(env, frame_stack) |
| | if scale: |
| | env = ScaledFloatFrame(env) |
| | return env |
| |
|
| | class SonicDiscretizer(gym.ActionWrapper): |
| | """ |
| | Wrap a gym-retro environment and make it use discrete |
| | actions for the Sonic game. |
| | """ |
| | def __init__(self, env): |
| | super(SonicDiscretizer, self).__init__(env) |
| | buttons = ["B", "A", "MODE", "START", "UP", "DOWN", "LEFT", "RIGHT", "C", "Y", "X", "Z"] |
| | actions = [['LEFT'], ['RIGHT'], ['LEFT', 'DOWN'], ['RIGHT', 'DOWN'], ['DOWN'], |
| | ['DOWN', 'B'], ['B']] |
| | self._actions = [] |
| | for action in actions: |
| | arr = np.array([False] * 12) |
| | for button in action: |
| | arr[buttons.index(button)] = True |
| | self._actions.append(arr) |
| | self.action_space = gym.spaces.Discrete(len(self._actions)) |
| |
|
| | def action(self, a): |
| | return self._actions[a].copy() |
| |
|
| | class RewardScaler(gym.RewardWrapper): |
| | """ |
| | Bring rewards to a reasonable scale for PPO. |
| | This is incredibly important and effects performance |
| | drastically. |
| | """ |
| | def __init__(self, env, scale=0.01): |
| | super(RewardScaler, self).__init__(env) |
| | self.scale = scale |
| |
|
| | def reward(self, reward): |
| | return reward * self.scale |
| |
|
| | class AllowBacktracking(gym.Wrapper): |
| | """ |
| | Use deltas in max(X) as the reward, rather than deltas |
| | in X. This way, agents are not discouraged too heavily |
| | from exploring backwards if there is no way to advance |
| | head-on in the level. |
| | """ |
| | def __init__(self, env): |
| | super(AllowBacktracking, self).__init__(env) |
| | self._cur_x = 0 |
| | self._max_x = 0 |
| |
|
| | def reset(self, **kwargs): |
| | self._cur_x = 0 |
| | self._max_x = 0 |
| | return self.env.reset(**kwargs) |
| |
|
| | def step(self, action): |
| | obs, rew, done, info = self.env.step(action) |
| | self._cur_x += rew |
| | rew = max(0, self._cur_x - self._max_x) |
| | self._max_x = max(self._max_x, self._cur_x) |
| | return obs, rew, done, info |
| |
|