Class CommunicationManager
- Namespace
- AiDotNet.DistributedTraining
- Assembly
- AiDotNet.dll
Central manager for distributed communication operations.
public static class CommunicationManager
- Inheritance
-
CommunicationManager
- Inherited Members
Remarks
⚠️ WARNING - Static Mutable State: This class uses static mutable state for managing communication backends. This design choice has important implications for concurrent usage: - Only ONE backend can be active per process - Unit tests using this class CANNOT run in parallel - Multiple training sessions in the same process share the same backend See detailed thread-safety notes below for proper usage patterns.
Provides a static API for collective communication in distributed training scenarios.
For Beginners: This is your main entry point for distributed training communication. It's a "wrapper" that makes it easy to communicate between different processes/GPUs without worrying about the underlying implementation details.
Example usage:
// Initialize communication (do this once at startup)
CommunicationManager.Initialize(new InMemoryCommunicationBackend<double>());
// Get your process ID and total number of processes
int myRank = CommunicationManager.GetRank();
int totalProcesses = CommunicationManager.GetWorldSize();
// Average gradients across all processes
Vector<double> gradients = ...; // Your local gradients
CommunicationManager.AllReduce(gradients, ReductionOperation.Sum);
// Now 'gradients' contains the sum from all processes
// Clean up when done
CommunicationManager.Shutdown();
IMPORTANT - Thread Safety and Testing Limitations: This class uses STATIC MUTABLE STATE which has the following implications:
SINGLE GLOBAL INSTANCE: Only ONE backend can be active per process at a time. Multiple training sessions in the same process will share the same backend instance.
PARALLEL TEST EXECUTION: Tests that use this class CANNOT run in parallel. Use [Collection] attributes in xUnit or similar mechanisms to enforce sequential execution.
TEST ISOLATION: Always call Shutdown() in test cleanup to reset state. For better isolation in tests, use InMemoryCommunicationBackend with unique environment IDs and inject the backend directly instead of using this static manager.
CONCURRENT INITIALIZATION: Attempting to Initialize() from multiple threads concurrently is protected by locks, but may result in exceptions if already initialized.
Recommended Test Pattern:
// Option 1: Use environment isolation (recommended for parallel tests)
var backend = new InMemoryCommunicationBackend<double>(rank: 0, worldSize: 4, environmentId: "test-123");
// Use backend directly, don't call CommunicationManager.Initialize()
// Option 2: Sequential tests with proper cleanup
[Collection("DistributedTraining")] // Force sequential execution
public class MyDistributedTests
{
[Fact]
public void MyTest()
{
try
{
CommunicationManager.Initialize(...);
// Test code
}
finally
{
CommunicationManager.Shutdown(); // CRITICAL: Always cleanup
}
}
}
Properties
IsInitialized
Gets whether the communication manager has been initialized.
public static bool IsInitialized { get; }
Property Value
Methods
AllGather<T>(Vector<T>)
Gathers data from all processes and returns the concatenated result.
public static Vector<T> AllGather<T>(Vector<T> sendData)
Parameters
sendDataVector<T>The local data to contribute
Returns
- Vector<T>
The concatenated data from all processes
Type Parameters
TThe numeric type
Remarks
For Beginners: This collects data from all processes and combines it into one big array. Everyone gets the full combined result. Useful when you need to see all the pieces together (like reconstructing full parameters from shards).
Exceptions
- InvalidOperationException
Thrown if not initialized
- ArgumentNullException
Thrown if sendData is null
AllReduce<T>(Vector<T>, ReductionOperation)
Performs an AllReduce operation - combines data from all processes and distributes the result to all processes.
public static void AllReduce<T>(Vector<T> data, ReductionOperation operation)
Parameters
dataVector<T>The data to reduce (will be modified to contain the result)
operationReductionOperationHow to combine the data (Sum, Average, Max, etc.)
Type Parameters
TThe numeric type
Remarks
For Beginners: This is the key operation for distributed training. It combines values from all processes (like adding gradients from all GPUs) and gives everyone the result. After this, everyone has the same combined data.
Exceptions
- InvalidOperationException
Thrown if not initialized
- ArgumentNullException
Thrown if data is null
Barrier<T>()
Blocks until all processes reach this synchronization point.
public static void Barrier<T>()
Type Parameters
TThe numeric type
Remarks
For Beginners: This is a "wait for everyone" checkpoint. All processes must reach this point before any can continue. Useful for making sure everyone is ready before starting the next step.
Exceptions
- InvalidOperationException
Thrown if not initialized
Broadcast<T>(Vector<T>, int)
Broadcasts data from one process (root) to all other processes.
public static Vector<T> Broadcast<T>(Vector<T> data, int root = 0)
Parameters
dataVector<T>The data to broadcast (only meaningful on root)
rootintWhich process is broadcasting (default: 0)
Returns
- Vector<T>
The broadcast data
Type Parameters
TThe numeric type
Remarks
For Beginners: One process (the root) sends data to everyone else. Everyone ends up with the same data. Useful for distributing initial parameters or settings.
Exceptions
- InvalidOperationException
Thrown if not initialized
GetRank<T>()
Gets the rank (ID) of the current process.
public static int GetRank<T>()
Returns
- int
The rank of the current process (0-based index)
Type Parameters
TThe numeric type
Remarks
For Beginners: This tells you which process you are. If you're running on 4 GPUs, one will be rank 0, one will be rank 1, etc.
Exceptions
- InvalidOperationException
Thrown if not initialized
GetWorldSize<T>()
Gets the total number of processes in the distributed group.
public static int GetWorldSize<T>()
Returns
- int
The total number of processes
Type Parameters
TThe numeric type
Remarks
For Beginners: This tells you how many processes (or GPUs) are working together total.
Exceptions
- InvalidOperationException
Thrown if not initialized
Initialize<T>(ICommunicationBackend<T>)
Initializes the communication manager with the specified backend.
public static void Initialize<T>(ICommunicationBackend<T> backend)
Parameters
backendICommunicationBackend<T>The communication backend to use
Type Parameters
TThe numeric type (float or double)
Remarks
This must be called before any other operations.
For Beginners: This sets up the communication system. You need to provide a "backend" which is the actual implementation that does the communication. For testing, use InMemoryCommunicationBackend. For real distributed training, you would use an MPI backend.
Exceptions
- ArgumentNullException
Thrown if backend is null
- InvalidOperationException
Thrown if already initialized
ReduceScatter<T>(Vector<T>, ReductionOperation)
Performs a reduce-scatter operation - combines data and distributes chunks.
public static Vector<T> ReduceScatter<T>(Vector<T> data, ReductionOperation operation)
Parameters
dataVector<T>The data to reduce and scatter
operationReductionOperationHow to combine the data
Returns
- Vector<T>
The reduced chunk for this process
Type Parameters
TThe numeric type
Remarks
For Beginners: This is a combined operation that's more efficient than doing AllReduce followed by Scatter. It reduces the data and immediately gives each process only their chunk of the result.
Exceptions
- InvalidOperationException
Thrown if not initialized
Scatter<T>(Vector<T>, int)
Scatters different chunks of data from root to each process.
public static Vector<T> Scatter<T>(Vector<T> sendData, int root = 0)
Parameters
sendDataVector<T>The data to scatter (only used on root)
rootintWhich process is scattering (default: 0)
Returns
- Vector<T>
The chunk received by this process
Type Parameters
TThe numeric type
Remarks
For Beginners: The root process splits data into chunks and gives each process a different chunk. This is how we distribute work across processes.
Exceptions
- InvalidOperationException
Thrown if not initialized
Shutdown()
Shuts down the communication manager and releases all resources.
public static void Shutdown()
Remarks
Should be called when distributed training is complete.
For Beginners: This is cleanup - call it when you're done with distributed training to free up resources and properly close connections.