about summary refs log tree commit diff
path: root/scrape.c
diff options
context:
space:
mode:
authorHeikki Kallasjoki <fis@zem.fi>2018-12-23 17:28:59 +0000
committerHeikki Kallasjoki <fis+github@zem.fi>2018-12-23 21:50:27 +0000
commit96a47dd416b5892c648ad17d01ce29481e9c5f6d (patch)
tree6fe1a2cb23d7b1f91ae8be12d10d392165d1824c /scrape.c
parentcfaf78325bb636c38166a2a19f05c1c89f010832 (diff)
downloadnano-exporter-96a47dd416b5892c648ad17d01ce29481e9c5f6d.tar.gz
nano-exporter-96a47dd416b5892c648ad17d01ce29481e9c5f6d.tar.xz
nano-exporter-96a47dd416b5892c648ad17d01ce29481e9c5f6d.zip
Allow multiple requests in parallel.
Mostly resolves #19, though a follow-up commit will add a request
timeout to stop overly slow requests from taking up slots.

The implementation basically turns the request sockets non-blocking,
and adds them to the poll loop used to accept new connections. Scrape
writes are collected to a request-specific buffer, one collector at a
time, and then queued for writing. Every time poll reports progress
can be made, that request is processed until the next read/write
operation would block, after which control returns to the poll loop.
Diffstat (limited to 'scrape.c')
-rw-r--r--scrape.c326
1 files changed, 245 insertions, 81 deletions
diff --git a/scrape.c b/scrape.c
index f632e4e..1528cc7 100644
--- a/scrape.c
+++ b/scrape.c
@@ -17,6 +17,8 @@
 #define _POSIX_C_SOURCE 200809L
 
 #include <ctype.h>
+#include <errno.h>
+#include <fcntl.h>
 #include <netdb.h>
 #include <netinet/in.h>
 #include <poll.h>
@@ -35,22 +37,57 @@
 
 #define MAX_LISTEN_SOCKETS 4
 #define MAX_BACKLOG 16
+#define MAX_REQUESTS 16
+
+enum req_state {
+  req_state_inactive,
+  req_state_read,
+  req_state_write_headers,
+  req_state_write_metrics,
+  req_state_write_error,
+};
+
+enum http_parse_state {
+  http_read_start,
+  http_read_path,
+  http_read_version,
+  http_skip_headers_1,
+  http_skip_headers_2,
+};
 
 struct scrape_req {
-  int socket;
+  enum req_state state;
+  union {
+    enum http_parse_state parse_state;
+    unsigned collector;
+  };
   bbuf *buf;
+  char *io;
+  size_t io_size;
 };
 
 struct scrape_server {
-  struct pollfd fds[MAX_LISTEN_SOCKETS];
-  nfds_t nfds;
+  struct scrape_req reqs[MAX_REQUESTS];
+  struct pollfd fds[MAX_LISTEN_SOCKETS + MAX_REQUESTS];
+  nfds_t nfds_listen;
+  nfds_t nfds_req;
 };
 
