View Issue Details

IDProjectCategoryView StatusLast Update
0002669OpenFOAMBugpublic2017-10-29 10:47
Reportermatthias Assigned Tohenry  
PrioritynormalSeveritycrashReproducibilityN/A
Status resolvedResolutionfixed 
PlatformCentos 7.3OSCentos 7.3OS VersionCentos 7.3
Summary0002669: Crash using new collated format
DescriptionSimulation is always crashing when using the new collated I/O format with MPI threading.

Old format works fine, collated format with serial I/O (no buffer) works too but slow.


It seems that the first field (i.e. nut) will be written out correctly but the other fields are missing when the error occurs.


Additional InformationOpenFOAM has been installed from scratch using gcc-7.1.0 and OpenMPI-2.1.1

OpenMPI has been built with "--enable-mpi-thread-multiple" MPI threading support. Another test using OpenMPI-3.0.0rc2 failed too.

The test case is a simple pipe flow using pimpleFoam solver.

 
TagsNo tags attached.

Activities

matthias

2017-08-18 15:31

reporter  

log (265,948 bytes)

MattijsJ

2017-08-27 14:54

reporter   ~0008636

Do you see this behaviour
- on any case?
- on any number of cores?
- on special nodes or all nodes?

Things to try:
- compile a debug version of the code
- switch on debug flag for OFstreamCollator
- ideally get all ranks to output to separate files

OFstreamCollator is the class that does the write thread starting & stopping. The thread will be automatically stopped if the buffer is empty. If your first file gets written correctly maybe the thread gets stopped and there is a problem starting it.

With the debug flag you will see messages like

OFstreamCollator : Writing " << s.size() << " bytes to " << fName
            << " using comm

Look out for different behaviour of one of the nodes.

matthias

2017-08-28 08:51

reporter   ~0008637

Hi Mattijs,

I can confirm that this happens
- on any number of cores
- on arbitrary nodes

I already did some tests and I suspect there is some kind of timing issue. I have added decomposedBlockData Debug flag for further investigations but I will add the OFstreamCollator flag too.


I decomposed the case to two processors and got this behaviour. Afterwards, I added "sleep(1)" as first command in decomposedBlockData.C and the problem disappeared. After that, I redistributed the case to 144 processors and the crash recurred. Increasing the "sleep" command to 100 fixed the issue immediately.

matthias

2017-08-28 09:04

reporter   ~0008638

decomposedBlockData.C

