Mpi_Send Takes Huge Part of Virtual Memory

MPI_SEND takes huge part of virtual memory

This is an expected behaviour from almost any MPI implementation that runs over InfiniBand. The IB RDMA mechanisms require that data buffers should be registered, i.e. they are first locked into a fixed position in the physical memory and then the driver tells the InfiniBand HCA how to map virtual addresses to physical memory. It is very complex and hence very slow process to register memory for usage by the IB HCA and that's why most MPI implementations never unregister memory that was once registered in hope that the same memory would later be used as a source or data target again. If the registered memory was heap memory, it is never returned back to the operating system and that's why your data segment only grows in size.

Reuse send and receive buffers as much as possible. Keep in mind that communication over InfiniBand incurrs high memory overhead. Most people don't really think about this and it is usually poorly documented, but InfiniBand uses a lot of special data structures (queues) which are allocated in the memory of the process and those queues grow significantly with the number of processes. In some fully connected cases the amount of queue memory can be so large that no memory is actually left for the application.

There are certain parameters that control IB queues used by Intel MPI. The most important in your case is I_MPI_DAPL_BUFFER_NUM which controls the amount of preallocated and preregistered memory. It's default value is 16, so you might want to decrease it. Be aware of possible performance implications though. You can also try to use dynamic preallocated buffer sizes by setting I_MPI_DAPL_BUFFER_ENLARGEMENT to 1. With this option enabled, Intel MPI would initially register small buffers and will later grow them if needed. Note also that IMPI opens connections lazily and that's why you see the huge increase in used memory only after the call to MPI_Send.

If not using the DAPL transport, e.g. using the ofa transport instead, there is not much that you can do. You can enable XRC queues by setting I_MPI_OFA_USE_XRC to 1. This should somehow decrease the memory used. Also enabling dynamic queue pairs creation by setting I_MPI_OFA_DYNAMIC_QPS to 1 might decrease memory usage if the communication graph of your program is not fully connected (a fully connected program is one in which each rank talks to all other ranks).

Increase of virtual memory without increse of VmSize

(The possible solution to your problem is the last paragraph)

Memory allocation on most modern operating systems with virtual memory is a two-phase process. First, a portion of the virtual address space of the process is reserved and the virtual memory size of the process (VmSize) increases accordingly. This creates entries in the so-called process page table. Pages are initially not associated with phyiscal memory frames, i.e. no physical memory is actually used. Whenever some part of this allocated portion is actually read from or written to, a page fault occurs and the operating system installs (maps) a free page from the physical memory. This increases the resident set size of the process (VmRSS). When some other process needs memory, the OS might store the content of some infrequently used page (the definition of "infrequently used page" is highly implementation-dependent) to some persistent storage (hard drive in most cases, or generally to the swap device) and then unmap up. This process decreases the RSS but leaves VmSize intact. If this page is later accessed, a page fault would again occur and it will be brought back. The virutal memory size only decreases when virtual memory allocations are freed. Note that VmSize also counts for memory mapped files (i.e. the executable file and all shared libraries it links to or other explicitly mapped files) and shared memory blocks.

There are two generic types of memory in a process - statically allocated memory and heap memory. The statically allocated memory keeps all constants and global/static variables. It is part of the data segment, whose size is shown by the VmData metric. The data segment also hosts part of the program heap, where dynamic memory is being allocated. The data segment is continuous, i.e. it starts at a certain location and grows upwards towards the stack (which starts at a very high address and then grows downwards). The problem with the heap in the data segment is that it is managed by a special heap allocator that takes care of subdividing the contiguous data segment into smaller memory chunks. On the other side, in Linux dynamic memory can also be allocated by directly mapping virtual memory. This is usually done only for large allocations in order to conserve memory, since it only allows memory in multiples of the page size (usually 4 KiB) to be allocated.

