Blog » MLOps » Logging in Reinforcement Learning Frameworks – What You Need to Know

Logging in Reinforcement Learning Frameworks – What You Need to Know

Logging is often a significant issue, as frameworks have different approaches to it. In this article I’ll show you how logging is implemented in popular reinforcement learning frameworks.

In my previous article, I discussed how to make sense of reinforcement learning agents, as well as what and why you should log during training and debugging.

In this tutorial, I will show you how logging works in different frameworks, and how to easily add your own loggers to them (I will use Neptune as an example). Frameworks include Acme, RLlib, SpinningUp, and Stable baselines

I created a GitHub repository with all supplementary materials for this tutorial here.


SEE RELATED ARTICLE
A Complete Guide to Monitoring ML Experiments Live in Neptune


Approaches to logging in Reinforcement Learning  

There are two most common approaches to logging that I call “Store then log” and “Write now” (pun very much intended 🙂). 

Store then log

In this approach, you first store the metrics while running, and after a specified interval, like every 10 000 timesteps, you calculate the aggregate statistics on these metrics and log them

In principle, this means that you can store, for example, episode return across many episodes, and aggregate the results to get the average episode return across stored values. 

This is super useful in stochastic environments, or when your policy is stochastic and you can get a different episode return each time you run the agent. If that’s the case, you have to consider averages to reduce noise in the logs.

This method of logging can be found in the Spinning Up and Stable baselines frameworks.

Spinning Up

In Spinning Up it’s easier to see the “Store then log” approach because the code is very straightforward and so we will use it to depict the approach and then I’ll discuss how to add such a custom logger to both mentioned frameworks.

“Store then log” in the example

Below is the Spinning Up (Epoch)Logger API. I’ve pruned the not so important code from it.

class Logger:
    """
    A Logger tailored for tracking average values over epochs.

    Typical use case: there is some quantity which is calculated many times
    throughout an epoch, and at the end of the epoch, you would like to 
    report the average / std / min / max value of that quantity.
    """


    [...]

    def store(self, **kwargs):

        """
        Save something into the epoch_logger's current state.

        Provide an arbitrary number of keyword arguments with numerical 
        values.
        """

        [...]


    def log_tabular(self, key, val=None, with_min_and_max=False, average_only=False):
        """
        Log a value or possibly the mean/std/min/max values of a diagnostic.

        Args:
            key (string): The name of the diagnostic. If you are logging a
                diagnostic whose state has previously been saved with 
                ``store``, the key here has to match the key you used there.
            val: A value for the diagnostic. If you have previously saved
                values for this key via ``store``, do *not* provide a ``val``
                here.
            with_min_and_max (bool): If true, log min and max values of the 
                diagnostic over the epoch.
            average_only (bool): If true, do not log the standard deviation
                of the diagnostic over the epoch.
        """

        [...]

    def dump_tabular(self):
        """
        Write all of the diagnostics from the current iteration.

        Writes both to stdout, and to the output file.
        """

        [...]

As you can see, the logger has three main methods: store, log_tabular, and dump_tabular.

  • store is called each time the quantity to aggregate and log is calculated. It simply stores the value, like episode return or some learning metrics, for future calculations.
  • log_tabular is called at the end of the epoch to log the statistics of the stored metrics. You can choose to log average and standard deviation (argument average_only=False), or average only (argument average_only=True), or additionally log minimum and maximum values too (argument with_min_and_max=True).
  • dump_tabular finalizes operations and prints the logs in the command line. This is the place to add your custom destination for the calculated statistics e.g. send them to Neptune.

Now let’s look at the code example that uses the “Store then log” approach and above EpochLogger in the Spinning Up VPG implementation (again, slightly modified and pruned for clarity).

def update():
    ...some policy optimization code...
    logger.store(LossPi=pi_l_old, LossV=v_l_old)

# Main loop: collect experience in env and update/log each epoch
for epoch in range(epochs):
    # Gather experience
    for t in range(steps_per_epoch):
        a, ... = ...get action...

        next_o, r, d, _ = env.step(a)
        ep_ret += r
        ep_len += 1

        ...

        if d is True:
            # save EpRet / EpLen if trajectory finished
            logger.store(EpRet=ep_ret, EpLen=ep_len)
            ...

    # Perform VPG update!
    update()

    # Log info about epoch
    logger.log_tabular('Epoch', epoch)
    logger.log_tabular('EpRet', with_min_and_max=True)
    logger.log_tabular('EpLen', average_only=True)
    logger.log_tabular('LossPi', average_only=False)
    logger.log_tabular('LossV', average_only=False)
    logger.dump_tabular()