bool Foam::decomposedBlockData::writeBlocks
(
    const label comm,
    autoPtr<OSstream>& osPtr,
    List<std::streamoff>& start,
    const UList<char>& data,
    const UPstream::commsTypes commsType,
    const bool syncReturnState
)
{
    sleep(100);

...

matthias

2017-08-28 09:43

reporter   ~0008639

Here is the output from last run using OFstreamCollator flag
log-2 (14,423 bytes)   
/*---------------------------------------------------------------------------*\
| =========                 |                                                 |
| \\      /  F ield         | OpenFOAM: The Open Source CFD Toolbox           |
|  \\    /   O peration     | Version:  5.x                                   |
|   \\  /    A nd           | Web:      www.OpenFOAM.org                      |
|    \\/     M anipulation  |                                                 |
\*---------------------------------------------------------------------------*/
Build  : 5.x-3fe7aa77e620
Exec   : pimpleFoam -parallel
Date   : Aug 28 2017
Time   : 10:22:23
Host   : "node144.service"
PID    : 13620
I/O    : uncollated
Case   : /beegfs/testcase/IOTestCase
nProcs : 4
Slaves : 
3
(
"node144.service.13621"
"node144.service.13622"
"node144.service.13623"
)

Pstream initialized with:
    floatTransfer      : 0
    nProcsSimpleSum    : 0
    commsType          : nonBlocking
    polling iterations : 0
sigFpe : Enabling floating point exception trapping (FOAM_SIGFPE).
fileModificationChecking : Monitoring run-time modified files using timeStampMaster (fileModificationSkew 10)
allowSystemOperations : Allowing user-supplied system call operations

// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
Create time

Overriding DebugSwitches according to controlDict
    decomposedBlockData 0;

    OFstreamCollator 1;

Overriding OptimisationSwitches according to controlDict
    maxThreadFileBufferSize 2e+09;

    maxMasterFileBufferSize 2e+09;

Overriding fileHandler to collated
I/O    : collated (maxThreadFileBufferSize 2e+09)
         Threading activated since maxThreadFileBufferSize > 0.
         Requires thread support enabled in MPI, otherwise the simulation
         may "hang".  If thread support cannot be enabled, deactivate threading
         by setting maxThreadFileBufferSize to 0 in $FOAM_ETC/controlDict
Create mesh for time = 0


PIMPLE: Operating solver in PISO mode

Reading field p

Reading field U

Reading/calculating face flux field phi

Selecting incompressible transport model Newtonian
Selecting turbulence model type LES
Selecting LES turbulence model dynamicKEqn
Selecting LES delta type cubeRootVol
bounding k, min: 0 max: 2.581693 average: 0.064223725
dynamicKEqnCoeffs
{
    filter          simple;
    Ce              1.048;
}

No MRF models present

No finite volume options present


Starting time loop

Courant Number mean: 0.019271517 max: 0.14347296
deltaT = 1.2e-05
Time = 1.2e-05

PIMPLE: iteration 1
DILUPBiCG:  Solving for Ux, Initial residual = 0.00063933574, Final residual = 1.0133512e-07, No Iterations 1
DILUPBiCG:  Solving for Uy, Initial residual = 0.0039839205, Final residual = 6.2724362e-07, No Iterations 1
DILUPBiCG:  Solving for Uz, Initial residual = 0.0039929386, Final residual = 6.3206595e-07, No Iterations 1
DICPCG:  Solving for p, Initial residual = 0.95020722, Final residual = 0.0083812333, No Iterations 8
time step continuity errors : sum local = 3.4815182e-07, global = 7.948159e-10, cumulative = 7.948159e-10
DICPCG:  Solving for p, Initial residual = 0.0060926763, Final residual = 2.0447134e-06, No Iterations 1001
time step continuity errors : sum local = 1.5374615e-10, global = 6.3334315e-13, cumulative = 7.9544925e-10
DILUPBiCG:  Solving for k, Initial residual = 0.0019602937, Final residual = 3.4806129e-07, No Iterations 1
bounding k, min: -0.0035616505 max: 2.6040647 average: 0.064221013
[2] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" to thread 
[2] OFstreamCollator : Started write thread 
[2] OFstreamCollator : Writing 11562830 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" using comm 1
[0] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" to thread 
[1] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" to thread 
[0] OFstreamCollator : Started write thread 
[0] OFstreamCollator : Writing 12268085 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" using comm 1
[1] OFstreamCollator : Started write thread 
[1] OFstreamCollator : Writing 12544682 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" using comm 1
[3] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" to thread 
[3] OFstreamCollator : Started write thread 
[3] OFstreamCollator : Writing 13028027 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" using comm 1
[1] OFstreamCollator : Finished writing 12544682 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" using comm 1
[1] OFstreamCollator : Exiting write thread 
[2] OFstreamCollator : Finished writing 11562830 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" using comm 1
[2] OFstreamCollator : Exiting write thread 
[3] OFstreamCollator : Finished writing 13028027 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" using comm 1
[3] OFstreamCollator : Exiting write thread 
[0] OFstreamCollator : Finished writing 12268085 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" using comm 1
[0] OFstreamCollator : Exiting write thread 
[3] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" to thread 
[3] OFstreamCollator : Started write thread 
[3] OFstreamCollator : Writing 43239935 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" using comm 1
[1] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" to thread 
[0] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" to thread 
[1] OFstreamCollator : Started write thread 
[1] OFstreamCollator : Writing 43544183 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" using comm 1
[0] OFstreamCollator : Started write thread 
[0] OFstreamCollator : Writing 43622996 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" using comm 1
[2] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" to thread 
[2] OFstreamCollator : Started write thread 
[2] OFstreamCollator : Writing 43475086 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" using comm 1
[1] OFstreamCollator : Finished writing 43544183 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" using comm 1
[1] OFstreamCollator : Exiting write thread 
[2] OFstreamCollator : Finished writing 43475086 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" using comm 1
[2] OFstreamCollator : Exiting write thread 
[3] OFstreamCollator : Finished writing 43239935 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" using comm 1
[3] OFstreamCollator : Exiting write thread 
[0] OFstreamCollator : Finished writing 43622996 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" using comm 1
[0] OFstreamCollator : Exiting write thread 
[3] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/k" to thread 
[2] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/k" to thread 
[3] OFstreamCollator : Started write thread 
[3] OFstreamCollator : Writing 14176288 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/k" using comm 1
[2] OFstreamCollator : Started write thread 
[2] OFstreamCollator : Writing 14726717 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/k" using comm 1
[1] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/k" to thread 
[1] OFstreamCollator : Started write thread 
[1] OFstreamCollator : Writing 14614333 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/k" using comm 1
[0] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/k" to thread 
[0] OFstreamCollator : Started write thread 
[0] OFstreamCollator : Writing 14843280 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/k" using comm 1
[1] OFstreamCollator : Finished writing 14614333 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/k" using comm 1
[1] OFstreamCollator : Exiting write thread 
[2] OFstreamCollator : Finished writing 14726717 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/k" using comm 1
[2] OFstreamCollator : Exiting write thread 
[3] OFstreamCollator : Finished writing 14176288 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/k" using comm 1
[3] OFstreamCollator : Exiting write thread 
[0] OFstreamCollator : Finished writing 14843280 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/k" using comm 1
[0] OFstreamCollator : Exiting write thread 
[2] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi" to thread 
[2] OFstreamCollator : Started write thread 
[2] OFstreamCollator : Writing 51180546 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi" using comm 1
[1] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi" to thread 
[1] OFstreamCollator : Started write thread 
[1] OFstreamCollator : Writing 52081538 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi" using comm 1
[3] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi" to thread 
[3] OFstreamCollator : Started write thread 
[3] OFstreamCollator : Writing 51714076 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi" using comm 1
[2] #0  Foam::error::printStack(Foam::Ostream&)[0] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi" to thread 
--------------------------------------------------------------------------
A process has executed an operation involving a call to the
"fork()" system call to create a child process.  Open MPI is currently
operating in a condition that could result in memory corruption or
other system errors; your job may hang, crash, or produce silent
data corruption.  The use of fork() (or system() or other calls that
create child processes) is strongly discouraged.

The process that invoked fork was:

  Local host:          [[4098,1],2] (PID 13622)

If you are *absolutely sure* that your application will successfully
and correctly survive a call to fork(), you may disable this warning
by setting the mpi_warn_on_fork MCA parameter to 0.
--------------------------------------------------------------------------
[0] OFstreamCollator : Started write thread 
[0] OFstreamCollator : Writing 52132478 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi" using comm 1
 at ??:?
[2] #1  Foam::sigSegv::sigHandler(int) at ??:?
[2] #2  ? in "/usr/lib64/libc.so.6"
[2] #3  ? at pml_cm.c:?
[2] #4  ompi_mtl_psm_progress at ??:?
[2] #5  opal_progress in "/cluster/mpi/gcc/openmpi/2.1.1/lib/libopen-pal.so.20"
[2] #6  sync_wait_mt in "/cluster/mpi/gcc/openmpi/2.1.1/lib/libopen-pal.so.20"
[2] #7  ? at pml_cm.c:?
[2] #8  MPI_Recv in "/cluster/mpi/gcc/openmpi/2.1.1/lib/libmpi.so.20"
[2] #9  Foam::UIPstream::read(Foam::UPstream::commsTypes, int, char*, long, int, int) at ??:?
[2] #10  void Foam::Pstream::gatherList<int>(Foam::List<Foam::UPstream::commsStruct> const&, Foam::List<int>&, int, int) at ??:?
[2] #11  Foam::decomposedBlockData::writeBlocks(int, Foam::autoPtr<Foam::OSstream>&, Foam::List<long>&, Foam::UList<char> const&, Foam::UPstream::commsTypes, bool) at ??:?
[2] #12  Foam::OFstreamCollator::writeFile(int, Foam::word const&, Foam::fileName const&, Foam::string const&, Foam::IOstream::streamFormat, Foam::IOstream::versionNumber, Foam::IOstream::compressionType, bool) at ??:?
[2] #13  Foam::OFstreamCollator::writeAll(void*) at ??:?
[2] #14  ? in "/usr/lib64/libpthread.so.0"
[2] #15  clone in "/usr/lib64/libc.so.6"
[node144:13622] *** Process received signal ***
[node144:13622] Signal: Segmentation fault (11)
[node144:13622] Associated errno: Unknown error 11085 (11085)
[node144:13622] Signal code:  (0)
[node144:13622] Failing at address: (nil)
[node144:13622] [ 0] /usr/lib64/libc.so.6(+0x35250)[0x2b4d35fc5250]
[node144:13622] [ 1] /usr/lib64/libc.so.6(gsignal+0x37)[0x2b4d35fc51d7]
[node144:13622] [ 2] /usr/lib64/libc.so.6(+0x35250)[0x2b4d35fc5250]
[node144:13622] [ 3] /cluster/mpi/gcc/openmpi/2.1.1/lib/openmpi/mca_pml_cm.so(+0x2405)[0x2b4d490b1405]
[node144:13622] [ 4] /cluster/mpi/gcc/openmpi/2.1.1/lib/openmpi/mca_mtl_psm.so(ompi_mtl_psm_progress+0x75)[0x2b4d496e9d05]
[node144:13622] [ 5] /cluster/mpi/gcc/openmpi/2.1.1/lib/libopen-pal.so.20(opal_progress+0x5c)[0x2b4d3a0e2c8c]
[node144:13622] [ 6] /cluster/mpi/gcc/openmpi/2.1.1/lib/libopen-pal.so.20(sync_wait_mt+0xc5)[0x2b4d3a0e7e35]
[node144:13622] [ 7] /cluster/mpi/gcc/openmpi/2.1.1/lib/openmpi/mca_pml_cm.so(+0x275c)[0x2b4d490b175c]
[node144:13622] [ 8] /cluster/mpi/gcc/openmpi/2.1.1/lib/libmpi.so.20(MPI_Recv+0x175)[0x2b4d3882ff85]
[node144:13622] [ 9] /cluster/engineering/OpenFOAM/OpenFOAM-5.x/platforms/linux64GccDPInt32Opt/lib/openmpi-system/libPstream.so(_ZN4Foam9UIPstream4readENS_8UPstream10commsTypesEiPclii+0x1b5)[0x2b4d363557c5]
[node144:13622] [10] /cluster/engineering/OpenFOAM/OpenFOAM-5.x/platforms/linux64GccDPInt32Opt/lib/libfiniteVolume.so(_ZN4Foam7Pstream10gatherListIiEEvRKNS_4ListINS_8UPstream11commsStructEEERNS2_IT_EEii+0xee)[0x2b4d3227964e]
[node144:13622] [11] /cluster/engineering/OpenFOAM/OpenFOAM-5.x/platforms/linux64GccDPInt32Opt/lib/libOpenFOAM.so(_ZN4Foam19decomposedBlockData11writeBlocksEiRNS_7autoPtrINS_8OSstreamEEERNS_4ListIlEERKNS_5UListIcEENS_8UPstream10commsTypesEb+0x1f9)[0x2b4d34d658e9]
[node144:13622] [12] /cluster/engineering/OpenFOAM/OpenFOAM-5.x/platforms/linux64GccDPInt32Opt/lib/libOpenFOAM.so(_ZN4Foam16OFstreamCollator9writeFileEiRKNS_4wordERKNS_8fileNameERKNS_6stringENS_8IOstream12streamFormatENSA_13versionNumberENSA_15compressionTypeEb+0xb7)[0x2b4d34c9f577]
[node144:13622] [13] /cluster/engineering/OpenFOAM/OpenFOAM-5.x/platforms/linux64GccDPInt32Opt/lib/libOpenFOAM.so(_ZN4Foam16OFstreamCollator8writeAllEPv+0xbb)[0x2b4d34c9f93b]
[node144:13622] [14] /usr/lib64/libpthread.so.0(+0x7dc5)[0x2b4d38acbdc5]
[node144:13622] [15] /usr/lib64/libc.so.6(clone+0x6d)[0x2b4d3608776d]
[node144:13622] *** End of error message ***
--------------------------------------------------------------------------
mpirun noticed that process rank 2 with PID 13622 on node node144 exited on signal 11 (Segmentation fault).
--------------------------------------------------------------------------
log-2 (14,423 bytes)   

MattijsJ

2017-08-28 20:00

reporter   ~0008640

Could you add the debug flag for decomposedBlockData?

I checked the OFstreamCollator and the calls are all synchronised. Two possibilities:
- decomposedBlockData::writeHeader (gets called by master only) triggers parallel communications
- or decomposedBlockData does not use the communicator consistently so the gatherList fails.

Q: the failure mode in log-2 is from gatherList (MPI_Recv), the failure mode in the original log seems to be in the allToAll from PstreamBuffers. Any idea why?

matthias

2017-08-28 20:18

reporter   ~0008641

Sorry but I have no idea why.

I have set the decomposedBlockData flag. There are lines

decomposedBlockData::writeBlocks: stream:"invalid" data:11562830 commsType:scheduled

What does "invalid" mean in this context?
log-3 (15,232 bytes)   
/*---------------------------------------------------------------------------*\
| =========                 |                                                 |
| \\      /  F ield         | OpenFOAM: The Open Source CFD Toolbox           |
|  \\    /   O peration     | Version:  5.x                                   |
|   \\  /    A nd           | Web:      www.OpenFOAM.org                      |
|    \\/     M anipulation  |                                                 |
\*---------------------------------------------------------------------------*/
Build  : 5.x-3fe7aa77e620
Exec   : pimpleFoam -parallel
Date   : Aug 28 2017
Time   : 21:10:50
Host   : "node070.service"
PID    : 8428
I/O    : uncollated
Case   : /beegfs/testcase/IOTestCase
nProcs : 4
Slaves : 
3
(
"node070.service.8429"
"node070.service.8430"
"node070.service.8431"
)

Pstream initialized with:
    floatTransfer      : 0
    nProcsSimpleSum    : 0
    commsType          : nonBlocking
    polling iterations : 0
sigFpe : Enabling floating point exception trapping (FOAM_SIGFPE).
fileModificationChecking : Monitoring run-time modified files using timeStampMaster (fileModificationSkew 10)
allowSystemOperations : Allowing user-supplied system call operations

// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
Create time

Overriding DebugSwitches according to controlDict
    decomposedBlockData 1;

    OFstreamCollator 1;

Overriding OptimisationSwitches according to controlDict
    maxThreadFileBufferSize 2e+09;

    maxMasterFileBufferSize 2e+09;

Overriding fileHandler to collated
I/O    : collated (maxThreadFileBufferSize 2e+09)
         Threading activated since maxThreadFileBufferSize > 0.
         Requires thread support enabled in MPI, otherwise the simulation
         may "hang".  If thread support cannot be enabled, deactivate threading
         by setting maxThreadFileBufferSize to 0 in $FOAM_ETC/controlDict
Create mesh for time = 0

[0] decomposedBlockData::readMasterHeader: stream:"/beegfs/testcase/IOTestCase/processors/constant/polyMesh/points"
[1] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[2] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[3] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[0] decomposedBlockData::readBlocks: stream:"/beegfs/testcase/IOTestCase/processors/constant/polyMesh/points" commsType:nonBlocking
[0] decomposedBlockData::readMasterHeader: stream:"/beegfs/testcase/IOTestCase/processors/constant/polyMesh/faces"
[3] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[1] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[2] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[0] decomposedBlockData::readBlocks: stream:"/beegfs/testcase/IOTestCase/processors/constant/polyMesh/faces" commsType:nonBlocking
[0] decomposedBlockData::readMasterHeader: stream:"/beegfs/testcase/IOTestCase/processors/constant/polyMesh/owner"
[2] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[3] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[1] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[0] decomposedBlockData::readBlocks: stream:"/beegfs/testcase/IOTestCase/processors/constant/polyMesh/owner" commsType:nonBlocking
[0] decomposedBlockData::readMasterHeader: stream:"/beegfs/testcase/IOTestCase/processors/constant/polyMesh/neighbour"
[1] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[2] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[3] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[0] decomposedBlockData::readBlocks: stream:"/beegfs/testcase/IOTestCase/processors/constant/polyMesh/neighbour" commsType:nonBlocking
[0] decomposedBlockData::readMasterHeader: stream:"/beegfs/testcase/IOTestCase/processors/constant/polyMesh/boundary"
[3] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[1] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[2] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[0] decomposedBlockData::readBlocks: stream:"/beegfs/testcase/IOTestCase/processors/constant/polyMesh/boundary" commsType:nonBlocking

PIMPLE: Operating solver in PISO mode

Reading field p

[2] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[3] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[1] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[0] decomposedBlockData::readBlocks: stream:"/beegfs/testcase/IOTestCase/processors/0/p" commsType:nonBlocking
Reading field U

[1] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[2] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[3] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[0] decomposedBlockData::readBlocks: stream:"/beegfs/testcase/IOTestCase/processors/0/U" commsType:nonBlocking
Reading/calculating face flux field phi

[0] decomposedBlockData::readMasterHeader: stream:"/beegfs/testcase/IOTestCase/processors/0/phi"
[3] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[1] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[2] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[0] decomposedBlockData::readBlocks: stream:"/beegfs/testcase/IOTestCase/processors/0/phi" commsType:nonBlocking
Selecting incompressible transport model Newtonian
Selecting turbulence model type LES
Selecting LES turbulence model dynamicKEqn
Selecting LES delta type cubeRootVol
[2] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[3] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[1] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[0] decomposedBlockData::readBlocks: stream:"/beegfs/testcase/IOTestCase/processors/0/nut" commsType:nonBlocking
[1] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[2] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[3] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[0] decomposedBlockData::readBlocks: stream:"/beegfs/testcase/IOTestCase/processors/0/k" commsType:nonBlocking
bounding k, min: 0 max: 2.581693 average: 0.064223725
dynamicKEqnCoeffs
{
    filter          simple;
    Ce              1.048;
}

No MRF models present

No finite volume options present


Starting time loop

Courant Number mean: 0.019271517 max: 0.14347296
deltaT = 1.2e-05
Time = 1.2e-05

PIMPLE: iteration 1
DILUPBiCG:  Solving for Ux, Initial residual = 0.00063933574, Final residual = 1.0133512e-07, No Iterations 1
DILUPBiCG:  Solving for Uy, Initial residual = 0.0039839205, Final residual = 6.2724362e-07, No Iterations 1
DILUPBiCG:  Solving for Uz, Initial residual = 0.0039929386, Final residual = 6.3206595e-07, No Iterations 1
DICPCG:  Solving for p, Initial residual = 0.95020722, Final residual = 0.0083812333, No Iterations 8
time step continuity errors : sum local = 3.4815182e-07, global = 7.948159e-10, cumulative = 7.948159e-10
DICPCG:  Solving for p, Initial residual = 0.0060926763, Final residual = 2.0447134e-06, No Iterations 1001
time step continuity errors : sum local = 1.5374615e-10, global = 6.3334315e-13, cumulative = 7.9544925e-10
DILUPBiCG:  Solving for k, Initial residual = 0.0019602937, Final residual = 3.4806129e-07, No Iterations 1
bounding k, min: -0.0035616505 max: 2.6040647 average: 0.064221013
[2] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" to thread 
[2] OFstreamCollator : Started write thread 
[2] OFstreamCollator : Writing 11562830 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" using comm 1
[2] decomposedBlockData::writeBlocks: stream:"invalid" data:11562830 commsType:scheduled
[3] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" to thread 
[3] OFstreamCollator : Started write thread 
[3] OFstreamCollator : Writing 13028027 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" using comm 1
[3] decomposedBlockData::writeBlocks: stream:"invalid" data:13028027 commsType:scheduled
[0] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" to thread 
[0] OFstreamCollator : Started write thread 
[0] OFstreamCollator : Writing 12268085 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" using comm 1
[0] decomposedBlockData::writeBlocks: stream:"/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" data:12268085 commsType:scheduled
[1] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" to thread 
[1] OFstreamCollator : Started write thread 
[1] OFstreamCollator : Writing 12544682 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" using comm 1
[1] decomposedBlockData::writeBlocks: stream:"invalid" data:12544682 commsType:scheduled
[1] Finished master-only writing at:1.5039475e+09 s
[1] OFstreamCollator : Finished writing 12544682 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" using comm 1
[1] OFstreamCollator : Exiting write thread 
[2] Finished master-only writing at:1.5039475e+09 s
[2] OFstreamCollator : Finished writing 11562830 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" using comm 1
[2] OFstreamCollator : Exiting write thread 
[3] Finished master-only writing at:1.5039475e+09 s
[3] OFstreamCollator : Finished writing 13028027 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" using comm 1
[3] OFstreamCollator : Exiting write thread 
[0] Finished master-only writing at:1.5039475e+09 s
[0] OFstreamCollator : Finished writing 12268085 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" using comm 1
[0] OFstreamCollator : Exiting write thread 
[3] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" to thread 
[3] OFstreamCollator : Started write thread 
[3] OFstreamCollator : Writing 43239935 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" using comm 1
[3] decomposedBlockData::writeBlocks: stream:"invalid" data:43239935 commsType:scheduled
[0] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" to thread 
[0] OFstreamCollator : Started write thread 
[0] OFstreamCollator : Writing 43622996 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" using comm 1
[0] decomposedBlockData::writeBlocks: stream:"/beegfs/testcase/IOTestCase/processors/1.2e-05/U" data:43622996 commsType:scheduled
[1] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" to thread 
[1] OFstreamCollator : Started write thread 
[1] OFstreamCollator : Writing 43544183 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" using comm 1
[1] decomposedBlockData::writeBlocks: stream:"invalid" data:43544183 commsType:scheduled
[2] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" to thread 
[2] OFstreamCollator : Started write thread 
[2] OFstreamCollator : Writing 43475086 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" using comm 1
[2] decomposedBlockData::writeBlocks: stream:"invalid" data:43475086 commsType:scheduled
[0] #0  Foam::error::printStack(Foam::Ostream&)--------------------------------------------------------------------------
A process has executed an operation involving a call to the
"fork()" system call to create a child process.  Open MPI is currently
operating in a condition that could result in memory corruption or
other system errors; your job may hang, crash, or produce silent
data corruption.  The use of fork() (or system() or other calls that
create child processes) is strongly discouraged.

The process that invoked fork was:

  Local host:          [[33579,1],0] (PID 8428)

If you are *absolutely sure* that your application will successfully
and correctly survive a call to fork(), you may disable this warning
by setting the mpi_warn_on_fork MCA parameter to 0.
--------------------------------------------------------------------------
[node070:08428] *** Process received signal ***
[node070:08428] Signal: Segmentation fault (11)
[node070:08428] Signal code:  (1107740560)
[node070:08428] Failing at address: 0x2ae042302e32
[node070:08428] [ 0] /usr/lib64/libc.so.6(+0x35250)[0x2ae00c230250]
[node070:08428] [ 1] /usr/lib64/libpsm_infinipath.so.1(psmi_mpool_get+0xd)[0x2ae027badf5d]
[node070:08428] [ 2] /usr/lib64/libpsm_infinipath.so.1(psmi_mq_req_alloc+0x2c)[0x2ae027bc17cc]
[node070:08428] [ 3] /usr/lib64/libpsm_infinipath.so.1(psmi_mq_handle_envelope_unexpected+0x72)[0x2ae027bc1d52]
[node070:08428] [ 4] /usr/lib64/libpsm_infinipath.so.1(psmi_am_mq_handler+0x282)[0x2ae027ba3a42]
[node070:08428] [ 5] /usr/lib64/libpsm_infinipath.so.1(+0xaa52)[0x2ae027b99a52]
[node070:08428] [ 6] /usr/lib64/libpsm_infinipath.so.1(+0x13f4c)[0x2ae027ba2f4c]
[node070:08428] [ 7] /usr/lib64/libpsm_infinipath.so.1(psmi_poll_internal+0x67)[0x2ae027bc2f37]
[node070:08428] [ 8] /usr/lib64/libpsm_infinipath.so.1(psm_mq_ipeek+0x8a)[0x2ae027bc176a]
[node070:08428] [ 9] /cluster/mpi/gcc/openmpi/2.1.1/lib/openmpi/mca_mtl_psm.so(ompi_mtl_psm_progress+0x87)[0x2ae02798ad17]
[node070:08428] [10] /cluster/mpi/gcc/openmpi/2.1.1/lib/libopen-pal.so.20(opal_progress+0x5c)[0x2ae01034dc8c]
[node070:08428] [11] /cluster/mpi/gcc/openmpi/2.1.1/lib/libopen-pal.so.20(sync_wait_mt+0xc5)[0x2ae010352e35]
[node070:08428] [12] /cluster/mpi/gcc/openmpi/2.1.1/lib/openmpi/mca_pml_cm.so(+0x275c)[0x2ae02735275c]
[node070:08428] [13] /cluster/mpi/gcc/openmpi/2.1.1/lib/libmpi.so.20(MPI_Recv+0x175)[0x2ae00ea9af85]
[node070:08428] [14] /cluster/engineering/OpenFOAM/OpenFOAM-5.x/platforms/linux64GccDPInt32Opt/lib/openmpi-system/libPstream.so(_ZN4Foam9UIPstream4readENS_8UPstream10commsTypesEiPclii+0x1b5)[0x2ae00c5c07c5]
[node070:08428] [15] /cluster/engineering/OpenFOAM/OpenFOAM-5.x/platforms/linux64GccDPInt32Opt/lib/libOpenFOAM.so(_ZN4Foam19decomposedBlockData11writeBlocksEiRNS_7autoPtrINS_8OSstreamEEERNS_4ListIlEERKNS_5UListIcEENS_8UPstream10commsTypesEb+0x71f)[0x2ae00afd0e0f]
[node070:08428] [16] /cluster/engineering/OpenFOAM/OpenFOAM-5.x/platforms/linux64GccDPInt32Opt/lib/libOpenFOAM.so(_ZN4Foam16OFstreamCollator9writeFileEiRKNS_4wordERKNS_8fileNameERKNS_6stringENS_8IOstream12streamFormatENSA_13versionNumberENSA_15compressionTypeEb+0xb7)[0x2ae00af0a577]
[node070:08428] [17] /cluster/engineering/OpenFOAM/OpenFOAM-5.x/platforms/linux64GccDPInt32Opt/lib/libOpenFOAM.so(_ZN4Foam16OFstreamCollator8writeAllEPv+0xbb)[0x2ae00af0a93b]
[node070:08428] [18] /usr/lib64/libpthread.so.0(+0x7dc5)[0x2ae00ed36dc5]
[node070:08428] [19] /usr/lib64/libc.so.6(clone+0x6d)[0x2ae00c2f276d]
[node070:08428] *** End of error message ***
--------------------------------------------------------------------------
mpirun noticed that process rank 0 with PID 8428 on node node070 exited on signal 11 (Segmentation fault).
--------------------------------------------------------------------------
log-3 (15,232 bytes)   

matthias

2017-08-28 20:44

reporter   ~0008642

BTW, I did another run using the same settings as before.

see log-4
log-4 (40,681 bytes)   
/*---------------------------------------------------------------------------*\
| =========                 |                                                 |
| \\      /  F ield         | OpenFOAM: The Open Source CFD Toolbox           |
|  \\    /   O peration     | Version:  5.x                                   |
|   \\  /    A nd           | Web:      www.OpenFOAM.org                      |
|    \\/     M anipulation  |                                                 |
\*---------------------------------------------------------------------------*/
Build  : 5.x-3fe7aa77e620
Exec   : pimpleFoam -parallel
Date   : Aug 28 2017
Time   : 21:38:06
Host   : "node010.service"
PID    : 20606
I/O    : uncollated
Case   : /beegfs/testcase/IOTestCase
nProcs : 4
Slaves : 
3
(
"node010.service.20607"
"node010.service.20608"
"node010.service.20609"
)

Pstream initialized with:
    floatTransfer      : 0
    nProcsSimpleSum    : 0
    commsType          : nonBlocking
    polling iterations : 0
sigFpe : Enabling floating point exception trapping (FOAM_SIGFPE).
fileModificationChecking : Monitoring run-time modified files using timeStampMaster (fileModificationSkew 10)
allowSystemOperations : Allowing user-supplied system call operations

// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
Create time

Overriding DebugSwitches according to controlDict
    decomposedBlockData 1;

    OFstreamCollator 1;

Overriding OptimisationSwitches according to controlDict
    maxThreadFileBufferSize 2e+09;

    maxMasterFileBufferSize 2e+09;

Overriding fileHandler to collated
I/O    : collated (maxThreadFileBufferSize 2e+09)
         Threading activated since maxThreadFileBufferSize > 0.
         Requires thread support enabled in MPI, otherwise the simulation
         may "hang".  If thread support cannot be enabled, deactivate threading
         by setting maxThreadFileBufferSize to 0 in $FOAM_ETC/controlDict
Create mesh for time = 0

[0] decomposedBlockData::readMasterHeader: stream:"/beegfs/testcase/IOTestCase/processors/constant/polyMesh/points"
[1] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[2] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[3] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[0] decomposedBlockData::readBlocks: stream:"/beegfs/testcase/IOTestCase/processors/constant/polyMesh/points" commsType:nonBlocking
[0] decomposedBlockData::readMasterHeader: stream:"/beegfs/testcase/IOTestCase/processors/constant/polyMesh/faces"
[3] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[1] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[2] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[0] decomposedBlockData::readBlocks: stream:"/beegfs/testcase/IOTestCase/processors/constant/polyMesh/faces" commsType:nonBlocking
[0] decomposedBlockData::readMasterHeader: stream:"/beegfs/testcase/IOTestCase/processors/constant/polyMesh/owner"
[2] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[3] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[1] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[0] decomposedBlockData::readBlocks: stream:"/beegfs/testcase/IOTestCase/processors/constant/polyMesh/owner" commsType:nonBlocking
[0] decomposedBlockData::readMasterHeader: stream:"/beegfs/testcase/IOTestCase/processors/constant/polyMesh/neighbour"
[1] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[2] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[3] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[0] decomposedBlockData::readBlocks: stream:"/beegfs/testcase/IOTestCase/processors/constant/polyMesh/neighbour" commsType:nonBlocking
[0] decomposedBlockData::readMasterHeader: stream:"/beegfs/testcase/IOTestCase/processors/constant/polyMesh/boundary"
[3] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[1] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[2] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[0] decomposedBlockData::readBlocks: stream:"/beegfs/testcase/IOTestCase/processors/constant/polyMesh/boundary" commsType:nonBlocking

PIMPLE: Operating solver in PISO mode

Reading field p

[2] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[3] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[1] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[0] decomposedBlockData::readBlocks: stream:"/beegfs/testcase/IOTestCase/processors/0/p" commsType:nonBlocking
Reading field U

[1] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[2] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[3] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[0] decomposedBlockData::readBlocks: stream:"/beegfs/testcase/IOTestCase/processors/0/U" commsType:nonBlocking
Reading/calculating face flux field phi

[0] decomposedBlockData::readMasterHeader: stream:"/beegfs/testcase/IOTestCase/processors/0/phi"
[3] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[1] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[2] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[0] decomposedBlockData::readBlocks: stream:"/beegfs/testcase/IOTestCase/processors/0/phi" commsType:nonBlocking
Selecting incompressible transport model Newtonian
Selecting turbulence model type LES
Selecting LES turbulence model dynamicKEqn
Selecting LES delta type cubeRootVol
[2] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[3] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[1] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[0] decomposedBlockData::readBlocks: stream:"/beegfs/testcase/IOTestCase/processors/0/nut" commsType:nonBlocking
[1] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[2] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[3] decomposedBlockData::readBlocks: stream:"invalid" commsType:nonBlocking
[0] decomposedBlockData::readBlocks: stream:"/beegfs/testcase/IOTestCase/processors/0/k" commsType:nonBlocking
bounding k, min: 0 max: 2.581693 average: 0.064223725
dynamicKEqnCoeffs
{
    filter          simple;
    Ce              1.048;
}

No MRF models present

No finite volume options present


Starting time loop

Courant Number mean: 0.019271517 max: 0.14347296
deltaT = 1.2e-05
Time = 1.2e-05

PIMPLE: iteration 1
DILUPBiCG:  Solving for Ux, Initial residual = 0.00063933574, Final residual = 1.0133512e-07, No Iterations 1
DILUPBiCG:  Solving for Uy, Initial residual = 0.0039839205, Final residual = 6.2724362e-07, No Iterations 1
DILUPBiCG:  Solving for Uz, Initial residual = 0.0039929386, Final residual = 6.3206595e-07, No Iterations 1
DICPCG:  Solving for p, Initial residual = 0.95020722, Final residual = 0.0083812333, No Iterations 8
time step continuity errors : sum local = 3.4815182e-07, global = 7.948159e-10, cumulative = 7.948159e-10
DICPCG:  Solving for p, Initial residual = 0.0060926763, Final residual = 2.0447134e-06, No Iterations 1001
time step continuity errors : sum local = 1.5374615e-10, global = 6.3334315e-13, cumulative = 7.9544925e-10
DILUPBiCG:  Solving for k, Initial residual = 0.0019602937, Final residual = 3.4806129e-07, No Iterations 1
bounding k, min: -0.0035616505 max: 2.6040647 average: 0.064221013
[2] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" to thread 
[2] OFstreamCollator : Started write thread 
[2] OFstreamCollator : Writing 11562830 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" using comm 1
[2] decomposedBlockData::writeBlocks: stream:"invalid" data:11562830 commsType:scheduled
[3] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" to thread 
[3] OFstreamCollator : Started write thread 
[3] OFstreamCollator : Writing 13028027 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" using comm 1
[3] decomposedBlockData::writeBlocks: stream:"invalid" data:13028027 commsType:scheduled
[0] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" to thread 
[0] OFstreamCollator : Started write thread 
[0] OFstreamCollator : Writing 12268085 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" using comm 1
[0] decomposedBlockData::writeBlocks: stream:"/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" data:12268085 commsType:scheduled
[1] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" to thread 
[1] OFstreamCollator : Started write thread 
[1] OFstreamCollator : Writing 12544682 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" using comm 1
[1] decomposedBlockData::writeBlocks: stream:"invalid" data:12544682 commsType:scheduled
[1] Finished master-only writing at:1.5039492e+09 s
[1] OFstreamCollator : Finished writing 12544682 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" using comm 1
[1] OFstreamCollator : Exiting write thread 
[2] Finished master-only writing at:1.5039492e+09 s
[2] OFstreamCollator : Finished writing 11562830 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" using comm 1
[2] OFstreamCollator : Exiting write thread 
[3] Finished master-only writing at:1.5039492e+09 s
[3] OFstreamCollator : Finished writing 13028027 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" using comm 1
[3] OFstreamCollator : Exiting write thread 
[0] Finished master-only writing at:1.5039492e+09 s
[0] OFstreamCollator : Finished writing 12268085 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/nut" using comm 1
[0] OFstreamCollator : Exiting write thread 
[3] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" to thread 
[3] OFstreamCollator : Started write thread 
[3] OFstreamCollator : Writing 43239935 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" using comm 1
[3] decomposedBlockData::writeBlocks: stream:"invalid" data:43239935 commsType:scheduled
[1] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" to thread 
[0] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" to thread 
[1] OFstreamCollator : Started write thread 
[1] OFstreamCollator : Writing 43544183 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" using comm 1
[1] decomposedBlockData::writeBlocks: stream:"invalid" data:43544183 commsType:scheduled
[0] OFstreamCollator : Started write thread 
[0] OFstreamCollator : Writing 43622996 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" using comm 1
[0] decomposedBlockData::writeBlocks: stream:"/beegfs/testcase/IOTestCase/processors/1.2e-05/U" data:43622996 commsType:scheduled
[2] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" to thread 
[2] OFstreamCollator : Started write thread 
[2] OFstreamCollator : Writing 43475086 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" using comm 1
[2] decomposedBlockData::writeBlocks: stream:"invalid" data:43475086 commsType:scheduled
[1] Finished master-only writing at:1.5039492e+09 s
[1] OFstreamCollator : Finished writing 43544183 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" using comm 1
[1] OFstreamCollator : Exiting write thread 
[2] Finished master-only writing at:1.5039492e+09 s
[2] OFstreamCollator : Finished writing 43475086 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" using comm 1
[2] OFstreamCollator : Exiting write thread 
[3] Finished master-only writing at:1.5039492e+09 s
[3] OFstreamCollator : Finished writing 43239935 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" using comm 1
[3] OFstreamCollator : Exiting write thread 
[0] Finished master-only writing at:1.5039492e+09 s
[0] OFstreamCollator : Finished writing 43622996 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/U" using comm 1
[0] OFstreamCollator : Exiting write thread 
[3] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/k" to thread 
[3] OFstreamCollator : Started write thread 
[3] OFstreamCollator : Writing 14176288 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/k" using comm 1
[3] decomposedBlockData::writeBlocks: stream:"invalid" data:14176288 commsType:scheduled
[2] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/k" to thread 
[2] OFstreamCollator : Started write thread 
[2] OFstreamCollator : Writing 14726717 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/k" using comm 1
[2] decomposedBlockData::writeBlocks: stream:"invalid" data:14726717 commsType:scheduled
[1] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/k" to thread 
[0] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/k" to thread 
[1] OFstreamCollator : Started write thread 
[1] OFstreamCollator : Writing 14614333 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/k" using comm 1
[1] decomposedBlockData::writeBlocks: stream:"invalid" data:14614333 commsType:scheduled
[0] OFstreamCollator : Started write thread 
[0] OFstreamCollator : Writing 14843280 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/k" using comm 1
[0] decomposedBlockData::writeBlocks: stream:"/beegfs/testcase/IOTestCase/processors/1.2e-05/k" data:14843280 commsType:scheduled
[1] Finished master-only writing at:1.5039492e+09 s
[1] OFstreamCollator : Finished writing 14614333 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/k" using comm 1
[1] OFstreamCollator : Exiting write thread 
[2] Finished master-only writing at:1.5039492e+09 s
[2] OFstreamCollator : Finished writing 14726717 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/k" using comm 1
[2] OFstreamCollator : Exiting write thread 
[3] Finished master-only writing at:1.5039492e+09 s
[3] OFstreamCollator : Finished writing 14176288 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/k" using comm 1
[3] OFstreamCollator : Exiting write thread 
[0] Finished master-only writing at:1.5039492e+09 s
[0] OFstreamCollator : Finished writing 14843280 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/k" using comm 1
[0] OFstreamCollator : Exiting write thread 
[3] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi" to thread 
[3] OFstreamCollator : Started write thread 
[3] OFstreamCollator : Writing 51714076 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi" using comm 1
[3] decomposedBlockData::writeBlocks: stream:"invalid" data:51714076 commsType:scheduled
[2] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi" to thread 
[2] OFstreamCollator : Started write thread 
[2] OFstreamCollator : Writing 51180546 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi" using comm 1
[2] decomposedBlockData::writeBlocks: stream:"invalid" data:51180546 commsType:scheduled
[1] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi" to thread 
[1] OFstreamCollator : Started write thread 
[1] OFstreamCollator : Writing 52081538 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi" using comm 1
[1] decomposedBlockData::writeBlocks: stream:"invalid" data:52081538 commsType:scheduled
[0] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi" to thread 
[0] OFstreamCollator : Started write thread 
[0] OFstreamCollator : Writing 52132478 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi" using comm 1
[0] decomposedBlockData::writeBlocks: stream:"/beegfs/testcase/IOTestCase/processors/1.2e-05/phi" data:52132478 commsType:scheduled
[1] Finished master-only writing at:1.5039492e+09 s
[1] OFstreamCollator : Finished writing 52081538 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi" using comm 1
[1] OFstreamCollator : Exiting write thread 
[2] Finished master-only writing at:1.5039492e+09 s
[2] OFstreamCollator : Finished writing 51180546 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi" using comm 1
[2] OFstreamCollator : Exiting write thread 
[3] Finished master-only writing at:1.5039492e+09 s
[3] OFstreamCollator : Finished writing 51714076 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi" using comm 1
[3] OFstreamCollator : Exiting write thread 
[0] Finished master-only writing at:1.5039492e+09 s
[0] OFstreamCollator : Finished writing 52132478 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi" using comm 1
[0] OFstreamCollator : Exiting write thread 
[3] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi_0" to thread 
[3] OFstreamCollator : Started write thread 
[3] OFstreamCollator : Writing 51713795 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi_0" using comm 1
[3] decomposedBlockData::writeBlocks: stream:"invalid" data:51713795 commsType:scheduled
[2] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi_0" to thread 
[2] OFstreamCollator : Started write thread 
[2] OFstreamCollator : Writing 51181324 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi_0" using comm 1
[2] decomposedBlockData::writeBlocks: stream:"invalid" data:51181324 commsType:scheduled
[1] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi_0" to thread 
[1] OFstreamCollator : Started write thread 
[1] OFstreamCollator : Writing 52079688 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi_0" using comm 1
[1] decomposedBlockData::writeBlocks: stream:"invalid" data:52079688 commsType:scheduled
[0] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi_0" to thread 
[0] OFstreamCollator : Started write thread 
[0] OFstreamCollator : Writing 52132744 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi_0" using comm 1
[0] decomposedBlockData::writeBlocks: stream:"/beegfs/testcase/IOTestCase/processors/1.2e-05/phi_0" data:52132744 commsType:scheduled
[1] Finished master-only writing at:1.5039492e+09 s
[1] OFstreamCollator : Finished writing 52079688 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi_0" using comm 1
[1] OFstreamCollator : Exiting write thread 
[2] Finished master-only writing at:1.5039492e+09 s
[2] OFstreamCollator : Finished writing 51181324 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi_0" using comm 1
[2] OFstreamCollator : Exiting write thread 
[3] Finished master-only writing at:1.5039492e+09 s
[3] OFstreamCollator : Finished writing 51713795 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi_0" using comm 1
[3] OFstreamCollator : Exiting write thread 
[0] Finished master-only writing at:1.5039492e+09 s
[0] OFstreamCollator : Finished writing 52132744 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/phi_0" using comm 1
[0] OFstreamCollator : Exiting write thread 
[3] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/p" to thread 
[3] OFstreamCollator : Started write thread 
[3] OFstreamCollator : Writing 13430838 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/p" using comm 1
[3] decomposedBlockData::writeBlocks: stream:"invalid" data:13430838 commsType:scheduled
[2] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/p" to thread 
[2] OFstreamCollator : Started write thread 
[2] OFstreamCollator : Writing 13258420 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/p" using comm 1
[2] decomposedBlockData::writeBlocks: stream:"invalid" data:13258420 commsType:scheduled
[1] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/p" to thread 
[1] OFstreamCollator : Started write thread 
[1] OFstreamCollator : Writing 13993114 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/p" using comm 1
[1] decomposedBlockData::writeBlocks: stream:"invalid" data:13993114 commsType:scheduled
[0] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/1.2e-05/p" to thread 
[0] OFstreamCollator : Started write thread 
[0] OFstreamCollator : Writing 13970481 bytes to ExecutionTime = "/beegfs/testcase/IOTestCase/processors/1.2e-05/p" using comm 1
85.6 s  ClockTime = 85 s

[0] decomposedBlockData::writeBlocks: stream:"/beegfs/testcase/IOTestCase/processors/1.2e-05/p" data:13970481 commsType:scheduled
[1] Finished master-only writing at:1.5039492e+09 s
[1] OFstreamCollator : Finished writing 13993114 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/p" using comm 1
[1] OFstreamCollator : Exiting write thread 
Courant Number mean: 0.023126426 max: 0.17198731
deltaT = 1.44e-05
Time = 2.64e-05

PIMPLE: iteration 1
[2] Finished master-only writing at:1.5039492e+09 s
[2] OFstreamCollator : Finished writing 13258420 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/p" using comm 1
[2] OFstreamCollator : Exiting write thread 
[3] Finished master-only writing at:1.5039492e+09 s
[3] OFstreamCollator : Finished writing 13430838 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/p" using comm 1
[3] OFstreamCollator : Exiting write thread 
[0] Finished master-only writing at:1.5039492e+09 s
[0] OFstreamCollator : Finished writing 13970481 bytes to "/beegfs/testcase/IOTestCase/processors/1.2e-05/p" using comm 1
[0] OFstreamCollator : Exiting write thread 
DILUPBiCG:  Solving for Ux, Initial residual = 0.0012657738, Final residual = 1.5916653e-07, No Iterations 1
DILUPBiCG:  Solving for Uy, Initial residual = 0.0059614998, Final residual = 7.1450902e-07, No Iterations 1
DILUPBiCG:  Solving for Uz, Initial residual = 0.0059679377, Final residual = 7.4868288e-07, No Iterations 1
DICPCG:  Solving for p, Initial residual = 0.94012796, Final residual = 0.0080740382, No Iterations 9
time step continuity errors : sum local = 3.3602888e-07, global = -8.8072524e-10, cumulative = -8.5275992e-11
DICPCG:  Solving for p, Initial residual = 0.031670651, Final residual = 4.500214e-06, No Iterations 1001
time step continuity errors : sum local = 5.9423136e-11, global = 8.5360696e-14, cumulative = -8.5190631e-11
DILUPBiCG:  Solving for k, Initial residual = 0.0023563998, Final residual = 3.1006644e-07, No Iterations 1
bounding k, min: -0.0098655761 max: 2.6296338 average: 0.064220051
[2] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/2.64e-05/nut" to thread 
[2] OFstreamCollator : Started write thread 
[2] OFstreamCollator : Writing 11565375 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/nut" using comm 1
[2] decomposedBlockData::writeBlocks: stream:"invalid" data:11565375 commsType:scheduled
[3] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/2.64e-05/nut" to thread 
[3] OFstreamCollator : Started write thread 
[3] OFstreamCollator : Writing 13034909 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/nut" using comm 1
[3] decomposedBlockData::writeBlocks: stream:"invalid" data:13034909 commsType:scheduled
[0] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/2.64e-05/nut" to thread 
[0] OFstreamCollator : Started write thread 
[0] OFstreamCollator : Writing 12271859 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/nut" using comm 1
[0] decomposedBlockData::writeBlocks: stream:"/beegfs/testcase/IOTestCase/processors/2.64e-05/nut" data:12271859 commsType:scheduled
[1] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/2.64e-05/nut" to thread 
[1] OFstreamCollator : Started write thread 
[1] OFstreamCollator : Writing 12549808 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/nut" using comm 1
[1] decomposedBlockData::writeBlocks: stream:"invalid" data:12549808 commsType:scheduled
[1] Finished master-only writing at:1.5039492e+09 s
[1] OFstreamCollator : Finished writing 12549808 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/nut" using comm 1
[1] OFstreamCollator : Exiting write thread 
[2] Finished master-only writing at:1.5039492e+09 s
[2] OFstreamCollator : Finished writing 11565375 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/nut" using comm 1
[2] OFstreamCollator : Exiting write thread 
[3] Finished master-only writing at:1.5039492e+09 s
[3] OFstreamCollator : Finished writing 13034909 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/nut" using comm 1
[3] OFstreamCollator : Exiting write thread 
[0] Finished master-only writing at:1.5039492e+09 s
[0] OFstreamCollator : Finished writing 12271859 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/nut" using comm 1
[0] OFstreamCollator : Exiting write thread 
[3] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/2.64e-05/U_0" to thread 
[3] OFstreamCollator : Started write thread 
[3] OFstreamCollator : Writing 43239935 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/U_0" using comm 1
[3] decomposedBlockData::writeBlocks: stream:"invalid" data:43239935 commsType:scheduled
[1] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/2.64e-05/U_0" to thread 
[1] OFstreamCollator : Started write thread 
[1] OFstreamCollator : Writing 43544183 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/U_0" using comm 1
[1] decomposedBlockData::writeBlocks: stream:"invalid" data:43544183 commsType:scheduled
[0] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/2.64e-05/U_0" to thread 
[0] OFstreamCollator : Started write thread 
[0] OFstreamCollator : Writing 43622999 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/U_0" using comm 1
[2] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/2.64e-05/U_0" to thread 
[0] decomposedBlockData::writeBlocks: stream:"/beegfs/testcase/IOTestCase/processors/2.64e-05/U_0" data:43622999 commsType:scheduled
[2] OFstreamCollator : Started write thread 
[2] OFstreamCollator : Writing 43475086 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/U_0" using comm 1
[2] decomposedBlockData::writeBlocks: stream:"invalid" data:43475086 commsType:scheduled
[1] Finished master-only writing at:1.5039492e+09 s
[1] OFstreamCollator : Finished writing 43544183 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/U_0" using comm 1
[1] OFstreamCollator : Exiting write thread 
[2] Finished master-only writing at:1.5039492e+09 s
[2] OFstreamCollator : Finished writing 43475086 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/U_0" using comm 1
[2] OFstreamCollator : Exiting write thread 
[3] Finished master-only writing at:1.5039492e+09 s
[3] OFstreamCollator : Finished writing 43239935 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/U_0" using comm 1
[3] OFstreamCollator : Exiting write thread 
[0] Finished master-only writing at:1.5039492e+09 s
[0] OFstreamCollator : Finished writing 43622999 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/U_0" using comm 1
[0] OFstreamCollator : Exiting write thread 
[3] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/2.64e-05/U" to thread 
[3] OFstreamCollator : Started write thread 
[3] OFstreamCollator : Writing 43239582 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/U" using comm 1
[3] decomposedBlockData::writeBlocks: stream:"invalid" data:43239582 commsType:scheduled
[1] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/2.64e-05/U" to thread 
[2] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/2.64e-05/U" to thread 
[0] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/2.64e-05/U" to thread 
[1] OFstreamCollator : Started write thread 
[1] OFstreamCollator : Writing 43542971 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/U" using comm 1
[1] decomposedBlockData::writeBlocks: stream:"invalid" data:43542971 commsType:scheduled
[2] OFstreamCollator : Started write thread 
[2] OFstreamCollator : Writing 43476820 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/U" using comm 1
[2] decomposedBlockData::writeBlocks: stream:"invalid" data:43476820 commsType:scheduled
[0] OFstreamCollator : Started write thread 
[0] OFstreamCollator : Writing 43622923 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/U" using comm 1
[0] decomposedBlockData::writeBlocks: stream:"/beegfs/testcase/IOTestCase/processors/2.64e-05/U" data:43622923 commsType:scheduled
[1] Finished master-only writing at:1.5039492e+09 s
[1] OFstreamCollator : Finished writing 43542971 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/U" using comm 1
[1] OFstreamCollator : Exiting write thread 
[2] Finished master-only writing at:1.5039492e+09 s
[2] OFstreamCollator : Finished writing 43476820 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/U" using comm 1
[2] OFstreamCollator : Exiting write thread 
[3] Finished master-only writing at:1.5039492e+09 s
[3] OFstreamCollator : Finished writing 43239582 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/U" using comm 1
[3] OFstreamCollator : Exiting write thread 
[0] Finished master-only writing at:1.5039492e+09 s
[0] OFstreamCollator : Finished writing 43622923 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/U" using comm 1
[0] OFstreamCollator : Exiting write thread 
[3] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/2.64e-05/k" to thread 
[3] OFstreamCollator : Started write thread 
[3] OFstreamCollator : Writing 14175065 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/k" using comm 1
[3] decomposedBlockData::writeBlocks: stream:"invalid" data:14175065 commsType:scheduled
[2] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/2.64e-05/k" to thread 
[2] OFstreamCollator : Started write thread 
[2] OFstreamCollator : Writing 14726359 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/k" using comm 1
[2] decomposedBlockData::writeBlocks: stream:"invalid" data:14726359 commsType:scheduled
[0] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/2.64e-05/k" to thread 
[1] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/2.64e-05/k" to thread 
[0] OFstreamCollator : Started write thread 
[0] OFstreamCollator : Writing 14843197 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/k" using comm 1
[1] OFstreamCollator : Started write thread 
[1] OFstreamCollator : Writing 14614528 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/k" using comm 1
[1] decomposedBlockData::writeBlocks: stream:"invalid" data:14614528 commsType:scheduled
[0] decomposedBlockData::writeBlocks: stream:"/beegfs/testcase/IOTestCase/processors/2.64e-05/k" data:14843197 commsType:scheduled
[1] Finished master-only writing at:1.5039492e+09 s
[1] OFstreamCollator : Finished writing 14614528 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/k" using comm 1
[1] OFstreamCollator : Exiting write thread 
[2] Finished master-only writing at:1.5039492e+09 s
[2] OFstreamCollator : Finished writing 14726359 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/k" using comm 1
[2] OFstreamCollator : Exiting write thread 
[3] Finished master-only writing at:1.5039492e+09 s
[3] OFstreamCollator : Finished writing 14175065 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/k" using comm 1
[3] OFstreamCollator : Exiting write thread 
[0] Finished master-only writing at:1.5039492e+09 s
[0] OFstreamCollator : Finished writing 14843197 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/k" using comm 1
[0] OFstreamCollator : Exiting write thread 
[3] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/2.64e-05/phi" to thread 
[2] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/2.64e-05/phi" to thread 
[3] OFstreamCollator : Started write thread 
[3] OFstreamCollator : Writing 51713290 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/phi" using comm 1
[3] decomposedBlockData::writeBlocks: stream:"invalid" data:51713290 commsType:scheduled
[2] OFstreamCollator : Started write thread 
[2] OFstreamCollator : Writing 51180890 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/phi" using comm 1
[2] decomposedBlockData::writeBlocks: stream:"invalid" data:51180890 commsType:scheduled
[1] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/2.64e-05/phi" to thread 
[0] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/2.64e-05/phi" to thread 
[1] OFstreamCollator : Started write thread 
[1] OFstreamCollator : Writing 52080631 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/phi" using comm 1
[1] decomposedBlockData::writeBlocks: stream:"invalid" data:52080631 commsType:scheduled
[0] OFstreamCollator : Started write thread 
[0] OFstreamCollator : Writing 52133658 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/phi" using comm 1
[0] decomposedBlockData::writeBlocks: stream:"/beegfs/testcase/IOTestCase/processors/2.64e-05/phi" data:52133658 commsType:scheduled
[1] Finished master-only writing at:1.5039492e+09 s
[1] OFstreamCollator : Finished writing 52080631 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/phi" using comm 1
[1] OFstreamCollator : Exiting write thread 
[2] Finished master-only writing at:1.5039492e+09 s
[2] OFstreamCollator : Finished writing 51180890 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/phi" using comm 1
[2] OFstreamCollator : Exiting write thread 
[3] Finished master-only writing at:1.5039492e+09 s
[3] OFstreamCollator : Finished writing 51713290 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/phi" using comm 1
[3] OFstreamCollator : Exiting write thread 
[0] Finished master-only writing at:1.5039492e+09 s
[0] OFstreamCollator : Finished writing 52133658 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/phi" using comm 1
[0] OFstreamCollator : Exiting write thread 
[2] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/2.64e-05/phi_0" to thread 
[3] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/2.64e-05/phi_0" to thread 
[2] OFstreamCollator : Started write thread 
[2] OFstreamCollator : Writing 51180546 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/phi_0" using comm 1
[2] decomposedBlockData::writeBlocks: stream:"invalid" data:51180546 commsType:scheduled
[3] OFstreamCollator : Started write thread 
[3] OFstreamCollator : Writing 51714076 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/phi_0" using comm 1
[3] decomposedBlockData::writeBlocks: stream:"invalid" data:51714076 commsType:scheduled
[2] #0  Foam::error::printStack(Foam::Ostream&)[1] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/2.64e-05/phi_0" to thread 
--------------------------------------------------------------------------
A process has executed an operation involving a call to the
"fork()" system call to create a child process.  Open MPI is currently
operating in a condition that could result in memory corruption or
other system errors; your job may hang, crash, or produce silent
data corruption.  The use of fork() (or system() or other calls that
create child processes) is strongly discouraged.

The process that invoked fork was:

  Local host:          [[57503,1],2] (PID 20608)

If you are *absolutely sure* that your application will successfully
and correctly survive a call to fork(), you may disable this warning
by setting the mpi_warn_on_fork MCA parameter to 0.
--------------------------------------------------------------------------
[1] OFstreamCollator : Started write thread 
[1] OFstreamCollator : Writing 52081538 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/phi_0" using comm 1
[1] decomposedBlockData::writeBlocks: stream:"invalid" data:52081538 commsType:scheduled
[0] OFstreamCollator : relaying write of "/beegfs/testcase/IOTestCase/processors/2.64e-05/phi_0" to thread 
[0] OFstreamCollator : Started write thread 
[0] OFstreamCollator : Writing 52132481 bytes to "/beegfs/testcase/IOTestCase/processors/2.64e-05/phi_0" using comm 1
[0] decomposedBlockData::writeBlocks: stream:"/beegfs/testcase/IOTestCase/processors/2.64e-05/phi_0" data:52132481 commsType:scheduled
 at ??:?
[2] #1  Foam::sigSegv::sigHandler(int) at ??:?
[2] #2  ? in "/usr/lib64/libc.so.6"
[2] #3  ? at pml_cm.c:?
[2] #4  ompi_mtl_psm_progress at ??:?
[2] #5  opal_progress in "/cluster/mpi/gcc/openmpi/2.1.1/lib/libopen-pal.so.20"
[2] #6  sync_wait_mt in "/cluster/mpi/gcc/openmpi/2.1.1/lib/libopen-pal.so.20"
[2] #7  ? at pml_cm.c:?
[2] #8  MPI_Recv in "/cluster/mpi/gcc/openmpi/2.1.1/lib/libmpi.so.20"
[2] #9  Foam::UIPstream::read(Foam::UPstream::commsTypes, int, char*, long, int, int) at ??:?
[2] #10  void Foam::Pstream::gatherList<int>(Foam::List<Foam::UPstream::commsStruct> const&, Foam::List<int>&, int, int) at ??:?
[2] #11  Foam::decomposedBlockData::writeBlocks(int, Foam::autoPtr<Foam::OSstream>&, Foam::List<long>&, Foam::UList<char> const&, Foam::UPstream::commsTypes, bool) at ??:?
[2] #12  Foam::OFstreamCollator::writeFile(int, Foam::word const&, Foam::fileName const&, Foam::string const&, Foam::IOstream::streamFormat, Foam::IOstream::versionNumber, Foam::IOstream::compressionType, bool) at ??:?
[2] #13  Foam::OFstreamCollator::writeAll(void*) at ??:?
[2] #14  ? in "/usr/lib64/libpthread.so.0"
[2] #15  clone in "/usr/lib64/libc.so.6"
[node010:20608] *** Process received signal ***
[node010:20608] Signal: Segmentation fault (11)
[node010:20608] Associated errno: Unknown error 11166 (11166)
[node010:20608] Signal code:  (0)
[node010:20608] Failing at address: (nil)
[node010:20608] [ 0] /usr/lib64/libc.so.6(+0x35250)[0x2b9e1dfe5250]
[node010:20608] [ 1] /usr/lib64/libc.so.6(gsignal+0x37)[0x2b9e1dfe51d7]
[node010:20608] [ 2] /usr/lib64/libc.so.6(+0x35250)[0x2b9e1dfe5250]
[node010:20608] [ 3] /cluster/mpi/gcc/openmpi/2.1.1/lib/openmpi/mca_pml_cm.so(+0x2405)[0x2b9e31132405]
[node010:20608] [ 4] /cluster/mpi/gcc/openmpi/2.1.1/lib/openmpi/mca_mtl_psm.so(ompi_mtl_psm_progress+0x75)[0x2b9e3176ad05]
[node010:20608] [ 5] /cluster/mpi/gcc/openmpi/2.1.1/lib/libopen-pal.so.20(opal_progress+0x5c)[0x2b9e22102c8c]
[node010:20608] [ 6] /cluster/mpi/gcc/openmpi/2.1.1/lib/libopen-pal.so.20(sync_wait_mt+0xc5)[0x2b9e22107e35]
[node010:20608] [ 7] /cluster/mpi/gcc/openmpi/2.1.1/lib/openmpi/mca_pml_cm.so(+0x275c)[0x2b9e3113275c]
[node010:20608] [ 8] /cluster/mpi/gcc/openmpi/2.1.1/lib/libmpi.so.20(MPI_Recv+0x175)[0x2b9e2084ff85]
[node010:20608] [ 9] /cluster/engineering/OpenFOAM/OpenFOAM-5.x/platforms/linux64GccDPInt32Opt/lib/openmpi-system/libPstream.so(_ZN4Foam9UIPstream4readENS_8UPstream10commsTypesEiPclii+0x1b5)[0x2b9e1e3757c5]
[node010:20608] [10] /cluster/engineering/OpenFOAM/OpenFOAM-5.x/platforms/linux64GccDPInt32Opt/lib/libfiniteVolume.so(_ZN4Foam7Pstream10gatherListIiEEvRKNS_4ListINS_8UPstream11commsStructEEERNS2_IT_EEii+0xee)[0x2b9e1a29964e]
[node010:20608] [11] /cluster/engineering/OpenFOAM/OpenFOAM-5.x/platforms/linux64GccDPInt32Opt/lib/libOpenFOAM.so(_ZN4Foam19decomposedBlockData11writeBlocksEiRNS_7autoPtrINS_8OSstreamEEERNS_4ListIlEERKNS_5UListIcEENS_8UPstream10commsTypesEb+0x1f9)[0x2b9e1cd858e9]
[node010:20608] [12] /cluster/engineering/OpenFOAM/OpenFOAM-5.x/platforms/linux64GccDPInt32Opt/lib/libOpenFOAM.so(_ZN4Foam16OFstreamCollator9writeFileEiRKNS_4wordERKNS_8fileNameERKNS_6stringENS_8IOstream12streamFormatENSA_13versionNumberENSA_15compressionTypeEb+0xb7)[0x2b9e1ccbf577]
[node010:20608] [13] /cluster/engineering/OpenFOAM/OpenFOAM-5.x/platforms/linux64GccDPInt32Opt/lib/libOpenFOAM.so(_ZN4Foam16OFstreamCollator8writeAllEPv+0xbb)[0x2b9e1ccbf93b]
[node010:20608] [14] /usr/lib64/libpthread.so.0(+0x7dc5)[0x2b9e20aebdc5]
[node010:20608] [15] /usr/lib64/libc.so.6(clone+0x6d)[0x2b9e1e0a776d]
[node010:20608] *** End of error message ***
--------------------------------------------------------------------------
mpirun noticed that process rank 2 with PID 20608 on node node010 exited on signal 11 (Segmentation fault).
--------------------------------------------------------------------------
log-4 (40,681 bytes)   

MattijsJ

2017-08-29 20:30

reporter   ~0008648

Thanks.

- I split the file into 4 pieces and all processors are doing exactly the same thing.
- The processor2 seems to be the one that is failing but only at the second write of phi_0.
- I tried the pitzDaily tutorial on 4 processors with backwards and it runs fine, even under valgrind

Can you replicate the problem on a small mesh (e.g. above pitzDaily case) and post it?

MattijsJ

2017-08-29 20:55

reporter   ~0008649

And try a debug build (export WM_COMPILE_OPTION=Debug in etc/bashrc) and also for openmpi (configure --enable-debug). There seems to be a memory access violation.

MattijsJ

2017-09-01 12:57

reporter  

OFstreamCollator.H (4,710 bytes)   
/*---------------------------------------------------------------------------*\
  =========                 |
  \\      /  F ield         | OpenFOAM: The Open Source CFD Toolbox
   \\    /   O peration     |
    \\  /    A nd           | Copyright (C) 2017 OpenFOAM Foundation
     \\/     M anipulation  |
-------------------------------------------------------------------------------
License
    This file is part of OpenFOAM.

    OpenFOAM is free software: you can redistribute it and/or modify it
    under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    for more details.

    You should have received a copy of the GNU General Public License
    along with OpenFOAM.  If not, see <http://www.gnu.org/licenses/>.

Class
    Foam::OFstreamCollator

Description
    Threaded file writer.

SourceFiles
    OFstreamCollator.C

\*---------------------------------------------------------------------------*/

#ifndef OFstreamCollator_H
#define OFstreamCollator_H

#include "IOstream.H"
#include "List.H"
#include "FIFOStack.H"

// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //

namespace Foam
{

/*---------------------------------------------------------------------------*\
                        Class OFstreamCollator Declaration
\*---------------------------------------------------------------------------*/

class OFstreamCollator
{
    // Private class

        class writeData
        {
        public:

            const word typeName_;
            const fileName pathName_;
            const string data_;
            const IOstream::streamFormat format_;
            const IOstream::versionNumber version_;
            const IOstream::compressionType compression_;
            const bool append_;

            writeData
            (
                const word& typeName,
                const fileName& pathName,
                const string& data,
                IOstream::streamFormat format,
                IOstream::versionNumber version,
                IOstream::compressionType compression,
                const bool append
            )
            :
                typeName_(typeName),
                pathName_(pathName),
                data_(data),
                format_(format),
                version_(version),
                compression_(compression),
                append_(append)
            {}
        };


    // Private data

        const off_t maxBufferSize_;

        //pthread_mutex_t mutex_;
        label mutex_;

        //pthread_t thread_;
        label thread_;

        FIFOStack<writeData*> objects_;

        bool threadRunning_;

        //- Communicator to use for all parallel ops
        label comm_;


    // Private Member Functions

        //- Write actual file
        static bool writeFile
        (
            const label comm,
            const word& typeName,
            const fileName& fName,
            const string& data,
            IOstream::streamFormat fmt,
            IOstream::versionNumber ver,
            IOstream::compressionType cmp,
            const bool append
        );

        //- Write all files in stack. Return true (normal exit, all stack
        //  written) or false (signal to stop)
        static bool writeAll(OFstreamCollator&);

        //- Write all files in stack
        static void* writeAll(void *threadarg);


public:

    // Declare name of the class and its debug switch
    TypeName("OFstreamCollator");


    // Constructors

        //- Construct from buffer size. 0 = do not use thread
        OFstreamCollator(const off_t maxBufferSize);


    //- Destructor
    virtual ~OFstreamCollator();


    // Member functions

        //- Write file with contents. Blocks until writethread has space
        //  available (total file sizes < maxBufferSize)
        bool write
        (
            const word& typeName,
            const fileName&,
            const string& data,
            IOstream::streamFormat,
            IOstream::versionNumber,
            IOstream::compressionType,
            const bool append
        );
};


// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //

} // End namespace Foam

// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //

#endif

// ************************************************************************* //
OFstreamCollator.H (4,710 bytes)   

MattijsJ

2017-09-01 12:57

reporter  

OFstreamCollator.C (9,556 bytes)   
/*---------------------------------------------------------------------------*\
  =========                 |
  \\      /  F ield         | OpenFOAM: The Open Source CFD Toolbox
   \\    /   O peration     |
    \\  /    A nd           | Copyright (C) 2017 OpenFOAM Foundation
     \\/     M anipulation  |
-------------------------------------------------------------------------------
License
    This file is part of OpenFOAM.

    OpenFOAM is free software: you can redistribute it and/or modify it
    under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    for more details.

    You should have received a copy of the GNU General Public License
    along with OpenFOAM.  If not, see <http://www.gnu.org/licenses/>.

\*---------------------------------------------------------------------------*/

#include "OFstreamCollator.H"
#include "OFstream.H"
#include "OSspecific.H"
#include "IOstreams.H"
#include "Pstream.H"
#include "decomposedBlockData.H"
#include "PstreamReduceOps.H"

// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //

namespace Foam
{
    defineTypeNameAndDebug(OFstreamCollator, 0);
}


// * * * * * * * * * * * * * Private Member Functions  * * * * * * * * * * * //

bool Foam::OFstreamCollator::writeFile
(
    const label comm,
    const word& typeName,
    const fileName& fName,
    const string& s,
    IOstream::streamFormat fmt,
    IOstream::versionNumber ver,
    IOstream::compressionType cmp,
    const bool append
)
{
    if (debug)
    {
        Pout<< "OFstreamCollator : Writing " << s.size()
            << " bytes to " << fName
            << " using comm " << comm << endl;
    }

    //label oldWarn = UPstream::warnComm;
    //UPstream::warnComm = comm;

    autoPtr<OSstream> osPtr;
    if (UPstream::master(comm))
    {
        Foam::mkDir(fName.path());
        osPtr.reset
        (
            new OFstream
            (
                fName,
                fmt,
                ver,
                cmp,
                append
            )
        );

        // We don't have IOobject so cannot use IOobject::writeHeader
        OSstream& os = osPtr();
        decomposedBlockData::writeHeader
        (
            os,
            ver,
            fmt,
            typeName,
            "",
            fName,
            fName.name()
        );
    }

    UList<char> slice(const_cast<char*>(s.data()), label(s.size()));

    // Assuming threaded writing hides any slowness so we might
    // as well use scheduled communication to send the data to
    // the master processor in order.

    List<std::streamoff> start;
    decomposedBlockData::writeBlocks
    (
        comm,
        osPtr,
        start,
        slice,
        UPstream::commsTypes::scheduled,
        false       // do not reduce return state
    );

    if (osPtr.valid() && !osPtr().good())
    {
        FatalIOErrorInFunction(osPtr())
            << "Failed writing to " << fName << exit(FatalIOError);
    }

    if (debug)
    {
        Pout<< "OFstreamCollator : Finished writing " << s.size()
            << " bytes to " << fName
            << " using comm " << comm << endl;
    }

    //UPstream::warnComm = oldWarn;

    return true;
}


bool Foam::OFstreamCollator::writeAll(OFstreamCollator& handler)
{
    // Consume stack. Return true if normal exit (stack empty), false
    // if forcing exit.

    bool ok = true;

    while (true)
    {
        writeData* ptr = nullptr;

        lockMutex(handler.mutex_);
        if (handler.objects_.size())
        {
            ptr = handler.objects_.pop();
        }
        unlockMutex(handler.mutex_);

        if (!ptr)
        {
            break;
        }
        else if (ptr->pathName_.empty())
        {
            // An empty fileName is the signal to exit the thread
            if (debug)
            {
                Pout<< "OFstreamCollator : Detected signalling data" << endl;
            }

            ok = false;
            break;
        }
        else
        {
            bool ok = writeFile
            (
                handler.comm_,
                ptr->typeName_,
                ptr->pathName_,
                ptr->data_,
                ptr->format_,
                ptr->version_,
                ptr->compression_,
                ptr->append_
            );
            if (!ok)
            {
                FatalIOErrorInFunction(ptr->pathName_)
                    << "Failed writing " << ptr->pathName_
                    << exit(FatalIOError);
            }

            delete ptr;
        }
    }

    return ok;
}


void* Foam::OFstreamCollator::writeAll(void *threadarg)
{
    OFstreamCollator& handler = *static_cast<OFstreamCollator*>(threadarg);

bool keepAlive = true;

    if (keepAlive)
    {
        while (true)
        {
            bool ok = writeAll(handler);

            if (!ok)
            {
                break;
            }

            sleep(1);
        }

    }
    else
    {
        writeAll(handler);
    }


    if (debug)
    {
        Pout<< "OFstreamCollator : Exiting write thread " << endl;
    }

    lockMutex(handler.mutex_);
    handler.threadRunning_ = false;
    unlockMutex(handler.mutex_);

    return nullptr;
}


// * * * * * * * * * * * * * * * * Constructors  * * * * * * * * * * * * * * //

Foam::OFstreamCollator::OFstreamCollator(const off_t maxBufferSize)
:
    maxBufferSize_(maxBufferSize),
    mutex_
    (
        maxBufferSize_ > 0
      ? allocateMutex()
      : -1
    ),
    thread_
    (
        maxBufferSize_ > 0
      ? allocateThread()
      : -1
    ),
    threadRunning_(false),
    comm_
    (
        UPstream::allocateCommunicator
        (
            UPstream::worldComm,
            identity(UPstream::nProcs(UPstream::worldComm))
        )
    )
{}


// * * * * * * * * * * * * * * * * Destructor  * * * * * * * * * * * * * * * //

Foam::OFstreamCollator::~OFstreamCollator()
{
    if (threadRunning_)
    {
bool keepAlive = true;
        if (keepAlive)
        {
            if (debug)
            {
                Pout<< "~OFstreamCollator : Push signalling data" << endl;
            }

            // Push special data onto write stack that signals the thread
            // to stop

            lockMutex(mutex_);
            objects_.push
            (
                new writeData
                (
                    typeName,
                    fileName::null,
                    string::null,
                    IOstream::BINARY,
                    IOstream::currentVersion,
                    IOstream::UNCOMPRESSED,
                    false
                )
            );
            unlockMutex(mutex_);
        }

        if (debug)
        {
            Pout<< "~OFstreamCollator : Waiting for write thread "
                << thread_ << endl;
        }

        joinThread(thread_);
    }
    if (thread_ != -1)
    {
        freeThread(thread_);
    }
    if (mutex_ != -1)
    {
        freeMutex(mutex_);
    }
    if (comm_ != -1)
    {
        UPstream::freeCommunicator(comm_);
    }
}


// * * * * * * * * * * * * * * * Member Functions  * * * * * * * * * * * * * //

bool Foam::OFstreamCollator::write
(
    const word& typeName,
    const fileName& fName,
    const string& data,
    IOstream::streamFormat fmt,
    IOstream::versionNumber ver,
    IOstream::compressionType cmp,
    const bool append
)
{
    if (maxBufferSize_ > 0)
    {
        while (true)
        {
            // Count files to be written
            off_t totalSize = 0;
            lockMutex(mutex_);
            forAllConstIter(FIFOStack<writeData*>, objects_, iter)
            {
                totalSize += iter()->data_.size();
            }
            unlockMutex(mutex_);

            if
            (
                totalSize == 0
             || (totalSize+off_t(data.size()) < maxBufferSize_)
            )
            {
                break;
            }

            if (debug)
            {
                Pout<< "OFstreamCollator : Waiting for buffer space."
                    << " Currently in use:" << totalSize
                    << " limit:" << maxBufferSize_
                    << endl;
            }

            sleep(5);
        }

        if (debug)
        {
            Pout<< "OFstreamCollator : relaying write of " << fName
                << " to thread " << thread_ << endl;
        }

        // Push the data onto the buffer
        lockMutex(mutex_);
        objects_.push
        (
            new writeData(typeName, fName, data, fmt, ver, cmp, append)
        );
        unlockMutex(mutex_);

        // Start the thread if necessary
        lockMutex(mutex_);
        if (!threadRunning_)
        {
            createThread(thread_, writeAll, this);
            if (debug)
            {
                Pout<< "OFstreamCollator : Started write thread " << thread_
                    << endl;
            }
            threadRunning_ = true;
        }
        unlockMutex(mutex_);

        return true;
    }
    else
    {
        // Immediate writing
        return writeFile(comm_, typeName, fName, data, fmt, ver, cmp, append);
    }
}


// ************************************************************************* //
OFstreamCollator.C (9,556 bytes)   

MattijsJ

2017-09-01 13:01

reporter   ~0008652

1) is anyone else seeing these problems with the collated file format?

