Class DataPipeline<T>
Provides TensorFlow-style data pipeline operations for transforming and processing data.
public class DataPipeline<T> : IEnumerable<T>, IEnumerable
Type Parameters
TThe type of data in the pipeline.
- Inheritance
-
DataPipeline<T>
- Implements
-
IEnumerable<T>
- Inherited Members
- Extension Methods
Remarks
DataPipeline provides a fluent API for building data processing pipelines similar to TensorFlow's tf.data API. Operations are lazily evaluated and can be chained together.
For Beginners: Data pipelines let you build complex data processing workflows step by step. Each operation transforms the data in some way:
var pipeline = DataPipeline.From(dataLoader)
.Map(x => Normalize(x)) // Transform each sample
.Filter(x => IsValid(x)) // Keep only valid samples
.Cache() // Cache in memory
.Shuffle(1000) // Shuffle with buffer
.Batch(32) // Create batches
.Prefetch(2); // Prefetch batches
foreach (var batch in pipeline)
{
model.Train(batch);
}
Constructors
DataPipeline(Func<IEnumerable<T>>)
Initializes a new instance of the DataPipeline class.
public DataPipeline(Func<IEnumerable<T>> sourceFactory)
Parameters
sourceFactoryFunc<IEnumerable<T>>Factory function that creates the source enumerable.
Methods
Batch(int, bool)
Groups elements into batches.
public DataPipeline<T[]> Batch(int batchSize, bool dropRemainder = false)
Parameters
batchSizeintNumber of elements per batch.
dropRemainderboolWhether to drop the last incomplete batch.
Returns
- DataPipeline<T[]>
A new DataPipeline of batched elements.
Remarks
For Beginners: Batch groups individual samples together for more efficient processing. A batch size of 32 means 32 samples are processed together.
Cache()
Caches elements in memory for faster subsequent iterations.
public DataPipeline<T> Cache()
Returns
- DataPipeline<T>
A new DataPipeline with cached elements.
Remarks
Cache stores all elements in memory after the first iteration. Subsequent iterations read from the cache instead of recomputing.
For Beginners: Caching is useful when you iterate through the same data multiple times (like training for many epochs) and the data fits in memory. The first epoch loads/computes data, and subsequent epochs read from RAM.
Concat(DataPipeline<T>)
Concatenates another pipeline to this one.
public DataPipeline<T> Concat(DataPipeline<T> other)
Parameters
otherDataPipeline<T>The pipeline to concatenate.
Returns
- DataPipeline<T>
A new DataPipeline with concatenated elements.
Filter(Func<T, bool>)
Filters elements based on a predicate.
public DataPipeline<T> Filter(Func<T, bool> predicate)
Parameters
Returns
- DataPipeline<T>
A new DataPipeline with filtered elements.
Remarks
For Beginners: Filter keeps only elements that match a condition. For example, removing corrupted samples or samples below a quality threshold.
Flatten<TElement>(Func<T, IEnumerable<TElement>>)
Flattens a pipeline by extracting enumerables from each element.
public DataPipeline<TElement> Flatten<TElement>(Func<T, IEnumerable<TElement>> selector)
Parameters
selectorFunc<T, IEnumerable<TElement>>Function to extract the enumerable from each element.
Returns
- DataPipeline<TElement>
A flattened DataPipeline.
Type Parameters
TElementThe element type within the enumerables.
ForEach(Action<T>)
Applies a side effect to each element without modifying it.
public DataPipeline<T> ForEach(Action<T> action)
Parameters
actionAction<T>The action to perform.
Returns
- DataPipeline<T>
A new DataPipeline that performs the action.
Remarks
For Beginners: ForEach is useful for logging, debugging, or collecting statistics without changing the data.
From(IBatchIterable<T>)
Creates a new DataPipeline from a batch iterable.
public static DataPipeline<T> From(IBatchIterable<T> source)
Parameters
sourceIBatchIterable<T>The batch iterable source.
Returns
- DataPipeline<T>
A new DataPipeline.
From(IEnumerable<T>)
Creates a new DataPipeline from an enumerable source.
public static DataPipeline<T> From(IEnumerable<T> source)
Parameters
sourceIEnumerable<T>The source enumerable.
Returns
- DataPipeline<T>
A new DataPipeline.
GetEnumerator()
Returns an enumerator that iterates through the collection.
public IEnumerator<T> GetEnumerator()
Returns
- IEnumerator<T>
An enumerator that can be used to iterate through the collection.
Interleave(params DataPipeline<T>[])
Interleaves multiple pipelines.
public DataPipeline<T> Interleave(params DataPipeline<T>[] pipelines)
Parameters
pipelinesDataPipeline<T>[]The pipelines to interleave with.
Returns
- DataPipeline<T>
A new DataPipeline with interleaved elements.
Remarks
For Beginners: Interleave mixes elements from multiple sources. This is useful when you have data from different domains and want to mix them during training.
MapAsync<TResult>(Func<T, CancellationToken, Task<TResult>>, int)
Applies an async transformation function to each element.
public AsyncDataPipeline<TResult> MapAsync<TResult>(Func<T, CancellationToken, Task<TResult>> selector, int maxConcurrency = 4)
Parameters
selectorFunc<T, CancellationToken, Task<TResult>>The async transformation function.
maxConcurrencyintMaximum concurrent operations. Default is 4.
Returns
- AsyncDataPipeline<TResult>
A new AsyncDataPipeline with transformed elements.
Type Parameters
TResultThe type of the transformed elements.
Map<TResult>(Func<T, TResult>)
Applies a transformation function to each element.
public DataPipeline<TResult> Map<TResult>(Func<T, TResult> selector)
Parameters
selectorFunc<T, TResult>The transformation function.
Returns
- DataPipeline<TResult>
A new DataPipeline with transformed elements.
Type Parameters
TResultThe type of the transformed elements.
Remarks
For Beginners: Map transforms each element. For example, normalizing pixel values or converting data types.
Prefetch(int)
Prefetches elements in the background for improved performance.
public AsyncDataPipeline<T> Prefetch(int bufferSize = 2)
Parameters
bufferSizeintNumber of elements to prefetch.
Returns
- AsyncDataPipeline<T>
An async DataPipeline with prefetching.
Remarks
Prefetch prepares the next elements while the current ones are being processed. This hides data loading latency.
For Beginners: Prefetching is like having someone prepare the next ingredients while you're cooking. It keeps the pipeline flowing smoothly without waiting for data.
Repeat(int?)
Repeats the pipeline indefinitely or a specified number of times.
public DataPipeline<T> Repeat(int? count = null)
Parameters
countint?Number of times to repeat. Null for infinite repeat.
Returns
- DataPipeline<T>
A new DataPipeline that repeats.
Remarks
For Beginners: Repeat allows iterating through the data multiple times. With count=null, it repeats forever (useful with Take to limit total samples).
Shuffle(int, int?)
Shuffles elements using a buffer.
public DataPipeline<T> Shuffle(int bufferSize, int? seed = null)
Parameters
Returns
- DataPipeline<T>
A new DataPipeline with shuffled elements.
Remarks
Shuffle maintains a buffer of elements and randomly samples from it. Larger buffers provide more randomness but use more memory.
For Beginners: Shuffling data helps the model learn better by preventing it from memorizing the order. A shuffle buffer of 1000 means 1000 samples are kept in memory and randomly selected from.
Skip(int)
Skips the first N elements.
public DataPipeline<T> Skip(int count)
Parameters
countintNumber of elements to skip.
Returns
- DataPipeline<T>
A new DataPipeline with skipped elements.
Take(int)
Takes only the first N elements.
public DataPipeline<T> Take(int count)
Parameters
countintNumber of elements to take.
Returns
- DataPipeline<T>
A new DataPipeline with limited elements.
Zip<TOther>(DataPipeline<TOther>)
Zips this pipeline with another to create pairs.
public DataPipeline<(T, TOther)> Zip<TOther>(DataPipeline<TOther> other)
Parameters
otherDataPipeline<TOther>The other pipeline.
Returns
- DataPipeline<(T, TOther)>
A new DataPipeline of tuples.
Type Parameters
TOtherThe type of the other pipeline.