-static bool handle_http(struct scrape_req *req);
+static void req_start(struct scrape_server *srv, int socket);
+static void req_close(struct scrape_server *srv, unsigned r);
+static void req_process(struct scrape_server *srv, unsigned r, unsigned ncoll, const struct collector *coll[], void *coll_ctx[]);
+
+// TCP socket server
 
 scrape_server *scrape_listen(const char *port) {
   scrape_server *srv = must_malloc(sizeof *srv);
-  srv->nfds = 0;
+
+  srv->nfds_listen = 0;
+  srv->nfds_req = 0;
+  for (unsigned i = 0; i < MAX_REQUESTS; i++) {
+    srv->reqs[i].state = req_state_inactive;
+    srv->reqs[i].buf = 0;
+  }
 
   int ret;
 
@@ -69,7 +106,7 @@ scrape_server *scrape_listen(const char *port) {
       return false;
     }
 
-    for (struct addrinfo *a = addrs; a && srv->nfds < MAX_LISTEN_SOCKETS; a = a->ai_next) {
+    for (struct addrinfo *a = addrs; a && srv->nfds_listen < MAX_LISTEN_SOCKETS; a = a->ai_next) {
       int s = socket(a->ai_family, a->ai_socktype, a->ai_protocol);
       if (s == -1) {
         perror("socket");
@@ -96,13 +133,13 @@ scrape_server *scrape_listen(const char *port) {
         continue;
       }
 
-      srv->fds[srv->nfds].fd = s;
-      srv->fds[srv->nfds].events = POLLIN;
-      srv->nfds++;
+      srv->fds[srv->nfds_listen].fd = s;
+      srv->fds[srv->nfds_listen].events = POLLIN;
+      srv->nfds_listen++;
     }
   }
 
-  if (srv->nfds == 0) {
+  if (srv->nfds_listen == 0) {
     fprintf(stderr, "failed to bind any sockets\n");
     return 0;
   }
@@ -111,19 +148,18 @@ scrape_server *scrape_listen(const char *port) {
 }
 
 void scrape_serve(scrape_server *srv, unsigned ncoll, const struct collector *coll[], void *coll_ctx[]) {
-  struct scrape_req req;
-  req.buf = bbuf_alloc(BUF_INITIAL, BUF_MAX);
-
   int ret;
 
   while (1) {
-    ret = poll(srv->fds, srv->nfds, -1);
+    ret = poll(srv->fds, srv->nfds_listen + srv->nfds_req, -1);
     if (ret == -1) {
       perror("poll");
       break;
     }
 
-    for (nfds_t i = 0; i < srv->nfds; i++) {
+    // handle incoming connections
+
+    for (nfds_t i = 0; i < srv->nfds_listen; i++) {
       if (srv->fds[i].revents == 0)
         continue;
       if (srv->fds[i].revents != POLLIN) {
@@ -131,29 +167,46 @@ void scrape_serve(scrape_server *srv, unsigned ncoll, const struct collector *co
         return;
       }
 
-      req.socket = accept(srv->fds[i].fd, 0, 0);
-      if (req.socket == -1) {
+      int s = accept(srv->fds[i].fd, 0, 0);
+      if (s == -1) {
         perror("accept");
         continue;
       }
 
-      if (handle_http(&req)) {
-        for (unsigned c = 0; c < ncoll; c++)
-          coll[c]->collect(&req, coll_ctx[c]);
-      }
-      close(req.socket);
+      req_start(srv, s);
+    }
+
+    // handle ongoing requests
+
+    for (nfds_t i = srv->nfds_listen; i < srv->nfds_listen + srv->nfds_req; i++) {
+      if (srv->fds[i].fd < 0 || srv->fds[i].revents == 0)
+        continue;
+
+      unsigned r = i - srv->nfds_listen;
+      if ((srv->fds[i].revents & ~(POLLIN | POLLOUT)) != 0)
+        req_close(srv, r);
+      else
+        req_process(srv, r, ncoll, coll, coll_ctx);
     }
   }
 }
 
 void scrape_close(scrape_server *srv) {
-  for (nfds_t i = 0; i < srv->nfds; i++)
+  for (nfds_t i = 0; i < srv->nfds_listen; i++)
     close(srv->fds[i].fd);
+  for (unsigned r = 0; r < MAX_REQUESTS; r++)
+    if (srv->reqs[r].state != req_state_inactive)
+      req_close(srv, r);
+  if (srv->reqs[0].buf)
+    bbuf_free(srv->reqs[0].buf);
   free(srv);
 }
 
+// scrape write API implementation
+
 void scrape_write(scrape_req *req, const char *metric, const struct label *labels, double value) {
-  bbuf_reset(req->buf);
+  if (req->state != req_state_write_metrics)
+    return;
 
   bbuf_puts(req->buf, metric);
 
@@ -168,16 +221,72 @@ void scrape_write(scrape_req *req, const char *metric, const struct label *label
   }
 
   bbuf_putf(req->buf, " %.16g\n", value);
-
-  size_t buf_len;
-  const char *buf = bbuf_get(req->buf, &buf_len);
-  write_all(req->socket, buf, buf_len);
 }
 
 void scrape_write_raw(scrape_req *req, const void *buf, size_t len) {
-  write_all(req->socket, buf, len);
+  bbuf_put(req->buf, buf, len);
 }
 
+// request state management
+
+static void req_start(struct scrape_server *srv, int s) {
+  int flags = fcntl(s, F_GETFL);
+  if (flags == -1 || fcntl(s, F_SETFL, flags | O_NONBLOCK) == -1) {
+    perror("fcntl");
+    close(s);
+    return;
+  }
+
+  unsigned r = 0;
+  while (r < MAX_REQUESTS && srv->reqs[r].state != req_state_inactive)
+    r++;
+  if (r == MAX_REQUESTS) {
+    close(s);
+    return;
+  }
+
+  if (r >= srv->nfds_req)
+    srv->nfds_req = r + 1;
+
+  struct scrape_req *req = &srv->reqs[r];
+  struct pollfd *pfd = &srv->fds[srv->nfds_listen + r];
+
+  req->state = req_state_read;
+  req->parse_state = http_read_start;
+  if (!req->buf)
+    req->buf = bbuf_alloc(BUF_INITIAL, BUF_MAX);
+
+  pfd->fd = s;
+  pfd->events = POLLIN;
+  pfd->revents = POLLIN;  // pretend, to do the first read immediately
+}
+
+static void req_close(struct scrape_server *srv, unsigned r) {
+  srv->reqs[r].state = req_state_inactive;
+  if (r == 0) {
+    // keep the reqs[0] buffer for reuse
+    bbuf_reset(srv->reqs[0].buf);
+  } else {
+    bbuf_free(srv->reqs[r].buf);
+    srv->reqs[r].buf = 0;
+  }
+
+  nfds_t n = srv->nfds_listen + r;
+  close(srv->fds[n].fd);
+
+  srv->fds[n].fd = -1;
+  while (srv->nfds_req > 0 && srv->fds[srv->nfds_listen + srv->nfds_req - 1].fd < 0)
+    srv->nfds_req--;
+}
+
+enum http_parse_result {
+  http_parse_incomplete,
+  http_parse_valid,
+  http_parse_invalid,
+};
+
+static enum http_parse_result http_parse(int socket, enum http_parse_state *state, bbuf *buf);
+
 static const char http_success[] =
     "HTTP/1.1 200 OK\r\n"
     "Server: nano-exporter\r\n"
@@ -194,86 +303,141 @@ static const char http_error[] =
     "This is not a general-purpose HTTP server.\r\n"
     ;
 
-static bool handle_http(struct scrape_req *req) {
-  unsigned char http_buf[1024];
-  bbuf *tmp_buf = req->buf;
+static void req_process(struct scrape_server *srv, unsigned r, unsigned ncoll, const struct collector *coll[], void *coll_ctx[]) {
+  // TODO: add timeout support
+
+  struct scrape_req *req = &srv->reqs[r];
+  struct pollfd *pfd = &srv->fds[srv->nfds_listen + r];
+
+  if (req->state == req_state_inactive)
+    return;
+
+  if (req->state == req_state_read) {
+    enum http_parse_result ret = http_parse(pfd->fd, &req->parse_state, req->buf);
+
+    if (ret == http_parse_incomplete)
+      return;  // try again after polling
+
+    if (ret == http_parse_valid) {
+      req->state = req_state_write_headers;
+      req->io = (char *) http_success;
+      req->io_size = sizeof http_success - 1;
+    } else {
+      req->state = req_state_write_error;
+      req->io = (char *) http_error;
+      req->io_size = sizeof http_error - 1;
+    }
 
-  enum {
-    state_read_method,
-    state_read_path,
-    state_read_version,
-    state_skip_headers_1,
-    state_skip_headers_2
-  } state = state_read_method;
-  bbuf_reset(tmp_buf);
+    pfd->events = POLLOUT;
+  }
+
+rewrite:
+  while (req->io_size > 0) {
+    ssize_t wrote = write(pfd->fd, req->io, req->io_size);
+
+    if (wrote == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
+      return;  // try again after polling
+    if (wrote <= 0) {
+      req_close(srv, r);
+      return;
+    }
+
+    req->io += wrote;
+    req->io_size -= wrote;
+  }
+
+  if (req->state == req_state_write_error) {
+    req_close(srv, r);
+    return;
+  }
+
+  if (req->state == req_state_write_headers) {
+    req->state = req_state_write_metrics;
+    req->collector = 0;
+  }
+
+  while (req->collector < ncoll) {
+    bbuf_reset(req->buf);
+    coll[req->collector]->collect(req, coll_ctx[req->collector]);
+    req->collector++;
+
+    if (bbuf_len(req->buf) > 0) {
+      req->io = bbuf_get(req->buf, &req->io_size);
+      goto rewrite;
+    }
+  }
+
+  req_close(srv, r);
+}
+
+// HTTP protocol functions
+
+static enum http_parse_result http_parse(int socket, enum http_parse_state *state, bbuf *buf) {
+  unsigned char http_buf[1024];
 
   while (true) {
-    ssize_t got = read(req->socket, &http_buf, sizeof http_buf);
-    if (got <= 0)
-      return false;
+    ssize_t got = read(socket, &http_buf, sizeof http_buf);
+
+    if (got == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
+      return http_parse_incomplete;
+    else if (got <= 0)
+      return http_parse_invalid;
+
     for (ssize_t i = 0; i < got; i++) {
       int c = http_buf[i];
       if (c == '\r')
         continue;
 
-      switch (state) {
-        case state_read_method:
+      switch (*state) {
+        case http_read_start:
           if (c == ' ') {
-            if (bbuf_cmp(tmp_buf, "GET") != 0)
-              goto fail;
-            state = state_read_path;
-            bbuf_reset(tmp_buf);
+            if (bbuf_cmp(buf, "GET") != 0)
+              return http_parse_invalid;
+            *state = http_read_path;
+            bbuf_reset(buf);
             break;
           }
-          if (!isalnum(c) || bbuf_len(tmp_buf) >= 16)
-            goto fail;
-          bbuf_putc(tmp_buf, c);
+          if (!isalnum(c) || bbuf_len(buf) >= 16)
+            return http_parse_invalid;
+          bbuf_putc(buf, c);
           break;
 
-        case state_read_path:
+        case http_read_path:
           if (c == ' ') {
-            if (bbuf_cmp(tmp_buf, "/metrics") != 0)
-              goto fail;
-            state = state_read_version;
-            bbuf_reset(tmp_buf);
+            if (bbuf_cmp(buf, "/metrics") != 0)
+              return http_parse_invalid;
+            *state = http_read_version;
+            bbuf_reset(buf);
             break;
           }
-          if (!isprint(c) || c == '\n' || bbuf_len(tmp_buf) >= 128)
-            goto fail;
-          bbuf_putc(tmp_buf, c);
+          if (!isprint(c) || c == '\n' || bbuf_len(buf) >= 128)
+            return http_parse_invalid;
+          bbuf_putc(buf, c);
           break;
 
-        case state_read_version:
+        case http_read_version:
           if (c == '\n') {
-            if (bbuf_cmp(tmp_buf, "HTTP/1.1") != 0)
-              goto fail;
-            state = state_skip_headers_1;
-            bbuf_reset(tmp_buf);
+            if (bbuf_cmp(buf, "HTTP/1.1") != 0)
+              return http_parse_invalid;
+            *state = http_skip_headers_1;
+            bbuf_reset(buf);
             break;
           }
-          if (!isgraph(c) || bbuf_len(tmp_buf) >= 16)
-            goto fail;
-          bbuf_putc(tmp_buf, c);
+          if (!isgraph(c) || bbuf_len(buf) >= 16)
+            return http_parse_invalid;
+          bbuf_putc(buf, c);
           break;
 
-        case state_skip_headers_1:
+        case http_skip_headers_1:
           if (c == '\n')
-            goto succeed;
-          state = state_skip_headers_2;
+            return http_parse_valid;
+          *state = http_skip_headers_2;
           break;
-        case state_skip_headers_2:
+        case http_skip_headers_2:
           if (c == '\n')
-            state = state_skip_headers_1;
+            *state = http_skip_headers_1;
           break;
       }
     }
   }
-
-succeed:
-  write_all(req->socket, http_success, sizeof http_success - 1);
-  return true;
-
-fail:
-  write_all(req->socket, http_error, sizeof http_error - 1);
-  return false;
 }