Reinforcement Learning

Bandits

#rl#bandits

The Bandit Problem

All visualizations are motivated by the Sutton + Barto book. I found it a good exercise to reproduce them.

These are the simplest types of problems in RL, where there is no state involved. Instead, there are kk actions available with different reward distributions, yet there is no consequence on what state is sampled next.

Fun fact: the name "kk-armed bandit" is an allusion to the slot machine, AKA the one-armed bandit that steals your money.

Formulation

The agent is provided with a set of kk actions. Each round consists of selecting any action ii to get reward Rtpi,t.R_t \sim p_{i, t}. The goal is to maximize the reward over multiple rounds of the game. In the simplest form of the game, the reward distribution pi,t=pip_{i,t} = p_{i} is stationary. Otherwise, we call it unstationary. We'll mostly concern ourselves with the stationary case.

Let's assume pi=N(q(i),1),p_{i} = \mathcal{N}(q_*(i), 1), where q(i)N(0,1).q_*(i) \sim \mathcal{N}(0, 1). We call qq_* the true action-value function, as it describes the true "value" of selecting each action. Clearly, one way to do well at this game is to try and estimate qq_* through interactions with the bandits. Then we can exploit what we know by consistently selecting the one with the highest value. So there are two challenges:

  1. How do we estimate qq_*?
  2. What strategy should we follow to explore different ii?

This foreshadows a key trade-off in RL: how to balance explotation vs. exploration.

Estimating qq_*

Say there is only one bandit, k=1,k=1, and we select it a bunch of times to get the sequence {(A1,R1),(A2,R2),,(An,Rn)}.\{ (A_1, R_1), (A_2, R_2), \dots, (A_n, R_n)\}. The natural way to estimate q(1)q_*(1) is just with the empirical average, Qn+1(1)=1ni=1nRi.Q_{n+1}(1) = \frac{1}{n} \sum_{i=1}^n R_i. The subscript n+1n+1 denotes the estimate QQ of qq_* at iteration n+1.n+1. This clearly has good convergence properties. But, people in RL actually prefer to write this in a different way, namely: Qn+1(1)=Qn+1n(RnQn).Q_{n+1}(1) = Q_n + \frac{1}{n}(R_n - Q_n).

This is pretty clear from induction, e.g.:

Qn+1(1)=n1nQn+1nRn=1n(i=1n1Ri+Rn)=1ni=1nRi.\begin{align} Q_{n+1}(1) &= \frac{n-1}{n} Q_n + \frac{1}{n} R_n \\ &= \frac{1}{n} \left(\sum_{i=1}^{n-1} R_i + R_n\right) \\ &= \frac{1}{n} \sum_{i=1}^n R_i. \end{align}

More generally, we can refer to the 1n\frac{1}{n} as the stepsize α\alpha, where Qn+1(1)=Qn+α(RnQn).Q_{n+1}(1) = Q_n + \alpha(R_n - Q_n). There are some convergence properties on α\alpha for stationary rewards. Namely, we need iαi=\sum_i \alpha_i = \infty and iαi2<,\sum_i \alpha_i^2 < \infty, but this does not tell us to what it converges in the general case.

Simulating the Problem

import numpy as np
class Bandit:
    def __init__(self, k, shift_qstar=0):
        self.q_star = np.random.randn(k) + shift_qstar

        self.k = k
        
    def step(self, action):
        assert action < self.k, "action not in valid range"
        
        reward = self.q_star[action] + np.random.randn()
        return reward

prob = Bandit(10)
prob.q_star.shape
(10,)

Exploration Methods

We can start to talk about the most basic types of policies now.

ϵ\epsilon-greedy

ϵ\epsilon-greedy means that with probability 1ϵ1-\epsilon, we go with action aargmaxi[k]Q(i).a \in \arg\max_{i\in[k]} Q(i). Otherwise, with probability ϵ,\epsilon, we select a random one. Let's try simulating different values of ϵ\epsilon and Q1.Q_1.

