diff --git a/bfs/implementations/breadcrumbs/algos/AlgoBreadcrumbs.cpp b/bfs/implementations/breadcrumbs/algos/AlgoBreadcrumbs.cpp index 953593a..543cb11 100644 --- a/bfs/implementations/breadcrumbs/algos/AlgoBreadcrumbs.cpp +++ b/bfs/implementations/breadcrumbs/algos/AlgoBreadcrumbs.cpp @@ -4,12 +4,17 @@ void AlgoBreadcrumbs::loop() { + int msgCount = 0; + Attribute msgCountAttrib = { "MSGCOUNT", sizeof(int), &msgCount }; + vector attribs = pollForAttributes(); if (attribs.size() > 0) { - printf("Attrib length: %d\n", attribs.size()); + cout << attribs.size() << " attributes received!" << endl; + msgCount += attribs.size(); + + sendAttribute(msgCountAttrib); } - _sleep(1000); } bool AlgoBreadcrumbs::loopCondition() diff --git a/bfs/implementations/breadcrumbs/io_procs/VirtualOutputIOProcessor.cpp b/bfs/implementations/breadcrumbs/io_procs/VirtualOutputIOProcessor.cpp index a475031..fd1698a 100644 --- a/bfs/implementations/breadcrumbs/io_procs/VirtualOutputIOProcessor.cpp +++ b/bfs/implementations/breadcrumbs/io_procs/VirtualOutputIOProcessor.cpp @@ -7,10 +7,20 @@ void VirtualOutputIOProcessor::loop() char testValue = 'a' + iterations; Attribute attrib("testKey1", 1, &testValue); getComms()->sendAttribute(attrib); + + if (getComms()->areIncomingAttributesAvailable()) { + vector attribs = getComms()->getIncomingAttributes(); + for (Attribute attr : attribs) + { + cout << "Attribute " << attr.getKey() << " received!" << endl; + } + } + + Sleep(500); iterations--; } bool VirtualOutputIOProcessor::loopCondition() { - return iterations > 0; + return iterations >= 0; } diff --git a/bfs/include/Algorithm.hpp b/bfs/include/Algorithm.hpp index 8745c27..cd51b17 100644 --- a/bfs/include/Algorithm.hpp +++ b/bfs/include/Algorithm.hpp @@ -14,6 +14,7 @@ class Algorithm virtual bool loopCondition() { return false; }; vector pollForAttributes(); + void sendAttribute(Attribute attrib); private: size_t numIoProcs = 0; AlgorithmServer* server; diff --git a/bfs/include/AlgorithmServer.hpp b/bfs/include/AlgorithmServer.hpp index 4928c52..3113557 100644 --- a/bfs/include/AlgorithmServer.hpp +++ b/bfs/include/AlgorithmServer.hpp @@ -37,6 +37,7 @@ class AlgorithmServer // Updates Algorithm key/value store with IO key/value updates vector getAllIncomingAttributes(); + void sendAttributeToAll(Attribute attrib); void startServer(); int stopServer(); diff --git a/bfs/include/DataSyncThread.hpp b/bfs/include/DataSyncThread.hpp index 616f9f3..bcd47e5 100644 --- a/bfs/include/DataSyncThread.hpp +++ b/bfs/include/DataSyncThread.hpp @@ -11,6 +11,7 @@ #include #include #include +#include #include "Attribute.hpp" #include "CMakeConfig.h" @@ -33,6 +34,10 @@ class DataSyncThread HANDLE hThread = INVALID_HANDLE_VALUE; DWORD dwThreadId; + // The timeout for exiting the data sync thread with attributes that have not been + // polled for yet. It is in ms + const static size_t EXIT_PENDING_ATTRIBUTE_TIMEOUT = 1000; + // Flag for signalling this thread to terminate bool continueThread = false; diff --git a/bfs/src/algos/Algorithm.cpp b/bfs/src/algos/Algorithm.cpp index 4e7991b..91a4a41 100644 --- a/bfs/src/algos/Algorithm.cpp +++ b/bfs/src/algos/Algorithm.cpp @@ -18,3 +18,8 @@ vector Algorithm::pollForAttributes() { return server->getAllIncomingAttributes(); } + +void Algorithm :: sendAttribute(Attribute attrib) +{ + server->sendAttributeToAll(attrib); +} diff --git a/bfs/src/comms/AlgorithmServer.cpp b/bfs/src/comms/AlgorithmServer.cpp index 3521eb4..91e4bfb 100644 --- a/bfs/src/comms/AlgorithmServer.cpp +++ b/bfs/src/comms/AlgorithmServer.cpp @@ -163,6 +163,14 @@ vector AlgorithmServer::getAllIncomingAttributes() return newAttribs; } +void AlgorithmServer :: sendAttributeToAll(Attribute attrib) +{ + for (DataSyncThread* dst : clientThreads) + { + dst->sendAttribute(attrib); + } +} + void AlgorithmServer::startServer() { diff --git a/bfs/src/comms/DataSyncThread.cpp b/bfs/src/comms/DataSyncThread.cpp index 9d22dec..4e7c6b8 100644 --- a/bfs/src/comms/DataSyncThread.cpp +++ b/bfs/src/comms/DataSyncThread.cpp @@ -40,15 +40,23 @@ void DataSyncThread::threadRuntime() Attribute attr(bAttr, value); - // : AttributeUpdate(, ) - cout << this << ": AttributeUpdate(" << attr.getKey() << ", " << attr.getLength() << ")" << endl; - // Storing the attrib update addIncomingAttribute(attr); } } continueThread = false; + + // Waiting for all atributes to be read with a timeout: + auto start = clock(); + cout << "Waiting for attributes to be polled!" << endl; + while (areIncomingAttributesAvailable() && + (clock() - start) / (CLOCKS_PER_SEC) * 1000.0 < EXIT_PENDING_ATTRIBUTE_TIMEOUT); + if (areIncomingAttributesAvailable()) + { + cout << "WARNING: Attributes are still left to be polled but data sync thread is exiting." << endl; + } + if (attribMutex != NULL) { CloseHandle(attribMutex); attribMutex = NULL; @@ -66,7 +74,7 @@ void DataSyncThread::threadRuntime() cout << "Graceful socket shutdown failed! Still more data to read!" << endl; closesocket(sock); - cout << "Stopping DataSyncThread: " << this << endl; + cout << "Stopped DataSyncThread: " << this << endl; threadRunning = false; } @@ -257,6 +265,7 @@ vector DataSyncThread::getIncomingAttributes() { newAttribVector.push_back(attrib); } + incomingAttributes.clear(); ReleaseMutex(attribMutex); } else {