aboutsummaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
authorOmar Rizwan <omar.rizwan@gmail.com>2018-11-24 00:33:04 -0800
committerOmar Rizwan <omar.rizwan@gmail.com>2018-11-24 00:33:04 -0800
commitc8fad64066650ed6089bf3bf39dbf3f5389f3a04 (patch)
tree365a6a0b68a087e2fb8d7549be93a358ba41bed6 /fs
parent528467a55bf5b7ca1692ec0341191abf334d9419 (diff)
Multithreaded. Is this gonna help?
Only tested with single-thread mode still on so far.
Diffstat (limited to 'fs')
-rw-r--r--fs/hello.c128
1 files changed, 89 insertions, 39 deletions
diff --git a/fs/hello.c b/fs/hello.c
index ef77584..1609c6e 100644
--- a/fs/hello.c
+++ b/fs/hello.c
@@ -17,52 +17,86 @@
struct wby_server server;
struct wby_con *con = NULL;
-pthread_mutex_t request_data_mutex = PTHREAD_MUTEX_INITIALIZER;
-char *request_data = NULL;
+pthread_cond_t queue_cv = PTHREAD_COND_INITIALIZER;
+pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+enum request_response_state {
+ EMPTY = 0,
+ SEND_REQUEST,
+ RECEIVE_RESPONSE,
+ HANDLE_RESPONSE
+};
+
+struct request_response {
+ enum request_response_state state;
+ char *request;
+ cJSON *response;
+};
-pthread_cond_t response_cv = PTHREAD_COND_INITIALIZER;
-pthread_mutex_t response_mutex = PTHREAD_MUTEX_INITIALIZER;
-cJSON *response = NULL;
+#define REQUEST_RESPONSE_QUEUE_SIZE 128
+typedef int request_id;
+struct request_response queue[REQUEST_RESPONSE_QUEUE_SIZE];
-static const char *file_path = "/hello.txt";
-static const char file_content[] = "Hello World!\n";
-static const size_t file_size = sizeof(file_content)/sizeof(char) - 1;
+static request_id enqueue_request(cJSON *req) {
+ pthread_mutex_lock(&queue_mutex);
-static void dispatch_send_req(cJSON *req) {
- pthread_mutex_lock(&request_data_mutex);
+ // Look for the first free slot.
+ request_id id;
+ for (id = 0; id < REQUEST_RESPONSE_QUEUE_SIZE; id++) {
+ if (queue[id].state == EMPTY) break;
+ }
+ if (id >= REQUEST_RESPONSE_QUEUE_SIZE) {
+ printf("Request-response queue is full!\n");
+ exit(1);
+ }
+ cJSON_AddNumberToObject(req, "id", id);
- request_data = cJSON_Print(req);
- printf("%s\n", request_data);
+ queue[id].state = SEND_REQUEST;
+ queue[id].request = cJSON_Print(req);
+ queue[id].response = NULL;
- pthread_mutex_unlock(&request_data_mutex);
+ printf("%s\n", queue[id].request);
+
+ pthread_mutex_unlock(&queue_mutex);
+
+ return id;
}
-void send_req_if_any() {
- pthread_mutex_lock(&request_data_mutex);
+void send_any_enqueued_requests() {
+ pthread_mutex_lock(&queue_mutex);
- if (con == NULL || request_data == NULL) goto done;
+ if (con == NULL) goto done;
- wby_frame_begin(con, WBY_WSOP_TEXT_FRAME);
- wby_write(con, request_data, strlen(request_data));
- wby_frame_end(con);
+ for (request_id id = 0; id < REQUEST_RESPONSE_QUEUE_SIZE; id++) {
+ if (queue[id].state == SEND_REQUEST) {
+ char *request = queue[id].request;
- free(request_data);
- request_data = NULL;
+ wby_frame_begin(con, WBY_WSOP_TEXT_FRAME);
+ wby_write(con, request, strlen(request));
+ wby_frame_end(con);
+
+ queue[id].state = RECEIVE_RESPONSE;
+ free(request);
+ queue[id].request = NULL;
+ }
+ }
done:
- pthread_mutex_unlock(&request_data_mutex);
+ pthread_mutex_unlock(&queue_mutex);
}
-static cJSON *await_response() {
- pthread_mutex_lock(&response_mutex);
+static cJSON *await_response(request_id id) {
+ pthread_mutex_lock(&queue_mutex);
- response = NULL;
- while (response == NULL) {
- pthread_cond_wait(&response_cv, &response_mutex);
+ while (queue[id].state != HANDLE_RESPONSE) {
+ pthread_cond_wait(&queue_cv, &queue_mutex);
}
- cJSON *resp = response;
- pthread_mutex_unlock(&response_mutex);
+ cJSON *resp = queue[id].response;
+ queue[id].state = EMPTY;
+ queue[id].response = NULL;
+
+ pthread_mutex_unlock(&queue_mutex);
return resp;
}
@@ -73,18 +107,17 @@ static cJSON *await_response() {
cJSON *req = NULL; \
cJSON *resp = NULL; \
\
- pthread_mutex_lock(&request_data_mutex); \
+ pthread_mutex_lock(&queue_mutex); \
int disconnected = (con == NULL); \
- pthread_mutex_unlock(&request_data_mutex); \
+ pthread_mutex_unlock(&queue_mutex); \
if (disconnected) { ret = -EIO; goto done; } \
\
req = cJSON_CreateObject(); \
cJSON_AddStringToObject(req, "op", op); \
req_body \
\
- dispatch_send_req(req); \
- \
- resp = await_response();\
+ request_id id = enqueue_request(req); \
+ resp = await_response(id); \
\
cJSON *error_item = cJSON_GetObjectItemCaseSensitive(resp, "error"); \
if (error_item) { \
@@ -269,10 +302,27 @@ websocket_frame(struct wby_con *connection, const struct wby_frame *frame, void
printf("Null in data! [%s]\n", data);
}
- pthread_mutex_lock(&response_mutex);
- response = cJSON_Parse((const char *) data);
- pthread_cond_signal(&response_cv);
- pthread_mutex_unlock(&response_mutex);
+ // Will be freed at the receiver end.
+ cJSON *resp = cJSON_Parse((const char *) data);
+
+ cJSON *id_item = cJSON_GetObjectItemCaseSensitive(resp, "id");
+ if (id_item == NULL) {
+ printf("No id in response!\n");
+ exit(1);
+ }
+ request_id id = id_item->valueint;
+
+ pthread_mutex_lock(&queue_mutex);
+ printf("got resp");
+ if (queue[id].state != RECEIVE_RESPONSE) {
+ printf("Got response to request in wrong state!\n");
+ exit(1);
+ }
+ queue[id].state = HANDLE_RESPONSE;
+ queue[id].response = resp;
+
+ pthread_cond_signal(&queue_cv);
+ pthread_mutex_unlock(&queue_mutex);
return 0;
}
@@ -316,7 +366,7 @@ void *websocket_main(void *threadid)
printf("Awaiting WebSocket connection from Chrome extension.\n");
for (;;) {
- send_req_if_any();
+ send_any_enqueued_requests();
wby_update(&server);
}