From 3c8570ac52848544b33c805a764fbd00f244958e Mon Sep 17 00:00:00 2001 From: human Date: Sun, 3 Jan 2021 12:23:20 +0200 Subject: multi-threaded tabfs.c --- extension/background.js | 3 +- fs/Makefile | 12 +- fs/tabfs.c | 498 ++++++++++++++++++++++++++++++++++++------------ 3 files changed, 381 insertions(+), 132 deletions(-) diff --git a/extension/background.js b/extension/background.js index 1b1cad0..370e106 100644 --- a/extension/background.js +++ b/extension/background.js @@ -632,7 +632,7 @@ async function onMessage(req) { // timeout is very useful because some operations just hang // (like trying to take a screenshot, until the tab is focused) didTimeout = true; console.error('timeout'); - port.postMessage({ op: req.op, error: unix.ETIMEDOUT }); + port.postMessage({ id: req.id, op: req.op, error: unix.ETIMEDOUT }); }, 1000); /* console.time(req.op + ':' + req.path);*/ @@ -654,6 +654,7 @@ async function onMessage(req) { clearTimeout(timeout); console.log('resp', response); + response.id = req.id; port.postMessage(response); } }; diff --git a/fs/Makefile b/fs/Makefile index 97ca01a..37f0f3d 100644 --- a/fs/Makefile +++ b/fs/Makefile @@ -7,25 +7,26 @@ OSXFUSE_ROOT = /usr/local # Root for libraries from FreeBSD's ports FREEBSD_ROOT = /usr/local -CFLAGS_EXTRA = -DFUSE_USE_VERSION=26 -D_FILE_OFFSET_BITS=64 -Wall -Wno-unused-function -g +CFLAGS ?= -O2 +CFLAGS_EXTRA = -DFUSE_USE_VERSION=26 -D_FILE_OFFSET_BITS=64 -Wall -Wextra -Wno-unused-result -g ifeq ($(shell uname -s),Linux) CFLAGS += $(CFLAGS_EXTRA) - LIBS = -lfuse + LIBS = -lfuse -pthread endif ifeq ($(shell uname -s),Darwin) - CFLAGS = -I$(OSXFUSE_ROOT)/include/osxfuse/fuse -L$(OSXFUSE_ROOT)/lib -D_DARWIN_USE_64_BIT_INODE $(CFLAGS_EXTRA) + CFLAGS += -I$(OSXFUSE_ROOT)/include/osxfuse/fuse -L$(OSXFUSE_ROOT)/lib -D_DARWIN_USE_64_BIT_INODE $(CFLAGS_EXTRA) LIBS = -losxfuse endif ifeq ($(shell uname -s),FreeBSD) CFLAGS += -L$(FREEBSD_ROOT)/lib -I$(FREEBSD_ROOT)/include $(CFLAGS_EXTRA) - LIBS = -lfuse + LIBS = -lfuse -pthread endif all: $(TARGETS) tabfs: tabfs.c - cc $(CFLAGS) -o $@ $^ $(LIBS) + $(CC) $(CFLAGS) $(LDFLAGS) -o $@ $^ $(LIBS) clean: rm -f $(TARGETS) *.o @@ -34,3 +35,4 @@ clean: unmount: killall -9 tabfs || true diskutil unmount force mnt || true + fusermount -u mnt || true diff --git a/fs/tabfs.c b/fs/tabfs.c index fecec22..d00e96c 100644 --- a/fs/tabfs.c +++ b/fs/tabfs.c @@ -1,231 +1,477 @@ -// This file should rarely need to be changed. (which is intentional, -// because it is a pain to program here, it's a pain to recompile and -// reload it, and it's a pain to debug it.) Most of the real meat of -// TabFS is on the extension side, not here. - -#include -#include -#include -#include #include +#include +#include #include +#include +#include +#include +#include + #include #include "vendor/frozen.h" #include "vendor/frozen.c" -FILE* l; +#define eprintln(fmt, ...) fprintf(stderr, fmt "\n", ##__VA_ARGS__) + +// protects: +// - writing to stdout +// - the "waiters" global +static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER; + +struct resumedata { + unsigned int id; + int msgpipe[2]; + void *data; + size_t size; +}; + +static struct resumedata **waiters; +static size_t numwaiters; + +static void read_or_die(int fd, void *buf, size_t sz) { + size_t sofar = 0; + while (sofar < sz) { + ssize_t rv = read(fd, (char *)buf+sofar, sz-sofar); + if (rv == -1) { + if (errno == EINTR || errno == EAGAIN) continue; + perror("read error"); + exit(1); + } + if (rv == 0) exit(1); + sofar += (size_t)rv; + } +} +static void write_or_die(int fd, void *buf, size_t sz) { + size_t sofar = 0; + while (sofar < sz) { + ssize_t rv = write(fd, (char *)buf+sofar, sz-sofar); + if (rv == -1) { + if (errno == EINTR || errno == EAGAIN) continue; + perror("write error"); + exit(1); + } + if (rv == 0) exit(1); + sofar += (size_t)rv; + } +} + +// documented somewhere in https://developer.chrome.com/docs/apps/nativeMessaging/ +#define MAX_MESSAGE_SIZE (1024*1024) -static void send_request(const char *fmt, ...) { - va_list args; va_start(args, fmt); +static int do_exchange(unsigned int id, + char **datap, size_t *sizep, + const char *fmt, ...) { + *datap = NULL; + *sizep = 0; - char request_data[1024*1024]; // max size of native->Chrome message - struct json_out out = JSON_OUT_BUF(request_data, sizeof(request_data)); - unsigned int request_len = json_vprintf(&out, fmt, args); + char jsonbuf[MAX_MESSAGE_SIZE]; + struct json_out out = JSON_OUT_BUF(jsonbuf, sizeof(jsonbuf)); + va_list args; + va_start(args, fmt); + size_t request_size = (size_t)json_vprintf(&out, fmt, args); va_end(args); + if (request_size > sizeof(jsonbuf)) { + eprintln("warning: request too big to send (%zu > %zu)", + request_size, sizeof(jsonbuf)); + return -EMSGSIZE; + } + + struct resumedata mydata = { + .id = id, + .msgpipe = {-1, -1}, + .data = NULL, + .size = 0, + }; + if (-1 == pipe(mydata.msgpipe)) { + perror("exchange: pipe"); + return -EIO; + } + + pthread_mutex_lock(&write_lock); - write(1, (char *) &request_len, 4); // stdout - unsigned int bytes_written = 0; - while (bytes_written < request_len) { - bytes_written += write(1, request_data, request_len); + uint32_t size_4bytes = request_size; + + write_or_die(STDOUT_FILENO, &size_4bytes, sizeof(size_4bytes)); + write_or_die(STDOUT_FILENO, jsonbuf, request_size); + + waiters = realloc(waiters, (numwaiters+1)*sizeof(*waiters)); + waiters[numwaiters] = &mydata; + numwaiters += 1; + + pthread_mutex_unlock(&write_lock); + + char c; + read_or_die(mydata.msgpipe[0], &c, 1); + + close(mydata.msgpipe[0]); + close(mydata.msgpipe[1]); + + int err; + if (1 == json_scanf(mydata.data, mydata.size, "{error: %d}", &err)) { + free(mydata.data); + return -err; } - /* fprintf(l, "req[%s]\n", request_data); fflush(l); */ + + *datap = mydata.data; + *sizep = mydata.size; + + return 0; } -static int await_response(char **resp) { - unsigned int response_len; - read(0, (char *) &response_len, 4); // stdin - char *response_data = malloc(response_len); - unsigned int bytes_read = 0; - while (bytes_read < response_len) { - bytes_read += read(0, response_data + bytes_read, response_len); +static void *reader_main(void *ud) { + (void)ud; + for (;;) { + uint32_t size_4bytes; + read_or_die(STDIN_FILENO, &size_4bytes, sizeof(size_4bytes)); + size_t insize = size_4bytes; + + char *data = malloc(insize); + read_or_die(STDIN_FILENO, data, insize); + + unsigned int id; + if (1 != json_scanf(data, insize, "{id: %u}", &id)) { + eprintln("reader: warning: got a message without an id, ignoring"); + free(data); + continue; + } + + pthread_mutex_lock(&write_lock); + int found = 0; + unsigned int i = numwaiters; + while (i --> 0) { + if (waiters[i]->id == id) { + char c = '!'; + waiters[i]->data = data; + waiters[i]->size = insize; + write_or_die(waiters[i]->msgpipe[1], &c, 1); + memmove(&waiters[i], &waiters[i+1], + (numwaiters-(i+1))*sizeof(*waiters)); + numwaiters -= 1; + found = 1; + break; + } + } + if (!found) { + eprintln("reader: warning: got a message for nonexistent waiter %u", id); + free(data); + } + pthread_mutex_unlock(&write_lock); } - /* fprintf(l, "resp(%d; expected %d)[%s]\n", bytes_read, response_len, response_data); fflush(l); */ - if (response_data == NULL) { - // Connection is dead. - *resp = "{ \"error\": 5 }"; - return strlen(*resp); + return NULL; +} + +static int count_fmt_args(const char *s) { + int cnt = 0; + for (; *s; s++) { + if (*s == '%') { + if (*(s+1) != '%') cnt++; + else s++; + } } + return cnt; +} + +#define exchange_json(datap, sizep, keys_fmt, ...) \ + do { \ + unsigned int id = pthread_self(); \ + int req_rv = do_exchange(id, datap, sizep, \ + "{id: %u, " keys_fmt "}", \ + id, ##__VA_ARGS__); \ + if (req_rv != 0) return req_rv; \ + } while (0) - *resp = response_data; - return response_len; -} - -#define receive_response(fmt, ...) \ - do { \ - char *resp; int resp_len; \ - resp_len = await_response(&resp); \ - if (!resp_len) return -EIO; \ - \ - int err; \ - if (json_scanf(resp, resp_len, "{error: %d}", &err) && err) { \ - free(resp); return -err; \ - } \ - \ - json_scanf(resp, resp_len, fmt, __VA_ARGS__); \ - free(resp); \ +#define parse_and_free_response(data, size, keys_fmt, ...) \ + do { \ + if (*keys_fmt == '\0') { \ + /* empty format string, skip the work */ \ + free(data); data = NULL; \ + } else { \ + int num_expected = count_fmt_args(keys_fmt); \ + int num_scanned = json_scanf(data, size, \ + "{" keys_fmt "}", \ + ##__VA_ARGS__); \ + if (num_scanned == num_expected) { \ + free(data); data = NULL; \ + } else { \ + eprintln("%s: could only parse %d of %d keys!", \ + __func__, num_expected, num_scanned); \ + free(data); data = NULL; \ + return -EIO; \ + } \ + } \ } while (0) static int tabfs_getattr(const char *path, struct stat *stbuf) { - send_request("{op: %Q, path: %Q}", "getattr", path); + char *rdata; + size_t rsize; + exchange_json(&rdata, &rsize, + "op: %Q, path: %Q", + "getattr", path); memset(stbuf, 0, sizeof(struct stat)); - receive_response("{st_mode: %d, st_nlink: %d, st_size: %d}", - &stbuf->st_mode, &stbuf->st_nlink, &stbuf->st_size); + parse_and_free_response(rdata, rsize, + "st_mode: %d, st_nlink: %d, st_size: %d", + &stbuf->st_mode, &stbuf->st_nlink, &stbuf->st_size); + return 0; } static int tabfs_readlink(const char *path, char *buf, size_t size) { - send_request("{op: %Q, path: %Q}", "readlink", path); + char *rdata; + size_t rsize; + exchange_json(&rdata, &rsize, + "op: %Q, path: %Q", + "readlink", path); - char *scan_buf; int scan_len; - receive_response("{buf: %V}", &scan_buf, &scan_len); - memcpy(buf, scan_buf, scan_len < size ? scan_len : size); free(scan_buf); + char *scan_buf; + int scan_len; + parse_and_free_response(rdata, rsize, + "buf: %V", + &scan_buf, &scan_len); + + // fuse.h: + // "If the linkname is too long to fit in the buffer, it should be truncated." + if ((size_t)scan_len >= size) scan_len = size-1; + + memcpy(buf, scan_buf, scan_len); + buf[scan_len] = '\0'; + + free(scan_buf); return 0; } static int tabfs_open(const char *path, struct fuse_file_info *fi) { - send_request("{op: %Q, path: %Q, flags: %d}", "open", path, fi->flags); + char *data; + size_t size; + exchange_json(&data, &size, + "op: %Q, path: %Q, flags: %d", + "open", path, fi->flags); - receive_response("{fh: %d}", &fi->fh); + parse_and_free_response(data, size, + "fh: %d", + &fi->fh); return 0; } -static int -tabfs_read(const char *path, char *buf, size_t size, off_t offset, - struct fuse_file_info *fi) { - send_request("{op: %Q, path: %Q, size: %d, offset: %d, fh: %d, flags: %d}", - "read", path, size, offset, fi->fh, fi->flags); +static int tabfs_read(const char *path, + char *buf, + size_t size, + off_t offset, + struct fuse_file_info *fi) { + char *rdata; + size_t rsize; + exchange_json(&rdata, &rsize, + "op: %Q, path: %Q, size: %d, offset: %d, fh: %d, flags: %d", + "read", path, size, offset, fi->fh, fi->flags); char *scan_buf; int scan_len; - receive_response("{buf: %V}", &scan_buf, &scan_len); - memcpy(buf, scan_buf, scan_len < size ? scan_len : size); free(scan_buf); + parse_and_free_response(rdata, rsize, + "buf: %V", + &scan_buf, &scan_len); + + if ((size_t)scan_len > size) scan_len = size; + memcpy(buf, scan_buf, scan_len); + free(scan_buf); return scan_len; } -static int -tabfs_write(const char *path, const char *buf, size_t size, off_t offset, - struct fuse_file_info *fi) { - - send_request("{op: %Q, path: %Q, buf: %V, offset: %d, fh: %d, flags: %d}", - "write", path, buf, size, offset, fi->fh, fi->flags); - - int ret; receive_response("{size: %d}", &ret); return ret; +static int tabfs_write(const char *path, + const char *data, + size_t size, + off_t offset, + struct fuse_file_info *fi) { + char *rdata; + size_t rsize; + exchange_json(&rdata, &rsize, + "op: %Q, path: %Q, buf: %V, offset: %d, fh: %d, flags: %d", + "write", path, data, size, offset, fi->fh, fi->flags); + + int ret; + parse_and_free_response(rdata, rsize, + "size: %d", + &ret); + + return ret; } static int tabfs_release(const char *path, struct fuse_file_info *fi) { - send_request("{op: %Q, path: %Q, fh: %d}", - "release", path, fi->fh); + char *data; + size_t size; + exchange_json(&data, &size, + "op: %Q, path: %Q, fh: %d", + "release", path, fi->fh); + + parse_and_free_response(data, size, ""); - receive_response("{}", NULL); return 0; } static int tabfs_opendir(const char *path, struct fuse_file_info *fi) { - send_request("{op: %Q, path: %Q, flags: %d}", - "opendir", path, fi->flags); - - receive_response("{fh: %d}", &fi->fh); + char *rdata; + size_t rsize; + exchange_json(&rdata, &rsize, + "op: %Q, path: %Q, flags: %d", + "opendir", path, fi->flags); + + parse_and_free_response(rdata, rsize, + "fh: %d", + &fi->fh); + return 0; } -static int -tabfs_readdir(const char *path, void *buf, fuse_fill_dir_t filler, - off_t offset, struct fuse_file_info *fi) { - send_request("{op: %Q, path: %Q, offset: %d}", - "readdir", path, offset); +static int tabfs_readdir(const char *path, + void *buf, + fuse_fill_dir_t filler, + off_t offset, + struct fuse_file_info *fi) { + (void)fi; - char *resp; int resp_len; - resp_len = await_response(&resp); - if (!resp_len) return -EIO; + char *rdata; + size_t rsize; + exchange_json(&rdata, &rsize, + "op: %Q, path: %Q, offset: %d", + "readdir", path, offset); struct json_token t; - for (int i = 0; json_scanf_array_elem(resp, resp_len, ".entries", i, &t) > 0; i++) { - char entry[t.len + 1]; snprintf(entry, t.len + 1, "%.*s", t.len, t.ptr); + for (int i = 0; json_scanf_array_elem(rdata, rsize, ".entries", i, &t) > 0; i++) { + char entry[t.len+1]; + memcpy(entry, t.ptr, t.len); + entry[t.len] = '\0'; filler(buf, entry, NULL, 0); } - free(resp); + parse_and_free_response(rdata, rsize, ""); + return 0; } static int tabfs_releasedir(const char *path, struct fuse_file_info *fi) { - send_request("{op: %Q, path: %Q, fh: %d}", - "releasedir", path, fi->fh); + char *rdata; + size_t rsize; + exchange_json(&rdata, &rsize, + "op: %Q, path: %Q, fh: %d", + "releasedir", path, fi->fh); + + parse_and_free_response(rdata, rsize, ""); - receive_response("{}", NULL); return 0; } static int tabfs_truncate(const char *path, off_t size) { - send_request("{op: %Q, path: %Q, size: %d}", - "truncate", path, size); + char *rdata; + size_t rsize; + exchange_json(&rdata, &rsize, + "op: %Q, path: %Q, size: %d", + "truncate", path, size); + + parse_and_free_response(rdata, rsize, ""); - receive_response("{}", NULL); return 0; } static int tabfs_unlink(const char *path) { - send_request("{op: %Q, path: %Q}", "unlink", path); + char *rdata; + size_t rsize; + exchange_json(&rdata, &rsize, + "op: %Q, path: %Q", + "unlink", path); + + parse_and_free_response(rdata, rsize, ""); - receive_response("{}", NULL); return 0; } static int tabfs_mkdir(const char *path, mode_t mode) { - send_request("{op: %Q, path: %Q, mode: %d}", "mkdir", path, mode); + char *rdata; + size_t rsize; + exchange_json(&rdata, &rsize, + "op: %Q, path: %Q, mode: %d", + "mkdir", path, mode); + + parse_and_free_response(rdata, rsize, ""); - receive_response("{}", NULL); return 0; } static int tabfs_create(const char *path, mode_t mode, struct fuse_file_info *fi) { - send_request("{op: %Q, path: %Q, mode: %d}", "mkdir", path, mode); + (void)fi; + + char *rdata; + size_t rsize; + exchange_json(&rdata, &rsize, + "op: %Q, path: %Q, mode: %d", + "mkdir", path, mode); + + parse_and_free_response(rdata, rsize, ""); - receive_response("{}", NULL); return 0; } -static struct fuse_operations tabfs_filesystem_operations = { +static const struct fuse_operations tabfs_oper = { .getattr = tabfs_getattr, .readlink = tabfs_readlink, - .open = tabfs_open, - .read = tabfs_read, - .write = tabfs_write, - .release = tabfs_release, + .open = tabfs_open, + .read = tabfs_read, + .write = tabfs_write, + .release = tabfs_release, - .opendir = tabfs_opendir, - .readdir = tabfs_readdir, + .opendir = tabfs_opendir, + .readdir = tabfs_readdir, .releasedir = tabfs_releasedir, - .truncate = tabfs_truncate, - .unlink = tabfs_unlink, + .truncate = tabfs_truncate, + .unlink = tabfs_unlink, - .mkdir = tabfs_mkdir, - .create = tabfs_create + .mkdir = tabfs_mkdir, + .create = tabfs_create, }; int main(int argc, char **argv) { - char killcmd[1000]; - sprintf(killcmd, "pgrep tabfs | grep -v %d | xargs kill -9", getpid()); + (void)argc; + + freopen("log.txt", "a", stderr); + setvbuf(stderr, NULL, _IONBF, 0); + + char killcmd[128]; + sprintf(killcmd, "pgrep tabfs | grep -v %d | xargs kill -9 2>/dev/null", getpid()); system(killcmd); -#ifdef __APPLE__ - system("diskutil umount force mnt > /dev/null"); -#elif __FreeBSD__ - system("umount -f mnt"); + +#if defined(__APPLE__) + system("diskutil umount force mnt >/dev/null"); +#elif defined(__FreeBSD__) + system("umount -f mnt 2>/dev/null"); #else - system("fusermount -u mnt"); + system("fusermount -u mnt 2>/dev/null"); #endif - l = fopen("log.txt", "w"); - for (int i = 0; i < argc; i++) { - fprintf(l, "arg%d: [%s]\n", i, argv[i]); fflush(l); + mkdir("mnt", 0755); + + pthread_t thread; + int err = pthread_create(&thread, NULL, reader_main, NULL); + if (err != 0) { + eprintln("pthread_create: %s", strerror(err)); + exit(1); } - char* fuse_argv[] = {argv[0], "-odirect_io", "-s", "-f", "mnt"}; - return fuse_main(5, fuse_argv, &tabfs_filesystem_operations, NULL); + pthread_detach(thread); + + char *fuse_argv[] = { + argv[0], + "-f", + "-oauto_unmount", + "-odirect_io", + "mnt", + NULL, + }; + return fuse_main( + (sizeof(fuse_argv)/sizeof(*fuse_argv))-1, + (char **)&fuse_argv, + &tabfs_oper, + NULL); } -- cgit v1.2.3 From 57331e2092ec8d7cd5ebb4b8b6ec2bdf2d2b5661 Mon Sep 17 00:00:00 2001 From: human Date: Mon, 4 Jan 2021 11:40:22 +0200 Subject: macfuse fix attempt --- fs/tabfs.c | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/fs/tabfs.c b/fs/tabfs.c index d00e96c..5c27faf 100644 --- a/fs/tabfs.c +++ b/fs/tabfs.c @@ -57,7 +57,7 @@ static void write_or_die(int fd, void *buf, size_t sz) { } // documented somewhere in https://developer.chrome.com/docs/apps/nativeMessaging/ -#define MAX_MESSAGE_SIZE (1024*1024) +#define MAX_MESSAGE_SIZE (size_t)(1024*1024) static int do_exchange(unsigned int id, char **datap, size_t *sizep, @@ -65,16 +65,17 @@ static int do_exchange(unsigned int id, *datap = NULL; *sizep = 0; - char jsonbuf[MAX_MESSAGE_SIZE]; - struct json_out out = JSON_OUT_BUF(jsonbuf, sizeof(jsonbuf)); + char *jsonbuf = malloc(MAX_MESSAGE_SIZE); + struct json_out out = JSON_OUT_BUF(jsonbuf, MAX_MESSAGE_SIZE); va_list args; va_start(args, fmt); size_t request_size = (size_t)json_vprintf(&out, fmt, args); va_end(args); - if (request_size > sizeof(jsonbuf)) { + if (request_size > MAX_MESSAGE_SIZE) { eprintln("warning: request too big to send (%zu > %zu)", - request_size, sizeof(jsonbuf)); + request_size, MAX_MESSAGE_SIZE); + free(jsonbuf); return -EMSGSIZE; } @@ -86,6 +87,7 @@ static int do_exchange(unsigned int id, }; if (-1 == pipe(mydata.msgpipe)) { perror("exchange: pipe"); + free(jsonbuf); return -EIO; } @@ -96,6 +98,8 @@ static int do_exchange(unsigned int id, write_or_die(STDOUT_FILENO, &size_4bytes, sizeof(size_4bytes)); write_or_die(STDOUT_FILENO, jsonbuf, request_size); + free(jsonbuf); jsonbuf = NULL; + waiters = realloc(waiters, (numwaiters+1)*sizeof(*waiters)); waiters[numwaiters] = &mydata; numwaiters += 1; @@ -175,7 +179,7 @@ static int count_fmt_args(const char *s) { #define exchange_json(datap, sizep, keys_fmt, ...) \ do { \ - unsigned int id = pthread_self(); \ + unsigned int id = (uintptr_t)pthread_self(); \ int req_rv = do_exchange(id, datap, sizep, \ "{id: %u, " keys_fmt "}", \ id, ##__VA_ARGS__); \ @@ -464,7 +468,9 @@ int main(int argc, char **argv) { char *fuse_argv[] = { argv[0], "-f", +#if !defined(__APPLE__) "-oauto_unmount", +#endif "-odirect_io", "mnt", NULL, -- cgit v1.2.3