import numpy as np
class EpsilonGreedy:
    def __init__(self, initial_q, epsilon, k):
        self.initial_q = initial_q
        self.epsilon = epsilon

        self.k = k
        self.Q = np.ones(k) * initial_q
        self.N = np.ones(k)

    def sample_act(self, t=None):
        if np.random.random() < self.epsilon:
            return np.random.choice(range(self.k))

        return np.argmax(self.Q)
        
    def update_q(self, r_t, a_t):
        self.Q[a_t] += 1/self.N[a_t] * (r_t - self.Q[a_t])
        self.N[a_t] += 1
class Runner:
    def __init__(self, problem, algo):
        self.problem = problem
        self.algo = algo
    def run(self, n_steps):
        optimal_action = np.argmax(self.problem.q_star)

        optimal = []
        prev = 0
        
        for i in range(n_steps):
            action = self.algo.sample_act(i+1)
            r = self.problem.step(action)
            self.algo.update_q(r, action)
            
            optimal.append(prev + (action == optimal_action))
            prev = optimal[-1]

        pct_optimal = np.array(optimal) / (np.arange(n_steps) + 1)

        return pct_optimal
Plotting Utils
import matplotlib
matplotlib.rc_file("./matplotlibrc")
from matplotlib import pyplot as plt

def plot_multiple(arrays, labels):
    colors = plt.get_cmap("Set1").colors

    for i, (arr, label) in enumerate(zip(arrays, labels)):
        arr = np.array(arr) * 100 
        steps = np.arange(len(arr))
        plt.plot(steps, arr, label=label, color=colors[i])

    plt.xlabel("Step Count")
    plt.ylabel("Optimal Action (%)")

    plt.yticks(np.arange(0, 101, 20))

    max_len = max(len(a) for a in arrays)
    xticks = np.linspace(0, max_len, 6, dtype=int)
    plt.xticks(xticks)

    plt.legend(loc="upper center")
    plt.tight_layout()
    plt.show()
Experiment Setup
def run_experiment(k=10, n_steps=1000, n_problems=2000):
    avg_res_1 = np.zeros(n_steps)
    avg_res_2 = np.zeros(n_steps)
    avg_res_3 = np.zeros(n_steps)

    for _ in range(n_problems):
        problem = Bandit(k)

        config_1 = EpsilonGreedy(0, 0.01, k)      # optimistic, greedy
        config_2 = EpsilonGreedy(0, 0.1, k)    # eps-greedy
        config_3 = EpsilonGreedy(0, 0, k)    # eps-greedy

        res_1 = Runner(problem, config_1).run(n_steps)
        res_2 = Runner(problem, config_2).run(n_steps)
        res_3 = Runner(problem, config_3).run(n_steps)

        avg_res_1 += res_1
        avg_res_2 += res_2
        avg_res_3 += res_3

    avg_res_1 /= n_problems
    avg_res_2 /= n_problems
    avg_res_3 /= n_problems

    return avg_res_1, avg_res_2, avg_res_3
res_1, res_2, res_3 = run_experiment()
plot_multiple([res_1, res_2, res_3], ["ε=0.01", "ε=0.1", "ε=0"])
Output

Upper Confidence Bound

In ϵ\epsilon-greedy, during exploration we select the action completely by random. Upper Confidence Bound (UCB) proposes a slight compromise, where you factor in the number of times aa has been selected before. Intuitively this number should be inversely related to how much you want to explore it. The policy is given by

Atargmaxa{Qt(a)+clogtNt(a)},\begin{align} A_t \in \arg\max_a \left\{Q_t(a) + c\sqrt{\frac{\log t}{N_t(a)}} \right\}, \end{align}

where Nt(a)N_t(a) is the count for how many times aa has been selected by time t.t. Let's compare it to ϵ\epsilon-greedy.