Customizing the logger

The Spinning Up logger lives in the logx.py file and unfortunately, there is no easy way to override it to send the logs to Neptune. Spinning Up in its design to be very straightforward is very hard-coded which impedes the ability to extend it without modifying the original code. That being said, I modified the code for you and you can get it from here. In the link, you’ll also find an example run script.

To use the logger, replace the logx.py file in your Spinning Up code (cloned during installation) with the provided implementation above. Then you can run any existing agent in SpinUp like this:

import gym
import neptune
import tensorflow as tf

from spinup import ppo_tf1 as ppo

env_fn = lambda : gym.make('LunarLander-v2')

ac_kwargs = dict(hidden_sizes=[64,64], activation=tf.nn.relu)

neptune.init(project_qualified_name='<namespace/project_name>')
experiment = neptune.create_experiment()

logger_kwargs = dict(output_dir='path/to/output_dir',
                     exp_name='experiment_name',
                     neptune_experiment=experiment)

ppo(env_fn=env_fn, ac_kwargs=ac_kwargs, steps_per_epoch=5000, epochs=250, logger_kwargs=logger_kwargs)

See more information on the Running Experiments part of the Spinning Up website.

Stable baselines

Stable Baselines is a more advanced framework that is very similar in nature to Spinning Up when it comes to the logging approach, however, in contrast to Spinning Up, it is extendable. Here you’ll find the Neptune logger code and the running example I prepared for you. You can adapt them to your needs.

Let’s now look at how to implement the custom logger output to the Neptune experiment in Stable Baselines.

# logger.py
from stable_baselines import logger


class NeptuneLogger(logger.KVWriter):
    """Stable Baselines Neptune logger."""

    def __init__(self, experiment):
        super().__init__()
        self._experiment = experiment

    def writekvs(self, kvs):
        for k, v in sorted(kvs.items()):
            if hasattr(v, 'dtype'):
                v = float(v)
            self._experiment.log_metric(k, v)

Here you can see how to use our logger. We first import it, then create the Neptune experiment and append our logger to the default logger outputs.

# run_ppo.py
import neptune
import stable_baselines  # This is required!
from stable_baselines import logger

from logger import NeptuneLogger

neptune.init(project_qualified_name='<namespace/project_name>')
experiment = neptune.create_experiment()

logger_ = logger.Logger.CURRENT
logger_.output_formats.append(NeptuneLogger(experiment))

...further training code...

You can find more on training Stable Baselines agents here.

Write now

This is a conceptually simpler approach as you don’t need to store anything. You simply log values as they become available, for example after every episode. You log, or write the values “right now”, hence the name “Write now”. It’s used by Acme and RLlib.

This simple method has its downsides. The main one is high noise in logged values. We can recall here the problem with stochastic environment/policy, but also training statistics, as a loss, suffer from high variance.

It would be a nightmare to look at the printed values in the command line for each episode. But it’s not that big of an issue as long as we use experiment tracking tools like Neptune. 

Neptune will take all these fine-grained values, average them, and present them on interpretable charts together with max and min values (represented as shaded areas as in Figure 1).

RL visualization Neptune
Figure 1. DDPG’s critic loss on the Y-axis and timesteps on the X-axis. The agent was trained on the Humanoid MuJoCo environment.

Filters

If you still want to manually fix problems with high variance in the logs, you can use filters. 

Filters are wrappers on loggers, which apply some transformations on the values before logging them

For Acme, I prepared two of them for you. One for value smoothing (see this series of lectures on exponentially weighted moving averages, or EMA, from Andrew Ng) and one for calculating aggregate statistics like in “Store then log”.

All of this is best described by an example, so let’s jump into Acme.

Acme

As before, I’ll use the easier framework from the two to present the approach. Then we will talk about the implementation of custom loggers and filters.

“Write now” in the example

Here is the “Write now” approach taken from the Acme EnvironmentLoop class (slightly modified for clarity).

def run_episode():
    ...reset any counts and start the environment...
    # Run an episode.
    while not timestep.last():
        # Generate an action from the agent's policy and
        # step the environment.
        action = actor.select_action(timestep.observation)
        timestep = environment.step(action)
        episode_return += timestep.reward

        ...

        if should_update:
            actor.update()

    # Book-keeping.
    episode_steps += 1
    # Collect the results.
    steps_per_second = episode_steps / (time.time() - start_time)
    result = {
        'episode_length': episode_steps,
        'episode_return': episode_return,
        'steps_per_second': steps_per_second,
    }
    logger.write(result)

