Source code for jaclearn.rl.algo.math

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : math.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 02/17/2018
#
# This file is part of Jacinle.
# Distributed under terms of the MIT license.

import scipy
import scipy.signal
import numpy as np


[docs] def discount_cumsum(x, gamma): """Compute the discounted cumulative summation of an 1-d array. From https://github.com/rll/rllab/blob/master/rllab/misc/special.py""" # See https://docs.scipy.org/doc/scipy/reference/tutorial/signal.html#difference-equation-filtering # Here, we have y[t] - discount*y[t+1] = x[t] # or rev(y)[t] - discount*rev(y)[t-1] = rev(x)[t] return scipy.signal.lfilter([1], [1, float(-gamma)], x[::-1], axis=0)[::-1]
[docs] def discount_return(x, discount): """Compute the discounted return summation of an 1-d array. From https://github.com/rll/rllab/blob/master/rllab/misc/special.py""" return np.sum(x * (discount ** np.arange(len(x))))
[docs] def normalize_advantage(adv): return (adv - adv.mean()) / adv.std()
[docs] class ObservationNormalizer(object): _eps = 1e-6 """Normalize the input with a moving average."""
[docs] def __init__(self, filter_mean=True): self.m1 = 0 self.v = 0 self.std = 0 self.n = 0. self.filter_mean = filter_mean import threading self.lock = threading.Lock()
[docs] def __call__(self, o): with self.lock: return self.normalize(o)
[docs] def normalize(self, o): self.m1 = self.m1 * (self.n / (self.n + 1)) + o * 1 / (1 + self.n) self.v = self.v * (self.n / (self.n + 1)) + (o - self.m1) ** 2 * 1 / (1 + self.n) self.std = (self.v + self._eps) ** .5 # std self.n += 1 if self.filter_mean: o1 = (o - self.m1) / self.std else: o1 = o / self.std o1 = (o1 > 10) * 10 + (o1 < -10) * (-10) + (o1 < 10) * (o1 > -10) * o1 return o1
[docs] class LinearValueRegressor(object): _name = 'linear_value_regressor' coeffs = None def _features(self, states, steps): o = states.astype('float32').reshape(states.shape[0], -1) s = steps.reshape(steps.shape[0], -1) / 100. return np.concatenate([o, s ** 2, s, s ** 2, np.ones((states.shape[0], 1))], axis=1)
[docs] def fit(self, states, steps, returns): featmat = self._features(states, steps) n_col = featmat.shape[1] lamb = 2.0 self.coeffs = np.linalg.lstsq(featmat.T.dot(featmat) + lamb * np.identity(n_col), featmat.T.dot(returns))[0]
[docs] def predict(self, states, steps): if self.coeffs is None: return np.zeros(states.shape[0]) return self._features(states, steps).dot(self.coeffs)
[docs] def register_snapshot_parts(self, env): env.add_snapshot_part(self._name, self._dump_params, self._load_params)
def _dump_params(self): return self.coeffs def _load_params(self, coeffs): self.coeffs = coeffs
[docs] def compute_gae(rewards, values, next_val, gamma, lambda_): assert len(rewards) == len(values) size = len(rewards) adv_batch = np.empty((size, ), dtype='float32') td_i = rewards[size - 1] + gamma * next_val - values[size - 1] adv_batch[size - 1] = td_i for i in range(size - 2, -1, -1): td_i = rewards[i] + gamma * values[i+1] - values[i] + gamma * lambda_ * td_i adv_batch[i] = td_i return adv_batch