2) I've uploaded a hacked OFstreamCollator (src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator) which keeps the write thread alive. Does this help? You might very occasionally see backtraces from UIPstream::read. These can safely be ignored.

matthias

2017-09-01 13:08

reporter   ~0008653

Had no time to do it in last two days. I will make the tests this weekend

matthias

2017-09-01 16:51

reporter   ~0008656

I ran the pitzDaily case from tutorial using 4 processors and switching to new file format. Case settings are not modified.

Debug flags activated.

Same behaviour.
log.pitzdaily (787,027 bytes)

matthias

2017-09-01 17:02

reporter   ~0008657

Second pitzDaily run using your new OFstreamCollator files


Same behaviour.
log.pitzdaily2 (769,409 bytes)

MattijsJ

2017-09-01 17:34

reporter   ~0008658

Switch off runtimeModifiable.

matthias

2017-09-01 18:01

reporter   ~0008660

no effect

matthias

2017-09-01 18:11

reporter   ~0008662

Did a clean git checkout and built debug version of OpenFOAM and OpenMPI.

At moment I see no errors, at least for the pitzDaily case. That's really confusing.

matthias

2017-09-01 18:29

reporter   ~0008663

I repeated the runs of the pitzDaily case six times and got the error again. This times I used the debug versions of OF and OpenMPI and the non-modified OFstreamCollator files. See log.pitzDaily4
log.pitzdaily3 (766,420 bytes)