As you can see, the logger is a simple object. After each episode, you write the results to it. A result is a dictionary with keys that are log names and values that are the metrics. Nothing complicated.

Customizing the logger

Let’s see the custom Neptune logger code and the filters I prepared for you. The logger and the filters are available here alongside the example run script.

Logger

# logger.py
import collections
import re

import acme.utils.loggers as acme_loggers
import acme.utils.tree_utils as acme_tree
import numpy as np


class NeptuneLogger(acme_loggers.base.Logger):
    """Neptune logger for Acme.
    Args:
        prefix (string): The string used to prefix data keys in a name of a log.
          Can be None in which case no prefix is used.
        index_name (string): The data key which value to use as a log index.
          Can be None in which case no index is used.
    """

    def __init__(self, experiment, prefix=None, index_name=None):
        super()
        self._experiment = experiment
        self._prefix = prefix
        self._index_name = index_name or ''

    def write(self, values):
        """Send `values` to Neptune."""
        values = acme_loggers.to_numpy(values)
        index = values.pop(self._index_name, None)
        for key, value in values.items():
            prefixed_key = f'{self._prefix}/{key}' if self._prefix else key
            if index:
                self._experiment.log_metric(prefixed_key, index, value)
            else:
                self._experiment.log_metric(prefixed_key, value)

It’s very simple, you have to implement only one method write as shown for the Neptune logger.

Now let’s jump to the filters.

Smoothing filter

# logger.py
class SmoothingFilter(acme_loggers.base.Logger):
    """Logger which writes to another logger, smoothing matching data.
    Args:
        to (Logger): An object to which the current object will forward the
          original data and its results when `write` is called.
        smoothing_regex (string): A regex of data keys which should be smoothed.
        smoothing_coeff (float): A desired smoothing strength between 0 and 1.
    Note:
        For example values of regex = 'return' and coeff = 0.99 will calculate
        the running average of all data which contain 'return' in their key.
        It's calculated according to: average = 0.99 * average + 0.01 * value.
        Warm-up period of length 10 is also applied (see the comment in code).
    """

    def __init__(self, to, smoothing_regex, smoothing_coeff):
        super()
        self._to = to
        self._smoothing_regex = smoothing_regex
        self._smoothing_coeff = smoothing_coeff

        self._previous_values = collections.defaultdict(float)
        self._smoothing_coeffs = collections.defaultdict(float)

    def write(self, values):
        """Smooths matching data and forwards it with the original data."""
        values_ = dict(values)

        for key, value in values.items():
            if re.search(self._smoothing_regex, key) is not None:
                smoothed_key = f'{key}_smoothed_{self._smoothing_coeff}'
                prev_value = self._previous_values[smoothed_key]
                prev_smoothing_coeff = self._smoothing_coeffs[smoothed_key]

                # This implements warm-up period of length "10". That is
                # the smoothing coefficient start with 0 and is annealed to
                # the desired value.
                # This results in better estimates of smoothed value at the
                # beginning, which might be useful for short experiments.
                new_smoothing_coeff = (prev_smoothing_coeff * 0.9 +
                                       self._smoothing_coeff * 0.1)
                smoothed_value = (value * (1 - prev_smoothing_coeff) +
                                  prev_value * prev_smoothing_coeff)

                self._previous_values[smoothed_key] = smoothed_value
                self._smoothing_coeffs[smoothed_key] = new_smoothing_coeff
                values_[smoothed_key] = smoothed_value

        self._to.write(values_)

You pass a regex to the smoothing filter and it will smooth the values of logs which keys match the regexp. You can also control the smoothing strength between 0 and 1 with the smoothing_coeff argument.

For example values of regex = ‘return’ and coeff = 0.99 will calculate the running average of all data which contain ‘return’ in their keys. It’s calculated according to: average = 0.99 * average + 0.01 * value. A warm-up period of length 10 is also applied (see the comment in code). It helps with inaccurate early values described in the linked lectures on EMA.

Note that one regex can easily match multiple different metric names with the “or” operator (read more here).

Aggregate filter

