Class StreamingDataLoaderBase<T, TInput, TOutput>
Abstract base class for streaming data loaders that process data on-demand.
public abstract class StreamingDataLoaderBase<T, TInput, TOutput> : DataLoaderBase<T>, IStreamingDataLoader<T, TInput, TOutput>, IDataLoader<T>, IResettable, ICountable
Type Parameters
TThe numeric type used for calculations, typically float or double.
TInputThe input data type for each sample.
TOutputThe output/label data type for each sample.
- Inheritance
-
StreamingDataLoaderBase<T, TInput, TOutput>
- Implements
-
IStreamingDataLoader<T, TInput, TOutput>IDataLoader<T>
- Derived
- Inherited Members
- Extension Methods
Remarks
StreamingDataLoaderBase provides the foundation for data loaders that read data on-demand rather than loading everything into memory. This is essential for: - Large datasets that don't fit in RAM - Real-time data streams - Memory-efficient training pipelines
For Beginners: When working with huge datasets (millions of images, terabytes of text), you can't load everything into memory at once. This base class handles the complexity of streaming data efficiently while you focus on implementing the actual data reading logic.
Constructors
StreamingDataLoaderBase(int, int, int)
Initializes a new instance of the StreamingDataLoaderBase class.
protected StreamingDataLoaderBase(int batchSize, int prefetchCount = 2, int numWorkers = 4)
Parameters
batchSizeintNumber of samples per batch.
prefetchCountintNumber of batches to prefetch for improved throughput. Default is 2.
numWorkersintNumber of parallel workers for sample loading. Default is 4.
Exceptions
- ArgumentOutOfRangeException
Thrown when batchSize is not positive.
Properties
NumWorkers
Gets the number of parallel workers for sample loading.
public int NumWorkers { get; }
Property Value
PrefetchCount
Gets the number of batches to prefetch for improved throughput.
public int PrefetchCount { get; }
Property Value
SampleCount
Gets the total number of samples in the dataset.
public abstract int SampleCount { get; }
Property Value
Remarks
This may be known upfront (e.g., from file metadata) or estimated. For truly streaming sources where the count is unknown, this may return -1.
TotalCount
Gets the total number of samples in the dataset.
public override int TotalCount { get; }
Property Value
Methods
AggregateSamples(IList<(TInput Input, TOutput Output)>)
Aggregates multiple samples into a batch.
protected virtual (TInput[] Inputs, TOutput[] Outputs) AggregateSamples(IList<(TInput Input, TOutput Output)> samples)
Parameters
Returns
Remarks
Override this method if you need custom batching logic (e.g., padding sequences to the same length, or stacking tensors along a new dimension).
GetBatches(bool, bool, int?)
Iterates through the dataset in batches synchronously.
public virtual IEnumerable<(TInput[] Inputs, TOutput[] Outputs)> GetBatches(bool shuffle = true, bool dropLast = false, int? seed = null)
Parameters
shuffleboolWhether to shuffle the data before batching.
dropLastboolWhether to drop the last incomplete batch.
seedint?Optional random seed for reproducibility.
Returns
- IEnumerable<(TInput[] Inputs, TOutput[] Outputs)>
An enumerable of batches, each containing arrays of inputs and outputs.
Remarks
For Beginners: Use this when you want simple, synchronous iteration. Each iteration gives you a batch of inputs and their corresponding outputs.
GetBatchesAsync(bool, bool, int?, CancellationToken)
Iterates through the dataset in batches asynchronously with prefetching.
public virtual IAsyncEnumerable<(TInput[] Inputs, TOutput[] Outputs)> GetBatchesAsync(bool shuffle = true, bool dropLast = false, int? seed = null, CancellationToken cancellationToken = default)
Parameters
shuffleboolWhether to shuffle the data before batching.
dropLastboolWhether to drop the last incomplete batch.
seedint?Optional random seed for reproducibility.
cancellationTokenCancellationTokenToken to cancel the iteration.
Returns
- IAsyncEnumerable<(TInput[] Inputs, TOutput[] Outputs)>
An async enumerable of batches, each containing arrays of inputs and outputs.
Remarks
For Beginners: This is the recommended method for training. It uses async/await to overlap data loading with model training, keeping your GPU busy while the next batch is being prepared.
Example:
await foreach (var batch in loader.GetBatchesAsync(shuffle: true))
{
await model.TrainOnBatchAsync(batch.Inputs, batch.Outputs);
}
GetShuffledIndices(bool, int?)
Gets indices for iteration, optionally shuffled.
protected int[] GetShuffledIndices(bool shuffle, int? seed)
Parameters
Returns
- int[]
An array of indices in the desired order.
LoadDataCoreAsync(CancellationToken)
Core data loading implementation to be provided by derived classes.
protected override Task LoadDataCoreAsync(CancellationToken cancellationToken)
Parameters
cancellationTokenCancellationTokenCancellation token for async operation.
Returns
- Task
A task that completes when loading is finished.
Remarks
Derived classes must implement this to perform actual data loading: - Load from files, databases, or remote sources - Parse and validate data format - Store in appropriate internal structures
ReadSampleAsync(int, CancellationToken)
Reads a single sample by index.
protected abstract Task<(TInput Input, TOutput Output)> ReadSampleAsync(int index, CancellationToken cancellationToken = default)
Parameters
indexintThe index of the sample to read.
cancellationTokenCancellationTokenCancellation token.
Returns
Remarks
Derived classes must implement this to read a single sample from the data source. This method is called by the batching infrastructure to build batches.
UnloadDataCore()
Core data unloading implementation to be provided by derived classes.
protected override void UnloadDataCore()
Remarks
Derived classes should implement this to release resources: - Clear internal data structures - Release file handles or connections - Allow garbage collection of loaded data