matthias

2017-09-01 18:29

reporter   ~0008664

sorry log.pitzDaily3

MattijsJ

2017-09-08 11:22

reporter   ~0008729

I accidentally ran a case with openmpi without thread support enabled and saw similar messages you are getting. The problems went away once I reconfigured openmpi.

- you are running infiniband? Are you confident the infiniband drivers support multi-threading?
- does it run fine with ethernet instead?
- openmpi still seems to have thread support evolving (e.g. https://github.com/open-mpi/ompi/issues/2427 but I don't know how relevant that is)

matthias

2017-09-08 11:37

reporter   ~0008730

Yes we are using infiniband. We also did some tests using ethernet and shm (local machine) but the error appeared again.

I'm going to try IntelMPI instead of OpenMPI, maybe it works better.

MattijsJ

2017-09-12 12:41

reporter   ~0008758

- we checked that the write thread uses its own communicator
- and any use of any mpi routine uses that communicator

- did you have any luck with intelMPI?
- attached a version of decomposedBlockData.C that uses a different message tag. This should not matter but just in case.
decomposedBlockData.C (23,793 bytes)   
/*---------------------------------------------------------------------------*\
  =========                 |
  \\      /  F ield         | OpenFOAM: The Open Source CFD Toolbox
   \\    /   O peration     |
    \\  /    A nd           | Copyright (C) 2017 OpenFOAM Foundation
     \\/     M anipulation  |
-------------------------------------------------------------------------------
License
    This file is part of OpenFOAM.

    OpenFOAM is free software: you can redistribute it and/or modify it
    under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    for more details.

    You should have received a copy of the GNU General Public License
    along with OpenFOAM.  If not, see <http://www.gnu.org/licenses/>.

\*---------------------------------------------------------------------------*/

#include "decomposedBlockData.H"
#include "OPstream.H"
#include "IPstream.H"
#include "PstreamBuffers.H"
#include "OFstream.H"
#include "IFstream.H"
#include "IStringStream.H"
#include "dictionary.H"
#include <sys/time.h>
#include "objectRegistry.H"

// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //

namespace Foam
{
    defineTypeNameAndDebug(decomposedBlockData, 0);
}

// * * * * * * * * * * * * * * * * Constructors  * * * * * * * * * * * * * * //

Foam::decomposedBlockData::decomposedBlockData
(
    const label comm,
    const IOobject& io,
    const UPstream::commsTypes commsType
)
:
    regIOobject(io),
    commsType_(commsType),
    comm_(comm)
{
    // Temporary warning
    if (io.readOpt() == IOobject::MUST_READ_IF_MODIFIED)
    {
        WarningInFunction
            << "decomposedBlockData " << name()
            << " constructed with IOobject::MUST_READ_IF_MODIFIED"
            " but decomposedBlockData does not support automatic rereading."
            << endl;
    }
    if
    (
        (
            io.readOpt() == IOobject::MUST_READ
         || io.readOpt() == IOobject::MUST_READ_IF_MODIFIED
        )
     || (io.readOpt() == IOobject::READ_IF_PRESENT && headerOk())
    )
    {
        read();
    }
}


Foam::decomposedBlockData::decomposedBlockData
(
    const label comm,
    const IOobject& io,
    const UList<char>& list,
    const UPstream::commsTypes commsType
)
:
    regIOobject(io),
    commsType_(commsType),
    comm_(comm)
{
    // Temporary warning
    if (io.readOpt() == IOobject::MUST_READ_IF_MODIFIED)
    {
        WarningInFunction
            << "decomposedBlockData " << name()
            << " constructed with IOobject::MUST_READ_IF_MODIFIED"
            " but decomposedBlockData does not support automatic rereading."
            << endl;
    }

    if
    (
        (
            io.readOpt() == IOobject::MUST_READ
         || io.readOpt() == IOobject::MUST_READ_IF_MODIFIED
        )
     || (io.readOpt() == IOobject::READ_IF_PRESENT && headerOk())
    )
    {
        read();
    }
    else
    {
        List<char>::operator=(list);
    }
}


Foam::decomposedBlockData::decomposedBlockData
(
    const label comm,
    const IOobject& io,
    const Xfer<List<char>>& list,
    const UPstream::commsTypes commsType
)
:
    regIOobject(io),
    commsType_(commsType),
    comm_(comm)
{
    // Temporary warning
    if (io.readOpt() == IOobject::MUST_READ_IF_MODIFIED)
    {
        WarningInFunction
            << "decomposedBlockData " << name()
            << " constructed with IOobject::MUST_READ_IF_MODIFIED"
            " but decomposedBlockData does not support automatic rereading."
            << endl;
    }

    List<char>::transfer(list());

    if
    (
        (
            io.readOpt() == IOobject::MUST_READ
         || io.readOpt() == IOobject::MUST_READ_IF_MODIFIED
        )
     || (io.readOpt() == IOobject::READ_IF_PRESENT && headerOk())
    )
    {
        read();
    }
}


// * * * * * * * * * * * * * * * Destructor  * * * * * * * * * * * * * * * * //

Foam::decomposedBlockData::~decomposedBlockData()
{}


// * * * * * * * * * * * * * * * Members Functions * * * * * * * * * * * * * //

bool Foam::decomposedBlockData::readMasterHeader(IOobject& io, Istream& is)
{
    if (debug)
    {
        Pout<< "decomposedBlockData::readMasterHeader:"
            << " stream:" << is.name() << endl;
    }

    // Master-only reading of header
    is.fatalCheck("read(Istream&)");

    List<char> data(is);
    is.fatalCheck("read(Istream&) : reading entry");
    string buf(data.begin(), data.size());
    IStringStream str(is.name(), buf);

    return io.readHeader(str);
}


void Foam::decomposedBlockData::writeHeader
(
    Ostream& os,
    const IOstream::versionNumber version,
    const IOstream::streamFormat format,
    const word& type,
    const string& note,
    const fileName& location,
    const word& name
)
{
    IOobject::writeBanner(os)
        << "FoamFile\n{\n"
        << "    version     " << version << ";\n"
        << "    format      " << format << ";\n"
        << "    class       " << type << ";\n";
    if (note.size())
    {
        os  << "    note        " << note << ";\n";
    }

    if (location.size())
    {
        os  << "    location    " << location << ";\n";
    }

    os  << "    object      " << name << ";\n"
        << "}" << nl;

    IOobject::writeDivider(os) << nl;
}


Foam::autoPtr<Foam::ISstream> Foam::decomposedBlockData::readBlock
(
    const label blocki,
    Istream& is,
    IOobject& headerIO
)
{
    if (debug)
    {
        Pout<< "decomposedBlockData::readBlock:"
            << " stream:" << is.name() << " attempt to read block " << blocki
            << endl;
    }

    is.fatalCheck("read(Istream&)");

    List<char> data;
    autoPtr<ISstream> realIsPtr;

    if (blocki == 0)
    {
        is >> data;
        is.fatalCheck("read(Istream&) : reading entry");

        string buf(data.begin(), data.size());
        realIsPtr = new IStringStream(is.name(), buf);

        // Read header
        if (!headerIO.readHeader(realIsPtr()))
        {
            FatalIOErrorInFunction(realIsPtr())
                << "problem while reading header for object "
                << is.name() << exit(FatalIOError);
        }
    }
    else
    {
        // Read master for header
        is >> data;
        is.fatalCheck("read(Istream&) : reading entry");

        IOstream::versionNumber ver(IOstream::currentVersion);
        IOstream::streamFormat fmt;
        {
            string buf(data.begin(), data.size());
            IStringStream headerStream(is.name(), buf);

            // Read header
            if (!headerIO.readHeader(headerStream))
            {
                FatalIOErrorInFunction(headerStream)
                    << "problem while reading header for object "
                    << is.name() << exit(FatalIOError);
            }
            ver = headerStream.version();
            fmt = headerStream.format();
        }

        for (label i = 1; i < blocki+1; i++)
        {
            // Read data, override old data
            is >> data;
            is.fatalCheck("read(Istream&) : reading entry");
        }
        string buf(data.begin(), data.size());
        realIsPtr = new IStringStream(is.name(), buf);

        // Apply master stream settings to realIsPtr
        realIsPtr().format(fmt);
        realIsPtr().version(ver);
    }
    return realIsPtr;
}


bool Foam::decomposedBlockData::readBlocks
(
    const label comm,
    autoPtr<ISstream>& isPtr,
    List<char>& data,
    const UPstream::commsTypes commsType
)
{
    if (debug)
    {
        Pout<< "decomposedBlockData::readBlocks:"
            << " stream:" << (isPtr.valid() ? isPtr().name() : "invalid")
            << " commsType:" << Pstream::commsTypeNames[commsType]
            << " comm:" << comm << endl;
    }

    bool ok = false;

    if (commsType == UPstream::commsTypes::scheduled)
    {
        if (UPstream::master(comm))
        {
            Istream& is = isPtr();
            is.fatalCheck("read(Istream&)");

            // Read master data
            {
                is >> data;
                is.fatalCheck("read(Istream&) : reading entry");
            }

            // Read slave data
            for
            (
                label proci = 1;
                proci < UPstream::nProcs(comm);
                proci++
            )
            {
                List<char> elems(is);
                is.fatalCheck("read(Istream&) : reading entry");

                OPstream os
                (
                    UPstream::commsTypes::scheduled,
                    proci,
                    0,
                    UPstream::msgType(),
                    comm
                );
                os << elems;
            }

            ok = is.good();
        }
        else
        {
            IPstream is
            (
                UPstream::commsTypes::scheduled,
                UPstream::masterNo(),
                0,
                UPstream::msgType(),
                comm
            );
            is >> data;
        }
    }
    else
    {
        PstreamBuffers pBufs
        (
            UPstream::commsTypes::nonBlocking,
            UPstream::msgType(),
            comm
        );

        if (UPstream::master(comm))
        {
            Istream& is = isPtr();
            is.fatalCheck("read(Istream&)");

            // Read master data
            {
                is >> data;
                is.fatalCheck("read(Istream&) : reading entry");
            }

            // Read slave data
            for
            (
                label proci = 1;
                proci < UPstream::nProcs(comm);
                proci++
            )
            {
                List<char> elems(is);
                is.fatalCheck("read(Istream&) : reading entry");

                UOPstream os(proci, pBufs);
                os << elems;
            }
        }

        labelList recvSizes;
        pBufs.finishedSends(recvSizes);

        if (!UPstream::master(comm))
        {
            UIPstream is(UPstream::masterNo(), pBufs);
            is >> data;
        }
    }

    Pstream::scatter(ok, Pstream::msgType(), comm);

    return ok;
}


Foam::autoPtr<Foam::ISstream> Foam::decomposedBlockData::readBlocks
(
    const label comm,
    const fileName& fName,
    autoPtr<ISstream>& isPtr,
    IOobject& headerIO,
    const UPstream::commsTypes commsType
)
{
    if (debug)
    {
        Pout<< "decomposedBlockData::readBlocks:"
            << " stream:" << (isPtr.valid() ? isPtr().name() : "invalid")
            << " commsType:" << Pstream::commsTypeNames[commsType] << endl;
    }

    bool ok = false;

    List<char> data;
    autoPtr<ISstream> realIsPtr;

    if (commsType == UPstream::commsTypes::scheduled)
    {
        if (UPstream::master(comm))
        {
            Istream& is = isPtr();
            is.fatalCheck("read(Istream&)");

            // Read master data
            {
                is >> data;
                is.fatalCheck("read(Istream&) : reading entry");

                string buf(data.begin(), data.size());
                realIsPtr = new IStringStream(fName, buf);

                // Read header
                if (!headerIO.readHeader(realIsPtr()))
                {
                    FatalIOErrorInFunction(realIsPtr())
                        << "problem while reading header for object "
                        << is.name() << exit(FatalIOError);
                }
            }

            // Read slave data
            for
            (
                label proci = 1;
                proci < UPstream::nProcs(comm);
                proci++
            )
            {
                is >> data;
                is.fatalCheck("read(Istream&) : reading entry");

                OPstream os
                (
                    UPstream::commsTypes::scheduled,
                    proci,
                    0,
                    UPstream::msgType(),
                    comm
                );
                os << data;
            }

            ok = is.good();
        }
        else
        {
            IPstream is
            (
                UPstream::commsTypes::scheduled,
                UPstream::masterNo(),
                0,
                UPstream::msgType(),
                comm
            );
            is >> data;

            string buf(data.begin(), data.size());
            realIsPtr = new IStringStream(fName, buf);
        }
    }
    else
    {
        PstreamBuffers pBufs
        (
            UPstream::commsTypes::nonBlocking,
            UPstream::msgType(),
            comm
        );

        if (UPstream::master(comm))
        {
            Istream& is = isPtr();
            is.fatalCheck("read(Istream&)");

            // Read master data
            {
                is >> data;
                is.fatalCheck("read(Istream&) : reading entry");

                string buf(data.begin(), data.size());
                realIsPtr = new IStringStream(fName, buf);

                // Read header
                if (!headerIO.readHeader(realIsPtr()))
                {
                    FatalIOErrorInFunction(realIsPtr())
                        << "problem while reading header for object "
                        << is.name() << exit(FatalIOError);
                }
            }

            // Read slave data
            for
            (
                label proci = 1;
                proci < UPstream::nProcs(comm);
                proci++
            )
            {
                List<char> elems(is);
                is.fatalCheck("read(Istream&) : reading entry");

                UOPstream os(proci, pBufs);
                os << elems;
            }

            ok = is.good();
        }

        labelList recvSizes;
        pBufs.finishedSends(recvSizes);

        if (!UPstream::master(comm))
        {
            UIPstream is(UPstream::masterNo(), pBufs);
            is >> data;

            string buf(data.begin(), data.size());
            realIsPtr = new IStringStream(fName, buf);
        }
    }

    Pstream::scatter(ok, Pstream::msgType(), comm);

    // version
    string versionString(realIsPtr().version().str());
    Pstream::scatter(versionString,  Pstream::msgType(), comm);
    realIsPtr().version(IStringStream(versionString)());

    // stream
    {
        OStringStream os;
        os << realIsPtr().format();
        string formatString(os.str());
        Pstream::scatter(formatString,  Pstream::msgType(), comm);
        realIsPtr().format(formatString);
    }

    word name(headerIO.name());
    Pstream::scatter(name, Pstream::msgType(), comm);
    headerIO.rename(name);
    Pstream::scatter(headerIO.headerClassName(), Pstream::msgType(), comm);
    Pstream::scatter(headerIO.note(), Pstream::msgType(), comm);
    //Pstream::scatter(headerIO.instance(), Pstream::msgType(), comm);
    //Pstream::scatter(headerIO.local(), Pstream::msgType(), comm);

    return realIsPtr;
}


bool Foam::decomposedBlockData::writeBlocks
(
    const label comm,
    autoPtr<OSstream>& osPtr,
    List<std::streamoff>& start,
    const UList<char>& data,
    const UPstream::commsTypes commsType,
    const bool syncReturnState
)
{
    if (debug)
    {
        Pout<< "decomposedBlockData::writeBlocks:"
            << " stream:" << (osPtr.valid() ? osPtr().name() : "invalid")
            << " data:" << data.size()
            << " commsType:" << Pstream::commsTypeNames[commsType] << endl;
    }

    bool ok = true;

    labelList recvSizes(Pstream::nProcs(comm));
    recvSizes[Pstream::myProcNo(comm)] = data.byteSize();
    Pstream::gatherList(recvSizes, Pstream::msgType()+1, comm);

    if (commsType == UPstream::commsTypes::scheduled)
    {
        if (UPstream::master(comm))
        {
            start.setSize(UPstream::nProcs(comm));

            OSstream& os = osPtr();

            // Write master data
            {
                os << nl << "// Processor" << UPstream::masterNo() << nl;
                start[UPstream::masterNo()] = os.stdStream().tellp();
                os << data;
            }
            // Write slaves
            List<char> elems;
            for (label proci = 1; proci < UPstream::nProcs(comm); proci++)
            {
                elems.setSize(recvSizes[proci]);
                IPstream::read
                (
                    UPstream::commsTypes::scheduled,
                    proci,
                    elems.begin(),
                    elems.size(),
                    Pstream::msgType()+1,
                    comm
                );

                os << nl << nl << "// Processor" << proci << nl;
                start[proci] = os.stdStream().tellp();
                os << elems;
            }

            ok = os.good();
        }
        else
        {
            UOPstream::write
            (
                UPstream::commsTypes::scheduled,
                UPstream::masterNo(),
                data.begin(),
                data.byteSize(),
                Pstream::msgType()+1,
                comm
            );
        }
    }
    else
    {
        if (debug)
        {
            struct timeval tv;
            gettimeofday(&tv, nullptr);
            Pout<< "Starting sending at:"
                << 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s"
                << Foam::endl;
        }


        label startOfRequests = Pstream::nRequests();

        if (!UPstream::master(comm))
        {
            UOPstream::write
            (
                UPstream::commsTypes::nonBlocking,
                UPstream::masterNo(),
                data.begin(),
                data.byteSize(),
                Pstream::msgType(),
                comm
            );
            Pstream::waitRequests(startOfRequests);
        }
        else
        {
            List<List<char>> recvBufs(Pstream::nProcs(comm));
            for (label proci = 1; proci < UPstream::nProcs(comm); proci++)
            {
                recvBufs[proci].setSize(recvSizes[proci]);
                UIPstream::read
                (
                    UPstream::commsTypes::nonBlocking,
                    proci,
                    recvBufs[proci].begin(),
                    recvSizes[proci],
                    Pstream::msgType(),
                    comm
                );
            }

            if (debug)
            {
                struct timeval tv;
                gettimeofday(&tv, nullptr);
                Pout<< "Starting master-only writing at:"
                    << 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s"
                    << Foam::endl;
            }

            start.setSize(UPstream::nProcs(comm));

            OSstream& os = osPtr();

            // Write master data
            {
                os << nl << "// Processor" << UPstream::masterNo() << nl;
                start[UPstream::masterNo()] = os.stdStream().tellp();
                os << data;
            }

            if (debug)
            {
                struct timeval tv;
                gettimeofday(&tv, nullptr);
                Pout<< "Starting slave writing at:"
                    << 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s"
                    << Foam::endl;
            }

            // Write slaves
            for (label proci = 1; proci < UPstream::nProcs(comm); proci++)
            {
                os << nl << nl << "// Processor" << proci << nl;
                start[proci] = os.stdStream().tellp();

                if (Pstream::finishedRequest(startOfRequests+proci-1))
                {
                    os << recvBufs[proci];
                }
            }

            Pstream::resetRequests(startOfRequests);

            ok = os.good();
        }
    }
    if (debug)
    {
        struct timeval tv;
        gettimeofday(&tv, nullptr);
        Pout<< "Finished master-only writing at:"
            << 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s"
            << Foam::endl;
    }

    if (syncReturnState)
    {
        //- Enable to get synchronised error checking. Is the one that keeps
        //  slaves as slow as the master (which does all the writing)
        Pstream::scatter(ok, Pstream::msgType(), comm);
    }

    return ok;
}


bool Foam::decomposedBlockData::read()
{
    autoPtr<ISstream> isPtr;
    fileName objPath(fileHandler().filePath(false, *this, word::null));
    if (UPstream::master(comm_))
    {
        isPtr.reset(new IFstream(objPath));
        IOobject::readHeader(isPtr());
    }

    List<char>& data = *this;
    return readBlocks(comm_, isPtr, data, commsType_);
}


bool Foam::decomposedBlockData::writeData(Ostream& os) const
{
    const List<char>& data = *this;

    string str
    (
        reinterpret_cast<const char*>(data.cbegin()),
        data.byteSize()
    );

    IOobject io(*this);
    if (Pstream::master())
    {
        IStringStream is(name(), str);
        io.readHeader(is);
    }

    // Scatter header information

    // version
    string versionString(os.version().str());
    Pstream::scatter(versionString);

    // stream
    string formatString;
    {
        OStringStream os;
        os << os.format();
        formatString  = os.str();
        Pstream::scatter(formatString);
    }

    //word masterName(name());
    //Pstream::scatter(masterName);

    Pstream::scatter(io.headerClassName());
    Pstream::scatter(io.note());
    //Pstream::scatter(io.instance(), Pstream::msgType(), comm);
    //Pstream::scatter(io.local(), Pstream::msgType(), comm);

    fileName masterLocation(instance()/db().dbDir()/local());
    Pstream::scatter(masterLocation);

    if (!Pstream::master())
    {
        writeHeader
        (
            os,
            IOstream::versionNumber(IStringStream(versionString)()),
            IOstream::formatEnum(formatString),
            io.headerClassName(),
            io.note(),
            masterLocation,
            name()
        );
    }

    os.writeQuoted(str, false);

    if (!Pstream::master())
    {
        IOobject::writeEndDivider(os);
    }

    return os.good();
}


bool Foam::decomposedBlockData::writeObject
(
    IOstream::streamFormat fmt,
    IOstream::versionNumber ver,
    IOstream::compressionType cmp,
    const bool valid
) const
{
    autoPtr<OSstream> osPtr;
    if (UPstream::master(comm_))
    {
        // Note: always write binary. These are strings so readable
        //       anyway. They have already be tokenised on the sending side.
        osPtr.reset(new OFstream(objectPath(), IOstream::BINARY, ver, cmp));
        IOobject::writeHeader(osPtr());
    }
    List<std::streamoff> start;
    return writeBlocks(comm_, osPtr, start, *this, commsType_);
}


Foam::label Foam::decomposedBlockData::numBlocks(const fileName& fName)
{
    label nBlocks = 0;

    IFstream is(fName);
    is.fatalCheck("decomposedBlockData::numBlocks(const fileName&)");

    if (!is.good())
    {
        return nBlocks;
    }

    // Skip header
    token firstToken(is);

    if
    (
        is.good()
     && firstToken.isWord()
     && firstToken.wordToken() == "FoamFile"
    )
    {
        dictionary headerDict(is);
        is.version(headerDict.lookup("version"));
        is.format(headerDict.lookup("format"));
    }

    List<char> data;
    while (is.good())
    {
        token sizeToken(is);
        if (!sizeToken.isLabel())
        {
            return nBlocks;
        }
        is.putBack(sizeToken);

        is >> data;
        nBlocks++;
    }

    return nBlocks;
}


// ************************************************************************* //
decomposedBlockData.C (23,793 bytes)   

MattijsJ

2017-09-15 14:23

reporter   ~0008780

I've been running all day on a 5M cell case on 8 processors (2 nodes; connected via gigabit).

- case is the simpleFoam/pitzDaily. Scaled up to have 5M cells.
- running with openmpi-2.1.1 (not sure what gcc version was used for compiling mpi)
- running with purgeWrite 5;
- tried binary and compressed ascii
- tried writeInterval 10 (thread stops & starts) and writeInterval 1 (thread never stops)

The only problem I can see is if the purgeWrite is set so small that it tries to delete times that are still being written. Is there anything special about your case? Can you post it?

matthias

2017-09-20 12:25

reporter   ~0008783

Do you have mail address? Can't post the link here.

matthias

2017-09-20 12:29

reporter   ~0008784

BTW, IntelMPI is also not working

MattijsJ

2017-09-20 20:58

reporter   ~0008787

email is a gmail address

mattijs dot janssens

MattijsJ

2017-10-10 17:42

reporter  

nonBlocking.patch (24,105 bytes)   
diff --git a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C
index 7afbd2fb8..ea6fd79a7 100644
--- a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C
+++ b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C
@@ -31,8 +31,10 @@ License
 #include "IFstream.H"
 #include "IStringStream.H"
 #include "dictionary.H"
-#include <sys/time.h>
 #include "objectRegistry.H"
+#include "SubList.H"
+#include "labelPair.H"
+#include "masterUncollatedFileOperation.H"
 
 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
 
@@ -592,6 +594,7 @@ bool Foam::decomposedBlockData::writeBlocks
     autoPtr<OSstream>& osPtr,
     List<std::streamoff>& start,
     const UList<char>& data,
+    const labelUList& recvSizes,
     const UPstream::commsTypes commsType,
     const bool syncReturnState
 )
