diff --git a/bfs/implementations/breadcrumbs/algos/AlgoBreadcrumbs.cpp b/bfs/implementations/breadcrumbs/algos/AlgoBreadcrumbs.cpp index 543cb11..dd7bdf3 100644 --- a/bfs/implementations/breadcrumbs/algos/AlgoBreadcrumbs.cpp +++ b/bfs/implementations/breadcrumbs/algos/AlgoBreadcrumbs.cpp @@ -7,14 +7,19 @@ void AlgoBreadcrumbs::loop() int msgCount = 0; Attribute msgCountAttrib = { "MSGCOUNT", sizeof(int), &msgCount }; - vector attribs = pollForAttributes(); + map attribs = pollForAttributesMap(); if (attribs.size() > 0) { - cout << attribs.size() << " attributes received!" << endl; - msgCount += attribs.size(); + auto testKeyIter = attribs.find(string("testKey1")); + if (testKeyIter != attribs.end()) + { + char value = *((char*) (*testKeyIter).second.getValue()); + cout << "Test key 1 value updated to " << value << endl; + } sendAttribute(msgCountAttrib); } + Sleep(10); } bool AlgoBreadcrumbs::loopCondition() diff --git a/bfs/include/Algorithm.hpp b/bfs/include/Algorithm.hpp index cd51b17..08c1b71 100644 --- a/bfs/include/Algorithm.hpp +++ b/bfs/include/Algorithm.hpp @@ -13,8 +13,9 @@ class Algorithm virtual void loop() = 0; virtual bool loopCondition() { return false; }; - vector pollForAttributes(); - void sendAttribute(Attribute attrib); + vector pollForAttributes() { return server->getAllIncomingAttributes(); }; + map pollForAttributesMap() { return server->getAllIncomingAttributesMap(); }; + void sendAttribute(Attribute attrib) { server->sendAttributeToAll(attrib); }; private: size_t numIoProcs = 0; AlgorithmServer* server; diff --git a/bfs/include/AlgorithmServer.hpp b/bfs/include/AlgorithmServer.hpp index 3113557..278acca 100644 --- a/bfs/include/AlgorithmServer.hpp +++ b/bfs/include/AlgorithmServer.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include "CMakeConfig.h" #include "DataSyncThread.hpp" @@ -37,6 +38,7 @@ class AlgorithmServer // Updates Algorithm key/value store with IO key/value updates vector getAllIncomingAttributes(); + map getAllIncomingAttributesMap(); void sendAttributeToAll(Attribute attrib); void startServer(); int stopServer(); diff --git a/bfs/include/DataSyncThread.hpp b/bfs/include/DataSyncThread.hpp index 797ccd2..7d7e23b 100644 --- a/bfs/include/DataSyncThread.hpp +++ b/bfs/include/DataSyncThread.hpp @@ -78,6 +78,7 @@ class DataSyncThread unsigned int getSocketNumber() { return (unsigned int) socket; }; bool areIncomingAttributesAvailable(); vector getIncomingAttributes(); + map getIncomingAttributesMap(); bool sendAttribute(Attribute attrib); }; diff --git a/bfs/src/algos/Algorithm.cpp b/bfs/src/algos/Algorithm.cpp index 91a4a41..7ae56b8 100644 --- a/bfs/src/algos/Algorithm.cpp +++ b/bfs/src/algos/Algorithm.cpp @@ -13,13 +13,3 @@ Algorithm::~Algorithm() { delete server; } - -vector Algorithm::pollForAttributes() -{ - return server->getAllIncomingAttributes(); -} - -void Algorithm :: sendAttribute(Attribute attrib) -{ - server->sendAttributeToAll(attrib); -} diff --git a/bfs/src/comms/AlgorithmServer.cpp b/bfs/src/comms/AlgorithmServer.cpp index 91e4bfb..c0f72f3 100644 --- a/bfs/src/comms/AlgorithmServer.cpp +++ b/bfs/src/comms/AlgorithmServer.cpp @@ -68,7 +68,6 @@ void AlgorithmServer::serverThreadRuntime() iResult = listen(ListenSocket, SOMAXCONN); if (iResult == SOCKET_ERROR) { - //write_log("listen failed with error\n", 1, logger->filename, logger); printf("listen failed with error: %d\n", WSAGetLastError()); closesocket(ListenSocket); WSACleanup(); @@ -79,15 +78,10 @@ void AlgorithmServer::serverThreadRuntime() // Accept a client socket while (continueThread) { - write_log("Listening for clients...\n", 1, logger->filename, logger); fprintf(stderr, "Listening for clients...\n"); ClientSocket = accept(ListenSocket, NULL, NULL); - write_log("Hello new client!\n", 1, logger->filename, logger); fprintf(stderr, "Hello new client!\n"); if (ClientSocket == INVALID_SOCKET) { - write_log("accept failed with error: ", 1, logger->filename, logger); - write_log(std::to_string(WSAGetLastError()), 1, logger->filename, logger); - write_log("\n", 1, logger->filename, logger); printf("accept failed with error: %d\n", WSAGetLastError()); AcceptFailures++; if (AcceptFailures >= MAX_ACCEPT_FAILURES) @@ -126,7 +120,6 @@ void AlgorithmServer::serverThreadRuntime() unlockClientThreadsMutex(); } else { - write_log("Could not acquire mutex to add new client thread\n", 1, logger->filename, logger); printf("Could not acquire mutex to add new client thread\n"); } } @@ -146,7 +139,6 @@ vector AlgorithmServer::getAllIncomingAttributes() Polls DataSyncThreads for new updates Updates master storage accordingly */ - Logger* logger = getLogger(); vector newAttribs; if (lockClientThreadsMutex() == STATUS_WAIT_0) { @@ -158,11 +150,29 @@ vector AlgorithmServer::getAllIncomingAttributes() unlockClientThreadsMutex(); } else { - printf("Could not acquire mutex to get incoming attributes.\n"); + cout << "Could not acquire mutex to get incoming attributes." << endl; } return newAttribs; } +map AlgorithmServer :: getAllIncomingAttributesMap() +{ + map combinedAttribMap; + if (lockClientThreadsMutex() == STATUS_WAIT_0) + { + for (DataSyncThread* dst : clientThreads) + { + map toAdd = dst->getIncomingAttributesMap(); + combinedAttribMap.insert(toAdd.begin(), toAdd.end()); + } + unlockClientThreadsMutex(); + } + else { + cout << "Could not acquire mutex to get incoming attributes." << endl; + } + return combinedAttribMap; +} + void AlgorithmServer :: sendAttributeToAll(Attribute attrib) { for (DataSyncThread* dst : clientThreads) diff --git a/bfs/src/comms/DataSyncThread.cpp b/bfs/src/comms/DataSyncThread.cpp index dd9424b..d1d5d30 100644 --- a/bfs/src/comms/DataSyncThread.cpp +++ b/bfs/src/comms/DataSyncThread.cpp @@ -275,6 +275,24 @@ vector DataSyncThread :: getIncomingAttributes() return newAttribVector; } +map DataSyncThread :: getIncomingAttributesMap() +{ + map incomingAttribMap; + if (!areIncomingAttributesAvailable()) + return incomingAttribMap; + + DWORD result = WaitForSingleObject(attribMutex, INFINITE); + if (result == WAIT_OBJECT_0) { + incomingAttribMap = map(incomingAttributes); + incomingAttributes.clear(); + ReleaseMutex(attribMutex); + } + else { + printf("Failed to acquire mutex, wait returned %x, error: %d\n", result, GetLastError()); + } + return incomingAttribMap; +} + bool DataSyncThread::areIncomingAttributesAvailable() { if (!threadRunning)