View Issue Details

IDProjectCategoryView StatusLast Update
0002744OpenFOAMBugpublic2017-11-17 12:14
Reportersushpa Assigned Tohenry  
PrioritynormalSeveritycrashReproducibilitysometimes
Status resolvedResolutionfixed 
PlatformLinux64OSCentOSOS Version7.4
Product Versiondev 
Fixed in Versiondev 
Summary0002744: Cannot reconstruct a decomposed collated case
DescriptionreconstructPar fails when run on a decomposed case which uses the collated file format.

OF-dev ea85635b2d451d8ae1c6420db13a76e31d655d5f.
Steps To ReproduceTutorial case coldEngineFoam/freePiston:

$ decomposePar
$ mpirun -np 4 coldEngineFoam -parallel
$ reconstructPar

In system/controlDict:

OptimisationSwitches
{
    fileHandler collated;
// maxThreadFileBufferSize 0;
}

The behaviour is sometimes affected by whether or not threading is enabled.
Additional InformationReconstructing fields for mesh region0

Time = 0.001

--> FOAM FATAL IO ERROR:
incorrect first token, expected <int> or '(', found on line 22 an error

file: /cluster/work/lav/apps/OpenFOAM/OpenFOAM-lav/tutorials/combustion/coldEngineFoam/freePiston/processors/0.001/polyMesh/points at line 22.

    From function Foam::Istream &Foam::operator>>(Foam::Istream &, Foam::List<T> &) [with T = char]
    in file lnInclude/ListIO.C at line 148.

FOAM exiting

Sometimes (depending on whether threading is enabled or not) it goes through for the first few time steps and stops later:

Time = 0.005

Reconstructing FV fields

    Reconstructing volScalarFields

        air
        nut

    Reconstructing volVectorFields

        U


--> FOAM FATAL IO ERROR:
Expected a ')' while reading binaryBlock, found on line 27 an error

file: /cluster/work/lav/apps/OpenFOAM/OpenFOAM-lav/tutorials/combustion/coldEngineFoam/freePiston/processors/0.005/U at line 27.

    From function Foam::Istream &Foam::Istream::readEnd(const char *)
    in file db/IOstreams/IOstreams/Istream.C at line 109.

FOAM exiting
TagsCollated

Activities

MattijsJ

2017-11-03 09:54

reporter  