@@ -604,17 +607,16 @@ bool Foam::decomposedBlockData::writeBlocks
             << " commsType:" << Pstream::commsTypeNames[commsType] << endl;
     }
 
-    bool ok = true;
+    const label nProcs = UPstream::nProcs(comm);
+
 
-    labelList recvSizes(Pstream::nProcs(comm));
-    recvSizes[Pstream::myProcNo(comm)] = data.byteSize();
-    Pstream::gatherList(recvSizes, Pstream::msgType(), comm);
+    bool ok = true;
 
     if (commsType == UPstream::commsTypes::scheduled)
     {
         if (UPstream::master(comm))
         {
-            start.setSize(UPstream::nProcs(comm));
+            start.setSize(nProcs);
 
             OSstream& os = osPtr();
 
@@ -626,7 +628,7 @@ bool Foam::decomposedBlockData::writeBlocks
             }
             // Write slaves
             List<char> elems;
-            for (label proci = 1; proci < UPstream::nProcs(comm); proci++)
+            for (label proci = 1; proci < nProcs; proci++)
             {
                 elems.setSize(recvSizes[proci]);
                 IPstream::read
@@ -661,102 +663,147 @@ bool Foam::decomposedBlockData::writeBlocks
     }
     else
     {
-        if (debug)
+        // Write master data
+        if (UPstream::master(comm))
         {
-            struct timeval tv;
-            gettimeofday(&tv, nullptr);
-            Pout<< "Starting sending at:"
-                << 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s"
-                << Foam::endl;
+            start.setSize(nProcs);
+
+            OSstream& os = osPtr();
+
+            os << nl << "// Processor" << UPstream::masterNo() << nl;
+            start[UPstream::masterNo()] = os.stdStream().tellp();
+            os << data;
         }
 
 
-        label startOfRequests = Pstream::nRequests();
+        // Find out how many processor can be received into
+        // masterUncollatedFileOperation::maxMasterFileBufferSize
 
-        if (!UPstream::master(comm))
-        {
-            UOPstream::write
-            (
-                UPstream::commsTypes::nonBlocking,
-                UPstream::masterNo(),
-                data.begin(),
-                data.byteSize(),
-                Pstream::msgType(),
-                comm
-            );
-            Pstream::waitRequests(startOfRequests);
-        }
-        else
+        // Starting slave processor and number of processors
+        labelPair startAndSize(1, nProcs-1);
+
+        while (startAndSize[1] > 0)
         {
-            List<List<char>> recvBufs(Pstream::nProcs(comm));
-            for (label proci = 1; proci < UPstream::nProcs(comm); proci++)
+            labelPair masterData(startAndSize);
+            if (UPstream::master(comm))
             {
-                recvBufs[proci].setSize(recvSizes[proci]);
-                UIPstream::read
+                label totalSize = 0;
+                label proci = masterData[0];
+                while
                 (
-                    UPstream::commsTypes::nonBlocking,
-                    proci,
-                    recvBufs[proci].begin(),
-                    recvSizes[proci],
-                    Pstream::msgType(),
-                    comm
-                );
+                    proci < nProcs
+                 && (
+                        totalSize+recvSizes[proci]
+                      < fileOperations::masterUncollatedFileOperation::
+                            maxMasterFileBufferSize
+                    )
+                )
+                {
+                    totalSize += recvSizes[proci];
+                    proci++;
+                }
+
+                masterData[1] = proci-masterData[0];
             }
 
-            if (debug)
+
+            // Scatter masterData
+            UPstream::scatter
+            (
+                reinterpret_cast<const char*>(masterData.cdata()),
+                List<int>(nProcs, sizeof(masterData)),
+                List<int>(nProcs, 0),
+                reinterpret_cast<char*>(startAndSize.data()),
+                sizeof(startAndSize),
+                comm
+            );
+
+            if (startAndSize[1] == 0)
             {
-                struct timeval tv;
-                gettimeofday(&tv, nullptr);
-                Pout<< "Starting master-only writing at:"
-                    << 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s"
-                    << Foam::endl;
+                break;
             }
 
-            start.setSize(UPstream::nProcs(comm));
 
-            OSstream& os = osPtr();
-
-            // Write master data
+            // Calculate master data
+            List<int> sliceSizes;
+            List<int> sliceOffsets;
+            List<char> recvData;
+            if (UPstream::master(comm))
             {
-                os << nl << "// Processor" << UPstream::masterNo() << nl;
-                start[UPstream::masterNo()] = os.stdStream().tellp();
-                os << data;
+                sliceSizes.setSize(nProcs, 0);
+                sliceOffsets.setSize(nProcs, 0);
+
+                int totalSize = 0;
+                for
+                (
+                    label proci = startAndSize[0];
+                    proci < startAndSize[0]+startAndSize[1];
+                    proci++
+                )
+                {
+                    sliceSizes[proci] = int(recvSizes[proci]);
+                    sliceOffsets[proci] = totalSize;
+                    totalSize += sliceSizes[proci];
+                }
+                recvData.setSize(totalSize);
             }
 
-            if (debug)
+            int nSend = 0;
+            if
+            (
+               !UPstream::master(comm)
+             && (UPstream::myProcNo(comm) >= startAndSize[0])
+             && (UPstream::myProcNo(comm) < startAndSize[0]+startAndSize[1])
+            )
             {
-                struct timeval tv;
-                gettimeofday(&tv, nullptr);
-                Pout<< "Starting slave writing at:"
-                    << 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s"
-                    << Foam::endl;
+                nSend = data.byteSize();
             }
 
-            // Write slaves
-            for (label proci = 1; proci < UPstream::nProcs(comm); proci++)
+
+            UPstream::gather
+            (
+                data.begin(),
+                nSend,
+
+                recvData.begin(),
+                sliceSizes,
+                sliceOffsets,
+                comm
+            );
+
+            if (UPstream::master(comm))
             {
-                os << nl << nl << "// Processor" << proci << nl;
-                start[proci] = os.stdStream().tellp();
+                OSstream& os = osPtr();
 
-                if (Pstream::finishedRequest(startOfRequests+proci-1))
+                // Write slaves
+                for
+                (
+                    label proci = startAndSize[0];
+                    proci < startAndSize[0]+startAndSize[1];
+                    proci++
+                )
                 {
-                    os << recvBufs[proci];
+                    os << nl << nl << "// Processor" << proci << nl;
+                    start[proci] = os.stdStream().tellp();
+
+                    os <<
+                        SubList<char>
+                        (
+                            recvData,
+                            sliceSizes[proci],
+                            sliceOffsets[proci]
+                        );
                 }
             }
 
-            Pstream::resetRequests(startOfRequests);
+            startAndSize[0] += startAndSize[1];
+        }
 
-            ok = os.good();
+        if (UPstream::master(comm))
+        {
+            ok = osPtr().good();
         }
     }
