diff options
author | Leah Neukirchen <leah@vuxu.org> | 2023-11-08 20:24:45 +0100 |
---|---|---|
committer | Leah Neukirchen <leah@vuxu.org> | 2023-11-08 20:25:06 +0100 |
commit | 2ee4684a1f63f24ae4017f8e0e1811b2944c5a7a (patch) | |
tree | ba57c561a4585d012202c76ff6bf52f164eea9df | |
parent | ce5a3ad8a4199ed421b50c2fe506648700c82e11 (diff) | |
download | nitro-2ee4684a1f63f24ae4017f8e0e1811b2944c5a7a.tar.gz nitro-2ee4684a1f63f24ae4017f8e0e1811b2944c5a7a.tar.xz nitro-2ee4684a1f63f24ae4017f8e0e1811b2944c5a7a.zip |
complete rewrite, use poll and custom deadlines
-rw-r--r-- | Makefile | 1 | ||||
-rw-r--r-- | nitro.c | 807 |
2 files changed, 464 insertions, 344 deletions
diff --git a/Makefile b/Makefile index 88979ce..3faa54a 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,4 @@ CFLAGS=-g -O2 -Wall -Wno-unused-parameter -Wextra -Wwrite-strings -LDLIBS=-luv ALL=nitro diff --git a/nitro.c b/nitro.c index e3ac580..18e2787 100644 --- a/nitro.c +++ b/nitro.c @@ -1,18 +1,30 @@ #include <sys/types.h> #include <sys/socket.h> +#include <sys/stat.h> +#include <sys/wait.h> #include <sys/un.h> #include <assert.h> +#include <dirent.h> #include <errno.h> +#include <fcntl.h> +#include <limits.h> +#include <poll.h> +#include <signal.h> #include <stdio.h> #include <stdlib.h> #include <string.h> +#include <time.h> #include <unistd.h> -#include <uv.h> +typedef int64_t deadline; /* milliseconds since epoch */ -uv_loop_t *loop; -int controlsock; +deadline time_now() +{ + struct timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + return (int64_t)now.tv_sec * 1000 + now.tv_nsec / 1000000; +} enum global_state { GLBL_UP, @@ -37,196 +49,152 @@ enum process_events { EVNT_WANT_DOWN, EVNT_WANT_RESTART, EVNT_EXITED, - // EVNT_DIED, health check failed + // EVNT_DIED, health check failed }; -struct process { - struct process *next; - +struct service { char name[64]; - char tag[64]; - uv_timer_t timer; - uv_process_t main; -// uv_process_t log; - uv_pipe_t log_pipe; - uv_file logfd[2]; - uint64_t start; + deadline start; + deadline deadline; + int timeout; + pid_t pid; + int logpipe[2]; enum process_state state; + char seen; + char islog; +} services[512]; - uv_write_t wr_handle; - char log_buffer[4096]; -}; - -struct process *process_head; - -void disarm_timeout(struct process *p); -void process_step(struct process *p, enum process_events ev); +int max_service = 0; +int controlsock; +int nullfd; +int selfpipe[2]; +int globallog[2]; -uv_file globallogfd[2]; -uv_pipe_t log_input; +volatile sig_atomic_t want_rescan; +volatile sig_atomic_t want_shutdown; void -callback_exit(uv_process_t *proc, int64_t exit_status, int term_signal) +proc_launch(int i) { - struct process *p = proc->data; - - disarm_timeout(p); + unsigned char status; + int alivepipefd[2]; + if (pipe(alivepipefd) < 0) + abort(); + fcntl(alivepipefd[0], F_SETFD, FD_CLOEXEC); + fcntl(alivepipefd[1], F_SETFD, FD_CLOEXEC); + + pid_t child = fork(); + if (child == 0) { + chdir(services[i].name); + + if (strcmp(services[i].name, "LOG") == 0) { + dup2(globallog[0], 0); + dup2(1, 2); + } else if (services[i].islog) { + dup2(services[i].logpipe[0], 0); + } else { + dup2(nullfd, 0); + if (services[i].logpipe[1] != -1) + dup2(services[i].logpipe[1], 1); + else if (globallog[1] != -1) + dup2(globallog[1], 1); + } - printf("pid %d exited with status %ld and signal %d\n", - proc->pid, exit_status, term_signal); + setsid(); - // uv_close((uv_handle_t *)proc, 0); + execl("run", "run", (char *)0); - p->main.pid = 0; + status = (errno == ENOENT ? 127 : 126); + write(alivepipefd[1], &status, 1); + _exit(status); + } else if (child < 0) { + abort(); /* XXX handle retry */ + } - process_step(p, EVNT_EXITED); -} + close(alivepipefd[1]); + if (read(alivepipefd[0], &status, 1) == 1) { + printf("exec failed with status %d\n", status); + close(alivepipefd[0]); -void -callback_timeout(uv_timer_t *handle) -{ - struct process *p = handle->data; - process_step(p, EVNT_TIMEOUT); -} + services[i].state = PROC_FATAL; + services[i].pid = 0; + services[i].start = time_now(); + services[i].timeout = 0; + services[i].deadline = 0; + return; + } + close(alivepipefd[0]); -void -arm_timeout(struct process *p, uint64_t timeout) -{ - uv_timer_init(loop, &p->timer); - p->timer.data = p; - uv_timer_start(&p->timer, callback_timeout, timeout, 0); + services[i].pid = child; + services[i].start = time_now(); + services[i].state = PROC_STARTING; + services[i].timeout = 2000; + services[i].deadline = 0; } void -disarm_timeout(struct process *p) +proc_shutdown(int i) { - uv_timer_stop(&p->timer); + if (services[i].pid) + kill(services[i].pid, SIGTERM); + + if (services[i].state != PROC_SHUTDOWN && + services[i].state != PROC_RESTART) { + services[i].state = PROC_SHUTDOWN; + services[i].timeout = 7000; + services[i].deadline = 0; + } } void -fixed_stream_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) +proc_kill(int i) { - struct process *p = handle->data; - - buf->base = p->log_buffer; - buf->len = 4096; -} + if (services[i].pid) + kill(services[i].pid, SIGKILL); -void -done(uv_write_t *req, int status) -{ + assert(services[i].state == PROC_SHUTDOWN || + services[i].state == PROC_RESTART); } void -read_proc_log(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) +proc_cleanup(int i) { - struct process *p = stream->data; - - if (nread < 0) { - printf("LOG ERROR: %s\n", uv_strerror(nread)); - return; + services[i].pid = 0; + services[i].timeout = 0; + services[i].deadline = 0; + services[i].state = PROC_STOPPED; + + if (global_state != GLBL_UP) { + if (services[i].logpipe[0] > 0) + close(services[i].logpipe[0]); + if (services[i].logpipe[1] > 0) + close(services[i].logpipe[1]); } - if (nread == UV_EOF) - return; - - uv_buf_t b[] = { - { .base = p->tag, .len = strlen(p->tag) }, - { .base = buf->base, .len = nread }, - }; - - /* - printf("isref: %d\n", uv_has_ref((uv_handle_t *)&p->wr_handle)); - printf("got log: %.*s\n", nread, buf->base); - */ - - uv_write(&p->wr_handle, (uv_stream_t *)&log_input, b, 2, done); } void -proc_launch(struct process *p) -{ - p->main = (uv_process_t){ 0 }; - p->main.data = p; - - uv_pipe(p->logfd, 0, 0); +proc_zap(int i) { + // XXX clean up log pipe? - uv_stdio_container_t child_stdio[3]; - child_stdio[0].flags = UV_INHERIT_FD; - child_stdio[0].data.fd = 0; - child_stdio[1].flags = UV_INHERIT_FD; - child_stdio[1].data.fd = p->logfd[1]; - child_stdio[2].flags = UV_INHERIT_FD; - child_stdio[2].data.fd = 2; - - uv_pipe_init(loop, &p->log_pipe, 0); - p->log_pipe.data = p; - - uv_pipe_open(&p->log_pipe, p->logfd[0]); - uv_read_start((uv_stream_t *)&p->log_pipe, fixed_stream_buffer, read_proc_log); - - uv_process_options_t options = { 0 }; - options.stdio_count = 3; - options.stdio = child_stdio; - options.exit_cb = callback_exit; - - char path[1024]; - snprintf(path, sizeof path, "%s/run", p->name); - - options.args = (char *[]){path, 0}; - options.file = path; - - int r = uv_spawn(loop, &p->main, &options); - if (r) { - fprintf(stderr, "uv_spawn: %s\n", uv_strerror(r)); - p->state = PROC_FATAL; - return; + if (!services[i].seen) { + printf("can garbage-collect %s\n", services[i].name); + services[i] = services[--max_service]; } - - p->start = uv_now(loop); - p->state = PROC_STARTING; - snprintf(p->tag, sizeof p->tag, "%s[%d]: ", p->name, p->main.pid); - - arm_timeout(p, 100); } -void -proc_shutdown(struct process *p) -{ - if (p->main.pid) - kill(p->main.pid, SIGTERM); - p->state = PROC_SHUTDOWN; - arm_timeout(p, 15000); -} void -proc_kill(struct process *p) +process_step(int i, enum process_events ev) { - if (p->main.pid) - kill(p->main.pid, SIGKILL); - - assert(p->state == PROC_SHUTDOWN || p->state == PROC_RESTART); -} - -void -proc_cleanup(struct process *p) -{ -// uv_close((uv_handle_t *)&p->main, 0); -// uv_close((uv_handle_t *)&p->log_pipe, 0); -// uv_close((uv_handle_t *)&p->timer, 0); - close(p->logfd[0]); - close(p->logfd[1]); -} - -void -process_step(struct process *p, enum process_events ev) -{ - printf("%d[%d] got %d %d\n", p->state, p->main.pid, ev, global_state); + printf("process %s[%d] state %d step %d\n", + services[i].name, services[i].pid, + services[i].state, ev); switch (ev) { case EVNT_WANT_UP: if (global_state != GLBL_UP) break; - switch (p->state) { + switch (services[i].state) { case PROC_STARTING: case PROC_UP: case PROC_DEAD: @@ -235,30 +203,32 @@ process_step(struct process *p, enum process_events ev) break; case PROC_SHUTDOWN: - p->state = PROC_RESTART; + services[i].state = PROC_RESTART; break; case PROC_STOPPED: case PROC_FATAL: case PROC_DELAY: - proc_launch(p); + proc_launch(i); break; } break; case EVNT_WANT_DOWN: - switch (p->state) { + switch (services[i].state) { case PROC_STARTING: case PROC_UP: case PROC_DEAD: case PROC_RESTART: case PROC_SHUTDOWN: - proc_shutdown(p); + proc_shutdown(i); break; case PROC_FATAL: case PROC_DELAY: - p->state = PROC_STOPPED; + services[i].state = PROC_STOPPED; + services[i].timeout = 0; + services[i].deadline = 0; case PROC_STOPPED: /* ignore, is down */ @@ -269,74 +239,81 @@ process_step(struct process *p, enum process_events ev) case EVNT_WANT_RESTART: if (global_state != GLBL_UP) break; - switch (p->state) { + switch (services[i].state) { case PROC_STARTING: case PROC_UP: case PROC_DEAD: case PROC_RESTART: case PROC_SHUTDOWN: - proc_shutdown(p); - p->state = PROC_RESTART; + proc_shutdown(i); + services[i].state = PROC_RESTART; break; case PROC_STOPPED: case PROC_FATAL: case PROC_DELAY: - proc_launch(p); + proc_launch(i); break; } break; case EVNT_EXITED: - switch (p->state) { + services[i].timeout = 0; + services[i].deadline = 0; + switch (services[i].state) { case PROC_STARTING: case PROC_UP: case PROC_DEAD: case PROC_RESTART: - proc_cleanup(p); + proc_cleanup(i); + if (global_state != GLBL_UP) + break; // XXX check too many restarts - uint64_t now = uv_now(loop); - if (now - p->start > 2000) { - proc_launch(p); + deadline now = time_now(); + if (now - services[i].start > 2000) { + proc_launch(i); } else { - p->state = PROC_DELAY; - arm_timeout(p, 1000); + services[i].state = PROC_DELAY; + services[i].timeout = 1000; + services[i].deadline = 0; } break; case PROC_SHUTDOWN: - proc_cleanup(p); - p->state = PROC_STOPPED; + proc_cleanup(i); + proc_zap(i); break; - case PROC_STOPPED: /* can't happen */ - case PROC_FATAL: /* can't happen */ - case PROC_DELAY: /* can't happen */ + case PROC_STOPPED: /* can't happen */ + case PROC_FATAL: /* can't happen */ + case PROC_DELAY: /* can't happen */ assert(!"invalid state transition"); break; } break; + case EVNT_TIMEOUT: - disarm_timeout(p); - switch (p->state) { + services[i].timeout = 0; + services[i].deadline = 0; + switch (services[i].state) { case PROC_DELAY: - proc_launch(p); + proc_launch(i); break; case PROC_STARTING: /* detect failed start */ - p->state = PROC_UP; + services[i].state = PROC_UP; break; case PROC_DEAD: - proc_shutdown(p); + proc_shutdown(i); break; case PROC_RESTART: case PROC_SHUTDOWN: - proc_kill(p); + proc_kill(i); break; case PROC_UP: @@ -349,49 +326,144 @@ process_step(struct process *p, enum process_events ev) } } -void -do_shutdown(int state) -{ - global_state = state; - for (struct process *p = process_head; p; p = p->next) - process_step(p, EVNT_WANT_DOWN); -} void -callback_signal(uv_signal_t *handle, int signum) +on_signal(int sig) { - switch (signum) { - case SIGINT: - do_shutdown(GLBL_WANT_SHUTDOWN); - break; + int old_errno = errno; + switch (sig) { + case SIGCHLD: + write(selfpipe[1], "", 1); + break; + case SIGINT: /* XXX for debugging */ case SIGTERM: - uv_signal_stop(handle); - uv_stop(loop); + want_shutdown = 1; + break; + case SIGHUP: + want_rescan = 1; break; } + + errno = old_errno; } -void -fixed_log_buffer(uv_handle_t *, size_t suggested_size, uv_buf_t *buf) +int +find_service(char *name) { - static char buffer[4096]; - buf->base = buffer; - buf->len = sizeof buffer; + for (int i = 0; i < max_service; i++) + if (strcmp(services[i].name, name) == 0) + return i; + + return -1; +} + +int +add_service(char *name) +{ + int i; + for (i = 0; i < max_service; i++) + if (strcmp(services[i].name, name) == 0) + break; + + if (i == max_service) { + max_service++; + + strcpy(services[i].name, name); + services[i].pid = 0; + services[i].logpipe[0] = -1; + services[i].logpipe[1] = -1; + services[i].state = PROC_DELAY; + services[i].timeout = 1; + services[i].deadline = 0; + services[i].islog = 0; + } + + services[i].seen = 1; + return i; } void -read_log(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) +rescan(int first) { - if (nread < 0) { - fprintf(stderr, "log error: %s\n", uv_strerror(nread)); - uv_read_stop(stream); - return; + int i; + for (i = 0; i < max_service; i++) + services[i].seen = 0; + + DIR *dir = opendir("."); + if (!dir) + abort(); + + struct dirent *ent; + while ((ent = readdir(dir))) { + char *name = ent->d_name; + struct stat st; + + if (name[0] == '.') + continue; + if (stat(name, &st) < 0) + continue; + if (!S_ISDIR(st.st_mode)) + continue; + + printf("SCAN %s\n", name); + + int i = add_service(name); + + if (strcmp(name, "LOG") == 0 && globallog[0] == -1) { + printf("making global pipe\n"); + pipe(globallog); + fcntl(globallog[1], F_SETFL, O_NONBLOCK); + fcntl(globallog[0], F_SETFD, FD_CLOEXEC); + fcntl(globallog[1], F_SETFD, FD_CLOEXEC); + } + + char buf[PATH_MAX]; + + if (first) { + snprintf(buf, sizeof buf, "%s/down", name); + if (stat(buf, &st) == 0) { + services[i].state = PROC_STOPPED; + services[i].timeout = 0; + } + } + + snprintf(buf, sizeof buf, "%s/log", name); + printf("buf=%s\n" ,buf); + + if (stat(buf, &st) == 0 && S_ISDIR(st.st_mode)) { + printf("SCAN %s\n", buf); + int j = add_service(buf); + services[j].islog = 1; + if (services[j].logpipe[0] == -1) { + pipe(services[i].logpipe); + fcntl(services[i].logpipe[0], F_SETFD, FD_CLOEXEC); + fcntl(services[i].logpipe[1], F_SETFD, FD_CLOEXEC); + services[j].logpipe[0] = services[i].logpipe[0]; + services[j].logpipe[1] = services[i].logpipe[1]; + } + } } - if (nread == UV_EOF) - return; + closedir(dir); + + for (i = 0; i < max_service; i++) + if (!services[i].seen) + process_step(i, EVNT_WANT_DOWN); +} - printf("LOG: %.*s", (int)nread, buf->base); +void +do_shutdown(int state) +{ + global_state = state; + for (int i = 0; i < max_service; i++) { + if (services[i].islog) + continue; + if (strcmp(services[i].name, "LOG") == 0) + continue; + + printf("want down %d %d\n", i, services[i].state); + process_step(i, EVNT_WANT_DOWN); + } } void @@ -419,6 +491,8 @@ open_control_socket() { perror("nitro: socket"); exit(111); } + fcntl(controlsock, F_SETFD, FD_CLOEXEC); + unlink(path); mode_t mask = umask(0077); int r = bind(controlsock, (struct sockaddr *)&addr, sizeof addr); @@ -429,37 +503,6 @@ open_control_socket() { } } -struct process * -new_service(char *name) -{ - struct stat st; - if (name[0] == '.') - return 0; - if (stat(name, &st) < 0) /* we are in SVDIR */ - return 0; - if (!S_ISDIR(st.st_mode)) - return 0; - - struct process *p = calloc(sizeof (struct process), 1); - strcpy(p->name, name); - p->state = PROC_DELAY; - - p->next = process_head; - process_head = p; - - return p; -} - -struct process * -find_service(char *name) -{ - for (struct process *p = process_head; p; p = p->next) - if (strcmp(p->name, name) == 0) - return p; - - return new_service(name); -} - int charsig(char c) { @@ -474,22 +517,15 @@ charsig(char c) case '2': return SIGUSR2; case 't': return SIGTERM; case 'k': return SIGKILL; + default: return 0; } - - return 0; } void -callback_control_socket(uv_poll_t *handle, int status, int events) -{ - if (status < 0) { - fprintf(stderr, "poll error: %s\n", uv_strerror(status)); - return; - } - +handle_control_sock() { char buf[256]; struct sockaddr_un src; - socklen_t srclen; + socklen_t srclen = 0; ssize_t r = recvfrom(controlsock, buf, sizeof buf, MSG_DONTWAIT, (struct sockaddr *)&src, &srclen); @@ -504,55 +540,59 @@ callback_control_socket(uv_poll_t *handle, int status, int events) return; } - printf("got %ld from %d [%d %d]\n", r, srclen, status, events); + printf(">>> got %ld from %d\n", r, srclen); if (r == 0) return; + buf[r] = 0; // chop trailing newline - if (buf[strlen(buf) - 1] == '\n') - buf[strlen(buf) - 1] = 0; + if (buf[r - 1] == '\n') + buf[r - 1] = 0; switch (buf[0]) { case 'l': - for (struct process *p = process_head; p; p = p->next) { - printf("%s[%d] %d\n", p->name, p->main.pid, p->state); + for (int i = 0; i < max_service; i++) { + printf(":: %s[%d] %d\n", services[i].name, services[i].pid, services[i].state); } goto ok; + case 's': + want_rescan = 1; + goto ok; case 'u': case 'd': case 'r': { - struct process *p = find_service(buf + 1); - if (p) { - if (buf[0] == 'u') - process_step(p, EVNT_WANT_UP); - else if (buf[0] == 'd') - process_step(p, EVNT_WANT_DOWN); - else if (buf[0] == 'r') - process_step(p, EVNT_WANT_RESTART); - goto ok; - } - goto fail; + int i = find_service(buf + 1); + if (i < 0) + goto fail; + + if (buf[0] == 'u') + process_step(i, EVNT_WANT_UP); + else if (buf[0] == 'd') + process_step(i, EVNT_WANT_DOWN); + else if (buf[0] == 'r') + process_step(i, EVNT_WANT_RESTART); + goto ok; } case 'S': - do_shutdown(GLBL_WANT_SHUTDOWN); + want_shutdown = 1; goto ok; case 'R': do_shutdown(GLBL_WANT_REBOOT); goto ok; default: - if (charsig(buf[0])) { - struct process *p = find_service(buf + 1); - if (p && p->main.pid) { - kill(p->main.pid, charsig(buf[0])); - goto ok; - } + if (!charsig(buf[0])) + goto fail; + int i = find_service(buf + 1); + printf("%s is %d\n", buf+1, i); + if (i >= 0 && services[i].pid) { + kill(services[i].pid, charsig(buf[0])); + goto ok; } goto fail; } - ok: if (srclen > 0) { char reply[] = "ok\n"; @@ -560,7 +600,6 @@ ok: MSG_DONTWAIT, (struct sockaddr *)&src, srclen); } return; - fail: if (srclen > 0) { char reply[] = "error\n"; @@ -569,106 +608,188 @@ fail: } } -void -load_services() -{ - DIR *dir = opendir("."); - if (!dir) - abort(); - - struct dirent *ent; - while ((ent = readdir(dir))) - new_service(ent->d_name); - closedir(dir); -} +#define CHLD 0 +#define CTRL 1 -void callback_stop_check(uv_check_t *handle) +void +has_died(pid_t pid, int status) { - if (global_state == GLBL_UP) - return; + for (int i = 0; i < max_service; i++) { + if (services[i].pid == pid) { + printf("service %s[%d] has died with status %d\n", + services[i].name, pid, status); + process_step(i, EVNT_EXITED); + return; + } - int up = 0; - for (struct process *p = process_head; p; p = p->next) { - printf("DBG %s %d %d\n", p->name, p->main.pid, p->state); - if (!(p->state == PROC_STOPPED || p->state == PROC_FATAL)) - up++; - } - if (up) { - printf("shutdown waiting for %d processes\n", up); - return; + // XXX handle logger? } - - uv_stop(loop); } int main(int argc, char *argv[]) { - if (argc < 1) - return 111; + int i; - loop = uv_default_loop(); - uv_disable_stdio_inheritance(); + chdir(argv[1]); - const char *dir = "/var/service"; - if (argc == 2) - dir = argv[1]; + nullfd = open("/dev/null", O_RDONLY | O_CLOEXEC); + if (nullfd < 0) { + perror("nitro: open /dev/null"); - if (chdir(dir) < 0) { - perror("nitro: chdir"); - return 111; + // use a closed pipe instead + int fd[2]; + pipe(fd); + nullfd = fd[0]; + close(fd[1]); } - signal(SIGPIPE, SIG_IGN); + pipe(selfpipe); + fcntl(selfpipe[0], F_SETFL, O_NONBLOCK); + fcntl(selfpipe[1], F_SETFL, O_NONBLOCK); + fcntl(selfpipe[0], F_SETFD, FD_CLOEXEC); + fcntl(selfpipe[1], F_SETFD, FD_CLOEXEC); + + globallog[0] = -1; + globallog[1] = -1; + + sigset_t allset; + sigfillset(&allset); + sigaction(SIGCHLD, &(struct sigaction){ + .sa_handler = on_signal, + .sa_mask = allset, + .sa_flags = SA_NOCLDSTOP | SA_RESTART, + }, 0); + sigaction(SIGHUP, &(struct sigaction){ + .sa_handler = on_signal, + .sa_mask = allset, + }, 0); + sigaction(SIGINT, &(struct sigaction){ + .sa_handler = on_signal, + .sa_mask = allset, + }, 0); +/* for debugging + sigaction(SIGTERM, &(struct sigaction){ + .sa_handler = on_signal, + .sa_mask = allset, + }, 0); +*/ - uv_signal_t sigterm; - uv_signal_init(loop, &sigterm); - uv_signal_start(&sigterm, callback_signal, SIGTERM); + open_control_socket(); - uv_signal_t sigint; - uv_signal_init(loop, &sigint); - uv_signal_start(&sigint, callback_signal, SIGINT); + global_state = GLBL_UP; - open_control_socket(); + rescan(1); - /* we use plain poll access for the control socket, so we can - handle requests without any memory overhead. */ - uv_poll_t control_poll; - uv_poll_init(loop, &control_poll, controlsock); + struct pollfd fds[2]; + fds[CHLD].fd = selfpipe[0]; + fds[CHLD].events = POLLIN; + fds[CTRL].fd = controlsock; + fds[CTRL].events = POLLIN; - uv_poll_start(&control_poll, UV_READABLE, callback_control_socket); + while (1) { + deadline now = time_now(); - uv_pipe(globallogfd, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE); - uv_pipe_t log_pipe; - uv_pipe_init(loop, &log_pipe, 0); - uv_pipe_open(&log_pipe, globallogfd[0]); - uv_pipe_init(loop, &log_input, 0); - uv_pipe_open(&log_input, globallogfd[1]); - uv_read_start((uv_stream_t *)&log_pipe, fixed_log_buffer, read_log); + int timeout = -1; - global_state = GLBL_UP; + printf("\nnow = %ld\n", now); + for (i = 0; i < max_service; i++) { + printf("-> %s[%d] %d %d %ld\n", services[i].name, + services[i].pid, services[i].state, + services[i].timeout, + services[i].deadline + ); + } - load_services(); + for (i = 0; i < max_service; i++) { +printf("TO %s %d\n", services[i].name, services[i].timeout); + if (services[i].timeout <= 0) + continue; - printf("nitro up at %d\n", getpid()); + if (services[i].deadline == 0) + services[i].deadline = now + services[i].timeout; - for (struct process *p = process_head; p; p = p->next) { - char path[1024]; - struct stat st; - snprintf(path, sizeof path, "%s/down", p->name); - if (stat(path, &st)) - process_step(p, EVNT_WANT_DOWN); - else - process_step(p, EVNT_WANT_UP); - } + if (services[i].deadline <= now) { + printf("timeout for %d\n", i); + process_step(i, EVNT_TIMEOUT); + } - uv_check_t stop_check; - uv_check_init(loop, &stop_check); - uv_check_start(&stop_check, callback_stop_check); + if (services[i].timeout <= 0) + continue; - uv_run(loop, UV_RUN_DEFAULT); + if (services[i].deadline == 0) + services[i].deadline = now + services[i].timeout; - uv_loop_close(loop); - return 0; + int64_t wait_for = services[i].deadline - now; + if (wait_for > 0) { + if (timeout == -1 || (wait_for < timeout)) + timeout = wait_for; + } + } + + printf("poll(timeout=%d)\n", timeout); + + poll(fds, 2, timeout); + + if (fds[CHLD].revents & POLLIN) { + char ch; + + printf("data on self pipe\n"); + + while (read(selfpipe[0], &ch, 1) == 1) + ; + errno = 0; + } + + while (1) { + int wstatus = 0; + int r = waitpid(-1, &wstatus, WNOHANG); + if (r == 0 || (r < 0 && errno == ECHILD)) + break; + if (r < 0) + abort(); + has_died(r, wstatus); + } + + if (fds[CTRL].revents & POLLIN) { + handle_control_sock(); + } + + if (want_rescan) { + rescan(0); + want_rescan = 0; + } + + if (want_shutdown) { + do_shutdown(GLBL_WANT_SHUTDOWN); + } + + if (global_state != GLBL_UP) { + int up = 0; + int uplog = 0; + for (i = 0; i < max_service; i++) { + printf("DBG %s %d %d\n", services[i].name, services[i].pid, services[i].state); + if (!(services[i].state == PROC_STOPPED || + services[i].state == PROC_FATAL)) { + up++; + if (services[i].islog) + uplog++; + if (strcmp(services[i].name, "LOG")) + uplog++; + } + } + if (up) { + printf("shutdown waiting for %d processes\n", up); + if (up == uplog) { + printf("signalling %d log processes\n", uplog); + for (int i = 0; i < max_service; i++) + if (services[i].islog || strcmp(services[i].name, "LOG") == 0) + process_step(i, EVNT_WANT_DOWN); + } + } else { + break; + } + } + } } |