The stack is also an important source of heavy memory usage, especially if big arrays are allocated in the automatic (stack) storage. The stack starts near the very top of the usable virtual address space and grows downwards. In some cases it could reach the top of the data segment or it could reach the end of some other virtual allocation. Bad things happen then. The stack size is accounted in the VmStack metric and also in the VmSize.
One can summarise it as so:

  • VmSize accounts for all virtual memory allocations (file mappings, shared memory, heap memory, whatever memory) and grows almost every time new memory is being allocated. Almost, because if the new heap memory allocation is made in the place of a freed old allocation in the data segment, no new virtual memory would be allocated. It decreses whenever virtual allocations are being freed. VmPeak tracks the max value of VmSize - it could only increase in time.
  • VmRSS grows as memory is being accessed and decreases as memory is paged out to the swap device.
  • VmData grows as the data segment part of the heap is being utilised. It almost never shrinks as current heap allocators keep the freed memory in case future allocations need it.

If you are running on a cluster with InfiniBand or other RDMA-based fabrics, another kind of memory comes into play - the locked (registered) memory (VmLck). This is memory which is not allowed to be paged out. How it grows and shrinks depends on the MPI implementation. Some never unregister an already registered block (the technical details about why are too complex to be described here), others do so in order to play better with the virtual memory manager.

In your case you say that you are running into a virtual memory size limit. This could mean that this limit is set too low or that you are running into an OS-imposed limits. First, Linux (and most Unixes) have means to impose artificial restrictions through the ulimit mechanism. Running ulimit -v in the shell would tell you what the limit on the virtual memory size is in KiB. You can set the limit using ulimit -v <value in KiB>. This only applies to processes spawned by the current shell and to their children, grandchilren and so on. You need to instruct mpiexec (or mpirun) to propagate this value to all other processes, if they are to be launched on remote nodes. if you are running your program under the control of some workload manager like LSF, Sun/Oracle Grid Engine, Torque/PBS, etc., there are job parameters which control the virtual memory size limit. And last but not least, 32-bit processes are usually restricted to 2 GiB of usable virtual memory.

What exactly happens when we use mpi_send/receive functions?

Quite a bit of magic happens behind the scenes.


First, there's the unexpected message queue. When the sender calls MPI_Send before the receiver has called MPI_Recv, MPI doesn't know where in the receiver's memory the message is going. Two things can happen at this point. If the message is short, it is copied to a temporary buffer at the receiver. When the receiver calls MPI_Recv it first checks if a matching message has already arrived, and if it has, copies the data to the final destination. If not, the information about the target buffer is stored in the queue so the MPI_Recv can be matched when the message arrives. It is possible to examine the unexpected queue with MPI_Probe.

If the message is longer than some threshold, copying it would take too long. Instead, the sender and the receiver do a handshake with a rendezvous protocol of some sort to make sure the target is ready to receive the message before it is sent out. This is especially important with a high-speed network like InfiniBand.


If the communicating ranks are on the same machine, usually the data transfer happens through shared memory. Because MPI ranks are independent processes, they do not have access to each other's memory. Instead, the MPI processes on the same node set up a shared memory region and use it to transfer messages. So sending a message involves copying the data twice: the sender copies it into the shared buffer, and the receiver copies it out into its own address space. There exists an exception to this. If the MPI library is configured to use a kernel module like KNEM, the message can be copied directly to the destination in the OS kernel. However, such a copy incurs a penalty of a system call. Copying through the kernel is usually only worth it for large messages. Specialized HPC operating systems like Catamount can change these rules.


Collective operations can be implemented either in terms of send/receive, or can have a completely separate optimized implementation. It is common to have implementations of several algorithms for a collective operation. The MPI library decides at runtime which one to use for best performance depending on the size of the messages and the communicator.


A good MPI implementation will try very hard to transfer a derived datatype without creating extra copies. It will figure out which regions of memory within a datatype are contiguous and copy them individually. However, in some cases MPI will fall back to using MPI_Pack behind the scenes to make the message contiguous, and then transfer and unpack it.

Sending typedef struct containing void* by creating MPI drived datatype.


// Sender Side.
int x = 100;
void* snd;
MPI_Send(snd,4,MPI_BYTE,1,0,MPI_COMM_WORLD);

// Receiver Side.
void* rcv;
MPI_Recv(rcv, 4,MPI_BYTE,0,0,MPI_COMM_WORLD);