-    if (debug)
-    {
-        struct timeval tv;
-        gettimeofday(&tv, nullptr);
-        Pout<< "Finished master-only writing at:"
-            << 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s"
-            << Foam::endl;
-    }
 
     if (syncReturnState)
     {
@@ -868,8 +915,34 @@ bool Foam::decomposedBlockData::writeObject
         osPtr.reset(new OFstream(objectPath(), IOstream::BINARY, ver, cmp));
         IOobject::writeHeader(osPtr());
     }
+
+    const label nProcs = Pstream::nProcs(comm_);
+
+    labelList recvSizes(nProcs);
+    {
+        char* data = reinterpret_cast<char*>(recvSizes.begin());
+
+        labelList recvOffsets(nProcs);
+        forAll(recvOffsets, proci)
+        {
+            recvOffsets[proci] =
+                reinterpret_cast<char*>(&recvSizes[proci])
+              - data;
+        }
+        label sz = this->byteSize();
+        UPstream::gather
+        (
+            reinterpret_cast<char*>(&sz),
+            sizeof(label),
+            data,
+            List<int>(recvSizes, sizeof(label)),
+            recvOffsets,
+            comm_
+        );
+    }
+
     List<std::streamoff> start;
-    return writeBlocks(comm_, osPtr, start, *this, commsType_);
+    return writeBlocks(comm_, osPtr, start, *this, recvSizes, commsType_);
 }
 
 
diff --git a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.H b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.H
index dc4e3d70f..c151c3abe 100644
--- a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.H
+++ b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.H
@@ -177,6 +177,7 @@ public:
             autoPtr<OSstream>& osPtr,
             List<std::streamoff>& start,
             const UList<char>&,
+            const labelUList& recvSizes,
             const UPstream::commsTypes,
             const bool syncReturnState = true
         );
diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H
index fab2dd7ff..d3c4e2da6 100644
--- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H
+++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H
@@ -499,6 +499,47 @@ public:
             labelUList& recvData,
             const label communicator = 0
         );
+
+        //- Exchange data with all processors (in the communicator)
+        //  sendSizes, sendOffsets give (per processor) the slice of
+        //  sendData to send, similarly recvSizes, recvOffsets give the slice
+        //  of recvData to receive
+        static void allToAll
+        (
+            const char* sendData,
+            const UList<int>& sendSizes,
+            const UList<int>& sendOffsets,
+
+            char* recvData,
+            const UList<int>& recvSizes,
+            const UList<int>& recvOffsets,
+
+            const label communicator = 0
+        );
+
+        //- Receive data from all processors on the master
+        static void gather
+        (
+            const char* sendData,
+            int sendSize,
+
+            char* recvData,
+            const UList<int>& recvSizes,
+            const UList<int>& recvOffsets,
+            const label communicator = 0
+        );
+
+        //- Send data to all processors from the root of the communicator
+        static void scatter
+        (
+            const char* sendData,
+            const UList<int>& sendSizes,
+            const UList<int>& sendOffsets,
+
+            char* recvData,
+            int recvSize,
+            const label communicator = 0
+        );
 };
 
 
diff --git a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C
index b5c8ab494..4aeb79196 100644
--- a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C
+++ b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C
@@ -92,9 +92,34 @@ bool Foam::OFstreamCollator::writeFile
 
     UList<char> slice(const_cast<char*>(s.data()), label(s.size()));
 
-    // Assuming threaded writing hides any slowness so we might
-    // as well use scheduled communication to send the data to
-    // the master processor in order.
+    // Determine sizes to receive
+    labelList recvSizes(Pstream::nProcs(comm));
+    {
+        char* data = reinterpret_cast<char*>(recvSizes.begin());
+
+        List<int> recvOffsets(recvSizes.size());
+        forAll(recvOffsets, proci)
+        {
+            recvOffsets[proci] =
+                reinterpret_cast<char*>(&recvSizes[proci])
+              - data;
+        }
+        label sz = slice.byteSize();
+        UPstream::gather
+        (
+            reinterpret_cast<char*>(&sz),
+            sizeof(sz),
+            data,
+            List<int>(recvSizes.size(), sizeof(label)),
+            recvOffsets,
+            comm
+        );
+    }
+
+    // Assuming threaded writing hides any slowness so we
+    // can use scheduled communication to send the data to
+    // the master processor in order. However can be unstable so
+    // default is non-blocking.
 
     List<std::streamoff> start;
     decomposedBlockData::writeBlocks
@@ -103,7 +128,8 @@ bool Foam::OFstreamCollator::writeFile
         osPtr,
         start,
         slice,
-        UPstream::commsTypes::scheduled,
+        recvSizes,
+        UPstream::commsTypes::nonBlocking,  //scheduled,
         false       // do not reduce return state
     );
 
@@ -115,8 +141,17 @@ bool Foam::OFstreamCollator::writeFile
 
     if (debug)
     {
-        Pout<< "OFstreamCollator : Finished writing " << s.size()
-            << " bytes to " << fName
+        Pout<< "OFstreamCollator : Finished writing " << s.size() << " bytes";
+        if (UPstream::master(comm))
+        {
+            label sum = 0;
+            forAll(recvSizes, i)
+            {
+                sum += recvSizes[i];
+            }
+            Pout<< " (overall " << sum << ")";
+        }
+        Pout<< " to " << fName
             << " using comm " << comm << endl;
     }
 
@@ -133,14 +168,11 @@ void* Foam::OFstreamCollator::writeAll(void *threadarg)
     {
         writeData* ptr = nullptr;
 
-        //pthread_mutex_lock(&handler.mutex_);
         lockMutex(handler.mutex_);
-
         if (handler.objects_.size())
         {
             ptr = handler.objects_.pop();
         }
-        //pthread_mutex_unlock(&handler.mutex_);
         unlockMutex(handler.mutex_);
 
         if (!ptr)
@@ -177,10 +209,8 @@ void* Foam::OFstreamCollator::writeAll(void *threadarg)
         Pout<< "OFstreamCollator : Exiting write thread " << endl;
     }
 
-    //pthread_mutex_lock(&handler.mutex_);
     lockMutex(handler.mutex_);
     handler.threadRunning_ = false;
-    //pthread_mutex_unlock(&handler.mutex_);
     unlockMutex(handler.mutex_);
 
     return nullptr;
@@ -192,7 +222,6 @@ void* Foam::OFstreamCollator::writeAll(void *threadarg)
 Foam::OFstreamCollator::OFstreamCollator(const off_t maxBufferSize)
 :
     maxBufferSize_(maxBufferSize),
-    //mutex_(PTHREAD_MUTEX_INITIALIZER),
     mutex_
     (
         maxBufferSize_ > 0
@@ -228,7 +257,6 @@ Foam::OFstreamCollator::~OFstreamCollator()
             Pout<< "~OFstreamCollator : Waiting for write thread" << endl;
         }
 
-        //pthread_join(thread_, nullptr);
         joinThread(thread_);
     }
     if (thread_ != -1)
@@ -265,13 +293,12 @@ bool Foam::OFstreamCollator::write
         {
             // Count files to be written
             off_t totalSize = 0;
-            //pthread_mutex_lock(&mutex_);
+
             lockMutex(mutex_);
             forAllConstIter(FIFOStack<writeData*>, objects_, iter)
             {
                 totalSize += iter()->data_.size();
             }
-            //pthread_mutex_unlock(&mutex_);
             unlockMutex(mutex_);
 
             if
@@ -285,10 +312,13 @@ bool Foam::OFstreamCollator::write
 
             if (debug)
             {
+                lockMutex(mutex_);
                 Pout<< "OFstreamCollator : Waiting for buffer space."
                     << " Currently in use:" << totalSize
                     << " limit:" << maxBufferSize_
+                    << " files:" << objects_.size()
                     << endl;
+                unlockMutex(mutex_);
             }
 
             sleep(5);
@@ -299,16 +329,14 @@ bool Foam::OFstreamCollator::write
             Pout<< "OFstreamCollator : relaying write of " << fName
                 << " to thread " << endl;
         }
-        //pthread_mutex_lock(&mutex_);
+
         lockMutex(mutex_);
         objects_.push
         (
             new writeData(typeName, fName, data, fmt, ver, cmp, append)
         );
-        //pthread_mutex_unlock(&mutex_);
         unlockMutex(mutex_);
 
-        //pthread_mutex_lock(&mutex_);
         lockMutex(mutex_);
         if (!threadRunning_)
         {
@@ -319,7 +347,6 @@ bool Foam::OFstreamCollator::write
             }
             threadRunning_ = true;
         }
-        //pthread_mutex_unlock(&mutex_);
         unlockMutex(mutex_);
 
         return true;
diff --git a/src/Pstream/dummy/UPstream.C b/src/Pstream/dummy/UPstream.C
index 124e22a0c..dd5ac01b3 100644
--- a/src/Pstream/dummy/UPstream.C
+++ b/src/Pstream/dummy/UPstream.C
@@ -92,6 +92,36 @@ void Foam::UPstream::allToAll
 }
 
 
+void Foam::UPstream::gather
+(
+    const char* sendData,
+    int sendSize,
+
+    char* recvData,
+    const UList<int>& recvSizes,
+    const UList<int>& recvOffsets,
+    const label communicator
+)
+{
+    memmove(recvData, sendData, sendSize);
+}
+
+
+void Foam::UPstream::scatter
+(
+    const char* sendData,
+    const UList<int>& sendSizes,
+    const UList<int>& sendOffsets,
+
+    char* recvData,
+    int recvSize,
+    const label communicator
+)
+{
+    memmove(recvData, sendData, recvSize);
+}
+
+
 void Foam::UPstream::allocatePstreamCommunicator
 (
     const label,
diff --git a/src/Pstream/mpi/UPstream.C b/src/Pstream/mpi/UPstream.C
index 317eaf260..b36ca35ca 100644
--- a/src/Pstream/mpi/UPstream.C
+++ b/src/Pstream/mpi/UPstream.C
@@ -2,7 +2,7 @@
   =========                 |
   \\      /  F ield         | OpenFOAM: The Open Source CFD Toolbox
    \\    /   O peration     |
-    \\  /    A nd           | Copyright (C) 2011-2016 OpenFOAM Foundation
+    \\  /    A nd           | Copyright (C) 2011-2017 OpenFOAM Foundation
      \\/     M anipulation  |
 -------------------------------------------------------------------------------
 License
@@ -362,6 +362,196 @@ void Foam::UPstream::allToAll
 }
 
 
+void Foam::UPstream::allToAll
+(
+    const char* sendData,
+    const UList<int>& sendSizes,
+    const UList<int>& sendOffsets,
+
+    char* recvData,
+    const UList<int>& recvSizes,
+    const UList<int>& recvOffsets,
+
+    const label communicator
+)
+{
+    label np = nProcs(communicator);
+
+    if
+    (
+        sendSizes.size() != np
+     || sendOffsets.size() != np
+     || recvSizes.size() != np
+     || recvOffsets.size() != np
+    )
+    {
+        FatalErrorInFunction
+            << "Size of sendSize " << sendSizes.size()
+            << ", sendOffsets " << sendOffsets.size()
+            << ", recvSizes " << recvSizes.size()
+            << " or recvOffsets " << recvOffsets.size()
+            << " is not equal to the number of processors in the domain "
+            << np
+            << Foam::abort(FatalError);
+    }
+
+    if (!UPstream::parRun())
+    {
+        if (recvSizes[0] != sendSizes[0])
+        {
+            FatalErrorInFunction
+                << "Bytes to send " << sendSizes[0]
+                << " does not equal bytes to receive " << recvSizes[0]
+                << Foam::abort(FatalError);
+        }
+        memmove(recvData, &sendData[sendOffsets[0]], recvSizes[0]);
+    }
+    else
+    {
+        if
+        (
+            MPI_Alltoallv
+            (
+                sendData,
+                sendSizes.begin(),
+                sendOffsets.begin(),
+                MPI_BYTE,
+                recvData,
+                recvSizes.begin(),
+                recvOffsets.begin(),
+                MPI_BYTE,
+                PstreamGlobals::MPICommunicators_[communicator]
+            )
+        )
+        {
+            FatalErrorInFunction
+                << "MPI_Alltoallv failed for sendSizes " << sendSizes
+                << " recvSizes " << recvSizes
+                << " communicator " << communicator
+                << Foam::abort(FatalError);
+        }
+    }
+}
+
+
+void Foam::UPstream::gather
+(
+    const char* sendData,
+    int sendSize,
+
+    char* recvData,
+    const UList<int>& recvSizes,
+    const UList<int>& recvOffsets,
+    const label communicator
+)
+{
+    label np = nProcs(communicator);
+
+    if
+    (
+        UPstream::master(communicator)
+     && (recvSizes.size() != np || recvOffsets.size() != np)
+    )
+    {
+        FatalErrorInFunction
+            << "Size of recvSizes " << recvSizes.size()
+            << " or recvOffsets " << recvOffsets.size()
+            << " is not equal to the number of processors in the domain "
+            << np
+            << Foam::abort(FatalError);
+    }
+
+    if (!UPstream::parRun())
+    {
+        memmove(recvData, sendData, sendSize);
+    }
+    else
+    {
+        if
+        (
+            MPI_Gatherv
+            (
+                sendData,
+                sendSize,
+                MPI_BYTE,
+                recvData,
+                recvSizes.begin(),
+                recvOffsets.begin(),
+                MPI_BYTE,
+                0,
+                MPI_Comm(PstreamGlobals::MPICommunicators_[communicator])
+            )
+        )
+        {
+            FatalErrorInFunction
+                << "MPI_Gatherv failed for sendSize " << sendSize
+                << " recvSizes " << recvSizes
+                << " communicator " << communicator
+                << Foam::abort(FatalError);
+        }
+    }
+}
+
+
+void Foam::UPstream::scatter
+(
+    const char* sendData,
+    const UList<int>& sendSizes,
+    const UList<int>& sendOffsets,
+
+    char* recvData,
+    int recvSize,
+    const label communicator
+)
+{
+    label np = nProcs(communicator);
+
+    if
+    (
+        UPstream::master(communicator)
+     && (sendSizes.size() != np || sendOffsets.size() != np)
+    )
+    {
+        FatalErrorInFunction
+            << "Size of sendSizes " << sendSizes.size()
+            << " or sendOffsets " << sendOffsets.size()
+            << " is not equal to the number of processors in the domain "
+            << np
+            << Foam::abort(FatalError);
+    }
+
+    if (!UPstream::parRun())
+    {
+        memmove(recvData, sendData, recvSize);
+    }
+    else
+    {
+        if
+        (
+            MPI_Scatterv
+            (
+                sendData,
+                sendSizes.begin(),
+                sendOffsets.begin(),
+                MPI_BYTE,
+                recvData,
+                recvSize,
+                MPI_BYTE,
+                0,
+                MPI_Comm(PstreamGlobals::MPICommunicators_[communicator])
+            )
+        )
+        {
+            FatalErrorInFunction
+                << "MPI_Scatterv failed for sendSizes " << sendSizes
+                << " sendOffsets " << sendOffsets
+                << " communicator " << communicator
+                << Foam::abort(FatalError);
+        }
+    }
+}
+
+
 void Foam::UPstream::allocatePstreamCommunicator
 (
     const label parentIndex,
nonBlocking.patch (24,105 bytes)   

MattijsJ

2017-10-10 17:44

reporter   ~0008854

Could you try the uploaded patch (for a recent dev version). It switches the communication for the file collation non-blocking.

It needs a bit more checking for the corner cases but runs fine for me on your case.

matthias

2017-10-12 09:11

reporter   ~0008858

I tried the patch but the solver crashed again. Are you sure that these modifications are working?

Could you decompose the case to 144 cores and test again locally? I frequently see crashes when the number of cores are raised up to more than 100.

MattijsJ

2017-10-20 16:43

reporter  

masterCollation.patch (38,880 bytes)   
diff --git a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C
index 7afbd2fb8..78bb27c06 100644
--- a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C
+++ b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C
@@ -31,8 +31,10 @@ License
 #include "IFstream.H"
 #include "IStringStream.H"
 #include "dictionary.H"
-#include <sys/time.h>
 #include "objectRegistry.H"
+#include "SubList.H"
+#include "labelPair.H"
+#include "masterUncollatedFileOperation.H"
 
 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
 
@@ -586,12 +588,114 @@ Foam::autoPtr<Foam::ISstream> Foam::decomposedBlockData::readBlocks
 }
 
 
