diff options
author | Heikki Kallasjoki <fis@zem.fi> | 2018-12-23 17:28:59 +0000 |
---|---|---|
committer | Heikki Kallasjoki <fis+github@zem.fi> | 2018-12-23 21:50:27 +0000 |
commit | 96a47dd416b5892c648ad17d01ce29481e9c5f6d (patch) | |
tree | 6fe1a2cb23d7b1f91ae8be12d10d392165d1824c | |
parent | cfaf78325bb636c38166a2a19f05c1c89f010832 (diff) | |
download | nano-exporter-96a47dd416b5892c648ad17d01ce29481e9c5f6d.tar.gz nano-exporter-96a47dd416b5892c648ad17d01ce29481e9c5f6d.tar.xz nano-exporter-96a47dd416b5892c648ad17d01ce29481e9c5f6d.zip |
Allow multiple requests in parallel.
Mostly resolves #19, though a follow-up commit will add a request timeout to stop overly slow requests from taking up slots. The implementation basically turns the request sockets non-blocking, and adds them to the poll loop used to accept new connections. Scrape writes are collected to a request-specific buffer, one collector at a time, and then queued for writing. Every time poll reports progress can be made, that request is processed until the next read/write operation would block, after which control returns to the poll loop.
-rw-r--r-- | scrape.c | 326 |
1 files changed, 245 insertions, 81 deletions
diff --git a/scrape.c b/scrape.c index f632e4e..1528cc7 100644 --- a/scrape.c +++ b/scrape.c @@ -17,6 +17,8 @@ #define _POSIX_C_SOURCE 200809L #include <ctype.h> +#include <errno.h> +#include <fcntl.h> #include <netdb.h> #include <netinet/in.h> #include <poll.h> @@ -35,22 +37,57 @@ #define MAX_LISTEN_SOCKETS 4 #define MAX_BACKLOG 16 +#define MAX_REQUESTS 16 + +enum req_state { + req_state_inactive, + req_state_read, + req_state_write_headers, + req_state_write_metrics, + req_state_write_error, +}; + +enum http_parse_state { + http_read_start, + http_read_path, + http_read_version, + http_skip_headers_1, + http_skip_headers_2, +}; struct scrape_req { - int socket; + enum req_state state; + union { + enum http_parse_state parse_state; + unsigned collector; + }; bbuf *buf; + char *io; + size_t io_size; }; struct scrape_server { - struct pollfd fds[MAX_LISTEN_SOCKETS]; - nfds_t nfds; + struct scrape_req reqs[MAX_REQUESTS]; + struct pollfd fds[MAX_LISTEN_SOCKETS + MAX_REQUESTS]; + nfds_t nfds_listen; + nfds_t nfds_req; }; -static bool handle_http(struct scrape_req *req); +static void req_start(struct scrape_server *srv, int socket); +static void req_close(struct scrape_server *srv, unsigned r); +static void req_process(struct scrape_server *srv, unsigned r, unsigned ncoll, const struct collector *coll[], void *coll_ctx[]); + +// TCP socket server scrape_server *scrape_listen(const char *port) { scrape_server *srv = must_malloc(sizeof *srv); - srv->nfds = 0; + + srv->nfds_listen = 0; + srv->nfds_req = 0; + for (unsigned i = 0; i < MAX_REQUESTS; i++) { + srv->reqs[i].state = req_state_inactive; + srv->reqs[i].buf = 0; + } int ret; @@ -69,7 +106,7 @@ scrape_server *scrape_listen(const char *port) { return false; } - for (struct addrinfo *a = addrs; a && srv->nfds < MAX_LISTEN_SOCKETS; a = a->ai_next) { + for (struct addrinfo *a = addrs; a && srv->nfds_listen < MAX_LISTEN_SOCKETS; a = a->ai_next) { int s = socket(a->ai_family, a->ai_socktype, a->ai_protocol); if (s == -1) { perror("socket"); @@ -96,13 +133,13 @@ scrape_server *scrape_listen(const char *port) { continue; } - srv->fds[srv->nfds].fd = s; - srv->fds[srv->nfds].events = POLLIN; - srv->nfds++; + srv->fds[srv->nfds_listen].fd = s; + srv->fds[srv->nfds_listen].events = POLLIN; + srv->nfds_listen++; } } - if (srv->nfds == 0) { + if (srv->nfds_listen == 0) { fprintf(stderr, "failed to bind any sockets\n"); return 0; } @@ -111,19 +148,18 @@ scrape_server *scrape_listen(const char *port) { } void scrape_serve(scrape_server *srv, unsigned ncoll, const struct collector *coll[], void *coll_ctx[]) { - struct scrape_req req; - req.buf = bbuf_alloc(BUF_INITIAL, BUF_MAX); - int ret; while (1) { - ret = poll(srv->fds, srv->nfds, -1); + ret = poll(srv->fds, srv->nfds_listen + srv->nfds_req, -1); if (ret == -1) { perror("poll"); break; } - for (nfds_t i = 0; i < srv->nfds; i++) { + // handle incoming connections + + for (nfds_t i = 0; i < srv->nfds_listen; i++) { if (srv->fds[i].revents == 0) continue; if (srv->fds[i].revents != POLLIN) { @@ -131,29 +167,46 @@ void scrape_serve(scrape_server *srv, unsigned ncoll, const struct collector *co return; } - req.socket = accept(srv->fds[i].fd, 0, 0); - if (req.socket == -1) { + int s = accept(srv->fds[i].fd, 0, 0); + if (s == -1) { perror("accept"); continue; } - if (handle_http(&req)) { - for (unsigned c = 0; c < ncoll; c++) - coll[c]->collect(&req, coll_ctx[c]); - } - close(req.socket); + req_start(srv, s); + } + + // handle ongoing requests + + for (nfds_t i = srv->nfds_listen; i < srv->nfds_listen + srv->nfds_req; i++) { + if (srv->fds[i].fd < 0 || srv->fds[i].revents == 0) + continue; + + unsigned r = i - srv->nfds_listen; + if ((srv->fds[i].revents & ~(POLLIN | POLLOUT)) != 0) + req_close(srv, r); + else + req_process(srv, r, ncoll, coll, coll_ctx); } } } void scrape_close(scrape_server *srv) { - for (nfds_t i = 0; i < srv->nfds; i++) + for (nfds_t i = 0; i < srv->nfds_listen; i++) close(srv->fds[i].fd); + for (unsigned r = 0; r < MAX_REQUESTS; r++) + if (srv->reqs[r].state != req_state_inactive) + req_close(srv, r); + if (srv->reqs[0].buf) + bbuf_free(srv->reqs[0].buf); free(srv); } +// scrape write API implementation + void scrape_write(scrape_req *req, const char *metric, const struct label *labels, double value) { - bbuf_reset(req->buf); + if (req->state != req_state_write_metrics) + return; bbuf_puts(req->buf, metric); @@ -168,16 +221,72 @@ void scrape_write(scrape_req *req, const char *metric, const struct label *label } bbuf_putf(req->buf, " %.16g\n", value); - - size_t buf_len; - const char *buf = bbuf_get(req->buf, &buf_len); - write_all(req->socket, buf, buf_len); } void scrape_write_raw(scrape_req *req, const void *buf, size_t len) { - write_all(req->socket, buf, len); + bbuf_put(req->buf, buf, len); } +// request state management + +static void req_start(struct scrape_server *srv, int s) { + int flags = fcntl(s, F_GETFL); + if (flags == -1 || fcntl(s, F_SETFL, flags | O_NONBLOCK) == -1) { + perror("fcntl"); + close(s); + return; + } + + unsigned r = 0; + while (r < MAX_REQUESTS && srv->reqs[r].state != req_state_inactive) + r++; + if (r == MAX_REQUESTS) { + close(s); + return; + } + + if (r >= srv->nfds_req) + srv->nfds_req = r + 1; + + struct scrape_req *req = &srv->reqs[r]; + struct pollfd *pfd = &srv->fds[srv->nfds_listen + r]; + + req->state = req_state_read; + req->parse_state = http_read_start; + if (!req->buf) + req->buf = bbuf_alloc(BUF_INITIAL, BUF_MAX); + + pfd->fd = s; + pfd->events = POLLIN; + pfd->revents = POLLIN; // pretend, to do the first read immediately +} + +static void req_close(struct scrape_server *srv, unsigned r) { + srv->reqs[r].state = req_state_inactive; + if (r == 0) { + // keep the reqs[0] buffer for reuse + bbuf_reset(srv->reqs[0].buf); + } else { + bbuf_free(srv->reqs[r].buf); + srv->reqs[r].buf = 0; + } + + nfds_t n = srv->nfds_listen + r; + close(srv->fds[n].fd); + + srv->fds[n].fd = -1; + while (srv->nfds_req > 0 && srv->fds[srv->nfds_listen + srv->nfds_req - 1].fd < 0) + srv->nfds_req--; +} + +enum http_parse_result { + http_parse_incomplete, + http_parse_valid, + http_parse_invalid, +}; + +static enum http_parse_result http_parse(int socket, enum http_parse_state *state, bbuf *buf); + static const char http_success[] = "HTTP/1.1 200 OK\r\n" "Server: nano-exporter\r\n" @@ -194,86 +303,141 @@ static const char http_error[] = "This is not a general-purpose HTTP server.\r\n" ; -static bool handle_http(struct scrape_req *req) { - unsigned char http_buf[1024]; - bbuf *tmp_buf = req->buf; +static void req_process(struct scrape_server *srv, unsigned r, unsigned ncoll, const struct collector *coll[], void *coll_ctx[]) { + // TODO: add timeout support + + struct scrape_req *req = &srv->reqs[r]; + struct pollfd *pfd = &srv->fds[srv->nfds_listen + r]; + + if (req->state == req_state_inactive) + return; + + if (req->state == req_state_read) { + enum http_parse_result ret = http_parse(pfd->fd, &req->parse_state, req->buf); + + if (ret == http_parse_incomplete) + return; // try again after polling + + if (ret == http_parse_valid) { + req->state = req_state_write_headers; + req->io = (char *) http_success; + req->io_size = sizeof http_success - 1; + } else { + req->state = req_state_write_error; + req->io = (char *) http_error; + req->io_size = sizeof http_error - 1; + } - enum { - state_read_method, - state_read_path, - state_read_version, - state_skip_headers_1, - state_skip_headers_2 - } state = state_read_method; - bbuf_reset(tmp_buf); + pfd->events = POLLOUT; + } + +rewrite: + while (req->io_size > 0) { + ssize_t wrote = write(pfd->fd, req->io, req->io_size); + + if (wrote == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) + return; // try again after polling + if (wrote <= 0) { + req_close(srv, r); + return; + } + + req->io += wrote; + req->io_size -= wrote; + } + + if (req->state == req_state_write_error) { + req_close(srv, r); + return; + } + + if (req->state == req_state_write_headers) { + req->state = req_state_write_metrics; + req->collector = 0; + } + + while (req->collector < ncoll) { + bbuf_reset(req->buf); + coll[req->collector]->collect(req, coll_ctx[req->collector]); + req->collector++; + + if (bbuf_len(req->buf) > 0) { + req->io = bbuf_get(req->buf, &req->io_size); + goto rewrite; + } + } + + req_close(srv, r); +} + +// HTTP protocol functions + +static enum http_parse_result http_parse(int socket, enum http_parse_state *state, bbuf *buf) { + unsigned char http_buf[1024]; while (true) { - ssize_t got = read(req->socket, &http_buf, sizeof http_buf); - if (got <= 0) - return false; + ssize_t got = read(socket, &http_buf, sizeof http_buf); + + if (got == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) + return http_parse_incomplete; + else if (got <= 0) + return http_parse_invalid; + for (ssize_t i = 0; i < got; i++) { int c = http_buf[i]; if (c == '\r') continue; - switch (state) { - case state_read_method: + switch (*state) { + case http_read_start: if (c == ' ') { - if (bbuf_cmp(tmp_buf, "GET") != 0) - goto fail; - state = state_read_path; - bbuf_reset(tmp_buf); + if (bbuf_cmp(buf, "GET") != 0) + return http_parse_invalid; + *state = http_read_path; + bbuf_reset(buf); break; } - if (!isalnum(c) || bbuf_len(tmp_buf) >= 16) - goto fail; - bbuf_putc(tmp_buf, c); + if (!isalnum(c) || bbuf_len(buf) >= 16) + return http_parse_invalid; + bbuf_putc(buf, c); break; - case state_read_path: + case http_read_path: if (c == ' ') { - if (bbuf_cmp(tmp_buf, "/metrics") != 0) - goto fail; - state = state_read_version; - bbuf_reset(tmp_buf); + if (bbuf_cmp(buf, "/metrics") != 0) + return http_parse_invalid; + *state = http_read_version; + bbuf_reset(buf); break; } - if (!isprint(c) || c == '\n' || bbuf_len(tmp_buf) >= 128) - goto fail; - bbuf_putc(tmp_buf, c); + if (!isprint(c) || c == '\n' || bbuf_len(buf) >= 128) + return http_parse_invalid; + bbuf_putc(buf, c); break; - case state_read_version: + case http_read_version: if (c == '\n') { - if (bbuf_cmp(tmp_buf, "HTTP/1.1") != 0) - goto fail; - state = state_skip_headers_1; - bbuf_reset(tmp_buf); + if (bbuf_cmp(buf, "HTTP/1.1") != 0) + return http_parse_invalid; + *state = http_skip_headers_1; + bbuf_reset(buf); break; } - if (!isgraph(c) || bbuf_len(tmp_buf) >= 16) - goto fail; - bbuf_putc(tmp_buf, c); + if (!isgraph(c) || bbuf_len(buf) >= 16) + return http_parse_invalid; + bbuf_putc(buf, c); break; - case state_skip_headers_1: + case http_skip_headers_1: if (c == '\n') - goto succeed; - state = state_skip_headers_2; + return http_parse_valid; + *state = http_skip_headers_2; break; - case state_skip_headers_2: + case http_skip_headers_2: if (c == '\n') - state = state_skip_headers_1; + *state = http_skip_headers_1; break; } } } - -succeed: - write_all(req->socket, http_success, sizeof http_success - 1); - return true; - -fail: - write_all(req->socket, http_error, sizeof http_error - 1); - return false; } |