aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ws.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ws.c')
-rw-r--r--fs/ws.c239
1 files changed, 239 insertions, 0 deletions
diff --git a/fs/ws.c b/fs/ws.c
new file mode 100644
index 0000000..c7cb3ec
--- /dev/null
+++ b/fs/ws.c
@@ -0,0 +1,239 @@
+// WebSocket server.
+// Side thread that gets spawned.
+
+#define WBY_STATIC
+#define WBY_IMPLEMENTATION
+#define WBY_USE_FIXED_TYPES
+#define WBY_USE_ASSERT
+#include "mmx/web.h"
+
+#include "common.h"
+
+static struct wby_server server;
+static struct wby_con *con = NULL;
+
+static int fill_fd_set_with_ws_sockets(fd_set *read_fds, fd_set *write_fds, fd_set *except_fds) {
+ // Based on web.h:1936 (start of wby_update)
+
+ int max_fd = 0;
+ FD_SET(server.socket, read_fds);
+ FD_SET(server.socket, except_fds);
+ max_fd = WBY_SOCK(server.socket);
+
+ if (con == NULL) { return max_fd; }
+
+ struct wby_connection *conn = (struct wby_connection *) con;
+ wby_socket socket = WBY_SOCK(conn->socket);
+ FD_SET(socket, read_fds);
+ FD_SET(socket, except_fds);
+ if (conn->state == WBY_CON_STATE_SEND_CONTINUE) {
+ FD_SET(socket, write_fds);
+ }
+
+ if (socket > max_fd) { max_fd = socket; }
+ return max_fd;
+}
+
+static void receive_tabfs_request_then_send_to_browser() {
+ char *request_data = common_receive_tabfs_to_ws(fill_fd_set_with_ws_sockets);
+ if (request_data == NULL) {
+ return;
+ }
+
+ if (con == NULL) {
+ common_send_ws_to_tabfs(NULL);
+ return;
+ }
+
+ wby_frame_begin(con, WBY_WSOP_TEXT_FRAME);
+ wby_write(con, request_data, strlen(request_data));
+ wby_frame_end(con);
+
+ /* pthread_mutex_lock(&queue_mutex); */
+
+ /* if (con == NULL) goto done; */
+
+ /* for (request_id id = 0; id < REQUEST_RESPONSE_QUEUE_SIZE; id++) { */
+ /* if (queue[id].state == SEND_REQUEST) { */
+ /* char *request = queue[id].request; */
+
+ /* wby_frame_begin(con, WBY_WSOP_TEXT_FRAME); */
+ /* wby_write(con, request, strlen(request)); */
+ /* wby_frame_end(con); */
+
+ /* queue[id].state = RECEIVE_RESPONSE; */
+ /* free(request); */
+ /* queue[id].request = NULL; */
+ /* } */
+ /* } */
+
+ /* done: */
+ /* pthread_mutex_unlock(&queue_mutex); */
+}
+
+static int
+dispatch(struct wby_con *connection, void *userdata)
+{
+ return 1;
+}
+
+static int
+websocket_connect(struct wby_con *connection, void *userdata)
+{
+ /* connection bound userdata */
+ connection->user_data = NULL;
+ if (0 == strcmp(connection->request.uri, "/"))
+ return 0;
+ return 1;
+}
+
+static void
+websocket_connected(struct wby_con *connection, void *userdata)
+{
+ printf("WebSocket connected\n");
+ con = connection;
+}
+
+#define MAX_DATA_LENGTH 131072
+
+static int
+websocket_frame(struct wby_con *connection, const struct wby_frame *frame, void *userdata)
+{
+ unsigned char *data = calloc(1, MAX_DATA_LENGTH); // Will be freed at receiver (tabfs).
+
+ int i = 0;
+ DEBUG("WebSocket frame incoming\n");
+ DEBUG(" Frame OpCode: %d\n", frame->opcode);
+ DEBUG(" Final frame?: %s\n", (frame->flags & WBY_WSF_FIN) ? "yes" : "no");
+ DEBUG(" Masked? : %s\n", (frame->flags & WBY_WSF_MASKED) ? "yes" : "no");
+ DEBUG(" Data Length : %d\n", (int) frame->payload_length);
+
+ if ((unsigned long) frame->payload_length > MAX_DATA_LENGTH) {
+ printf("Data too long!\n");
+ exit(1);
+ }
+
+ while (i < frame->payload_length) {
+ unsigned char buffer[16];
+ int remain = frame->payload_length - i;
+ size_t read_size = remain > (int) sizeof buffer ? sizeof buffer : (size_t) remain;
+ size_t k;
+
+ DEBUG("%08x ", (int) i);
+ if (0 != wby_read(connection, buffer, read_size))
+ break;
+ for (k = 0; k < read_size; ++k)
+ DEBUG("%02x ", buffer[k]);
+ for (k = read_size; k < 16; ++k)
+ DEBUG(" ");
+ DEBUG(" | ");
+ for (k = 0; k < read_size; ++k)
+ DEBUG("%c", isprint(buffer[k]) ? buffer[k] : '?');
+ DEBUG("\n");
+ for (k = 0; k < read_size; ++k)
+ data[i + k] = buffer[k];
+ i += (int)read_size;
+ }
+
+ if ((int) strlen((const char *) data) != frame->payload_length) {
+ printf("Null in data! [%s]\n", data);
+ }
+
+ common_send_ws_to_tabfs((char *) data);
+
+ // Will be freed at the receiver end.
+ /* cJSON *resp = cJSON_Parse((const char *) data); */
+
+ /* cJSON *id_item = cJSON_GetObjectItemCaseSensitive(resp, "id"); */
+ /* if (id_item == NULL) { */
+ /* printf("No id in response!\n"); */
+ /* exit(1); */
+ /* } */
+ /* request_id id = id_item->valueint; */
+
+ /* pthread_mutex_lock(&queue_mutex); */
+
+ /* if (queue[id].state != RECEIVE_RESPONSE) { */
+ /* printf("Got response to request in wrong state!\n"); */
+ /* exit(1); */
+ /* } */
+ /* queue[id].state = HANDLE_RESPONSE; */
+ /* queue[id].response = resp; */
+
+ /* pthread_cond_signal(&queue_cv); */
+ /* pthread_mutex_unlock(&queue_mutex); */
+
+ return 0;
+}
+
+static void
+websocket_closed(struct wby_con *connection, void *userdata)
+{
+ printf("WebSocket closed\n");
+
+ if (con == connection) con = NULL;
+}
+
+static void
+test_log(const char* text)
+{
+ DEBUG("[debug] %s\n", text);
+}
+
+void await_io_demand_or_timeout() {
+ /* pthread_mutex_lock(&queue_mutex); */
+
+ /* struct timeval now; */
+ /* gettimeofday(&now, NULL); */
+
+ /* struct timespec tsp; */
+ /* tsp.tv_sec = now.tv_sec; */
+ /* tsp.tv_nsec = now.tv_usec * 1000; */
+
+ /* tsp.tv_nsec += 200 * 1000000; // wait for 200ms max */
+ /* pthread_cond_timedwait(&queue_cv, &queue_mutex, &tsp); */
+
+ /* pthread_mutex_unlock(&queue_mutex); */
+}
+
+void *websocket_main(void *threadid)
+{
+ void *memory = NULL;
+ wby_size needed_memory = 0;
+
+ struct wby_config config;
+ memset(&config, 0, sizeof config);
+ config.userdata = NULL;
+ config.address = "127.0.0.1";
+ config.port = 8888;
+ config.connection_max = 4;
+ config.request_buffer_size = 2048;
+ config.io_buffer_size = 8192;
+ config.log = test_log;
+ config.dispatch = dispatch;
+ config.ws_connect = websocket_connect;
+ config.ws_connected = websocket_connected;
+ config.ws_frame = websocket_frame;
+ config.ws_closed = websocket_closed;
+
+ wby_init(&server, &config, &needed_memory);
+ memory = calloc(needed_memory, 1);
+ wby_start(&server, memory);
+
+ printf("Awaiting WebSocket connection from Chrome extension.\n");
+ for (;;) {
+ // FIXME: makes reconnect impossible. :<
+ /* await_io_demand_or_timeout(); */
+
+ receive_tabfs_request_then_send_to_browser();
+
+ wby_update(&server); // We receive stuff during this phase.
+ }
+
+ wby_stop(&server);
+ free(memory);
+#if defined(_WIN32)
+ WSACleanup();
+#endif
+ return 0;
+}