How to Use PER(Prioritized Experience Replay)¶
Guideline¶
Prioritized Sample is an important mechanism in some algorithms, for example, Rainbow. It inclues:
Sample according to priority, instead of traditional uniform sample.
Calculate Importance Sampling Weight as each data’s loss weight.
After a train iteration, update priority in buffer
Procedures¶
Open priority mechanism and set corresponding hyper-parameters in config
prioritymust be set toTrue.priority_IS_weightmeans whether to use IS weight to correct the bias. It is recommended to set toTrue, but you can feel free not to use it.alpha,beta,anneal_stepare hyper-parameters in priority mechanism.policy=dict( ..., priority=True, priority_IS_weight=True, ..., other=dict( replay_buffer=dict( ..., # How much priority is used. alpha=0.6, # How much correction is used. beta=0.4, # Beta annealing. Sample step count. anneal_step=0, ) ), )
Use Importance Sampling as loss weight
PrioritizedBufferrwould sample data with probabilities proportional to their priorities. And it would also add a key-value pairISinto data dict.ISis “Importance Sampling Weight”, which is used to correct the biased optimization process caused by prioritized sampling. Each sampled data’s loss will multiply corresponding weight respctively ifpriority_IS_weightisTrue.import torch.nn.functional as F # tensor shape: output (B, ), target (B, ) # not use IS loss = F.mse_loss(output, target) # use IS (recommended) loss = (F.mse_loss(output, target, reduction='none') * data['IS']).mean() # DI-engine td error(data['weight'] = data['IS'], assigned in policy._forward_learn method)
Update priority in buffer
Since priority is a by-product of error calculation, you can directly get new priority in method
policy._forward_learn. Then you can add the key-value pair to the return dict. Make sure that its key is"priority", its value is alistwith length “batch_size”.data_n = q_nstep_td_data( q_value, target_q_value, data['action'], target_q_action, reward, data['done'], data['weight'] ) loss, td_error_per_sample = q_nstep_td_error(data_n, self._gamma, nstep=self._nstep) return { 'total_loss': loss.item(), 'priority': td_error_per_sample.abs().tolist(), }
Others¶
Calculate initial priority in collectors
Usually, priority is initialized when this data is inserted into replay buffer with default value or the maximum history priority value, DI-engine also supports priority calculation and initialization in collector:
Method
policy._forward_collectwill calculate priority as well,and return the key-value pair.Method
policy._process_transitionwill putmodel_output['priority']into returned data, as its initial priority.
def _process_transition(self, obs: Any, model_output: dict, timestep: namedtuple) -> dict: transition = { 'obs': obs, 'next_obs': timestep.obs, 'action': model_output['action'], 'priority': model_output['priority'], # add this one 'reward': timestep.reward, 'done': timestep.done, } return transition
Different exploration strategies
In Ape-X, different collectors can use different exploration strategies(e.g.: different epsilon values for different collectors). Now DI-engine also supports this mechanism. In serial pipeline, you need to implement your own main entry function to control when to change exploration strategies, and override
policy._forward_colleectmethod to receive control arguments and execute the corresponding strategy. In parallel entry, you should set different parameters in commander for different collectors.