controlDict (1,513 bytes)   
/*--------------------------------*- C++ -*----------------------------------*\
| =========                 |                                                 |
| \\      /  F ield         | OpenFOAM: The Open Source CFD Toolbox           |
|  \\    /   O peration     | Version:  dev                                   |
|   \\  /    A nd           | Web:      www.OpenFOAM.org                      |
|    \\/     M anipulation  |                                                 |
\*---------------------------------------------------------------------------*/
FoamFile
{
    version     2.0;
    format      ascii;
    class       dictionary;
    object      controlDict;
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //

DebugSwitches
{
    collated    1;
}

OptimisationSwitches
{
    fileHandler collated;
    maxThreadFileBufferSize 100;
}

application             coldEngineFoam;

startFrom               startTime;

startTime               0;

stopAt                  endTime;

endTime                 0.0003;

deltaT                  5.0e-7;

writeControl            adjustableRunTime;

writeInterval           0.0001;

purgeWrite              0;

writeFormat             binary;

writePrecision          6;

writeCompression        off;

timeFormat              general;

timePrecision           8;

runTimeModifiable       true;

adjustTimeStep          yes;

maxCo                   0.25;

// ************************************************************************* //
controlDict (1,513 bytes)   

MattijsJ

2017-11-03 10:01

reporter   ~0008986

I cannot repeat this. Tried:

- coldEngineFoam tutorial
- decompose into 4 with scotch
- changed (to speed up simulation)
writeInterval 0.0001;
endTime 0.0003;
- reconstructPar

I am running this on a shared memory machine.

- are you running with NFS storage?
- could you compare the conflicting file against runs that did succeed?
- any particular setting that always works (you mentioned threading)

sushpa

2017-11-04 12:41

reporter   ~0008999

Are you able to reproduce it with:

OptimisationSwitches {
    maxMasterFileBufferSize 0;
    maxThreadFileBufferSize 0;
    fileHandler collated;
}

For me this issue now seems reproducible with the above, using both collated and masterCollated. Decomposition method has no influence nor MPI ranks (as far as I can tell). The issue applies only when threading is turned off.

In decomposedBlockData.C (line numbers may vary):

   832 while
   833 (
   834 proci < nProcs
   835 && (
   836 totalSize+recvSizes[proci]
   837 < fileOperations::masterUncollatedFileOperation::
   838 maxMasterFileBufferSize
   839 )
   840 )
   841 {
   842 totalSize += recvSizes[proci];
   843 proci++;
   844 }

When the comparison fails, ranks 1..n do not write out any data to the files (e.g. processors/0.0001/polyMesh/points only has data from rank 0).

If it still doesn't show up in your trials, I can put together and upload a successful and unsuccessful case and logs (with DebugSwitch on). My recent tries were on a local workstation, 4 cores, no NFS.

I can work around it at the moment by using collated file handler with maxMasterFileBufferSize 2e9.

As an aside: is it a misconfiguration if I specify masterCollated with maxMasterFileBufferSize 0? Or what would be the intended behaviour here?

MattijsJ

2017-11-07 16:20

reporter   ~0009014

I have not been able to reproduce it (with your settings) but not really looked at the code yet. I guess if you specify maxMasterFileBufferSize all communication should be blocking ('scheduled' in OpenFOAM speak).

MattijsJ

2017-11-10 12:19

reporter  

decomposedBlockData.C (27,445 bytes)   
/*---------------------------------------------------------------------------*\
  =========                 |
  \\      /  F ield         | OpenFOAM: The Open Source CFD Toolbox
   \\    /   O peration     |
    \\  /    A nd           | Copyright (C) 2017 OpenFOAM Foundation
     \\/     M anipulation  |
-------------------------------------------------------------------------------
License
    This file is part of OpenFOAM.

    OpenFOAM is free software: you can redistribute it and/or modify it
    under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    for more details.

    You should have received a copy of the GNU General Public License
    along with OpenFOAM.  If not, see <http://www.gnu.org/licenses/>.

\*---------------------------------------------------------------------------*/

#include "decomposedBlockData.H"
#include "OPstream.H"
#include "IPstream.H"
#include "PstreamBuffers.H"
#include "OFstream.H"
#include "IFstream.H"
#include "IStringStream.H"
#include "dictionary.H"
#include "objectRegistry.H"
#include "SubList.H"
#include "labelPair.H"
#include "masterUncollatedFileOperation.H"

// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //

namespace Foam
{
    defineTypeNameAndDebug(decomposedBlockData, 0);
}

// * * * * * * * * * * * * * * * * Constructors  * * * * * * * * * * * * * * //

Foam::decomposedBlockData::decomposedBlockData
(
    const label comm,
    const IOobject& io,
    const UPstream::commsTypes commsType
)
:
    regIOobject(io),
    commsType_(commsType),
    comm_(comm)
{
    // Temporary warning
    if (io.readOpt() == IOobject::MUST_READ_IF_MODIFIED)
    {
        WarningInFunction
            << "decomposedBlockData " << name()
            << " constructed with IOobject::MUST_READ_IF_MODIFIED"
            " but decomposedBlockData does not support automatic rereading."
            << endl;
    }
    if
    (
        (
            io.readOpt() == IOobject::MUST_READ
         || io.readOpt() == IOobject::MUST_READ_IF_MODIFIED
        )
     || (io.readOpt() == IOobject::READ_IF_PRESENT && headerOk())
    )
    {
        read();
    }
}


Foam::decomposedBlockData::decomposedBlockData
(
    const label comm,
    const IOobject& io,
    const UList<char>& list,
    const UPstream::commsTypes commsType
)
:
    regIOobject(io),
    commsType_(commsType),
    comm_(comm)
{
    // Temporary warning
    if (io.readOpt() == IOobject::MUST_READ_IF_MODIFIED)
    {
        WarningInFunction
            << "decomposedBlockData " << name()
            << " constructed with IOobject::MUST_READ_IF_MODIFIED"
            " but decomposedBlockData does not support automatic rereading."
            << endl;
    }

    if
    (
        (
            io.readOpt() == IOobject::MUST_READ
         || io.readOpt() == IOobject::MUST_READ_IF_MODIFIED
        )
     || (io.readOpt() == IOobject::READ_IF_PRESENT && headerOk())
    )
    {
        read();
    }
    else
    {
        List<char>::operator=(list);
    }
}


Foam::decomposedBlockData::decomposedBlockData
(
    const label comm,
    const IOobject& io,
    const Xfer<List<char>>& list,
    const UPstream::commsTypes commsType
)
:
    regIOobject(io),
    commsType_(commsType),
    comm_(comm)
{
    // Temporary warning
    if (io.readOpt() == IOobject::MUST_READ_IF_MODIFIED)
    {
        WarningInFunction
            << "decomposedBlockData " << name()
            << " constructed with IOobject::MUST_READ_IF_MODIFIED"
            " but decomposedBlockData does not support automatic rereading."
            << endl;
    }

    List<char>::transfer(list());

    if
    (
        (
            io.readOpt() == IOobject::MUST_READ
         || io.readOpt() == IOobject::MUST_READ_IF_MODIFIED
        )
     || (io.readOpt() == IOobject::READ_IF_PRESENT && headerOk())
    )
    {
        read();
    }
}


// * * * * * * * * * * * * * * * Destructor  * * * * * * * * * * * * * * * * //

Foam::decomposedBlockData::~decomposedBlockData()
{}


// * * * * * * * * * * * * * * * Members Functions * * * * * * * * * * * * * //

bool Foam::decomposedBlockData::readMasterHeader(IOobject& io, Istream& is)
{
    if (debug)
    {
        Pout<< "decomposedBlockData::readMasterHeader:"
            << " stream:" << is.name() << endl;
    }

    // Master-only reading of header
    is.fatalCheck("read(Istream&)");

    List<char> data(is);
    is.fatalCheck("read(Istream&) : reading entry");
    string buf(data.begin(), data.size());
    IStringStream str(is.name(), buf);

    return io.readHeader(str);
}


void Foam::decomposedBlockData::writeHeader
(
    Ostream& os,
    const IOstream::versionNumber version,
    const IOstream::streamFormat format,
    const word& type,
    const string& note,
    const fileName& location,
    const word& name
)
{
    IOobject::writeBanner(os)
        << "FoamFile\n{\n"
        << "    version     " << version << ";\n"
        << "    format      " << format << ";\n"
        << "    class       " << type << ";\n";
    if (note.size())
    {
        os  << "    note        " << note << ";\n";
    }

    if (location.size())
    {
        os  << "    location    " << location << ";\n";
    }

    os  << "    object      " << name << ";\n"
        << "}" << nl;

    IOobject::writeDivider(os) << nl;
}


Foam::autoPtr<Foam::ISstream> Foam::decomposedBlockData::readBlock
(
    const label blocki,
    Istream& is,
    IOobject& headerIO
)
{
    if (debug)
    {
        Pout<< "decomposedBlockData::readBlock:"
            << " stream:" << is.name() << " attempt to read block " << blocki
            << endl;
    }

    is.fatalCheck("read(Istream&)");

    List<char> data;
    autoPtr<ISstream> realIsPtr;

    if (blocki == 0)
    {
        is >> data;
        is.fatalCheck("read(Istream&) : reading entry");

        string buf(data.begin(), data.size());
        realIsPtr = new IStringStream(is.name(), buf);

        // Read header
        if (!headerIO.readHeader(realIsPtr()))
        {
            FatalIOErrorInFunction(realIsPtr())
                << "problem while reading header for object "
                << is.name() << exit(FatalIOError);
        }
    }
    else
    {
        // Read master for header
        is >> data;
        is.fatalCheck("read(Istream&) : reading entry");

        IOstream::versionNumber ver(IOstream::currentVersion);
        IOstream::streamFormat fmt;
        {
            string buf(data.begin(), data.size());
            IStringStream headerStream(is.name(), buf);

            // Read header
            if (!headerIO.readHeader(headerStream))
            {
                FatalIOErrorInFunction(headerStream)
                    << "problem while reading header for object "
                    << is.name() << exit(FatalIOError);
            }
            ver = headerStream.version();
            fmt = headerStream.format();
        }

        for (label i = 1; i < blocki+1; i++)
        {
            // Read data, override old data
            is >> data;
            is.fatalCheck("read(Istream&) : reading entry");
        }
        string buf(data.begin(), data.size());
        realIsPtr = new IStringStream(is.name(), buf);

        // Apply master stream settings to realIsPtr
        realIsPtr().format(fmt);
        realIsPtr().version(ver);
    }
    return realIsPtr;
}


bool Foam::decomposedBlockData::readBlocks
(
    const label comm,
    autoPtr<ISstream>& isPtr,
    List<char>& data,
    const UPstream::commsTypes commsType
)
{
    if (debug)
    {
        Pout<< "decomposedBlockData::readBlocks:"
            << " stream:" << (isPtr.valid() ? isPtr().name() : "invalid")
            << " commsType:" << Pstream::commsTypeNames[commsType]
            << " comm:" << comm << endl;
    }

    bool ok = false;

    if (commsType == UPstream::commsTypes::scheduled)
    {
        if (UPstream::master(comm))
        {
            Istream& is = isPtr();
            is.fatalCheck("read(Istream&)");

            // Read master data
            {
                is >> data;
                is.fatalCheck("read(Istream&) : reading entry");
            }

            // Read slave data
            for
            (
                label proci = 1;
                proci < UPstream::nProcs(comm);
                proci++
            )
            {
                List<char> elems(is);
                is.fatalCheck("read(Istream&) : reading entry");

                OPstream os
                (
                    UPstream::commsTypes::scheduled,
                    proci,
                    0,
                    UPstream::msgType(),
                    comm
                );
                os << elems;
            }

            ok = is.good();
        }
        else
        {
            IPstream is
            (
                UPstream::commsTypes::scheduled,
                UPstream::masterNo(),
                0,
                UPstream::msgType(),
                comm
            );
            is >> data;
        }
    }
    else
    {
        PstreamBuffers pBufs
        (
            UPstream::commsTypes::nonBlocking,
            UPstream::msgType(),
            comm
        );

        if (UPstream::master(comm))
        {
            Istream& is = isPtr();
            is.fatalCheck("read(Istream&)");

            // Read master data
            {
                is >> data;
                is.fatalCheck("read(Istream&) : reading entry");
            }

            // Read slave data
            for
            (
                label proci = 1;
                proci < UPstream::nProcs(comm);
                proci++
            )
            {
                List<char> elems(is);
                is.fatalCheck("read(Istream&) : reading entry");

                UOPstream os(proci, pBufs);
                os << elems;
            }
        }

        labelList recvSizes;
        pBufs.finishedSends(recvSizes);

        if (!UPstream::master(comm))
        {
            UIPstream is(UPstream::masterNo(), pBufs);
            is >> data;
        }
    }

    Pstream::scatter(ok, Pstream::msgType(), comm);

    return ok;
}


Foam::autoPtr<Foam::ISstream> Foam::decomposedBlockData::readBlocks
(
    const label comm,
    const fileName& fName,
    autoPtr<ISstream>& isPtr,
    IOobject& headerIO,
    const UPstream::commsTypes commsType
)
{
    if (debug)
    {
        Pout<< "decomposedBlockData::readBlocks:"
            << " stream:" << (isPtr.valid() ? isPtr().name() : "invalid")
            << " commsType:" << Pstream::commsTypeNames[commsType] << endl;
    }

    bool ok = false;

    List<char> data;
    autoPtr<ISstream> realIsPtr;

    if (commsType == UPstream::commsTypes::scheduled)
    {
        if (UPstream::master(comm))
        {
            Istream& is = isPtr();
            is.fatalCheck("read(Istream&)");

            // Read master data
            {
                is >> data;
                is.fatalCheck("read(Istream&) : reading entry");

                string buf(data.begin(), data.size());
                realIsPtr = new IStringStream(fName, buf);

                // Read header
                if (!headerIO.readHeader(realIsPtr()))
                {
                    FatalIOErrorInFunction(realIsPtr())
                        << "problem while reading header for object "
                        << is.name() << exit(FatalIOError);
                }
            }

            // Read slave data
            for
            (
                label proci = 1;
                proci < UPstream::nProcs(comm);
                proci++
            )
            {
                is >> data;
                is.fatalCheck("read(Istream&) : reading entry");

                OPstream os
                (
                    UPstream::commsTypes::scheduled,
                    proci,
                    0,
                    UPstream::msgType(),
                    comm
                );
                os << data;
            }

            ok = is.good();
        }
        else
        {
            IPstream is
            (
                UPstream::commsTypes::scheduled,
                UPstream::masterNo(),
                0,
                UPstream::msgType(),
                comm
            );
            is >> data;

            string buf(data.begin(), data.size());
            realIsPtr = new IStringStream(fName, buf);
        }
    }
    else
    {
        PstreamBuffers pBufs
        (
            UPstream::commsTypes::nonBlocking,
            UPstream::msgType(),
            comm
        );

        if (UPstream::master(comm))
        {
            Istream& is = isPtr();
            is.fatalCheck("read(Istream&)");

            // Read master data
            {
                is >> data;
                is.fatalCheck("read(Istream&) : reading entry");

                string buf(data.begin(), data.size());
                realIsPtr = new IStringStream(fName, buf);

                // Read header
                if (!headerIO.readHeader(realIsPtr()))
                {
                    FatalIOErrorInFunction(realIsPtr())
                        << "problem while reading header for object "
                        << is.name() << exit(FatalIOError);
                }
            }

            // Read slave data
            for
            (
                label proci = 1;
                proci < UPstream::nProcs(comm);
                proci++
            )
            {
                List<char> elems(is);
                is.fatalCheck("read(Istream&) : reading entry");

                UOPstream os(proci, pBufs);
                os << elems;
            }

            ok = is.good();
        }

        labelList recvSizes;
        pBufs.finishedSends(recvSizes);

        if (!UPstream::master(comm))
        {
            UIPstream is(UPstream::masterNo(), pBufs);
            is >> data;

            string buf(data.begin(), data.size());
            realIsPtr = new IStringStream(fName, buf);
        }
    }

    Pstream::scatter(ok, Pstream::msgType(), comm);

    // version
    string versionString(realIsPtr().version().str());
    Pstream::scatter(versionString,  Pstream::msgType(), comm);
    realIsPtr().version(IStringStream(versionString)());

    // stream
    {
        OStringStream os;
        os << realIsPtr().format();
        string formatString(os.str());
        Pstream::scatter(formatString,  Pstream::msgType(), comm);
        realIsPtr().format(formatString);
    }

    word name(headerIO.name());
    Pstream::scatter(name, Pstream::msgType(), comm);
    headerIO.rename(name);
    Pstream::scatter(headerIO.headerClassName(), Pstream::msgType(), comm);
    Pstream::scatter(headerIO.note(), Pstream::msgType(), comm);
    //Pstream::scatter(headerIO.instance(), Pstream::msgType(), comm);
    //Pstream::scatter(headerIO.local(), Pstream::msgType(), comm);

    return realIsPtr;
}


void Foam::decomposedBlockData::gather
(
    const label comm,
    const label data,
    labelList& datas
)
{
    const label nProcs = UPstream::nProcs(comm);
    datas.setSize(nProcs);

    char* data0Ptr = reinterpret_cast<char*>(datas.begin());

    List<int> recvOffsets;
    List<int> recvSizes;
    if (UPstream::master())
    {
        recvOffsets.setSize(nProcs);
        forAll(recvOffsets, proci)
        {
            recvOffsets[proci] =
                reinterpret_cast<char*>(&datas[proci])
              - data0Ptr;
        }
        recvSizes.setSize(nProcs, sizeof(label));
    }

    UPstream::gather
    (
        reinterpret_cast<const char*>(&data),
        sizeof(label),
        data0Ptr,
        recvSizes,
        recvOffsets,
        comm
    );
}


void Foam::decomposedBlockData::gatherSlaveData
(
    const label comm,
    const UList<char>& data,
    const labelUList& recvSizes,

    const label startProc,
    const label nProcs,

    List<int>& sliceOffsets,
    List<char>& recvData
)
{
    // Calculate master data
    List<int> sliceSizes;
    if (UPstream::master(comm))
    {
        const label numProcs = UPstream::nProcs(comm);

        sliceSizes.setSize(numProcs, 0);
        sliceOffsets.setSize(numProcs+1, 0);

        int totalSize = 0;
        label proci = startProc;
        for (label i = 0; i < nProcs; i++)
        {
            sliceSizes[proci] = int(recvSizes[proci]);
            sliceOffsets[proci] = totalSize;
            totalSize += sliceSizes[proci];
            proci++;
        }
        sliceOffsets[proci] = totalSize;
        recvData.setSize(totalSize);
    }

    int nSend = 0;
    if
    (
       !UPstream::master(comm)
     && (UPstream::myProcNo(comm) >= startProc)
     && (UPstream::myProcNo(comm) < startProc+nProcs)
    )
    {
        nSend = data.byteSize();
    }

    UPstream::gather
    (
        data.begin(),
        nSend,

        recvData.begin(),
        sliceSizes,
        sliceOffsets,
        comm
    );
}


bool Foam::decomposedBlockData::writeBlocks
(
    const label comm,
    autoPtr<OSstream>& osPtr,
    List<std::streamoff>& start,
    const UList<char>& data,

    const labelUList& recvSizes,
    const bool haveSlaveData,
    const List<char>& slaveData,

    const UPstream::commsTypes commsType,
    const bool syncReturnState
)
{
    if (debug)
    {
        Pout<< "decomposedBlockData::writeBlocks:"
            << " stream:" << (osPtr.valid() ? osPtr().name() : "invalid")
            << " data:" << data.size()
            << " haveSlaveData:" << haveSlaveData
            << " (master only) slaveData:" << slaveData.size()
            << " commsType:" << Pstream::commsTypeNames[commsType] << endl;
    }

    const label nProcs = UPstream::nProcs(comm);


    bool ok = true;

    if (haveSlaveData)
    {
        // Already have gathered the slave data. communicator only used to
        // check who is the master

        if (UPstream::master(comm))
        {
            OSstream& os = osPtr();

            start.setSize(nProcs);

            // Write master data
            {
                os << nl << "// Processor" << UPstream::masterNo() << nl;
                start[UPstream::masterNo()] = os.stdStream().tellp();
                os << data;
            }

            // Write slaves

            label slaveOffset = 0;

            for (label proci = 1; proci < nProcs; proci++)
            {
                os << nl << nl << "// Processor" << proci << nl;
                start[proci] = os.stdStream().tellp();

                os << SubList<char>(slaveData, recvSizes[proci], slaveOffset);

                slaveOffset += recvSizes[proci];
            }

            ok = os.good();
        }
    }
    else if (commsType == UPstream::commsTypes::scheduled)
    {
        if (UPstream::master(comm))
        {
            start.setSize(nProcs);

            OSstream& os = osPtr();

            // Write master data
            {
                os << nl << "// Processor" << UPstream::masterNo() << nl;
                start[UPstream::masterNo()] = os.stdStream().tellp();
                os << data;
            }
            // Write slaves
            List<char> elems;
            for (label proci = 1; proci < nProcs; proci++)
            {
                elems.setSize(recvSizes[proci]);
                IPstream::read
                (
                    UPstream::commsTypes::scheduled,
                    proci,
                    elems.begin(),
                    elems.size(),
                    Pstream::msgType(),
                    comm
                );

                os << nl << nl << "// Processor" << proci << nl;
                start[proci] = os.stdStream().tellp();
                os << elems;
            }

            ok = os.good();
        }
        else
        {
            UOPstream::write
            (
                UPstream::commsTypes::scheduled,
                UPstream::masterNo(),
                data.begin(),
                data.byteSize(),
                Pstream::msgType(),
                comm
            );
        }
    }
    else
    {
        // Write master data
        if (UPstream::master(comm))
        {
            start.setSize(nProcs);

            OSstream& os = osPtr();

            os << nl << "// Processor" << UPstream::masterNo() << nl;
            start[UPstream::masterNo()] = os.stdStream().tellp();
            os << data;
        }


        // Find out how many processor can be received into
        // maxMasterFileBufferSize

        // Starting slave processor and number of processors
        labelPair startAndSize(1, nProcs-1);

        while (startAndSize[1] > 0)
        {
            labelPair masterData(startAndSize);
            if (UPstream::master(comm))
            {
                label totalSize = recvSizes[masterData[0]];
                label proci = masterData[0]+1;
                while
                (
                    proci < nProcs
                 && (
                        totalSize+recvSizes[proci]
                      < fileOperations::masterUncollatedFileOperation::
                            maxMasterFileBufferSize
                    )
                )
                {
                    totalSize += recvSizes[proci];
                    proci++;
                }

                masterData[1] = proci-masterData[0];
            }

            // Scatter masterData
            UPstream::scatter
            (
                reinterpret_cast<const char*>(masterData.cdata()),
                List<int>(nProcs, sizeof(masterData)),
                List<int>(nProcs, 0),
                reinterpret_cast<char*>(startAndSize.data()),
                sizeof(startAndSize),
                comm
            );

            if (startAndSize[0] == nProcs || startAndSize[1] == 0)
            {
                break;
            }


            // Gather data from (a slice of) the slaves
            List<int> sliceOffsets;
            List<char> recvData;
            gatherSlaveData
            (
                comm,
                data,
                recvSizes,

                startAndSize[0],    // startProc,
                startAndSize[1],    // nProcs,

                sliceOffsets,
                recvData
            );

            if (UPstream::master(comm))
            {
                OSstream& os = osPtr();

                // Write slaves
                for
                (
                    label proci = startAndSize[0];
                    proci < startAndSize[0]+startAndSize[1];
                    proci++
                )
                {
                    os << nl << nl << "// Processor" << proci << nl;
                    start[proci] = os.stdStream().tellp();

                    os <<
                        SubList<char>
                        (
                            recvData,
                            sliceOffsets[proci+1]-sliceOffsets[proci],
                            sliceOffsets[proci]
                        );
                }
            }

            startAndSize[0] += startAndSize[1];
        }

        if (UPstream::master(comm))
        {
            ok = osPtr().good();
        }
    }

    if (syncReturnState)
    {
        //- Enable to get synchronised error checking. Is the one that keeps
        //  slaves as slow as the master (which does all the writing)
        Pstream::scatter(ok, Pstream::msgType(), comm);
    }

    return ok;
}


bool Foam::decomposedBlockData::read()
{
    autoPtr<ISstream> isPtr;
    fileName objPath(fileHandler().filePath(false, *this, word::null));
    if (UPstream::master(comm_))
    {
        isPtr.reset(new IFstream(objPath));
        IOobject::readHeader(isPtr());
    }

    List<char>& data = *this;
    return readBlocks(comm_, isPtr, data, commsType_);
}


bool Foam::decomposedBlockData::writeData(Ostream& os) const
{
    const List<char>& data = *this;

    string str
    (
        reinterpret_cast<const char*>(data.cbegin()),
        data.byteSize()
    );

    IOobject io(*this);
    if (Pstream::master())
    {
        IStringStream is(name(), str);
        io.readHeader(is);
    }

    // Scatter header information

    // version
    string versionString(os.version().str());
    Pstream::scatter(versionString);

    // stream
    string formatString;
    {
        OStringStream os;
        os << os.format();
        formatString  = os.str();
        Pstream::scatter(formatString);
    }

    //word masterName(name());
    //Pstream::scatter(masterName);

    Pstream::scatter(io.headerClassName());
    Pstream::scatter(io.note());
    //Pstream::scatter(io.instance(), Pstream::msgType(), comm);
    //Pstream::scatter(io.local(), Pstream::msgType(), comm);

    fileName masterLocation(instance()/db().dbDir()/local());
    Pstream::scatter(masterLocation);

    if (!Pstream::master())
    {
        writeHeader
        (
            os,
            IOstream::versionNumber(IStringStream(versionString)()),
            IOstream::formatEnum(formatString),
            io.headerClassName(),
            io.note(),
            masterLocation,
            name()
        );
    }

    os.writeQuoted(str, false);

    if (!Pstream::master())
    {
        IOobject::writeEndDivider(os);
    }

    return os.good();
}


bool Foam::decomposedBlockData::writeObject
(
    IOstream::streamFormat fmt,
    IOstream::versionNumber ver,
    IOstream::compressionType cmp,
    const bool valid
) const
{
    autoPtr<OSstream> osPtr;
    if (UPstream::master(comm_))
    {
        // Note: always write binary. These are strings so readable
        //       anyway. They have already be tokenised on the sending side.
        osPtr.reset(new OFstream(objectPath(), IOstream::BINARY, ver, cmp));
        IOobject::writeHeader(osPtr());
    }

    labelList recvSizes;
    gather(comm_, this->byteSize(), recvSizes);

    List<std::streamoff> start;
    List<char> slaveData;           // dummy already received slave data
    return writeBlocks
    (
        comm_,
        osPtr,
        start,
        *this,
        recvSizes,
        false,                      // don't have slave data
        slaveData,
        commsType_
    );
}


Foam::label Foam::decomposedBlockData::numBlocks(const fileName& fName)
{
    label nBlocks = 0;

    IFstream is(fName);
    is.fatalCheck("decomposedBlockData::numBlocks(const fileName&)");

    if (!is.good())
    {
        return nBlocks;
    }

    // Skip header
    token firstToken(is);

    if
    (
        is.good()
     && firstToken.isWord()
     && firstToken.wordToken() == "FoamFile"
    )
    {
        dictionary headerDict(is);
        is.version(headerDict.lookup("version"));
        is.format(headerDict.lookup("format"));
    }

    List<char> data;
    while (is.good())
    {
        token sizeToken(is);
        if (!sizeToken.isLabel())
        {
            return nBlocks;
        }
        is.putBack(sizeToken);

        is >> data;
        nBlocks++;
    }

    return nBlocks;
}


// ************************************************************************* //
decomposedBlockData.C (27,445 bytes)   

MattijsJ

2017-11-10 12:24

reporter   ~0009019

- I can reproduce your problem now. I was running collated globally (through the etc/controlDict) so it did not re-initialise the collated with the 0 size.
- Could you try attached replacement for db/IOobjects/decomposedBlockData/decomposedBlockData.C. I've tried your case on 2 and 4 processors and think the logic is sound but it'd be nice to have another tester.
- It now always at least receives data for one processor, i.e. proci starts now off from the next processor.

henry

2017-11-17 12:14

manager   ~0009048

Resolved by commit 03b641d2c7575738d94461527f12ce918dbd1922

Issue History

Date Modified Username Field Change
2017-10-31 21:02 sushpa New Issue
2017-10-31 21:02 sushpa Tag Attached: Collated
2017-11-03 09:54 MattijsJ File Added: controlDict
2017-11-03 10:01 MattijsJ Note Added: 0008986
2017-11-04 12:41 sushpa Note Added: 0008999
2017-11-07 16:20 MattijsJ Note Added: 0009014
2017-11-10 12:19 MattijsJ File Added: decomposedBlockData.C
2017-11-10 12:24 MattijsJ Note Added: 0009019
2017-11-17 12:14 henry Assigned To => henry
2017-11-17 12:14 henry Status new => resolved
2017-11-17 12:14 henry Resolution open => fixed
2017-11-17 12:14 henry Fixed in Version => dev
2017-11-17 12:14 henry Note Added: 0009048