+void Foam::decomposedBlockData::gather
+(
+    const label comm,
+    const label data,
+    labelList& datas
+)
+{
+    const label nProcs = UPstream::nProcs(comm);
+    datas.setSize(nProcs);
+
+    char* data0Ptr = reinterpret_cast<char*>(datas.begin());
+
+    labelList recvOffsets;
+    labelList recvSizes;
+    if (UPstream::master())
+    {
+        recvOffsets.setSize(nProcs);
+        forAll(recvOffsets, proci)
+        {
+            recvOffsets[proci] =
+                reinterpret_cast<char*>(&datas[proci])
+              - data0Ptr;
+        }
+        recvSizes.setSize(nProcs, sizeof(label));
+    }
+
+    UPstream::gather
+    (
+        reinterpret_cast<const char*>(&data),
+        sizeof(label),
+        data0Ptr,
+        recvSizes,
+        recvOffsets,
+        comm
+    );
+}
+
+
+void Foam::decomposedBlockData::gatherSlaveData
+(
+    const label comm,
+    const UList<char>& data,
+    const labelUList& recvSizes,
+
+    const label startProc,
+    const label nProcs,
+
+    List<int>& sliceOffsets,
+    List<char>& recvData
+)
+{
+    // Calculate master data
+    List<int> sliceSizes;
+    if (UPstream::master(comm))
+    {
+        const label numProcs = UPstream::nProcs(comm);
+
+        sliceSizes.setSize(numProcs, 0);
+        sliceOffsets.setSize(numProcs+1, 0);
+
+        int totalSize = 0;
+        label proci = startProc;
+        for (label i = 0; i < nProcs; i++)
+        {
+            sliceSizes[proci] = int(recvSizes[proci]);
+            sliceOffsets[proci] = totalSize;
+            totalSize += sliceSizes[proci];
+            proci++;
+        }
+        sliceOffsets[proci] = totalSize;
+        recvData.setSize(totalSize);
+    }
+
+    int nSend = 0;
+    if
+    (
+       !UPstream::master(comm)
+     && (UPstream::myProcNo(comm) >= startProc)
+     && (UPstream::myProcNo(comm) < startProc+nProcs)
+    )
+    {
+        nSend = data.byteSize();
+    }
+
+    UPstream::gather
+    (
+        data.begin(),
+        nSend,
+
+        recvData.begin(),
+        sliceSizes,
+        sliceOffsets,
+        comm
+    );
+}
+
+
 bool Foam::decomposedBlockData::writeBlocks
 (
     const label comm,
     autoPtr<OSstream>& osPtr,
     List<std::streamoff>& start,
     const UList<char>& data,
+
+    const labelUList& recvSizes,
+    const bool haveSlaveData,
+    const List<char>& slaveData,
+
     const UPstream::commsTypes commsType,
     const bool syncReturnState
 )
@@ -601,20 +705,56 @@ bool Foam::decomposedBlockData::writeBlocks
         Pout<< "decomposedBlockData::writeBlocks:"
             << " stream:" << (osPtr.valid() ? osPtr().name() : "invalid")
             << " data:" << data.size()
+            << " haveSlaveData:" << haveSlaveData
+            << " (master only) slaveData:" << slaveData.size()
             << " commsType:" << Pstream::commsTypeNames[commsType] << endl;
     }
 
+    const label nProcs = UPstream::nProcs(comm);
+
+
     bool ok = true;
 
-    labelList recvSizes(Pstream::nProcs(comm));
-    recvSizes[Pstream::myProcNo(comm)] = data.byteSize();
-    Pstream::gatherList(recvSizes, Pstream::msgType(), comm);
+    if (haveSlaveData)
+    {
+        // Already have gathered the slave data. communicator only used to
+        // check who is the master
 
-    if (commsType == UPstream::commsTypes::scheduled)
+        if (UPstream::master(comm))
+        {
+            OSstream& os = osPtr();
+
+            start.setSize(nProcs);
+
+            // Write master data
+            {
+                os << nl << "// Processor" << UPstream::masterNo() << nl;
+                start[UPstream::masterNo()] = os.stdStream().tellp();
+                os << data;
+            }
+
+            // Write slaves
+
+            label slaveOffset = 0;
+
+            for (label proci = 1; proci < nProcs; proci++)
+            {
+                os << nl << nl << "// Processor" << proci << nl;
+                start[proci] = os.stdStream().tellp();
+
+                os << SubList<char>(slaveData, recvSizes[proci], slaveOffset);
+
+                slaveOffset += recvSizes[proci];
+            }
+
+            ok = os.good();
+        }
+    }
+    else if (commsType == UPstream::commsTypes::scheduled)
     {
         if (UPstream::master(comm))
         {
-            start.setSize(UPstream::nProcs(comm));
+            start.setSize(nProcs);
 
             OSstream& os = osPtr();
 
@@ -626,7 +766,7 @@ bool Foam::decomposedBlockData::writeBlocks
             }
             // Write slaves
             List<char> elems;
-            for (label proci = 1; proci < UPstream::nProcs(comm); proci++)
+            for (label proci = 1; proci < nProcs; proci++)
             {
                 elems.setSize(recvSizes[proci]);
                 IPstream::read
@@ -661,102 +801,116 @@ bool Foam::decomposedBlockData::writeBlocks
     }
     else
     {
-        if (debug)
+        // Write master data
+        if (UPstream::master(comm))
         {
-            struct timeval tv;
-            gettimeofday(&tv, nullptr);
-            Pout<< "Starting sending at:"
-                << 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s"
-                << Foam::endl;
+            start.setSize(nProcs);
+
+            OSstream& os = osPtr();
+
+            os << nl << "// Processor" << UPstream::masterNo() << nl;
+            start[UPstream::masterNo()] = os.stdStream().tellp();
+            os << data;
         }
 
 
-        label startOfRequests = Pstream::nRequests();
+        // Find out how many processor can be received into
+        // maxMasterFileBufferSize
 
-        if (!UPstream::master(comm))
-        {
-            UOPstream::write
-            (
-                UPstream::commsTypes::nonBlocking,
-                UPstream::masterNo(),
-                data.begin(),
-                data.byteSize(),
-                Pstream::msgType(),
-                comm
-            );
-            Pstream::waitRequests(startOfRequests);
-        }
-        else
+        // Starting slave processor and number of processors
+        labelPair startAndSize(1, nProcs-1);
+
+        while (startAndSize[1] > 0)
         {
-            List<List<char>> recvBufs(Pstream::nProcs(comm));
-            for (label proci = 1; proci < UPstream::nProcs(comm); proci++)
+            labelPair masterData(startAndSize);
+            if (UPstream::master(comm))
             {
-                recvBufs[proci].setSize(recvSizes[proci]);
-                UIPstream::read
+                label totalSize = 0;
+                label proci = masterData[0];
+                while
                 (
-                    UPstream::commsTypes::nonBlocking,
-                    proci,
-                    recvBufs[proci].begin(),
-                    recvSizes[proci],
-                    Pstream::msgType(),
-                    comm
-                );
-            }
+                    proci < nProcs
+                 && (
+                        totalSize+recvSizes[proci]
+                      < fileOperations::masterUncollatedFileOperation::
+                            maxMasterFileBufferSize
+                    )
+                )
+                {
+                    totalSize += recvSizes[proci];
+                    proci++;
+                }
 
-            if (debug)
-            {
-                struct timeval tv;
-                gettimeofday(&tv, nullptr);
-                Pout<< "Starting master-only writing at:"
-                    << 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s"
-                    << Foam::endl;
+                masterData[1] = proci-masterData[0];
             }
 
-            start.setSize(UPstream::nProcs(comm));
 
-            OSstream& os = osPtr();
+            // Scatter masterData
+            UPstream::scatter
+            (
+                reinterpret_cast<const char*>(masterData.cdata()),
+                List<int>(nProcs, sizeof(masterData)),
+                List<int>(nProcs, 0),
+                reinterpret_cast<char*>(startAndSize.data()),
+                sizeof(startAndSize),
+                comm
+            );
 
-            // Write master data
+            if (startAndSize[1] == 0)
             {
-                os << nl << "// Processor" << UPstream::masterNo() << nl;
-                start[UPstream::masterNo()] = os.stdStream().tellp();
-                os << data;
+                break;
             }
 
-            if (debug)
-            {
-                struct timeval tv;
-                gettimeofday(&tv, nullptr);
-                Pout<< "Starting slave writing at:"
-                    << 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s"
-                    << Foam::endl;
-            }
 
-            // Write slaves
-            for (label proci = 1; proci < UPstream::nProcs(comm); proci++)
+            // Gather data from (a slice of) the slaves
+            List<int> sliceOffsets;
+            List<char> recvData;
+            gatherSlaveData
+            (
+                comm,
+                data,
+                recvSizes,
+
+                startAndSize[0],    // startProc,
+                startAndSize[1],    // nProcs,
+
+                sliceOffsets,
+                recvData
+            );
+
+            if (UPstream::master(comm))
             {
-                os << nl << nl << "// Processor" << proci << nl;
-                start[proci] = os.stdStream().tellp();
+                OSstream& os = osPtr();
 
-                if (Pstream::finishedRequest(startOfRequests+proci-1))
+                // Write slaves
+                for
+                (
+                    label proci = startAndSize[0];
+                    proci < startAndSize[0]+startAndSize[1];
+                    proci++
+                )
                 {
-                    os << recvBufs[proci];
+                    os << nl << nl << "// Processor" << proci << nl;
+                    start[proci] = os.stdStream().tellp();
+
+                    os <<
+                        SubList<char>
+                        (
+                            recvData,
+                            sliceOffsets[proci+1]-sliceOffsets[proci],
+                            sliceOffsets[proci]
+                        );
                 }
             }
 
-            Pstream::resetRequests(startOfRequests);
+            startAndSize[0] += startAndSize[1];
+        }
 
-            ok = os.good();
+        if (UPstream::master(comm))
+        {
+            ok = osPtr().good();
         }
     }
-    if (debug)
-    {
-        struct timeval tv;
-        gettimeofday(&tv, nullptr);
-        Pout<< "Finished master-only writing at:"
-            << 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s"
-            << Foam::endl;
-    }
 
     if (syncReturnState)
     {
@@ -868,8 +1022,23 @@ bool Foam::decomposedBlockData::writeObject
         osPtr.reset(new OFstream(objectPath(), IOstream::BINARY, ver, cmp));
         IOobject::writeHeader(osPtr());
     }
+
+    labelList recvSizes;
+    gather(comm_, this->byteSize(), recvSizes);
+
     List<std::streamoff> start;
-    return writeBlocks(comm_, osPtr, start, *this, commsType_);
+    List<char> slaveData;           // dummy already received slave data
+    return writeBlocks
+    (
+        comm_,
+        osPtr,
+        start,
+        *this,
+        recvSizes,
+        false,                      // don't have slave data
+        slaveData,
+        commsType_
+    );
 }
 
 
diff --git a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.H b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.H
index dc4e3d70f..1def38f9e 100644
--- a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.H
+++ b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.H
@@ -169,6 +169,32 @@ public:
             const UPstream::commsTypes commsType
         );
 
+        //- Helper: gather single label. Note: using native Pstream.
+        //  datas sized with num procs but undefined contents on
+        //  slaves
+        static void gather
+        (
+            const label comm,
+            const label data,
+            labelList& datas
+        );
+
+        //- Helper: gather data from (subset of) slaves. Returns
+        //  recvData : received data
+        //  recvOffsets : offset in data. recvOffsets is nProcs+1
+        static void gatherSlaveData
+        (
+            const label comm,
+            const UList<char>& data,
+            const labelUList& recvSizes,
+
+            const label startProc,
+            const label nProcs,
+
+            List<int>& recvOffsets,
+            List<char>& recvData
+        );
+
         //- Write *this. Ostream only valid on master. Returns starts of
         //  processor blocks
         static bool writeBlocks
@@ -177,6 +203,12 @@ public:
             autoPtr<OSstream>& osPtr,
             List<std::streamoff>& start,
             const UList<char>&,
+
+            const labelUList& recvSizes,
+
+            const bool haveSlaveData,       // does master have slaveData
+            const List<char>& slaveData,    // optional slave data (on master)
+
             const UPstream::commsTypes,
             const bool syncReturnState = true
         );
diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H
index fab2dd7ff..d3c4e2da6 100644
--- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H
+++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H
@@ -499,6 +499,47 @@ public:
             labelUList& recvData,
             const label communicator = 0
         );
+
+        //- Exchange data with all processors (in the communicator)
+        //  sendSizes, sendOffsets give (per processor) the slice of
+        //  sendData to send, similarly recvSizes, recvOffsets give the slice
+        //  of recvData to receive
+        static void allToAll
+        (
+            const char* sendData,
+            const UList<int>& sendSizes,
+            const UList<int>& sendOffsets,
+
+            char* recvData,
+            const UList<int>& recvSizes,
+            const UList<int>& recvOffsets,
+
+            const label communicator = 0
+        );
+
+        //- Receive data from all processors on the master
+        static void gather
+        (
+            const char* sendData,
+            int sendSize,
+
+            char* recvData,
+            const UList<int>& recvSizes,
+            const UList<int>& recvOffsets,
+            const label communicator = 0
+        );
+
+        //- Send data to all processors from the root of the communicator
+        static void scatter
+        (
+            const char* sendData,
+            const UList<int>& sendSizes,
+            const UList<int>& sendOffsets,
+
+            char* recvData,
+            int recvSize,
+            const label communicator = 0
+        );
 };
 
 
diff --git a/src/OpenFOAM/db/dictionary/functionEntries/functionEntry/functionEntry.C b/src/OpenFOAM/db/dictionary/functionEntries/functionEntry/functionEntry.C
index 8b1585ea7..5837bd577 100644
--- a/src/OpenFOAM/db/dictionary/functionEntries/functionEntry/functionEntry.C
+++ b/src/OpenFOAM/db/dictionary/functionEntries/functionEntry/functionEntry.C
@@ -2,7 +2,7 @@
   =========                 |
   \\      /  F ield         | OpenFOAM: The Open Source CFD Toolbox
    \\    /   O peration     |
-    \\  /    A nd           | Copyright (C) 2011-2017 OpenFOAM Foundation
+    \\  /    A nd           | Copyright (C) 2011-2016 OpenFOAM Foundation
      \\/     M anipulation  |
 -------------------------------------------------------------------------------
 License
@@ -172,8 +172,6 @@ void Foam::functionEntry::write(Ostream& os) const
     {
         os.write(s[i]);
     }
-
-    os << endl;
 }
 
 
diff --git a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C
index b5c8ab494..a6825993b 100644
--- a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C
+++ b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C
@@ -25,11 +25,7 @@ License
 
 #include "OFstreamCollator.H"
 #include "OFstream.H"
-#include "OSspecific.H"
-#include "IOstreams.H"
-#include "Pstream.H"
 #include "decomposedBlockData.H"
-#include "PstreamReduceOps.H"
 
 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
 
@@ -46,7 +42,10 @@ bool Foam::OFstreamCollator::writeFile
     const label comm,
     const word& typeName,
     const fileName& fName,
-    const string& s,
+    const string& masterData,
+    const labelUList& recvSizes,
+    const bool haveSlaveData,           // does master have slaveData
+    const UList<char>& slaveData,       // on master: slave data
     IOstream::streamFormat fmt,
     IOstream::versionNumber ver,
     IOstream::compressionType cmp,
@@ -55,7 +54,7 @@ bool Foam::OFstreamCollator::writeFile
 {
     if (debug)
     {
-        Pout<< "OFstreamCollator : Writing " << s.size()
+        Pout<< "OFstreamCollator : Writing " << masterData.size()
             << " bytes to " << fName
             << " using comm " << comm << endl;
     }
@@ -90,11 +89,17 @@ bool Foam::OFstreamCollator::writeFile
         );
     }
 
-    UList<char> slice(const_cast<char*>(s.data()), label(s.size()));
 
-    // Assuming threaded writing hides any slowness so we might
-    // as well use scheduled communication to send the data to
-    // the master processor in order.
+    UList<char> slice
+    (
+        const_cast<char*>(masterData.data()),
+        label(masterData.size())
+    );
+
+    // Assuming threaded writing hides any slowness so we
+    // can use scheduled communication to send the data to
+    // the master processor in order. However can be unstable
+    // for some mpi so default is non-blocking.
 
     List<std::streamoff> start;
     decomposedBlockData::writeBlocks
@@ -103,7 +108,10 @@ bool Foam::OFstreamCollator::writeFile
         osPtr,
         start,
         slice,
-        UPstream::commsTypes::scheduled,
+        recvSizes,
+        haveSlaveData,
+        slaveData,
+        UPstream::commsTypes::nonBlocking,  //scheduled,
         false       // do not reduce return state
     );
 
@@ -115,8 +123,18 @@ bool Foam::OFstreamCollator::writeFile
 
     if (debug)
     {
-        Pout<< "OFstreamCollator : Finished writing " << s.size()
-            << " bytes to " << fName
+        Pout<< "OFstreamCollator : Finished writing " << masterData.size()
+            << " bytes";
+        if (UPstream::master(comm))
+        {
+            off_t sum = 0;
+            forAll(recvSizes, i)
+            {
+                sum += recvSizes[i];
+            }
+            Pout<< " (overall " << sum << ")";
+        }
+        Pout<< " to " << fName
             << " using comm " << comm << endl;
     }
 
@@ -133,14 +151,11 @@ void* Foam::OFstreamCollator::writeAll(void *threadarg)
     {
         writeData* ptr = nullptr;
 
-        //pthread_mutex_lock(&handler.mutex_);
         lockMutex(handler.mutex_);
-
         if (handler.objects_.size())
         {
             ptr = handler.objects_.pop();
         }
-        //pthread_mutex_unlock(&handler.mutex_);
         unlockMutex(handler.mutex_);
 
         if (!ptr)
@@ -151,10 +166,14 @@ void* Foam::OFstreamCollator::writeAll(void *threadarg)
         {
             bool ok = writeFile
             (
-                handler.comm_,
+                ptr->comm_,
                 ptr->typeName_,
                 ptr->pathName_,
                 ptr->data_,
+                ptr->sizes_,
+                ptr->haveSlaveData_,
+                ptr->slaveData_,
+
                 ptr->format_,
                 ptr->version_,
                 ptr->compression_,
@@ -177,22 +196,54 @@ void* Foam::OFstreamCollator::writeAll(void *threadarg)
         Pout<< "OFstreamCollator : Exiting write thread " << endl;
     }
 
-    //pthread_mutex_lock(&handler.mutex_);
     lockMutex(handler.mutex_);
     handler.threadRunning_ = false;
-    //pthread_mutex_unlock(&handler.mutex_);
     unlockMutex(handler.mutex_);
 
     return nullptr;
 }
 
 
+void Foam::OFstreamCollator::waitForBufferSpace(const off_t wantedSize) const
+{
+    while (true)
+    {
+        // Count files to be written
+        off_t totalSize = 0;
+
+        lockMutex(mutex_);
+        forAllConstIter(FIFOStack<writeData*>, objects_, iter)
+        {
+            totalSize += iter()->size();
+        }
+        unlockMutex(mutex_);
+
+        if (totalSize == 0 || (totalSize+wantedSize) <= maxBufferSize_)
+        {
+            break;
+        }
+
+        if (debug)
+        {
+            lockMutex(mutex_);
+            Pout<< "OFstreamCollator : Waiting for buffer space."
+                << " Currently in use:" << totalSize
+                << " limit:" << maxBufferSize_
+                << " files:" << objects_.size()
+                << endl;
+            unlockMutex(mutex_);
+        }
+
+        sleep(5);
+    }
+}
+
+
 // * * * * * * * * * * * * * * * * Constructors  * * * * * * * * * * * * * * //
 
 Foam::OFstreamCollator::OFstreamCollator(const off_t maxBufferSize)
 :
     maxBufferSize_(maxBufferSize),
-    //mutex_(PTHREAD_MUTEX_INITIALIZER),
     mutex_
     (
         maxBufferSize_ > 0
@@ -228,7 +279,6 @@ Foam::OFstreamCollator::~OFstreamCollator()
             Pout<< "~OFstreamCollator : Waiting for write thread" << endl;
         }
 
-        //pthread_join(thread_, nullptr);
         joinThread(thread_);
     }
     if (thread_ != -1)