I don't understand why the previous example work correctly but not the following.

It works (of course, snd and rcv have to be assigned meaningful memory locations as values), because MPI_Send and MPI_Recv take the address of the data location and both snd and rcv are pointers, i.e. their values are such addresses. For example, the MPI_Send line is not sending the value of the pointer itself but rather 4 bytes starting from the location that snd is pointing to. The same is true about the call to MPI_Recv and the usage of rcv. In order to send the value of the pointer rather than the value it is pointing to, you would have to use:

MPI_Send(&snd, sizeof(void *), MPI_BYTE, ...);

This would send sizeof(void *) bytes, starting from the address where the value of the pointer is stored. This would make very little sense unless for some super special cases.

Why your second example doesn't work? MPI is not a magician and it cannot recognise that part of the memory contains a pointer to another memory block and follow that pointer. That is, when you construct a structured datatype, there is no way to tell MPI that the first element of the structure is actually a pointer and make it read the data that this pointer points to. In other words, you must perform explicit data marshalling - construct and intermediate buffer that contains a copy of the memory region, pointed by data.data. Besides, your data structure contains no information on the length of the memory region that data points to.

Please note something very important. All MPI datatypes have something called a type map. A type map is a list of tuples, where each tuple, also called type signature, has the form (basic_type, offset) where basic_type is a primitive language type, e.g. char, int, double, etc. and offset is an offset, relative to the beginning of the buffer. One peculiar feature of MPI is that offsets could also be negative and this means that the argument to MPI_Send (or to MPI_Recv, or to any other communication function) might actually point to the middle of the memory area, that would serve as data source. When sending data, MPI traverses the type map and takes one element of type basic_type from the corresponding offset, relative to the supplied data buffer address. The built-in MPI datatypes have typemaps of only one entry with an offset of 0, e.g.:

MPI_INT      -> (int, 0)
MPI_FLOAT -> (float, 0)
MPI_DOUBLE -> (double, 0)

NO datatype exists in MPI, that can make it dererence a pointer and take the value that it points to instead of the pointer value itself.

offsets[0] = 0;
oldType[0] = MPI_BYTE;
blockCount[0] = 1;

MPI_Type_extent(MPI_INT, &extent);

offsets[1] = 4 * extent;
oldType[1] = MPI_INT;
blockCount[1] = 1;

MPI_Type_create_struct(2, blockCount, offsets, oldType, &structType);

This code creates an MPI datatype that has the following type map (assuming int is 4 bytes):

{(byte, 0), (int, 16)}

When supplied as the type argument to MPI_Send, it would instruct the MPI library to take one byte from the beginning of the data buffer and then to take the integer value, located at 16 bytes past the beginning of the data buffer. In total the message would be 5 bytes long, although the span of the buffer area would be 20 bytes.

offsets[0] = offsetof(data, data);
oldType[0] = MPI_CHAR;
blockCount[0] = sizeof(void *);

offsets[1] = offsetof(data, tag);
oldType[1] = MPI_INT;
blockCount[1] = 1;

MPI_Type_create_struct(2, blockCount, offsets, oldType, &structType);

This code, taken from the answer of Greg Inozemtsev, creates a datatype with the following type map (assuming 32-bit machine with 32-bit wide pointes and zero padding):

{(char, 0), (char, 1), (char, 2), (char, 3), (int, 4)}

The number of (char, x) typesigs is equal to sizeof(void *) (4 by assumption). If used as a datatype, this would take 4 bytes from the beginning of the buffer (i.e. the value of the pointer, the address, not the actual int it is pointing to!) and then it would take an integer from 4 bytes after the beginning, i.e. the value of the tag field in the structure. Once again, you would be sending the address of the pointer and not the data that this pointer points to.

The difference betwen MPI_CHAR and MPI_BYTE is that no type conversion is applied to data of type MPI_BYTE. This is only relevant when running MPI codes in heterogenous environments. With MPI_CHAR the library might perform data conversion, e.g. convert each character from ASCII to EBCDIC character sets and vice versa. Using MPI_CHAR in this case is erroneous, but sending pointers in a heterogeneous environment is even more erroneous, so no worries ;)