class UCB(EpsilonGreedy):
    def __init__(self, c, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.c = c
        self.counts = np.zeros(self.k)

    def sample_act(self, t):
        unexplored = np.where(self.counts == 0)
        if unexplored[0].shape[0] > 0:
            return unexplored[0][0]
        act = np.argmax(self.Q + self.c * np.sqrt(np.log(t) / self.counts))
        return act

    def update_q(self, r_t, a_t):
        super().update_q(r_t, a_t)

        self.counts[a_t] += 1
        
Experiment Setup
def run_experiment(k=10, n_steps=1000, n_problems=2000):
    avg_res_1 = np.zeros(n_steps)
    avg_res_2 = np.zeros(n_steps)

    for _ in range(n_problems):
        problem = Bandit(k)

        config_1 = UCB(2, 0, None, k)
        config_2 = EpsilonGreedy(0, 0.1, k)

        res_1 = Runner(problem, config_1).run(n_steps)
        res_2 = Runner(problem, config_2).run(n_steps)

        avg_res_1 += res_1
        avg_res_2 += res_2

    avg_res_1 /= n_problems
    avg_res_2 /= n_problems

    return avg_res_1, avg_res_2
res_1, res_2 = run_experiment()
plot_multiple([res_1, res_2], ["UCB w/ c=2", "ε=0.1"])
Output

Gradient Bandits

Observe that, if we start with some ground truth qq_*, and we form q=q+C,q_*' = q_* + C, the behavior of ϵ\epsilon-greedy and UCB will change, since we must make some choice for Q1Q_1 and c.c. But, all that really matters when selecting an optimal action is the relative order between two actions. So, we can instead enode some notion of "energy" for each action that represents relative preference. More concretely, let H(a)H(a) denote the energy of action aa. We can initialize H=0,H = 0, and follow the model

π(a)=expH(a)iexpH(i).\begin{align} \pi(a) = \frac{\exp^{H(a)}}{\sum_i \exp^{H(i)}}. \end{align}

Given (action, reward) At,Rt,A_t, R_t, we'd update with

Ht+1(a)=Ht(At)+α(RtRtˉ)(1(a=At)πt(a)).\begin{align} H_{t+1}(a) = H_t(A_t) + \alpha(R_t - \bar{R_t})(\mathbf{1}(a=A_t)-\pi_t(a)). \end{align}

The intuition is that this is the form of a single-sample approximation to the update Ht+1(a)Ht+Ht(a)E[Rt].H_{t+1}(a) \leftarrow H_{t} + \nabla_{H_t(a)} \mathbb{E}[R_t]. The choice of Rtˉ\bar{R_t} is arbitrary.

import torch.nn.functional as F
import torch

class GradientBandit:
    def __init__(self, alpha, k, use_baseline):
        self.alpha = alpha
        self.k = k
        self.H = torch.zeros(k)
        self.r_bar = np.zeros(k)
        self.counts = np.ones(k)
        self.use_baseline = use_baseline

    def sample_act(self, t):
        dist = F.softmax(self.H, dim=0).numpy()
        return np.random.choice(self.k, p=dist)
        
    def update_q(self, r_t, a_t):
        dist = F.softmax(self.H, dim=0).numpy()

        self.r_bar[a_t] += 1/self.counts[a_t] * (r_t - self.r_bar[a_t])
        self.counts[a_t] += 1

        if self.use_baseline:
            baseline = self.r_bar
        else:
            baseline = 0

        self.H += self.alpha * (r_t - baseline) * ((np.arange(self.k) == a_t).astype(float) - dist)        
Experiment Setup
from joblib import Parallel, delayed
from tqdm import tqdm
import numpy as np

def one_experiment(k, n_steps):
    problem = Bandit(k, shift_qstar=4)

    config_1 = GradientBandit(0.1, k, True)
    config_2 = GradientBandit(0.1, k, False)

    res_1 = Runner(problem, config_1).run(n_steps)
    res_2 = Runner(problem, config_2).run(n_steps)

    return res_1, res_2

def run_experiment(k=10, n_steps=1000, n_problems=2000, n_jobs=-1):
    results = Parallel(n_jobs=n_jobs)(
        delayed(one_experiment)(k, n_steps)
        for _ in tqdm(range(n_problems))
    )

    res_1 = np.mean([r[0] for r in results], axis=0)
    res_2 = np.mean([r[1] for r in results], axis=0)
    return res_1, res_2
res_1, res_2 = run_experiment()
plot_multiple([res_1, res_2], ["ε=0.1 + baseline", "ε=0.1 + no baseline"])
100%|███████████████| 2000/2000 [00:12<00:00, 162.07it/s]
Output

Things will become more interesting next when we start considering states as well.