@@ -259,56 +309,153 @@ bool Foam::OFstreamCollator::write
     const bool append
 )
 {
-    if (maxBufferSize_ > 0)
+    // Determine (on master) sizes to receive. Note: do NOT use thread
+    // communicator
+    labelList recvSizes;
+    decomposedBlockData::gather(Pstream::worldComm, data.size(), recvSizes);
+    off_t totalSize = 0;
+    label maxLocalSize = 0;
     {
-        while (true)
+        for (label proci = 0; proci < recvSizes.size(); proci++)
         {
-            // Count files to be written
-            off_t totalSize = 0;
-            //pthread_mutex_lock(&mutex_);
-            lockMutex(mutex_);
-            forAllConstIter(FIFOStack<writeData*>, objects_, iter)
-            {
-                totalSize += iter()->data_.size();
-            }
-            //pthread_mutex_unlock(&mutex_);
-            unlockMutex(mutex_);
+            totalSize += recvSizes[proci];
+            maxLocalSize = max(maxLocalSize, recvSizes[proci]);
+        }
+        Pstream::scatter(totalSize, Pstream::msgType(), Pstream::worldComm);
+        Pstream::scatter(maxLocalSize, Pstream::msgType(), Pstream::worldComm);
+    }
+
+    if (maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_)
+    {
+        if (debug)
+        {
+            Pout<< "OFstreamCollator : non-thread gather and write of " << fName
+                << " using worldComm" << endl;
+        }
+        // Direct collating and writing (so master blocks until all written!)
+        const List<char> dummySlaveData;
+        return writeFile
+        (
+            UPstream::worldComm,
+            typeName,
+            fName,
+            data,
+            recvSizes,
+            false,              // no slave data provided yet
+            dummySlaveData,
+            fmt,
+            ver,
+            cmp,
+            append
+        );
+    }
+    else if (totalSize <= maxBufferSize_)
+    {
+        // Total size can be stored locally so receive all data now and only
+        // do the writing in the thread
 
-            if
+        if (debug)
+        {
+            Pout<< "OFstreamCollator : non-thread gather; thread write of "
+                << fName << endl;
+        }
+
+        if (Pstream::master())
+        {
+            waitForBufferSpace(totalSize);
+        }
+
+        // Allocate local buffer for all collated data
+        autoPtr<writeData> fileAndDataPtr
+        (
+            new writeData
             (
-                totalSize == 0
-             || (totalSize+off_t(data.size()) < maxBufferSize_)
+                comm_,      // Note: comm not actually used anymore
+                typeName,
+                fName,
+                data,
+                recvSizes,
+                true,       // have slave data (collected below)
+                fmt,
+                ver,
+                cmp,
+                append
             )
-            {
-                break;
-            }
+        );
+        writeData& fileAndData = fileAndDataPtr();
+
+        // Gather the slave data and insert into fileAndData
+        UList<char> slice(const_cast<char*>(data.data()), label(data.size()));
+        List<int> slaveOffsets;
+        decomposedBlockData::gatherSlaveData
+        (
+            Pstream::worldComm,         // Note: using simulation thread
+            slice,
+            recvSizes,
+
+            1,                          // startProc,
+            Pstream::nProcs()-1,        // n procs
+
+            slaveOffsets,
+            fileAndData.slaveData_
+        );
+
+        // Append to thread buffer
+        lockMutex(mutex_);
+        objects_.push(fileAndDataPtr.ptr());
+        unlockMutex(mutex_);
 
+        // Start thread if not running
+        lockMutex(mutex_);
+        if (!threadRunning_)
+        {
+            createThread(thread_, writeAll, this);
             if (debug)
             {
-                Pout<< "OFstreamCollator : Waiting for buffer space."
-                    << " Currently in use:" << totalSize
-                    << " limit:" << maxBufferSize_
-                    << endl;
+                Pout<< "OFstreamCollator : Started write thread "
+                    << thread_ << endl;
             }
-
-            sleep(5);
+            threadRunning_ = true;
         }
+        unlockMutex(mutex_);
 
+        return true;
+    }
+    else
+    {
         if (debug)
         {
-            Pout<< "OFstreamCollator : relaying write of " << fName
-                << " to thread " << endl;
+            Pout<< "OFstreamCollator : thread gather and write of " << fName
+                << " in thread " << thread_
+                << " using communicator " << comm_ << endl;
         }
-        //pthread_mutex_lock(&mutex_);
+
+        if (Pstream::master())
+        {
+            waitForBufferSpace(data.size());
+        }
+
         lockMutex(mutex_);
+        // Push all file info on buffer. Note that no slave data provided
+        // so it will trigger communication inside the thread
         objects_.push
         (
-            new writeData(typeName, fName, data, fmt, ver, cmp, append)
+            new writeData
+            (
+                comm_,
+                typeName,
+                fName,
+                data,
+                recvSizes,
+                false,          // Have no slave data; collect in thread
+                fmt,
+                ver,
+                cmp,
+                append
+            )
         );
-        //pthread_mutex_unlock(&mutex_);
         unlockMutex(mutex_);
 
-        //pthread_mutex_lock(&mutex_);
         lockMutex(mutex_);
         if (!threadRunning_)
         {
@@ -319,16 +466,10 @@ bool Foam::OFstreamCollator::write
             }
             threadRunning_ = true;
         }
-        //pthread_mutex_unlock(&mutex_);
         unlockMutex(mutex_);
 
         return true;
     }
-    else
-    {
-        // Immediate writing
-        return writeFile(comm_, typeName, fName, data, fmt, ver, cmp, append);
-    }
 }
 
 
diff --git a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.H b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.H
index 17904fc38..bf4eeb571 100644
--- a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.H
+++ b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.H
@@ -27,6 +27,22 @@ Class
 Description
     Threaded file writer.
 
+    Collects all data from all processors and writes as single
+    'decomposedBlockData' file. The operation is determined by the
+    buffer size (maxThreadFileBufferSize setting):
+    - local size of data is larger than buffer: receive and write processor
+    by processor (i.e. 'scheduled'). Does not use a thread, no file size
+    limit.
+    - total size of data is larger than buffer (but local is not):
+    thread does all the collecting and writing of the processors. No file
+    size limit.
+    - total size of data is less than buffer:
+    collecting is done locally; the thread only does the writing
+    (since the data has already been collected)
+
+
+Operation determine
+
 SourceFiles
     OFstreamCollator.C
 
@@ -36,7 +52,7 @@ SourceFiles
 #define OFstreamCollator_H
 
 #include "IOstream.H"
-#include "List.H"
+#include "labelList.H"
 #include "FIFOStack.H"
 
 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
@@ -56,9 +72,15 @@ class OFstreamCollator
         {
         public:
 
+            const label comm_;
             const word typeName_;
             const fileName pathName_;
             const string data_;
+            const labelList sizes_;
+
+            const bool haveSlaveData_;
+            List<char> slaveData_;
+
             const IOstream::streamFormat format_;
             const IOstream::versionNumber version_;
             const IOstream::compressionType compression_;
@@ -66,23 +88,36 @@ class OFstreamCollator
 
             writeData
             (
+                const label comm,
                 const word& typeName,
                 const fileName& pathName,
                 const string& data,
+                const labelList& sizes,
+                const bool haveSlaveData,
                 IOstream::streamFormat format,
                 IOstream::versionNumber version,
                 IOstream::compressionType compression,
                 const bool append
             )
             :
+                comm_(comm),
                 typeName_(typeName),
                 pathName_(pathName),
                 data_(data),
+                sizes_(sizes),
+                haveSlaveData_(haveSlaveData),
+                slaveData_(0),
                 format_(format),
                 version_(version),
                 compression_(compression),
                 append_(append)
             {}
+
+            //- (approximate) size of master + any optional slave data
+            off_t size() const
+            {
+                return data_.size() + slaveData_.size();
+            }
         };
 
 
@@ -112,7 +147,10 @@ class OFstreamCollator
             const label comm,
             const word& typeName,
             const fileName& fName,
-            const string& data,
+            const string& masterData,
+            const labelUList& recvSizes,
+            const bool haveSlaveData,       // (does master) have slave data
+            const UList<char>& slaveData,   // (on master) all slave data
             IOstream::streamFormat fmt,
             IOstream::versionNumber ver,
             IOstream::compressionType cmp,
@@ -122,6 +160,10 @@ class OFstreamCollator
         //- Write all files in stack
         static void* writeAll(void *threadarg);
 
+        //- Wait for total size of objects_ (master + optional slave data)
+        //  to be wantedSize less than overall maxBufferSize.
+        void waitForBufferSpace(const off_t wantedSize) const;
+
 
 public:
 
diff --git a/src/Pstream/dummy/UPstream.C b/src/Pstream/dummy/UPstream.C
index 124e22a0c..dd5ac01b3 100644
--- a/src/Pstream/dummy/UPstream.C
+++ b/src/Pstream/dummy/UPstream.C
@@ -92,6 +92,36 @@ void Foam::UPstream::allToAll
 }
 
 
+void Foam::UPstream::gather
+(
+    const char* sendData,
+    int sendSize,
+
+    char* recvData,
+    const UList<int>& recvSizes,
+    const UList<int>& recvOffsets,
+    const label communicator
+)
+{
+    memmove(recvData, sendData, sendSize);
+}
+
+
+void Foam::UPstream::scatter
+(
+    const char* sendData,
+    const UList<int>& sendSizes,
+    const UList<int>& sendOffsets,
+
+    char* recvData,
+    int recvSize,
+    const label communicator
+)
+{
+    memmove(recvData, sendData, recvSize);
+}
+
+
 void Foam::UPstream::allocatePstreamCommunicator
 (
     const label,
diff --git a/src/Pstream/mpi/UPstream.C b/src/Pstream/mpi/UPstream.C
index 317eaf260..6554f7b00 100644
--- a/src/Pstream/mpi/UPstream.C
+++ b/src/Pstream/mpi/UPstream.C
@@ -2,7 +2,7 @@
   =========                 |
   \\      /  F ield         | OpenFOAM: The Open Source CFD Toolbox
    \\    /   O peration     |
-    \\  /    A nd           | Copyright (C) 2011-2016 OpenFOAM Foundation
+    \\  /    A nd           | Copyright (C) 2011-2017 OpenFOAM Foundation
      \\/     M anipulation  |
 -------------------------------------------------------------------------------
 License
@@ -362,6 +362,199 @@ void Foam::UPstream::allToAll
 }
 
 
+void Foam::UPstream::allToAll
+(
+    const char* sendData,
+    const UList<int>& sendSizes,
+    const UList<int>& sendOffsets,
+
+    char* recvData,
+    const UList<int>& recvSizes,
+    const UList<int>& recvOffsets,
+
+    const label communicator
+)
+{
+    label np = nProcs(communicator);
+
+    if
+    (
+        sendSizes.size() != np
+     || sendOffsets.size() != np
+     || recvSizes.size() != np
+     || recvOffsets.size() != np
+    )
+    {
+        FatalErrorInFunction
+            << "Size of sendSize " << sendSizes.size()
+            << ", sendOffsets " << sendOffsets.size()
+            << ", recvSizes " << recvSizes.size()
+            << " or recvOffsets " << recvOffsets.size()
+            << " is not equal to the number of processors in the domain "
+            << np
+            << Foam::abort(FatalError);
+    }
+
+    if (!UPstream::parRun())
+    {
+        if (recvSizes[0] != sendSizes[0])
+        {
+            FatalErrorInFunction
+                << "Bytes to send " << sendSizes[0]
+                << " does not equal bytes to receive " << recvSizes[0]
+                << Foam::abort(FatalError);
+        }
+        memmove(recvData, &sendData[sendOffsets[0]], recvSizes[0]);
+    }
+    else
+    {
+        if
+        (
+            MPI_Alltoallv
+            (
+                sendData,
+                sendSizes.begin(),
+                sendOffsets.begin(),
+                MPI_BYTE,
+                recvData,
+                recvSizes.begin(),
+                recvOffsets.begin(),
+                MPI_BYTE,
+                PstreamGlobals::MPICommunicators_[communicator]
+            )
+        )
+        {
+            FatalErrorInFunction
+                << "MPI_Alltoallv failed for sendSizes " << sendSizes
+                << " recvSizes " << recvSizes
+                << " communicator " << communicator
+                << Foam::abort(FatalError);
+        }
+    }
+}
+
+
+void Foam::UPstream::gather
+(
+    const char* sendData,
+    int sendSize,
+
+    char* recvData,
+    const UList<int>& recvSizes,
+    const UList<int>& recvOffsets,
+    const label communicator
+)
+{
+    label np = nProcs(communicator);
+
+    if
+    (
+        UPstream::master(communicator)
+     && (recvSizes.size() != np || recvOffsets.size() < np)
+    )
+    {
+        // Note: allow recvOffsets to be e.g. 1 larger than np so we
+        // can easily loop over the result
+
+        FatalErrorInFunction
+            << "Size of recvSizes " << recvSizes.size()
+            << " or recvOffsets " << recvOffsets.size()
+            << " is not equal to the number of processors in the domain "
+            << np
+            << Foam::abort(FatalError);
+    }
+
+    if (!UPstream::parRun())
+    {
+        memmove(recvData, sendData, sendSize);
+    }
+    else
+    {
+        if
+        (
+            MPI_Gatherv
+            (
+                sendData,
+                sendSize,
+                MPI_BYTE,
+                recvData,
+                recvSizes.begin(),
+                recvOffsets.begin(),
+                MPI_BYTE,
+                0,
+                MPI_Comm(PstreamGlobals::MPICommunicators_[communicator])
+            )
+        )
+        {
+            FatalErrorInFunction
+                << "MPI_Gatherv failed for sendSize " << sendSize
+                << " recvSizes " << recvSizes
+                << " communicator " << communicator
+                << Foam::abort(FatalError);
+        }
+    }
+}
+
+
+void Foam::UPstream::scatter
+(
+    const char* sendData,
+    const UList<int>& sendSizes,
+    const UList<int>& sendOffsets,
+
+    char* recvData,
+    int recvSize,
+    const label communicator
+)
+{
+    label np = nProcs(communicator);
+
+    if
+    (
+        UPstream::master(communicator)
+     && (sendSizes.size() != np || sendOffsets.size() != np)
+    )
+    {
+        FatalErrorInFunction
+            << "Size of sendSizes " << sendSizes.size()
+            << " or sendOffsets " << sendOffsets.size()
+            << " is not equal to the number of processors in the domain "
+            << np
+            << Foam::abort(FatalError);
+    }
+
+    if (!UPstream::parRun())
+    {
+        memmove(recvData, sendData, recvSize);
+    }
+    else
+    {
+        if
+        (
+            MPI_Scatterv
+            (
+                sendData,
+                sendSizes.begin(),
+                sendOffsets.begin(),
+                MPI_BYTE,
+                recvData,
+                recvSize,
+                MPI_BYTE,
+                0,
+                MPI_Comm(PstreamGlobals::MPICommunicators_[communicator])
+            )
+        )
+        {
+            FatalErrorInFunction
+                << "MPI_Scatterv failed for sendSizes " << sendSizes
+                << " sendOffsets " << sendOffsets
+                << " communicator " << communicator
+                << Foam::abort(FatalError);
+        }
+    }
+}
+
+
 void Foam::UPstream::allocatePstreamCommunicator
 (
     const label parentIndex,
masterCollation.patch (38,880 bytes)   

MattijsJ

2017-10-20 16:45

reporter   ~0008915

Dear Matthias,

the new patch (masterCollation.patch) changes the behaviour to preferentially collect all data in the simulation thread so the write thread does not have to do any parallel communication anymore.

henry

2017-10-27 17:15

manager   ~0008949

Resolved by commit ea85635b2d451d8ae1c6420db13a76e31d655d5f

matthias

2017-10-27 17:37

reporter   ~0008950

Please, can you commit the patch to version 5.x too? That would be great, thanks.

henry

2017-10-28 09:25

manager   ~0008951

commit 16416c79ecb71d923a0007618d1b7231c6cea05a

matthias

2017-10-29 02:05

reporter   ~0008952

Dear Henry,

there is still a minor bug for the OF-5.x version which prevents the compilation of Pstream lib.

Please can you modifiy src/Pstream/mpi/UPstream.C (line 95) and change

setParRun(numprocs);

to

setParRun(numprocs, provided_thread_support == MPI_THREAD_MULTIPLE);


As the interface of static setParRun has changed in the patch the calls of that function need to be changed too

matthias

2017-10-29 02:43

reporter   ~0008953

In UPstream.C, the new gather and scatter functions are missing too. UPstream should be the same as in the dev version.

matthias

2017-10-29 02:50

reporter   ~0008954

Actually all modifications in the Pstream lib (dummy and mpi) seem to be missing for OF-5.x. Please compare to dev version.

henry

2017-10-29 08:40

manager   ~0008955

Sorry about the omission, try commit 197d9d3bf20ac0641a8fe4f57be6a6325ca834b1

matthias

2017-10-29 10:39

reporter   ~0008956

Thank you for the last commit, it's working now.

Issue History

Date Modified Username Field Change
2017-08-18 15:31 matthias New Issue
2017-08-18 15:31 matthias File Added: log
2017-08-27 14:54 MattijsJ Note Added: 0008636
2017-08-28 08:51 matthias Note Added: 0008637
2017-08-28 09:04 matthias Note Added: 0008638
2017-08-28 09:43 matthias File Added: log-2
2017-08-28 09:43 matthias Note Added: 0008639
2017-08-28 20:00 MattijsJ Note Added: 0008640
2017-08-28 20:18 matthias File Added: log-3
2017-08-28 20:18 matthias Note Added: 0008641
2017-08-28 20:44 matthias File Added: log-4
2017-08-28 20:44 matthias Note Added: 0008642
2017-08-29 20:30 MattijsJ Note Added: 0008648
2017-08-29 20:55 MattijsJ Note Added: 0008649
2017-09-01 12:57 MattijsJ File Added: OFstreamCollator.H
2017-09-01 12:57 MattijsJ File Added: OFstreamCollator.C
2017-09-01 13:01 MattijsJ Note Added: 0008652
2017-09-01 13:08 matthias Note Added: 0008653
2017-09-01 16:51 matthias File Added: log.pitzdaily
2017-09-01 16:51 matthias Note Added: 0008656
2017-09-01 17:02 matthias File Added: log.pitzdaily2
2017-09-01 17:02 matthias Note Added: 0008657
2017-09-01 17:34 MattijsJ Note Added: 0008658
2017-09-01 18:01 matthias Note Added: 0008660
2017-09-01 18:11 matthias Note Added: 0008662
2017-09-01 18:29 matthias File Added: log.pitzdaily3
2017-09-01 18:29 matthias Note Added: 0008663
2017-09-01 18:29 matthias Note Added: 0008664
2017-09-08 11:22 MattijsJ Note Added: 0008729
2017-09-08 11:37 matthias Note Added: 0008730
2017-09-12 12:41 MattijsJ File Added: decomposedBlockData.C
2017-09-12 12:41 MattijsJ Note Added: 0008758
2017-09-15 14:23 MattijsJ Note Added: 0008780
2017-09-20 12:25 matthias Note Added: 0008783
2017-09-20 12:29 matthias Note Added: 0008784
2017-09-20 20:58 MattijsJ Note Added: 0008787
2017-10-10 17:42 MattijsJ File Added: nonBlocking.patch
2017-10-10 17:44 MattijsJ Note Added: 0008854
2017-10-12 09:11 matthias Note Added: 0008858
2017-10-20 16:43 MattijsJ File Added: masterCollation.patch
2017-10-20 16:45 MattijsJ Note Added: 0008915
2017-10-27 17:15 henry Assigned To => henry
2017-10-27 17:15 henry Status new => resolved
2017-10-27 17:15 henry Resolution open => fixed
2017-10-27 17:15 henry Fixed in Version => dev
2017-10-27 17:15 henry Note Added: 0008949
2017-10-27 17:37 matthias Status resolved => feedback
2017-10-27 17:37 matthias Resolution fixed => reopened
2017-10-27 17:37 matthias Note Added: 0008950
2017-10-28 09:25 henry Status feedback => resolved
2017-10-28 09:25 henry Resolution reopened => fixed
2017-10-28 09:25 henry Fixed in Version dev => 5.x
2017-10-28 09:25 henry Note Added: 0008951
2017-10-29 02:05 matthias Status resolved => feedback
2017-10-29 02:05 matthias Resolution fixed => reopened
2017-10-29 02:05 matthias Note Added: 0008952
2017-10-29 02:43 matthias Note Added: 0008953
2017-10-29 02:43 matthias Status feedback => assigned
2017-10-29 02:50 matthias Note Added: 0008954
2017-10-29 08:40 henry Note Added: 0008955
2017-10-29 10:39 matthias Note Added: 0008956
2017-10-29 10:47 henry Status assigned => resolved
2017-10-29 10:47 henry Resolution reopened => fixed