In the light of all this, if I were you, I would consider the solution that suszterpatt has proposed.


For the explicit data marshalling, there are two possible scenarios:

Scenario 1. Each data item, pointed to by data.data is of constant size. In this case you can construct a structure datatype in the following way:

typedef struct {
int tag;
char data[];
} data_flat;

// Put the tag at the beginning
offsets[0] = offsetof(data_flat, tag);
oldType[0] = MPI_INT;
blockCount[0] = 1;

offsets[1] = offsetof(data_flat, data);
oldType[1] = MPI_BYTE;
blockCount[1] = size of the data;

MPI_Type_create_struct(2, blockCount, offsets, oldType, &structType);
MPI_Type_commit(&structType);

Then use it like this:

// --- Sender ---

// Make a temporary buffer to hold the data
size_t total_size = offsetof(data_flat, data) + size of the data;
data_flat *temp = malloc(total_size);

// Copy data structure content into the temporary flat structure
temp->tag = data.tag;
memcpy(temp->data, data.data, size of the data);

// Send the temporary structure
MPI_Send(temp, 1, structType, ...);

// Free the temporary structure
free(temp);

You might also not free the temporary storage but rather reuse it for other instances of the data structure as well (since by presumption they are all pointing to data of the same size). The receiver would be:

// --- Receiver ---

// Make a temporary buffer to hold the data
size_t total_size = offsetof(data_flat, data) + size of the data;
data_flat *temp = malloc(total_size);

// Receive into the temporary structure
MPI_Recv(temp, 1, structType, ...);

// Copy the temporary flat struture into a data structure
data.tag = temp->tag;
data.data = temp->data;
// Do not free the temporary structure as it contains the actual data

Scenario 2. Each data item might be of different size. This one is much more involved and hard to do in a portable way. If speed is not your greatest concern, then you might send the data in two distinct messages for maximum portability. MPI guarantees that order is preserved for messages sent with the same envelope (source, destination, tag, communicator).


You could also implement what suszterpatt proposed in the following way (given that your tags fit into the allowed range):

// --- Send a structure ---
MPI_Send(data.data, size of data, MPI_BYTE, dest, data.tag, MPI_COMM_WORLD);

