Skip to content

Commit

Permalink
Adding function for polling for attributes as maps
Browse files Browse the repository at this point in the history
  • Loading branch information
grf14003 committed May 8, 2020
1 parent 725a42a commit dc14af5
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 24 deletions.
11 changes: 8 additions & 3 deletions bfs/implementations/breadcrumbs/algos/AlgoBreadcrumbs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@ void AlgoBreadcrumbs::loop()
int msgCount = 0;
Attribute msgCountAttrib = { "MSGCOUNT", sizeof(int), &msgCount };

vector<Attribute> attribs = pollForAttributes();
map<string, Attribute> attribs = pollForAttributesMap();
if (attribs.size() > 0)
{
cout << attribs.size() << " attributes received!" << endl;
msgCount += attribs.size();
auto testKeyIter = attribs.find(string("testKey1"));
if (testKeyIter != attribs.end())
{
char value = *((char*) (*testKeyIter).second.getValue());
cout << "Test key 1 value updated to " << value << endl;
}

sendAttribute(msgCountAttrib);
}
Sleep(10);
}

bool AlgoBreadcrumbs::loopCondition()
Expand Down
5 changes: 3 additions & 2 deletions bfs/include/Algorithm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ class Algorithm
virtual void loop() = 0;
virtual bool loopCondition() { return false; };

vector<Attribute> pollForAttributes();
void sendAttribute(Attribute attrib);
vector<Attribute> pollForAttributes() { return server->getAllIncomingAttributes(); };
map<string, Attribute> pollForAttributesMap() { return server->getAllIncomingAttributesMap(); };
void sendAttribute(Attribute attrib) { server->sendAttributeToAll(attrib); };
private:
size_t numIoProcs = 0;
AlgorithmServer* server;
Expand Down
2 changes: 2 additions & 0 deletions bfs/include/AlgorithmServer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <stdlib.h>
#include <stdio.h>
#include <vector>
#include <map>

#include "CMakeConfig.h"
#include "DataSyncThread.hpp"
Expand All @@ -37,6 +38,7 @@ class AlgorithmServer

// Updates Algorithm key/value store with IO key/value updates
vector<Attribute> getAllIncomingAttributes();
map<string, Attribute> getAllIncomingAttributesMap();
void sendAttributeToAll(Attribute attrib);
void startServer();
int stopServer();
Expand Down
1 change: 1 addition & 0 deletions bfs/include/DataSyncThread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class DataSyncThread
unsigned int getSocketNumber() { return (unsigned int) socket; };
bool areIncomingAttributesAvailable();
vector<Attribute> getIncomingAttributes();
map<string, Attribute> getIncomingAttributesMap();
bool sendAttribute(Attribute attrib);
};

Expand Down
10 changes: 0 additions & 10 deletions bfs/src/algos/Algorithm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,3 @@ Algorithm::~Algorithm()
{
delete server;
}

vector<Attribute> Algorithm::pollForAttributes()
{
return server->getAllIncomingAttributes();
}

void Algorithm :: sendAttribute(Attribute attrib)
{
server->sendAttributeToAll(attrib);
}
28 changes: 19 additions & 9 deletions bfs/src/comms/AlgorithmServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ void AlgorithmServer::serverThreadRuntime()

iResult = listen(ListenSocket, SOMAXCONN);
if (iResult == SOCKET_ERROR) {
//write_log("listen failed with error\n", 1, logger->filename, logger);
printf("listen failed with error: %d\n", WSAGetLastError());
closesocket(ListenSocket);
WSACleanup();
Expand All @@ -79,15 +78,10 @@ void AlgorithmServer::serverThreadRuntime()
// Accept a client socket
while (continueThread)
{
write_log("Listening for clients...\n", 1, logger->filename, logger);
fprintf(stderr, "Listening for clients...\n");
ClientSocket = accept(ListenSocket, NULL, NULL);
write_log("Hello new client!\n", 1, logger->filename, logger);
fprintf(stderr, "Hello new client!\n");
if (ClientSocket == INVALID_SOCKET) {
write_log("accept failed with error: ", 1, logger->filename, logger);
write_log(std::to_string(WSAGetLastError()), 1, logger->filename, logger);
write_log("\n", 1, logger->filename, logger);
printf("accept failed with error: %d\n", WSAGetLastError());
AcceptFailures++;
if (AcceptFailures >= MAX_ACCEPT_FAILURES)
Expand Down Expand Up @@ -126,7 +120,6 @@ void AlgorithmServer::serverThreadRuntime()
unlockClientThreadsMutex();
}
else {
write_log("Could not acquire mutex to add new client thread\n", 1, logger->filename, logger);
printf("Could not acquire mutex to add new client thread\n");
}
}
Expand All @@ -146,7 +139,6 @@ vector<Attribute> AlgorithmServer::getAllIncomingAttributes()
Polls DataSyncThreads for new updates
Updates master storage accordingly
*/
Logger* logger = getLogger();
vector<Attribute> newAttribs;
if (lockClientThreadsMutex() == STATUS_WAIT_0)
{
Expand All @@ -158,11 +150,29 @@ vector<Attribute> AlgorithmServer::getAllIncomingAttributes()
unlockClientThreadsMutex();
}
else {
printf("Could not acquire mutex to get incoming attributes.\n");
cout << "Could not acquire mutex to get incoming attributes." << endl;
}
return newAttribs;
}

map<string, Attribute> AlgorithmServer :: getAllIncomingAttributesMap()
{
map<string, Attribute> combinedAttribMap;
if (lockClientThreadsMutex() == STATUS_WAIT_0)
{
for (DataSyncThread* dst : clientThreads)
{
map<string, Attribute> toAdd = dst->getIncomingAttributesMap();
combinedAttribMap.insert(toAdd.begin(), toAdd.end());
}
unlockClientThreadsMutex();
}
else {
cout << "Could not acquire mutex to get incoming attributes." << endl;
}
return combinedAttribMap;
}

void AlgorithmServer :: sendAttributeToAll(Attribute attrib)
{
for (DataSyncThread* dst : clientThreads)
Expand Down
18 changes: 18 additions & 0 deletions bfs/src/comms/DataSyncThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,24 @@ vector<Attribute> DataSyncThread :: getIncomingAttributes()
return newAttribVector;
}

map<string, Attribute> DataSyncThread :: getIncomingAttributesMap()
{
map<string, Attribute> incomingAttribMap;
if (!areIncomingAttributesAvailable())
return incomingAttribMap;

DWORD result = WaitForSingleObject(attribMutex, INFINITE);
if (result == WAIT_OBJECT_0) {
incomingAttribMap = map<string, Attribute>(incomingAttributes);
incomingAttributes.clear();
ReleaseMutex(attribMutex);
}
else {
printf("Failed to acquire mutex, wait returned %x, error: %d\n", result, GetLastError());
}
return incomingAttribMap;
}

bool DataSyncThread::areIncomingAttributesAvailable()
{
if (!threadRunning)
Expand Down

0 comments on commit dc14af5

Please sign in to comment.