Skip to content

Commit

Permalink
Stabalizing server code and fixing attrib update decoding
Browse files Browse the repository at this point in the history
  • Loading branch information
grf14003 committed Jan 16, 2020
1 parent 0c24eb9 commit b947e5a
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 42 deletions.
7 changes: 4 additions & 3 deletions breadcrumbs/include/AlgorithmServer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include "CMakeConfig.h"
#include "DataSyncThread.hpp"

#define ALGORITHM_PORT "10101"
#define MAX_ACCEPT_FAILURES 5

#pragma comment (lib, "Ws2_32.lib")
Expand All @@ -39,12 +38,14 @@ class AlgorithmServer
// Updates Algorithm key/value store with IO key/value updates
void pollForUpdates();
void startServer();
void stopServer();
int stopServer();

private:
SOCKET *ServerSocket = NULL;

HANDLE hThread;
DWORD dwThreadId;
bool continueThread = false;
bool continueThread = true;

size_t numClients;
std::vector<DataSyncThread> clientThreads;
Expand Down
2 changes: 1 addition & 1 deletion breadcrumbs/include/Attribute.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

typedef struct BinaryAttributeStructure {
char key[8];
unsigned char length[2];
char length;
} bAttrib;

#endif
6 changes: 4 additions & 2 deletions breadcrumbs/include/DataSyncThread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class DataSyncThread
public:
DataSyncThread(SOCKET s) { sock = s; };

// Synchronous functions
// Synchronous functions (only called from thread)
void threadRuntime();
static DWORD threadInit(LPVOID pThreadArgs)
{
Expand All @@ -42,9 +42,11 @@ class DataSyncThread
}
int recvBytes(void* buffer, size_t numBytes);

// Async control
// Async control (only called outside of thread)
void startComms();
bool stopComms();
bool isClientConnected() { return continueThread; };
unsigned int getSocketNumber() { return (unsigned int) socket; };
int connectToAlgorithm(char* serverName);
};

Expand Down
2 changes: 2 additions & 0 deletions breadcrumbs/src/Breadcrumbs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,7 @@ int main()
algorithm->loop();
}

while (1);

return 0;
}
66 changes: 35 additions & 31 deletions breadcrumbs/src/comms/AlgorithmServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ void AlgorithmServer::serverThreadRuntime()
WSACleanup();
return;
}
ServerSocket = &ListenSocket;

// Setup the TCP listening socket
iResult = bind(ListenSocket, result->ai_addr, (int)result->ai_addrlen);
Expand All @@ -69,52 +70,51 @@ void AlgorithmServer::serverThreadRuntime()
return;
}

// Select stuff
FD_SET readSet;

int AcceptFailures = 0;
// Accept a client socket
while (continueThread)
{
int fds;

FD_ZERO(&readSet);
FD_SET(ListenSocket, &readSet);

if ((fds = select(0, &readSet, NULL, NULL, NULL)) == SOCKET_ERROR)
{
printf("select() returned with error %d\n", WSAGetLastError());
break;
printf("Listening for clients...\n");
ClientSocket = accept(ListenSocket, NULL, NULL);
printf("Hello new client!\n");
if (ClientSocket == INVALID_SOCKET) {
printf("accept failed with error: %d\n", WSAGetLastError());
AcceptFailures++;
if (AcceptFailures >= MAX_ACCEPT_FAILURES)
break;
}
else {
AcceptFailures = 0;
}

if (fds != 0)
// Reaping any old data sync threads
auto it = clientThreads.begin();
while (clientThreads.size() > 0 && it != clientThreads.end())
{
ClientSocket = accept(ListenSocket, NULL, NULL);
if (ClientSocket == INVALID_SOCKET) {
printf("accept failed with error: %d\n", WSAGetLastError());
AcceptFailures++;
if (AcceptFailures >= MAX_ACCEPT_FAILURES)
break;
if (!(*it).isClientConnected())
{
clientThreads.erase(it);
}
else {
AcceptFailures = 0;
++it;
}
}

// Creating a new Data sync thread
if (clientThreads.size() < numClients) {
DataSyncThread client = *new DataSyncThread(ClientSocket);
clientThreads.push_back(client);
client.startComms();
}
else {
printf("Client attempted connection when (%d) clients are already connected", numClients);
}
// Creating a new Data sync thread
if (clientThreads.size() < numClients) {
DataSyncThread client = *new DataSyncThread(ClientSocket);
clientThreads.push_back(client);
client.startComms();
}
else {
printf("Client attempted connection when (%d) clients are already connected", numClients);
}
}

// cleanup
closesocket(ListenSocket);
WSACleanup();
ServerSocket = NULL;
continueThread = false;
return;
}
Expand Down Expand Up @@ -153,7 +153,11 @@ void AlgorithmServer::startServer()
&dwThreadId); // returns the thread identifier
}

void AlgorithmServer::stopServer()
int AlgorithmServer::stopServer()
{
continueThread = false;
if (continueThread && ServerSocket != NULL) {
continueThread = false;
return shutdown(*ServerSocket, SD_BOTH);
}
return 0;
}
18 changes: 13 additions & 5 deletions breadcrumbs/src/comms/DataSyncThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ void DataSyncThread::threadRuntime()

while (continueThread && iResult > 0)
{

printf("Waiting to receive bytes\n");

// Master while loop that will receive incoming messages
char commandByte;
iResult = recvBytes(&commandByte, 1);
Expand All @@ -20,18 +23,23 @@ void DataSyncThread::threadRuntime()
switch (commandByte)
{
case 0:
printf("Attribute update...\n");
bAttrib attrib;
iResult = recvBytes(&attrib, sizeof(bAttrib));
if (iResult <= 0)
break;
printf("Attribute update received, key:%.8s len:%us\n", attrib.key, (unsigned short) attrib.length);
void* value = malloc((unsigned short)attrib.length);
iResult = recvBytes(value, (unsigned short)attrib.length);

printf("Attribute update received, key:%.8s len:%x\n", attrib.key, attrib.length);
if (attrib.length <= 0)
break;

void* value = malloc(attrib.length);
iResult = recvBytes(value, attrib.length);
if (iResult <= 0)
break;
// TODO: Storing the attrib update

break;
// TODO: Storing the attrib update

}
}

Expand Down

0 comments on commit b947e5a

Please sign in to comment.