# logger.py
class AggregateFilter(acme_loggers.base.Logger):
    """Logger which writes to another logger, aggregating matching data.
    Args:
        to (Logger): An object to which the current object will forward the
          aggregated data when `dump` is called.
        aggregate_regex (string): A regex of data keys which should be
          aggregated.
    Note:
        For not matched keys the last value will be forwarded.
    """

    def __init__(self, to, aggregate_regex):
        super()
        self._to = to
        self._aggregate_regex = aggregate_regex

        self._cache = []

    def write(self, values):
        self._cache.append(values)

    def dump(self):
        """Calculates statistics and forwards them to the target logger."""
        results = {}

        stacked_cache = acme_tree.stack_sequence_fields(self._cache)
        for key, values in stacked_cache.items():
            if re.search(self._aggregate_regex, key) is not None:
                results.update({
                    f'{key}_mean': np.mean(values),
                    f'{key}_std': np.std(values),
                    f'{key}_median': np.median(values),
                    f'{key}_max': np.max(values),
                    f'{key}_min': np.min(values),
                })
            else:
                results[key] = values[-1]

        self._to.write(results)
        self._cache.clear()

This one pretty much emulates the behavior of the “Store then log“ approach. The method write doesn’t log the values right away but stores them. Then you have one more method dump that will calculate aggregate statistics like average, std. dev., min, and max on the stored data, and write them to the logger that the filter wraps.

See this and the previous filters’ code for details. Moreover, here you will find a full example of how to run the Acme training and evaluation with our custom loggers and filters.

How to do it all together?

You use filters by wrapping the loggers. See the excerpt below.

# run_d4pg.py
import acme.utils.loggers as acme_loggers
import logger as neptune_loggers

def make_logger(experiment,
                prefix=None,
                time_delta=1.0,
                aggregate_regex=None,
                smoothing_regex=None,
                smoothing_coeff=0.99):
    """Creates an aggregate of Neptune and Terminal loggers with some filters.

    Args:
        experiment (NeptuneExperiment): Neptune experiment to log to.
        prefix (string): The logger name (used also as NeptuneLogger prefix).
        time_delta (float): Time (in seconds) between logging events.
        aggregate_regex (string): A regex of data keys which should be
          aggregated. If None, then no aggregation.
        smoothing_regex (string): A regex of data keys which should be smoothed.
          If None, then no smoothing.
        smoothing_coeff (float between 0 and 1): A desired smoothing strength.
    """
    neptune_logger = neptune_loggers.NeptuneLogger(
        experiment, prefix, index_name='epoch')
    terminal_logger = acme_loggers.terminal.TerminalLogger(prefix)
    logger = acme_loggers.aggregators.Dispatcher(
        [neptune_logger, terminal_logger])

    if smoothing_regex:
        logger = neptune_loggers.SmoothingFilter(
            logger, smoothing_regex, smoothing_coeff)

    logger = acme_loggers.filters.NoneFilter(logger)
    logger = acme_loggers.filters.TimeFilter(logger, time_delta)

    if aggregate_regex:
        logger = neptune_loggers.AggregateFilter(
            logger, aggregate_regex)

    return logger

Then, you use this method to create the logger for the agent, the training loop, and the evaluation loop. Below I include the code snippet for that, the full running example is here.

# run_d4pg.py
[...]

  # Construct the agent.
  agent = d4pg.D4PG(
      environment_spec=environment_spec,
      policy_network=agent_networks['policy'],
      critic_network=agent_networks['critic'],
      observation_network=agent_networks['observation'],
      sigma=1.0,  # pytype: disable=wrong-arg-types
      logger=make_logger(experiment, prefix='learner'),
  )

  # Create the environment loop used for training.
  train_loop = acme.EnvironmentLoop(
      environment,
      agent,
      label='train_loop',
      logger=make_logger(experiment,
                         prefix='train',
                         smoothing_regex='return')
  )

[...]

  # Create the evaluation actor and loop.
  eval_actor = actors.FeedForwardActor(policy_network=eval_policy)
  eval_env = make_environment()
  eval_logger = make_logger(experiment,
                            prefix='eval',
                            aggregate_regex='return')
  eval_loop = acme.EnvironmentLoop(
      eval_env,
      eval_actor,
      label='eval_loop',
      logger=eval_logger,
  )

[...]

You’ll find more information about running Acme in the repo with examples, here.

RLlib

RLlib, like in the case of Stable Baselines, is a more advanced framework that uses the same logging strategy as Acme. I provide you with the code that allows you to add the Neptune logging to RLlib. Here you’ll find the logger code and the example run script. Let’s see the logger first.

# logger.py
import neptune
import numpy as np
from ray import tune


