Skip to content

Commit

Permalink
Finishing basic attribute update framework
Browse files Browse the repository at this point in the history
  • Loading branch information
grf14003 committed Jan 16, 2020
1 parent a01e7a7 commit 7ddbee6
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 24 deletions.
2 changes: 2 additions & 0 deletions breadcrumbs/include/Algorithm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class Algorithm

virtual void loop() = 0;
virtual bool loopCondition() { return false; };

vector<Attribute>* pollForAttributes();
private:
size_t numIoProcs = 0;
AlgorithmServer* server;
Expand Down
8 changes: 6 additions & 2 deletions breadcrumbs/include/AlgorithmServer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@ class AlgorithmServer
}

// Updates Algorithm key/value store with IO key/value updates
void pollForUpdates();
vector<Attribute>* getAllIncomingAttributes();
void startServer();
int stopServer();

DWORD lockClientThreadsMutex();
void unlockClientThreadsMutex();

private:
SOCKET *ServerSocket = NULL;

Expand All @@ -48,7 +51,8 @@ class AlgorithmServer
bool continueThread = true;

size_t numClients;
std::vector<DataSyncThread> clientThreads;
HANDLE clientThreadsMutex = NULL;
vector<DataSyncThread*> clientThreads;
};

#endif
5 changes: 4 additions & 1 deletion breadcrumbs/include/DataSyncThread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ class DataSyncThread
DWORD dwThreadId;
bool continueThread = false;

HANDLE attribMutex;
std::vector<Attribute> incomingAttributes;
HANDLE attribMutex;
public:
DataSyncThread(SOCKET s)
{
sock = s;
attribMutex = CreateMutex(NULL, false, NULL);
};

// Synchronous functions (only called from thread)
Expand All @@ -56,7 +57,9 @@ class DataSyncThread
bool isClientConnected() { return continueThread; };
unsigned int getSocketNumber() { return (unsigned int) socket; };
int connectToAlgorithm(char* serverName);
bool areIncomingAttributesAvailable();
std::vector<Attribute> *getIncomingAttributes();
bool sendAttribute(Attribute attrib);
};

#endif
3 changes: 1 addition & 2 deletions breadcrumbs/src/Breadcrumbs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ int main()
while (algorithm->loopCondition())
{
algorithm->loop();
_sleep(1000);
}

while (1);

return 0;
}
9 changes: 7 additions & 2 deletions breadcrumbs/src/algos/AlgoBreadcrumbs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@

void AlgoBreadcrumbs::loop()
{
printf("Breadcrumbs algorithm loop!\n");
vector<Attribute>* attribs = pollForAttributes();
// printf("Attrib length: %d\n", attribs->size());
if (attribs->size() > 0)
{
printf("Attrib length: %d\n", attribs->size());
}
}

bool AlgoBreadcrumbs::loopCondition()
{
return --iterations >= 0;
return true;
}
5 changes: 5 additions & 0 deletions breadcrumbs/src/algos/Algorithm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,8 @@ Algorithm::~Algorithm()
{
delete server;
}

vector<Attribute>* Algorithm::pollForAttributes()
{
return server->getAllIncomingAttributes();
}
68 changes: 52 additions & 16 deletions breadcrumbs/src/comms/AlgorithmServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ AlgorithmServer::~AlgorithmServer()
void AlgorithmServer::serverThreadRuntime()
{
int iResult;
clientThreadsMutex = CreateMutex(NULL, false, NULL);

SOCKET ListenSocket = INVALID_SOCKET;
SOCKET ClientSocket = INVALID_SOCKET;
Expand Down Expand Up @@ -88,26 +89,33 @@ void AlgorithmServer::serverThreadRuntime()
}

