Table of Contents

Class DataPipeline<T>

Namespace
AiDotNet.Data.Pipeline
Assembly
AiDotNet.dll

Provides TensorFlow-style data pipeline operations for transforming and processing data.

public class DataPipeline<T> : IEnumerable<T>, IEnumerable

Type Parameters

T

The type of data in the pipeline.

Inheritance
DataPipeline<T>
Implements
Inherited Members
Extension Methods

Remarks

DataPipeline provides a fluent API for building data processing pipelines similar to TensorFlow's tf.data API. Operations are lazily evaluated and can be chained together.

For Beginners: Data pipelines let you build complex data processing workflows step by step. Each operation transforms the data in some way:

var pipeline = DataPipeline.From(dataLoader)
    .Map(x => Normalize(x))           // Transform each sample
    .Filter(x => IsValid(x))           // Keep only valid samples
    .Cache()                           // Cache in memory
    .Shuffle(1000)                     // Shuffle with buffer
    .Batch(32)                         // Create batches
    .Prefetch(2);                      // Prefetch batches

foreach (var batch in pipeline)
{
    model.Train(batch);
}

Constructors

DataPipeline(Func<IEnumerable<T>>)

Initializes a new instance of the DataPipeline class.

public DataPipeline(Func<IEnumerable<T>> sourceFactory)

Parameters

sourceFactory Func<IEnumerable<T>>

Factory function that creates the source enumerable.

Methods

Batch(int, bool)

Groups elements into batches.

public DataPipeline<T[]> Batch(int batchSize, bool dropRemainder = false)

Parameters

batchSize int

Number of elements per batch.

dropRemainder bool

Whether to drop the last incomplete batch.

Returns

DataPipeline<T[]>

A new DataPipeline of batched elements.

Remarks

For Beginners: Batch groups individual samples together for more efficient processing. A batch size of 32 means 32 samples are processed together.

Cache()

Caches elements in memory for faster subsequent iterations.

public DataPipeline<T> Cache()

Returns

DataPipeline<T>

A new DataPipeline with cached elements.

Remarks

Cache stores all elements in memory after the first iteration. Subsequent iterations read from the cache instead of recomputing.

For Beginners: Caching is useful when you iterate through the same data multiple times (like training for many epochs) and the data fits in memory. The first epoch loads/computes data, and subsequent epochs read from RAM.

Concat(DataPipeline<T>)

Concatenates another pipeline to this one.

public DataPipeline<T> Concat(DataPipeline<T> other)

Parameters

other DataPipeline<T>

The pipeline to concatenate.

Returns

DataPipeline<T>

A new DataPipeline with concatenated elements.

Filter(Func<T, bool>)

Filters elements based on a predicate.

public DataPipeline<T> Filter(Func<T, bool> predicate)

Parameters

predicate Func<T, bool>

The filter predicate.

Returns

DataPipeline<T>

A new DataPipeline with filtered elements.

Remarks

For Beginners: Filter keeps only elements that match a condition. For example, removing corrupted samples or samples below a quality threshold.

Flatten<TElement>(Func<T, IEnumerable<TElement>>)

Flattens a pipeline by extracting enumerables from each element.

public DataPipeline<TElement> Flatten<TElement>(Func<T, IEnumerable<TElement>> selector)

Parameters

selector Func<T, IEnumerable<TElement>>

Function to extract the enumerable from each element.

Returns

DataPipeline<TElement>

A flattened DataPipeline.

Type Parameters

TElement

The element type within the enumerables.

ForEach(Action<T>)

Applies a side effect to each element without modifying it.

public DataPipeline<T> ForEach(Action<T> action)

Parameters

action Action<T>

The action to perform.

Returns

DataPipeline<T>

A new DataPipeline that performs the action.

Remarks

For Beginners: ForEach is useful for logging, debugging, or collecting statistics without changing the data.

From(IBatchIterable<T>)

Creates a new DataPipeline from a batch iterable.

public static DataPipeline<T> From(IBatchIterable<T> source)

Parameters

source IBatchIterable<T>

The batch iterable source.

Returns

DataPipeline<T>

A new DataPipeline.

From(IEnumerable<T>)

Creates a new DataPipeline from an enumerable source.