// --- Receive a structure ---
MPI_Status status;
MPI_Aint msg_size;
// Peek for a message, allocate big enough buffer
MPI_Probe(source, MPI_ANY_TAG, &status);
MPI_Get_count(&status, MPI_BYTE, &msg_size);
uint8_t *buffer = malloc(msg_size);
// Receive the message
MPI_Recv(buffer, (int)msg_size, MPI_BYTE, source, status.MPI_TAG,
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// Fill in a data structure
data.tag = status.MPI_TAG;
data.data = buffer;

MPI-3 Shared Memory for Array Struct

Using shared memory with MPI-3 is relatively simple.

First, you allocate the shared memory window using MPI_Win_allocate_shared:

MPI_Win win;
MPI_Aint size;
void *baseptr;

if (rank == 0)
{
size = 2 * ARRAY_LEN * sizeof(T);
MPI_Win_allocate_shared(size, sizeof(T), MPI_INFO_NULL,
MPI_COMM_WORLD, &baseptr, &win);
}
else
{
int disp_unit;
MPI_Win_allocate_shared(0, sizeof(T), MPI_INFO_NULL,
MPI_COMM_WORLD, &baseptr, &win);
MPI_Win_shared_query(win, 0, &size, &disp_unit, &baseptr);
}
a_old.data = baseptr;
a_old.length = ARRAY_LEN;
a_new.data = a_old.data + ARRAY_LEN;
a_new.length = ARRAY_LEN;

Here, only rank 0 allocates memory. It doesn't really matter which process allocates it as it is shared. It is even possible to have each process allocate a portion of the memory, but since by the default the allocation is contiguous, both methods are equivalent. MPI_Win_shared_query is then used by all other processes to find out the location in their virtual address space of the beginning of the shared memory block. That address might vary among the ranks and therefore one should not pass around absolute pointers.

You can now simply load from and store into a_old.data respectively a_new.data. As the ranks in your case work on disjoint sets of memory locations, you don't really need to lock the window. Use window locks to implement e.g. protected initialisation of a_old or other operations that require synchronisation. You might also need to explicitly tell the compiler not to reorder the code and to emit a memory fence in order to have all outstanding load/store operations finished before e.g. you call MPI_Barrier().

The a_old = a_new code suggests copying one array onto the other. Instead, you could simply swap the data pointers and eventually the size fields. Since only the data of the array is in the shared memory block, swapping the pointers is a local operation, i.e. no synchronisation needed. Assuming that both arrays are of equal length:

T *temp;
temp = a_old.data;
a_old.data = a_new.data;
a_new.data = temp;

You still need a barrier to make sure that all other processes have finished processing before continuing further.

At the very end, simply free the window:

MPI_Win_free(&win);

A complete example (in C) follows:

#include <stdio.h>
#include <mpi.h>

#define ARRAY_LEN 1000

int main (void)
{
MPI_Init(NULL, NULL);

int rank, nproc;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nproc);

MPI_Win win;
MPI_Aint size;
void *baseptr;

if (rank == 0)
{
size = ARRAY_LEN * sizeof(float);
MPI_Win_allocate_shared(size, sizeof(int), MPI_INFO_NULL,
MPI_COMM_WORLD, &baseptr, &win);
}
else
{
int disp_unit;
MPI_Win_allocate_shared(0, sizeof(int), MPI_INFO_NULL,
MPI_COMM_WORLD, &baseptr, &win);
MPI_Win_shared_query(win, 0, &size, &disp_unit, &baseptr);
}

printf("Rank %d, baseptr = %p\n", rank, baseptr);

int *arr = baseptr;
for (int i = rank; i < ARRAY_LEN; i += nproc)
arr[i] = rank;

MPI_Barrier(MPI_COMM_WORLD);

if (rank == 0)
{
for (int i = 0; i < 10; i++)
printf("%4d", arr[i]);
printf("\n");
}

MPI_Win_free(&win);

MPI_Finalize();
return 0;
}

Disclaimer: Take this with a grain of salt. My understanding of MPI's RMA is still quite weak.

Segmentation fault during MPI_FINALIZE() in Fortran

One should always use the Fortran 90 MPI interface if available instead of the old FORTRAN 77 inteface. That is you should always

USE mpi

instead of

INCLUDE 'mpif.h'

The difference between the two is that the Fortran 90 interface puts all MPI subroutines in a module and thus explicit interfaces are being generated. This allows the compiler to do argument checking in calls and signal an error if you e.g. omit an argument.

In Fortran's calling convention all arguments are passed by address, irrespective of their type. This allows the compiler to generate proper calls to functions and subroutines without requiring prototypes as in C. But this also means that one can freely pass the an INTEGER argument where an array of REAL is expected and virtually all FORTRAN 77 compilers will happily compile such code or one can pass fewer/more arguments than expected. There are external tools, usually called linters by the name of the C tool lint, that parse the whole source tree and can pinpoint such errors and many others that the compiler would not care to find. One such tool that does such static code analysis for Fortran is flint. Fortran 90 added interfaces in order to compensate for this error-prone nature of Fortran.

Calling a Fortran subroutine with fewer arguments than expected can have many different ill effects depending on the architecture but in most cases will result in crash, especially if the omitted argument is an output one. The called function doesn't know that less arguments are being passed - it just looks where its address should be and takes whatever address it finds there. As ierr is an output argument, a write at that address would occur. There is a good chance that the address would not point to a virtual address that corresponds to mapped memory and a hefty segmentation fault would be delivered by the OS. Even if the address points somewhere in user's allocated memory, the result could be an overwrite of an important value in some control structure. And if even that doesn't happen, then there are calling conventions in which the callee cleans up the stack frame - in this case the stack pointer would be incorrectly incremented and the return address would be completely different from the right one, which would almost certainly lead to jump to non-executable (and even non-mapped) memory and again to segmentation fault.



Related Topics



Leave a reply



Submit