Skip to content

Commit

Permalink
Adding ability for algorithm to send attributes. Also adding catch co…
Browse files Browse the repository at this point in the history
…ndition to delay data sync thread shutdown when there are still pending attibutes that can be polled.
  • Loading branch information
grf14003 committed May 7, 2020
1 parent 79a4c3b commit 273a7fd
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 7 deletions.
9 changes: 7 additions & 2 deletions bfs/implementations/breadcrumbs/algos/AlgoBreadcrumbs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@

void AlgoBreadcrumbs::loop()
{
int msgCount = 0;
Attribute msgCountAttrib = { "MSGCOUNT", sizeof(int), &msgCount };

vector<Attribute> attribs = pollForAttributes();
if (attribs.size() > 0)
{
printf("Attrib length: %d\n", attribs.size());
cout << attribs.size() << " attributes received!" << endl;
msgCount += attribs.size();

sendAttribute(msgCountAttrib);
}
_sleep(1000);
}

bool AlgoBreadcrumbs::loopCondition()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,20 @@ void VirtualOutputIOProcessor::loop()
char testValue = 'a' + iterations;
Attribute attrib("testKey1", 1, &testValue);
getComms()->sendAttribute(attrib);

if (getComms()->areIncomingAttributesAvailable()) {
vector<Attribute> attribs = getComms()->getIncomingAttributes();
for (Attribute attr : attribs)
{
cout << "Attribute " << attr.getKey() << " received!" << endl;
}
}

Sleep(500);
iterations--;
}

bool VirtualOutputIOProcessor::loopCondition()
{
return iterations > 0;
return iterations >= 0;
}
1 change: 1 addition & 0 deletions bfs/include/Algorithm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class Algorithm
virtual bool loopCondition() { return false; };

vector<Attribute> pollForAttributes();
void sendAttribute(Attribute attrib);
private:
size_t numIoProcs = 0;
AlgorithmServer* server;
Expand Down
1 change: 1 addition & 0 deletions bfs/include/AlgorithmServer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class AlgorithmServer

// Updates Algorithm key/value store with IO key/value updates
vector<Attribute> getAllIncomingAttributes();
void sendAttributeToAll(Attribute attrib);
void startServer();
int stopServer();

Expand Down
5 changes: 5 additions & 0 deletions bfs/include/DataSyncThread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <stdio.h>
#include <vector>
#include <iostream>
#include <ctime>

#include "Attribute.hpp"
#include "CMakeConfig.h"
Expand All @@ -33,6 +34,10 @@ class DataSyncThread
HANDLE hThread = INVALID_HANDLE_VALUE;
DWORD dwThreadId;

// The timeout for exiting the data sync thread with attributes that have not been
// polled for yet. It is in ms
const static size_t EXIT_PENDING_ATTRIBUTE_TIMEOUT = 1000;

// Flag for signalling this thread to terminate
bool continueThread = false;

Expand Down
5 changes: 5 additions & 0 deletions bfs/src/algos/Algorithm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,8 @@ vector<Attribute> Algorithm::pollForAttributes()
{
return server->getAllIncomingAttributes();
}

void Algorithm :: sendAttribute(Attribute attrib)
{
server->sendAttributeToAll(attrib);
}
8 changes: 8 additions & 0 deletions bfs/src/comms/AlgorithmServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,14 @@ vector<Attribute> AlgorithmServer::getAllIncomingAttributes()
return newAttribs;
}

void AlgorithmServer :: sendAttributeToAll(Attribute attrib)
{
for (DataSyncThread* dst : clientThreads)
{
dst->sendAttribute(attrib);
}
}

void AlgorithmServer::startServer()
{

Expand Down
17 changes: 13 additions & 4 deletions bfs/src/comms/DataSyncThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,23 @@ void DataSyncThread::threadRuntime()

Attribute attr(bAttr, value);

// <DataSyncThreadAddress>: AttributeUpdate(<key>, <size>)
cout << this << ": AttributeUpdate(" << attr.getKey() << ", " << attr.getLength() << ")" << endl;

// Storing the attrib update
addIncomingAttribute(attr);
}
}

continueThread = false;

// Waiting for all atributes to be read with a timeout:
auto start = clock();
cout << "Waiting for attributes to be polled!" << endl;
while (areIncomingAttributesAvailable() &&
(clock() - start) / (CLOCKS_PER_SEC) * 1000.0 < EXIT_PENDING_ATTRIBUTE_TIMEOUT);
if (areIncomingAttributesAvailable())
{
cout << "WARNING: Attributes are still left to be polled but data sync thread is exiting." << endl;
}

if (attribMutex != NULL) {
CloseHandle(attribMutex);
attribMutex = NULL;
Expand All @@ -66,7 +74,7 @@ void DataSyncThread::threadRuntime()
cout << "Graceful socket shutdown failed! Still more data to read!" << endl;
closesocket(sock);

cout << "Stopping DataSyncThread: " << this << endl;
cout << "Stopped DataSyncThread: " << this << endl;
threadRunning = false;
}

Expand Down Expand Up @@ -257,6 +265,7 @@ vector<Attribute> DataSyncThread::getIncomingAttributes()
{
newAttribVector.push_back(attrib);
}
incomingAttributes.clear();
ReleaseMutex(attribMutex);
}
else {
Expand Down

0 comments on commit 273a7fd

Please sign in to comment.