class NeptuneLogger(tune.logger.Logger):
    """RLlib Neptune logger.
    Example usage:
    ```
    import ray
    from ray import tune
    ray.init()
    tune.run(
        "PPO",
        stop={"episode_reward_mean": 200},
        config={
            "env": "CartPole-v0",
            "num_gpus": 0,
            "num_workers": 1,
            "lr": tune.grid_search([0.01, 0.001, 0.0001]),
            "logger_config": {"neptune_project_name": '<user name>/sandbox'},
        },
        loggers=tune.logger.DEFAULT_LOGGERS + (NeptuneLogger,),
    )
    ```
    """

    def _init(self):
        logger_config = self.config.get('logger_config')
        neptune.init(logger_config.get('neptune_project_name'))
        self.neptune_experiment = neptune.create_experiment(
            name=str(self.trial),  # Gets the trial name.
            params=self.config,
        )

    @staticmethod
    def dict_multiple_get(dict_, indices):
        """Access the nested value."""
        if not isinstance(indices, list):
            indices = [indices]

        value = dict_
        index = None
        for index in indices:
            try:
                value = value[index]
            except KeyError:
                print("Skipping", indices)
                return {}

        if isinstance(value, dict):
            return value
        else:
            return {index: value}

    def on_result(self, result):
        list_to_traverse = [
            [],
            ['custom_metrics'],
            ['evaluation'],
            ['info', 'num_steps_trained'],
            ['info', 'learner'],
            ['info', 'exploration_infos', 0],
            ['info', 'exploration_infos', 1],
            ['info', 'learner', "default_policy"]
        ]

        for indices in list_to_traverse:
            res_ = self.dict_multiple_get(result, indices)
            prefix = '/'.join([str(idx) for idx in indices])
            for key, value in res_.items():
                prefixed_key = '/'.join([prefix, key])
                if isinstance(value, float) or isinstance(value, int):
                    self.neptune_experiment.log_metric(
                        prefixed_key, value)
                elif (isinstance(value, np.ndarray) or
                      isinstance(value, np.number)):
                    self.neptune_experiment.log_metric(
                        prefixed_key, float(value))
                # Otherwise ignore

    def close(self):
        neptune.stop()

The example Neptune logger above might seem scary, but it really isn’t. All it does is accessing the logs listed in list_to_traverse from the nested result dictionary. Along the way, it adds the prefix to each value that is built from the keys in the original nested data structure. Then it takes care of the values data type and voilà! Feel free to copy this code and adapt it to your logger requirements.

Below you can see how to add your custom logger to the RLlib training.

# run_ppo_grid.py
import ray
from ray import tune

from logger import NeptuneLogger

ray.init()
tune.run(
    "PPO",
    stop={"episode_reward_mean": 200},
    config={
        "env": "CartPole-v0",
        "num_gpus": 0,
        "num_workers": 1,
        "lr": tune.grid_search([0.01, 0.001, 0.0001]),
        "logger_config": {"neptune_project_name": "<namespace/project_name>"},
    },
    loggers=tune.logger.DEFAULT_LOGGERS + (NeptuneLogger,),
)

Conclusion

There you have it, two approaches to logging and four frameworks. Here you will find all the code with the example run scripts. 

Have fun playing with them, and I wish you luck with your RL experiments!


READ NEXT

How to Structure, Organize, Track and Manage Reinforcement Learning (RL) Projects

7 mins read | Vladimir Lyashenko | Posted December 23, 2020

Structuring and managing machine learning projects can be a tricky thing.

When you dive into a project, you may quickly realize that you’re drowning in an ocean of Python scripts, data, algorithms, functions, updates, and so on. At some point, you just lose track of your experiments, and can’t even say which script or update led to the best result.

So, structuring your project and keeping track of experiments is a crucial part of success.

From this point of view, working on an ML project might be challenging in general, but some fields are more complicated than others. Reinforcement Learning (RL) is one of the complicated ones.

This article is dedicated to structuring and managing RL projects. I’ll try to be as precise as possible and provide a comprehensive step-by-step guide and some useful tips.

We’ll cover:

  • General tips – project directory structure, Cookiecutter, keeping track of experiments using Neptune, proper evaluation
  • Defining a problem as an RL problem – Reinforcement Learning, Supervised Learning, optimization problem, maximization and minimization
  • Picking an RL environment – OpenAI Gym
  • Picking an RL library and algorithm – RL_Coach, Tensorforce, Stable Baselines, RL_Coach guidelines
  • Testing the performance of the agent
  • Preparing for publishing – README, requirements, readable code, visualizations

Let’s jump in.

Continue reading ->
RL examples

10 Real-Life Applications of Reinforcement Learning

Read more
RL agents

How to Make Sense of the Reinforcement Learning Agents? What and Why I Log During Training and Debug

Read more
RL tutorials

Best Reinforcement Learning Tutorials, Examples, Projects, and Courses

Read more
RL projects structure

How to Structure, Organize, Track and Manage Reinforcement Learning (RL) Projects

Read more