diff options
| author | Omar Rizwan <omar.rizwan@gmail.com> | 2018-11-13 00:58:12 -0800 | 
|---|---|---|
| committer | Omar Rizwan <omar.rizwan@gmail.com> | 2018-11-13 00:58:12 -0800 | 
| commit | 8e4e7a2a959d0ad0e615b1fabe8b7e00e4deef2e (patch) | |
| tree | aba3678d99bc6fec5dc9a496a063b4a757e32a05 /fs | |
| parent | 47d855c7309dfb86093fd4cd272935e3b24a6ee9 (diff) | |
Starting to multithread the server.
Diffstat (limited to 'fs')
| -rw-r--r-- | fs/hello.c | 106 | 
1 files changed, 54 insertions, 52 deletions
@@ -2,6 +2,7 @@  #include <fcntl.h>  #include <string.h>  #include <stdlib.h> +#include <pthread.h>  #include <fuse.h>  #define WBY_STATIC @@ -13,14 +14,6 @@  #include "cJSON/cJSON.h"  #include "cJSON/cJSON.c" -#define MAX_WSCONN 8 -struct server_state { -    int quit; -    unsigned frame_counter; -    struct wby_con *conn[MAX_WSCONN]; -    int conn_count; -}; -  struct wby_server server;  struct wby_con *con; @@ -47,6 +40,9 @@ struct response {      } body;  }; +pthread_cond_t response_cv = PTHREAD_COND_INITIALIZER; +pthread_mutex_t response_mutex = PTHREAD_MUTEX_INITIALIZER; +  struct response response = (struct response) { .op = NONE };  static const char  *file_path      = "/hello.txt"; @@ -73,11 +69,18 @@ static void send_req(cJSON *req) {      cJSON_Delete(req); \  } while (0) -static void await_response(enum opcode op) { +static struct response await_response(enum opcode op) { +  pthread_mutex_lock(&response_mutex); +    memset(&response, 0, sizeof response); -  do { -    wby_update(&server); -  } while (response.op == NONE); +  while (response.op == NONE) { +    pthread_cond_wait(&response_cv, &response_mutex); +  } + +  struct response ret = response; +  pthread_mutex_unlock(&response_mutex); + +  return ret;  }  static int @@ -90,15 +93,15 @@ hello_getattr(const char *path, struct stat *stbuf)          cJSON_AddStringToObject(req, "path", path);        }); -    await_response(GETATTR); -    if (response.error != 0) { -      printf("error re getattr(%s): %d\n", path, response.error); -      return -response.error; +    struct response resp = await_response(GETATTR); +    if (resp.error != 0) { +      printf("error re getattr(%s): %d\n", path, resp.error); +      return -resp.error;      } -    stbuf->st_mode = response.body.getattr.st_mode; -    stbuf->st_nlink = response.body.getattr.st_nlink; -    stbuf->st_size = response.body.getattr.st_size; +    stbuf->st_mode = resp.body.getattr.st_mode; +    stbuf->st_nlink = resp.body.getattr.st_nlink; +    stbuf->st_size = resp.body.getattr.st_size;      printf("returning re getattr(%s)\n", path);      /* if (strcmp(path, "/") == 0) { /\* The root directory of our file system. *\/ */      /*     stbuf->st_mode = S_IFDIR | 0755; */ @@ -110,6 +113,7 @@ hello_getattr(const char *path, struct stat *stbuf)      /* } else /\* We reject everything else. *\/ */      /*     return -ENOENT; */ +      return 0;  } @@ -137,9 +141,10 @@ hello_readdir(const char *path, void *buf, fuse_fill_dir_t filler,        });      printf("awaiting response to readdir(%s)\n", path); -    await_response(READDIR); -    struct readdir *readdir = &response.body.readdir; -    printf("response: %d files\n", readdir->num_entries); +    struct response resp = await_response(READDIR); + +    struct readdir *readdir = &resp.body.readdir; +    printf("response: %d files\n", (int) readdir->num_entries);      for (size_t i = 0; i < readdir->num_entries; ++i) {          filler(buf, readdir->entries[i], NULL, 0); @@ -183,18 +188,16 @@ dispatch(struct wby_con *connection, void *userdata)  static int  websocket_connect(struct wby_con *connection, void *userdata)  { -    struct server_state *state = (struct server_state*)userdata;      /* connection bound userdata */      connection->user_data = NULL; -    if (0 == strcmp(connection->request.uri, "/") && state->conn_count < MAX_WSCONN) +    if (0 == strcmp(connection->request.uri, "/"))          return 0; -    else return 1; +    return 1;  }  static void  websocket_connected(struct wby_con *connection, void *userdata)  { -    struct server_state *state = (struct server_state*)userdata;      printf("WebSocket connected\n");      con = connection;  } @@ -241,6 +244,8 @@ websocket_frame(struct wby_con *connection, const struct wby_frame *frame, void        printf("Null in data! [%s]\n", data);      } +    pthread_mutex_lock(&response_mutex); +      cJSON *ret = cJSON_Parse((const char *) data);      cJSON *op_item = cJSON_GetObjectItemCaseSensitive(ret, "op"); @@ -275,23 +280,16 @@ websocket_frame(struct wby_con *connection, const struct wby_frame *frame, void   done:      if (ret) cJSON_Delete(ret); + +    pthread_cond_signal(&response_cv); +    pthread_mutex_unlock(&response_mutex);      return 0;  }  static void  websocket_closed(struct wby_con *connection, void *userdata)  { -    int i; -    struct server_state *state = (struct server_state*)userdata;      printf("WebSocket closed\n"); -    for (i = 0; i < state->conn_count; i++) { -        if (state->conn[i] == connection) { -            int remain = state->conn_count - i; -            memmove(state->conn + i, state->conn + i + 1, (size_t)remain * sizeof(struct wby_con*)); -            --state->conn_count; -            break; -        } -    }  }  static void @@ -300,19 +298,17 @@ test_log(const char* text)      printf("[debug] %s\n", text);  } -int -main(int argc, char **argv) +void *websocket_main(void *threadid)  { -  void *memory = NULL; +    void *memory = NULL;      wby_size needed_memory = 0; -    struct server_state state;      struct wby_config config;      memset(&config, 0, sizeof config); -    config.userdata = &state; +    config.userdata = NULL;      config.address = "127.0.0.1";      config.port = 8888; -    config.connection_max = 1; +    config.connection_max = 4;      config.request_buffer_size = 2048;      config.io_buffer_size = 8192;      config.log = test_log; @@ -326,18 +322,24 @@ main(int argc, char **argv)      memory = calloc(needed_memory, 1);      wby_start(&server, memory); -    memset(&state, 0, sizeof state); -      printf("Awaiting WebSocket connection from Chrome extension.\n"); -    while (con == NULL) { +    for (;;) {          wby_update(&server);      } + +    wby_stop(&server); +    free(memory); +#if defined(_WIN32) +    WSACleanup(); +#endif +    return 0; +} + +int +main(int argc, char **argv) +{ +    pthread_t websocket_thread; +    pthread_create(&websocket_thread, NULL, websocket_main, NULL);      return fuse_main(argc, argv, &hello_filesystem_operations, NULL); -/*     wby_stop(&server); */ -/*     free(memory); */ -/* #if defined(_WIN32) */ -/*     WSACleanup(); */ -/* #endif */ -/*     return 0; */ -  //  +  }  | 
