data.dataloader

async dataloader

AsyncDataLoader

class ding.utils.data.dataloader.AsyncDataLoader(data_source: Union[Callable, dict], batch_size: int, device: str, chunk_size: Optional[int] = None, collate_fn: Optional[Callable] = None, num_workers: int = 0)[source]
Overview:

An asynchronous dataloader.

Interface:

__init__, __iter__, __next__, close

__init__(data_source: Union[Callable, dict], batch_size: int, device: str, chunk_size: Optional[int] = None, collate_fn: Optional[Callable] = None, num_workers: int = 0)None[source]
Overview:

Init dataloader with input parameters. If data_source is dict, data will only be processed in get_data_thread and put into async_train_queue. If data_source is Callable, data will be processed by implementing functions, and can be sorted in two types:

  • num_workers == 0 or 1: Only main worker will process it and put into async_train_queue.

  • num_workers > 1: Main worker will divide a job into several pieces, push every job into job_queue; Then slave workers get jobs and implement; Finally they will push procesed data into async_train_queue.

At the last step, if device contains “cuda”, data in async_train_queue will be transferred to cuda_queue for uer to access.

Arguments:
  • data_source (Union[Callable, dict]): The data source, e.g. function to be implemented(Callable), replay buffer’s real data(dict), etc.

  • batch_size (int): Batch size.

  • device (str): Device.

  • chunk_size (int): The size of a chunked piece in a batch, should exactly divide batch_size, only function when there are more than 1 worker.

  • collate_fn (Callable): The function which is used to collate batch size into each data field.

  • num_workers (int): Number of extra workers. 0 or 1 means only 1 main worker and no extra ones, i.e. Multiprocessing is disabled. More than 1 means multiple workers implemented by multiprocessing are to processs data respectively.

__iter__()Iterable[source]
Overview:

Return the iterable self as an iterator.

Returns:
  • self (Iterable): Self as an iterator.

__next__()Any[source]
Overview:

Return next data in the iterator. If use cuda, get from self.cuda_queue; Otherwise, get from self.async_train_queue.

Returns:
  • data (torch.Tensor): Next data in the dataloader iterator.

close()None[source]
Overview:

Delete this dataloader. First set end_flag to True, which means different processes/threads will clear and close all data queues; Then all processes will be terminated and joined.