public static DataPipeline<T> From(IEnumerable<T> source)

Parameters

source IEnumerable<T>

The source enumerable.

Returns

DataPipeline<T>

A new DataPipeline.

GetEnumerator()

Returns an enumerator that iterates through the collection.

public IEnumerator<T> GetEnumerator()

Returns

IEnumerator<T>

An enumerator that can be used to iterate through the collection.

Interleave(params DataPipeline<T>[])

Interleaves multiple pipelines.

public DataPipeline<T> Interleave(params DataPipeline<T>[] pipelines)

Parameters

pipelines DataPipeline<T>[]

The pipelines to interleave with.

Returns

DataPipeline<T>

A new DataPipeline with interleaved elements.

Remarks

For Beginners: Interleave mixes elements from multiple sources. This is useful when you have data from different domains and want to mix them during training.

MapAsync<TResult>(Func<T, CancellationToken, Task<TResult>>, int)

Applies an async transformation function to each element.

public AsyncDataPipeline<TResult> MapAsync<TResult>(Func<T, CancellationToken, Task<TResult>> selector, int maxConcurrency = 4)

Parameters

selector Func<T, CancellationToken, Task<TResult>>

The async transformation function.

maxConcurrency int

Maximum concurrent operations. Default is 4.

Returns

AsyncDataPipeline<TResult>

A new AsyncDataPipeline with transformed elements.

Type Parameters

TResult

The type of the transformed elements.

Map<TResult>(Func<T, TResult>)

Applies a transformation function to each element.

public DataPipeline<TResult> Map<TResult>(Func<T, TResult> selector)

Parameters

selector Func<T, TResult>

The transformation function.

Returns

DataPipeline<TResult>

A new DataPipeline with transformed elements.

Type Parameters

TResult

The type of the transformed elements.

Remarks

For Beginners: Map transforms each element. For example, normalizing pixel values or converting data types.

Prefetch(int)

Prefetches elements in the background for improved performance.

public AsyncDataPipeline<T> Prefetch(int bufferSize = 2)

Parameters

bufferSize int

Number of elements to prefetch.

Returns

AsyncDataPipeline<T>

An async DataPipeline with prefetching.

Remarks

Prefetch prepares the next elements while the current ones are being processed. This hides data loading latency.

For Beginners: Prefetching is like having someone prepare the next ingredients while you're cooking. It keeps the pipeline flowing smoothly without waiting for data.

Repeat(int?)

Repeats the pipeline indefinitely or a specified number of times.

public DataPipeline<T> Repeat(int? count = null)

Parameters

count int?

Number of times to repeat. Null for infinite repeat.

Returns

DataPipeline<T>

A new DataPipeline that repeats.

Remarks

For Beginners: Repeat allows iterating through the data multiple times. With count=null, it repeats forever (useful with Take to limit total samples).

Shuffle(int, int?)

Shuffles elements using a buffer.

public DataPipeline<T> Shuffle(int bufferSize, int? seed = null)

Parameters

bufferSize int

Size of the shuffle buffer.

seed int?

Optional random seed for reproducibility.

Returns

DataPipeline<T>

A new DataPipeline with shuffled elements.

Remarks

Shuffle maintains a buffer of elements and randomly samples from it. Larger buffers provide more randomness but use more memory.

For Beginners: Shuffling data helps the model learn better by preventing it from memorizing the order. A shuffle buffer of 1000 means 1000 samples are kept in memory and randomly selected from.

Skip(int)

Skips the first N elements.

public DataPipeline<T> Skip(int count)

Parameters

count int

Number of elements to skip.

Returns

DataPipeline<T>

A new DataPipeline with skipped elements.

Take(int)

Takes only the first N elements.

public DataPipeline<T> Take(int count)

Parameters

count int

Number of elements to take.

Returns

DataPipeline<T>

A new DataPipeline with limited elements.

Zip<TOther>(DataPipeline<TOther>)

Zips this pipeline with another to create pairs.

public DataPipeline<(T, TOther)> Zip<TOther>(DataPipeline<TOther> other)

Parameters

other DataPipeline<TOther>

The other pipeline.

Returns

DataPipeline<(T, TOther)>

A new DataPipeline of tuples.

Type Parameters

TOther

The type of the other pipeline.