// Reaping any old data sync threads
auto it = clientThreads.begin();
while (clientThreads.size() > 0 && it != clientThreads.end())
if (lockClientThreadsMutex() == STATUS_WAIT_0)
{
if (!(*it).isClientConnected())
auto it = clientThreads.begin();
while (clientThreads.size() > 0 && it != clientThreads.end())
{
clientThreads.erase(it);
if (!(**it).isClientConnected())
{
clientThreads.erase(it);
}
else {
++it;
}
}

// Creating a new Data sync thread
if (clientThreads.size() < numClients) {
DataSyncThread* client = new DataSyncThread(ClientSocket);
clientThreads.push_back(client);
client->startComms();
}
else {
++it;
printf("Client attempted connection when (%d) clients are already connected", numClients);
}
}

// Creating a new Data sync thread
if (clientThreads.size() < numClients) {
DataSyncThread client = *new DataSyncThread(ClientSocket);
clientThreads.push_back(client);
client.startComms();
unlockClientThreadsMutex();
}
else {
printf("Client attempted connection when (%d) clients are already connected", numClients);
printf("Could not acquire mutex to add new client thread\n");
}
}

Expand All @@ -116,19 +124,30 @@ void AlgorithmServer::serverThreadRuntime()
WSACleanup();
ServerSocket = NULL;
continueThread = false;
CloseHandle(clientThreadsMutex);
return;
}

void AlgorithmServer::pollForUpdates()
vector<Attribute>* AlgorithmServer::getAllIncomingAttributes()
{
/*
Polls DataSyncThreads for new updates
Updates master storage accordingly
*/
for (DataSyncThread dst : clientThreads)
vector<Attribute>* newAttribs = new vector<Attribute>;
if (lockClientThreadsMutex() == STATUS_WAIT_0)
{
// TODO: Implement this!
for (DataSyncThread* dst : clientThreads)
{
vector<Attribute>* toAdd = dst->getIncomingAttributes();
newAttribs->insert(newAttribs->end(), toAdd->begin(), toAdd->end());
}
unlockClientThreadsMutex();
}
else {
printf("Could not acquire mutex to get incoming attributes.\n");
}
return newAttribs;
}

void AlgorithmServer::startServer()
Expand Down Expand Up @@ -161,3 +180,20 @@ int AlgorithmServer::stopServer()
}
return 0;
}

DWORD AlgorithmServer::lockClientThreadsMutex()
{
if (clientThreadsMutex != NULL)
{
return WaitForSingleObject(clientThreadsMutex, INFINITE);
}
return -1;
}

void AlgorithmServer::unlockClientThreadsMutex()
{
if (clientThreadsMutex != NULL)
{
ReleaseMutex(clientThreadsMutex);
}
}
21 changes: 20 additions & 1 deletion breadcrumbs/src/comms/DataSyncThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

void DataSyncThread::threadRuntime()
{
attribMutex = CreateMutex(NULL, false, NULL);

printf("Thread runtime: %d\n", incomingAttributes);

/*
Handles the data sync between the other end of the socket
Expand Down Expand Up @@ -83,13 +84,17 @@ void DataSyncThread::addIncomingAttribute(Attribute attrib)
incomingAttributes.push_back(attrib);
ReleaseMutex(attribMutex);
}
else {
printf("1 Failed to acquire mutex, wait returned %x, error: %d\n", result, GetLastError());
}
}

void DataSyncThread::startComms()
{
/*
Initializes the communication thread
*/
printf("Start comms: %d\n", incomingAttributes);
continueThread = true;
hThread = CreateThread(
NULL, // default security attributes
Expand Down Expand Up @@ -177,6 +182,20 @@ std::vector<Attribute> *DataSyncThread::getIncomingAttributes()
(*newVector).push_back(attrib);
ReleaseMutex(attribMutex);
}
else {
printf("2 Failed to acquire mutex, wait returned %x, error: %d\n", result, GetLastError());
}

return newVector;
}

bool DataSyncThread::areIncomingAttributesAvailable()
{
return incomingAttributes.size() > 0;
}

bool sendAttribute(Attribute attrib)
{
// TODO: Implement this to send attribute in the format described in the recieve section.
return false;
}

0 comments on commit 7ddbee6

Please sign in to comment.