Skip to content

Commit

Permalink
Adding test io processor and onside comms stop
Browse files Browse the repository at this point in the history
  • Loading branch information
grf14003 committed Jan 29, 2020
1 parent 3595c6f commit 7808d97
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 15 deletions.
9 changes: 8 additions & 1 deletion breadcrumbs/include/Attribute.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,15 @@ class Attribute
this->value = value;
}

~Attribute() {
Attribute(string key, size_t length, void* value)
{
this->key = key;
this->length = length;
this->value = value;
}

~Attribute() {

}

// Getters and setters
Expand Down
8 changes: 7 additions & 1 deletion breadcrumbs/include/DataSyncThread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class DataSyncThread
HANDLE hThread = INVALID_HANDLE_VALUE;
DWORD dwThreadId;
bool continueThread = false;
bool threadRunning = false;

std::vector<Attribute> incomingAttributes;
HANDLE attribMutex;
Expand All @@ -40,6 +41,7 @@ class DataSyncThread
attribMutex = CreateMutex(NULL, false, NULL);
};


// Synchronous functions (only called from thread)
void threadRuntime();
static DWORD threadInit(LPVOID pThreadArgs)
Expand All @@ -49,14 +51,18 @@ class DataSyncThread
return NULL;
}
int recvBytes(void* buffer, size_t numBytes);
int sendBytes(char* buffer, size_t numBytes);
void addIncomingAttribute(Attribute attrib);

// Instantiates a datasyncthread on the heap connected to given server.
int connectToAlgorithm(char* serverName);

// Async control (only called outside of thread)
void startComms();
bool isRunning();
bool stopComms();
bool isClientConnected() { return continueThread; };
unsigned int getSocketNumber() { return (unsigned int) socket; };
int connectToAlgorithm(char* serverName);
bool areIncomingAttributesAvailable();
std::vector<Attribute> *getIncomingAttributes();
bool sendAttribute(Attribute attrib);
Expand Down
85 changes: 72 additions & 13 deletions breadcrumbs/src/comms/DataSyncThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

void DataSyncThread::threadRuntime()
{

threadRunning = true;
printf("Thread runtime: %d\n", incomingAttributes);

/*
Expand Down Expand Up @@ -52,6 +52,10 @@ void DataSyncThread::threadRuntime()
CloseHandle(attribMutex);
attribMutex = NULL;
}

WSACleanup();
printf("Done running thread!");
threadRunning = false;
}

int DataSyncThread::recvBytes(void* buffer, size_t numBytes)
Expand All @@ -77,6 +81,29 @@ int DataSyncThread::recvBytes(void* buffer, size_t numBytes)
return numBytes;
}

int DataSyncThread::sendBytes(char* buffer, size_t numBytes)
{
size_t bytesSent = 0;
int iResult;

while (bytesSent < numBytes)
{
iResult = send(sock, buffer + bytesSent, numBytes - bytesSent, 0);
if (iResult < 0)
{
printf("send failed with error: %d\n", WSAGetLastError());
return iResult;
}
else if (iResult == 0) {
printf("send returned 0, connection closed.\n");
return iResult;
}
else
bytesSent += iResult;
}
return numBytes;
}

void DataSyncThread::addIncomingAttribute(Attribute attrib)
{
DWORD result = WaitForSingleObject(attribMutex, INFINITE);
Expand All @@ -95,14 +122,27 @@ void DataSyncThread::startComms()
Initializes the communication thread
*/
printf("Start comms: %d\n", incomingAttributes);
continueThread = true;
hThread = CreateThread(
NULL, // default security attributes
0, // use default stack size
threadInit, // thread function name
this, // argument to thread function
0, // use default creation flags
&dwThreadId); // returns the thread identifier
if (!threadRunning)
{
threadRunning = false;
continueThread = true;
hThread = CreateThread(
NULL, // default security attributes
0, // use default stack size
threadInit, // thread function name
this, // argument to thread function
0, // use default creation flags
&dwThreadId); // returns the thread identifier

while (1)
if (threadRunning)
break;
}
}

bool DataSyncThread::isRunning()
{
return threadRunning;
}

bool DataSyncThread::stopComms()
Expand All @@ -111,6 +151,7 @@ bool DataSyncThread::stopComms()
Stops the communication thread
*/
continueThread = false;
while (isRunning());
return true;
}

Expand Down Expand Up @@ -162,14 +203,17 @@ int DataSyncThread::connectToAlgorithm(char* serverName)
}
break;
}

freeaddrinfo(result);

if (sock == INVALID_SOCKET) {
printf("Unable to connect to server!\n");
WSACleanup();
return 1;
}
else
{
return 0;
}
}

std::vector<Attribute> *DataSyncThread::getIncomingAttributes()
Expand All @@ -194,8 +238,23 @@ bool DataSyncThread::areIncomingAttributesAvailable()
return incomingAttributes.size() > 0;
}

bool sendAttribute(Attribute attrib)
bool DataSyncThread::sendAttribute(Attribute attrib)
{
// TODO: Implement this to send attribute in the format described in the recieve section.
return false;
// two extra bytes, one for command 0x00 and one for length:
int streamLength = attrib.getLength() + ATTRIB_KEY_SIZE + 2;
char *bytes = new char[streamLength];
int iter = 0;
bytes[iter++] = 0x00;

string key = attrib.getKey();
for (int i = 0; i < ATTRIB_KEY_SIZE; i++)
bytes[iter++] = key[i];

bytes[iter++] = attrib.getLength();

for (int i = 0; i < attrib.getLength(); i++)
bytes[iter++] = ((char*) attrib.getValue())[i];

int result = sendBytes(bytes, streamLength);
return result == streamLength;
}
13 changes: 13 additions & 0 deletions breadcrumbs/src/main/VirtualOutputProcessor.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@

#include "DataSyncThread.hpp"


int main()
{
DataSyncThread client(NULL);
client.connectToAlgorithm("localhost");
client.startComms();

char testValue = 'a';
Attribute attrib = *new Attribute("testKey1", 1, &testValue);
client.sendAttribute(attrib);

client.stopComms();

return 0;
}

0 comments on commit 7808d97

Please sign in to comment.