diff options
-rw-r--r-- | rvnit.c | 624 |
1 files changed, 624 insertions, 0 deletions
diff --git a/rvnit.c b/rvnit.c new file mode 100644 index 0000000..a93d2d0 --- /dev/null +++ b/rvnit.c @@ -0,0 +1,624 @@ +/* accept4 */ +#define _GNU_SOURCE + +#include <sys/wait.h> +#include <sys/socket.h> +#include <sys/un.h> +//#include <sys/stat.h> + +#include <ctype.h> +#include <dirent.h> +#include <errno.h> +#include <fcntl.h> +#include <poll.h> +#include <pthread.h> +#include <signal.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <time.h> +#include <unistd.h> + +#define MAX_SV 512 + +struct entry { + char level; + char name[62]; + char logged; + pid_t pid; + pid_t display_pid; // not reset to 0, needed for oneshots + int logfd[2]; + time_t start; + int status; + enum { + UNDEF, + UP, + DOWN, + } state; +}; + +struct entry services[MAX_SV]; +int level; + +pthread_t main_thread; +pthread_t socket_thread; +pthread_t logger_thread; + +int selflogfd[2]; + +sig_atomic_t got_intr; +sig_atomic_t got_sigusr1; + +void +on_sigint(int sig) +{ + (void)sig; + got_intr = 1; +} + +void +on_sigusr1(int sig) +{ + (void)sig; + got_sigusr1 = 1; +} + +void +on_sigusr2(int sig) +{ + (void)sig; +} + +void +restart(int i) +{ + if (services[i].pid > 0) + return; + if (services[i].state != UP) + return; + + int delay = 0; + time_t now = time(0); + if (services[i].start == now) + delay = 1; + + int loggerpipe[2] = { -1, -1 }; + + if (services[i].logfd[0] == -1) { + pipe(services[i].logfd); + fcntl(services[i].logfd[0], F_SETFL, O_NONBLOCK); + fcntl(services[i].logfd[1], F_SETFL, O_NONBLOCK); + + pthread_kill(logger_thread, SIGUSR2); + } + + if (services[i].name[2] == 'L') { + for (int j = 0; j < MAX_SV; j++) { + if (strcmp(services[i].name + 3, + services[j].name + 3) == 0 && + services[j].name[2] == 'D') { + if (services[j].logfd[0] == -1) { + // hook up logger + pipe(loggerpipe); + services[j].logfd[0] = loggerpipe[0]; + services[j].logfd[1] = loggerpipe[1]; + services[j].logged = 1; + } else { + // recover loggerpipe + loggerpipe[0] = services[j].logfd[0]; + loggerpipe[1] = services[j].logfd[1]; + } + break; + } + } + } + + pid_t child = fork(); + if (child == 0) { + dup2(services[i].logfd[1], 1); + if (loggerpipe[0] != -1) { // loggers get read end of loggerpipe + dup2(loggerpipe[0], 0); + close(loggerpipe[0]); + close(loggerpipe[1]); + } + + close(services[i].logfd[0]); + close(services[i].logfd[1]); + sleep(delay); + execl(services[i].name, + services[i].name, + (char *)0); + exit(-1); + } else if (child > 0) { + fcntl(services[i].logfd[0], F_SETFD, FD_CLOEXEC); + fcntl(services[i].logfd[1], F_SETFD, FD_CLOEXEC); + if (loggerpipe[0] != -1) { + fcntl(loggerpipe[0], F_SETFD, FD_CLOEXEC); + fcntl(loggerpipe[1], F_SETFD, FD_CLOEXEC); + } + services[i].pid = services[i].display_pid = child; + services[i].start = now; + } else { + abort(); + } +} + +void * +socket_loop(void* ignored) +{ + (void)ignored; + + const char *path = "/tmp/rvnit.sock"; + + struct sockaddr_un addr = { 0 }; + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, path, sizeof addr.sun_path - 1); + int listenfd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0); + if (listenfd < 0) { + perror("socket"); + exit(111); + } + unlink(path); + int r = bind(listenfd, (struct sockaddr *)&addr, sizeof addr); + if (r < 0) { + perror("bind"); + exit(111); + } + + r = listen(listenfd, SOMAXCONN); + if (r < 0) { + perror("listen"); + exit(111); + } + + while (1) { + int connfd = accept4(listenfd, 0, 0, SOCK_CLOEXEC); + if (connfd < 0) + continue; + + char cmd = 0; + read(connfd, &cmd, 1); + write(connfd, "ok\n", 3); + + if (cmd == 'l') { + dprintf(connfd, "level=%d\n", level); + } + + if (cmd == 's') { + for (int i = 0; i < MAX_SV; i++) { + if (services[i].name[2] == 'D' || + services[i].name[2] == 'L') + dprintf(connfd, "%s pid=%d status=%d\n", + services[i].name, + services[i].pid, + services[i].status); + } + } + + if (cmd == 'd' || cmd == 'u') { + char buf[63]; + ssize_t rd = read(connfd, buf, sizeof buf); + if (rd > 0) { + buf[rd] = 0; + printf("got %c|%s|\n", cmd, buf); + + for (int i = 0; i < MAX_SV; i++) { + if (strcmp(services[i].name, buf) == 0) { + if (cmd == 'd') { + printf("setting %s down\n", + services[i].name); + services[i].state = DOWN; + } else if (cmd == 'u') { + printf("setting %s up\n", + services[i].name); + services[i].state = UP; + } + } + } + } + + pthread_kill(main_thread, SIGUSR1); + } + + if (cmd == 'X') { + pthread_kill(main_thread, SIGPWR); + } + + close(connfd); + } + + return 0; +} + +void * +logger_loop(void* ignored) +{ + (void)ignored; + +// sigaction(SIGUSR2, &(struct sigaction){.sa_handler=on_sigusr2}, 0); + + while (1) { + struct pollfd fds[MAX_SV]; + nfds_t nfds = 0; + + fds[nfds].fd = selflogfd[0]; + fds[nfds].events = POLLIN; + nfds++; + + for (int i = 0; i < MAX_SV; i++) { + if (/*services[i].pid > 0 && */ !services[i].logged && services[i].logfd[0] > 0) { + fds[nfds].fd = services[i].logfd[0]; + fds[nfds].events = POLLIN; + nfds++; + } + } + +// printf("waiting for %d logs\n", (int)nfds); + int n = poll(fds, nfds, -1); + if (n < 0) + continue; + + if (n == 1 && (fds[0].revents & POLLHUP) && + !(fds[0].revents & POLLIN)) { + /* selflog was closed, end thread */ + break; + } + + for (nfds_t j = 0; j < nfds; j++) { + // XXX HUP, NVAL + if (!(fds[j].revents & POLLIN)) + continue; + + char buf[4096]; + ssize_t rd = read(fds[j].fd, buf, sizeof buf); + if (rd < 0) { + perror("read"); + continue; + } + if (fds[j].fd == selflogfd[0]) { + printf("%s[%d]: %.*s", + "rvnit", + (int)getpid(), + (int)rd, + buf); + continue; + } + for (int i = 0; i < MAX_SV; i++) { + if (services[i].logfd[0] == fds[j].fd) { + printf("%s[%d]: %.*s", + services[i].name, + services[i].display_pid, + (int)rd, + buf + ); + break; + } + } + } + } + + return 0; +} + +int +reap(pid_t pid, int status) +{ + for (int i = 0; i < MAX_SV; i++) { + if (services[i].pid != pid) + continue; + + services[i].pid = 0; + services[i].status = status; + + dprintf(2, "reaped %s[%d] with status %d during level=%d\n", + services[i].name, pid, status, level); + + return i; + } + + return -1; +} + +int +main() +{ + pipe(selflogfd); + fcntl(selflogfd[0], F_SETFL, O_NONBLOCK); + fcntl(selflogfd[1], F_SETFL, O_NONBLOCK); + fcntl(selflogfd[0], F_SETFD, FD_CLOEXEC); + fcntl(selflogfd[1], F_SETFD, FD_CLOEXEC); + + sigaction(SIGUSR2, &(struct sigaction){.sa_handler=on_sigusr2}, 0); + + main_thread = pthread_self(); + pthread_create(&socket_thread, 0, socket_loop, 0); + pthread_create(&logger_thread, 0, logger_loop, 0); + + if (chdir("sv") < 0) { + perror("chdir"); + return 111; + } + + DIR *dir = opendir("."); + if (!dir) + return 111; + + int i = 0; + + struct dirent *ent; + while ((ent = readdir(dir))) { + if (!isdigit(ent->d_name[0]) || !isdigit(ent->d_name[1])) + continue; + + services[i].level = 10 * (ent->d_name[0] - '0') + + (ent->d_name[1] - '0'); + (void)snprintf(services[i].name, sizeof services[i].name, + "%s", ent->d_name); + // follow symlinks + services[i].state = access(ent->d_name, X_OK) == 0 ? UP : DOWN; + services[i].logfd[0] = -1; + services[i].logfd[1] = -1; + services[i].logged = 0; + i++; + } + + closedir(dir); + + dprintf(selflogfd[1], "booting\n"); + + for (level = 0; level < 100; level++) { + /* spawn all of level */ + int oneshot = 0; + + // spawn loggers first + for (i = 0; i < MAX_SV; i++) { + if (services[i].level != level) + continue; + + if (services[i].state != UP) + continue; + + if (services[i].name[2] == 'L') + restart(i); + } + + // spawn oneshots and daemons + for (i = 0; i < MAX_SV; i++) { + if (services[i].level != level) + continue; + + if (services[i].state != UP) + continue; + + if (services[i].name[2] == 'S') { + restart(i); + oneshot++; + } else if (services[i].name[2] == 'D') { + restart(i); + } + } + + while (oneshot) { + int status = 0; + int pid = wait(&status); + + if (pid < 0 && errno == ECHILD) + break; + + int i = reap(pid, status); + if (i < 0) + continue; + + if (services[i].level != level) + continue; + + if (services[i].name[2] == 'S') { + oneshot--; + } else if (services[i].name[2] == 'D' || + services[i].name[2] == 'L') { + restart(i); + } + } + } + + sigaction(SIGINT, &(struct sigaction){.sa_handler=on_sigint}, 0); + sigaction(SIGPWR, &(struct sigaction){.sa_handler=on_sigint}, 0); + sigaction(SIGUSR1, &(struct sigaction){.sa_handler=on_sigusr1}, 0); + + dprintf(selflogfd[1], "system up\n"); + + while (1) { + if (got_intr) + break; + + if (got_sigusr1) { + printf("rescanning state\n"); + for (i = 0; i < MAX_SV; i++) { + if (services[i].name[2] == 'D' || + services[i].name[2] == 'L') { + if (services[i].state == UP && + services[i].pid == 0) + restart(i); + if (services[i].state == DOWN && + services[i].pid > 0) + kill(services[i].pid, SIGTERM); + } + } + + got_sigusr1 = 0; + } + + int status = 0; + errno = 0; + int pid = wait(&status); + + if (pid < 0) { + if (errno == ECHILD) + break; + if (errno == EINTR) + continue; + } + + int i = reap(pid, status); + if (i < 0) + continue; + + if (services[i].name[2] == 'D' || + services[i].name[2] == 'L') { + dprintf(selflogfd[1], "%s terminated with status %d\n", services[i].name, services[i].status); + if (services[i].state == UP) { + dprintf(selflogfd[1], "restarting %s\n", services[i].name); + restart(i); + } + } + } + + dprintf(selflogfd[1], "shutting down\n"); + +// sigaction(SIGINT, &(struct sigaction){.sa_handler=SIG_DFL}, 0); + + + struct timespec timeout = {7, 0}; + sigset_t childset; + sigemptyset(&childset); + sigaddset(&childset, SIGCHLD); + + + for (level = 99; level >= 0; level--) { + /* kill all of level */ + int oneshot = 0; + int daemons = 0; + + for (i = 0; i < MAX_SV; i++) { + if (services[i].level != level) + continue; + + if (services[i].name[2] == 'K' && + services[i].state == UP) { + dprintf(2, "todo: shutdown oneshot %s\n", services[i].name); + oneshot++; + } + if (services[i].name[2] == 'D' && + services[i].pid > 0) { + dprintf(2, "todo: sending sigterm to %s\n", services[i].name); + daemons++; + } + } + + dprintf(2, "level=%d oneshot=%d daemons=%d\n", + level, oneshot, daemons); + } + + for (level = 99; level >= 0; level--) { + /* kill all of level */ + int oneshot = 0; + int daemons = 0; + + for (i = 0; i < MAX_SV; i++) { + if (services[i].level != level) + continue; + + if (services[i].name[2] == 'K' && + services[i].state == UP) { + dprintf(selflogfd[1], "starting shutdown oneshot %s\n", services[i].name); + restart(i); + oneshot++; + } + if (services[i].name[2] == 'D' && + services[i].pid > 0) { + dprintf(selflogfd[1], "sending sigterm to %s\n", services[i].name); + kill(services[i].pid, SIGTERM); + kill(services[i].pid, SIGCONT); + daemons++; + } + } + + dprintf(2, "level=%d oneshot=%d daemons=%d\n", + level, oneshot, daemons); + usleep(100000); + + // XXX figure out logger shutdown + + while (oneshot) { + int status = 0; + int pid = wait(&status); + + if (pid < 0 && errno == ECHILD) + break; + + int i = reap(pid, status); + if (i < 0) + continue; + + if (services[i].level != level) + continue; + + if (services[i].name[2] == 'K') { + dprintf(selflogfd[1], "oneshot %s exited with status %d\n", services[i].name, services[i].status); + oneshot--; + } else if (services[i].name[2] == 'D') { + dprintf(selflogfd[1], "daemon %s exited with status %d\n", services[i].name, services[i].status); +// if (services[i].logged) +// close(services[i].logfd[1]); + daemons--; + } + } + + // only daemons are left, wait up to 7s before sending + // them SIGKILL. + + if (!daemons) + continue; + + sigprocmask(SIG_BLOCK, &childset, 0); + + while (daemons) { + int status = 0; + int pid = waitpid(-1, &status, WNOHANG); + if (pid == 0) { // nothing to reap + if (sigtimedwait(&childset, 0, &timeout) == SIGCHLD) + continue; + if (errno == EAGAIN) { // hit timeout + printf("need kill\n"); + for (i = 0; i < MAX_SV; i++) { + if (services[i].level != level) + continue; + if (services[i].name[2] == 'D' && + services[i].pid > 0) + kill(services[i].pid, SIGKILL); + } + // XXX ensure this is only run once, + // then force next level + continue; + } + } + + if (pid < 0 && errno == ECHILD) + break; + + int i = reap(pid, status); + if (i < 0) + continue; + + if (services[i].level != level) + continue; + + if (services[i].name[2] == 'K') { + // can't happen + dprintf(selflogfd[1], "oneshot %s exited with status %d\n", services[i].name, services[i].status); + oneshot--; + } else if (services[i].name[2] == 'D') { + dprintf(selflogfd[1], "daemon %s exited with status %d\n", services[i].name, services[i].status); +// if (services[i].logged) +// close(services[i].logfd[1]); + daemons--; + } + } + + sigprocmask(SIG_UNBLOCK, &childset, 0); + } + + dprintf(selflogfd[1], "shutdown\n"); + close(selflogfd[1]); + pthread_join(logger_thread, 0); +} |