summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--rvnit.c624
1 files changed, 624 insertions, 0 deletions
diff --git a/rvnit.c b/rvnit.c
new file mode 100644
index 0000000..a93d2d0
--- /dev/null
+++ b/rvnit.c
@@ -0,0 +1,624 @@
+/* accept4 */
+#define _GNU_SOURCE
+
+#include <sys/wait.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+//#include <sys/stat.h>
+
+#include <ctype.h>
+#include <dirent.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <poll.h>
+#include <pthread.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <unistd.h>
+
+#define MAX_SV 512
+
+struct entry {
+	char level;
+	char name[62];
+	char logged;
+	pid_t pid;
+	pid_t display_pid;  // not reset to 0, needed for oneshots
+	int logfd[2];
+	time_t start;
+	int status;
+	enum {
+		UNDEF,
+		UP,
+		DOWN,
+	} state;
+};
+
+struct entry services[MAX_SV];
+int level;
+
+pthread_t main_thread;
+pthread_t socket_thread;
+pthread_t logger_thread;
+
+int selflogfd[2];
+
+sig_atomic_t got_intr;
+sig_atomic_t got_sigusr1;
+
+void
+on_sigint(int sig)
+{
+	(void)sig;
+	got_intr = 1;
+}
+
+void
+on_sigusr1(int sig)
+{
+	(void)sig;
+	got_sigusr1 = 1;
+}
+
+void
+on_sigusr2(int sig)
+{
+	(void)sig;
+}
+
+void
+restart(int i)
+{
+	if (services[i].pid > 0)
+		return;
+	if (services[i].state != UP)
+		return;
+
+	int delay = 0;
+	time_t now = time(0);
+	if (services[i].start == now)
+		delay = 1;
+
+	int loggerpipe[2] = { -1, -1 };
+
+	if (services[i].logfd[0] == -1) {
+		pipe(services[i].logfd);
+		fcntl(services[i].logfd[0], F_SETFL, O_NONBLOCK);
+		fcntl(services[i].logfd[1], F_SETFL, O_NONBLOCK);
+
+		pthread_kill(logger_thread, SIGUSR2);
+	}
+
+	if (services[i].name[2] == 'L') {
+		for (int j = 0; j < MAX_SV; j++) {
+			if (strcmp(services[i].name + 3,
+			    services[j].name + 3) == 0 &&
+			    services[j].name[2] == 'D') {
+				if (services[j].logfd[0] == -1) {
+					// hook up logger
+					pipe(loggerpipe);
+					services[j].logfd[0] = loggerpipe[0];
+					services[j].logfd[1] = loggerpipe[1];
+					services[j].logged = 1;
+				} else {
+					// recover loggerpipe
+					loggerpipe[0] = services[j].logfd[0];
+					loggerpipe[1] = services[j].logfd[1];
+				}
+				break;
+			}
+		}
+	}
+
+	pid_t child = fork();
+	if (child == 0) {
+		dup2(services[i].logfd[1], 1);
+		if (loggerpipe[0] != -1) { // loggers get read end of loggerpipe
+			dup2(loggerpipe[0], 0);
+			close(loggerpipe[0]);
+			close(loggerpipe[1]);
+		}
+
+		close(services[i].logfd[0]);
+		close(services[i].logfd[1]);
+		sleep(delay);
+		execl(services[i].name,
+		    services[i].name,
+		    (char *)0);
+		exit(-1);
+	} else if (child > 0) {
+		fcntl(services[i].logfd[0], F_SETFD, FD_CLOEXEC);
+		fcntl(services[i].logfd[1], F_SETFD, FD_CLOEXEC);
+		if (loggerpipe[0] != -1) {
+			fcntl(loggerpipe[0], F_SETFD, FD_CLOEXEC);
+			fcntl(loggerpipe[1], F_SETFD, FD_CLOEXEC);
+		}
+		services[i].pid = services[i].display_pid = child;
+		services[i].start = now;
+	} else {
+		abort();
+	}
+}
+
+void *
+socket_loop(void* ignored)
+{
+	(void)ignored;
+
+	const char *path = "/tmp/rvnit.sock";
+
+	struct sockaddr_un addr = { 0 };
+	addr.sun_family = AF_UNIX;
+	strncpy(addr.sun_path, path, sizeof addr.sun_path - 1);
+	int listenfd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0);
+	if (listenfd < 0) {
+		perror("socket");
+		exit(111);
+	}
+	unlink(path);
+	int r = bind(listenfd, (struct sockaddr *)&addr, sizeof addr);
+	if (r < 0) {
+		perror("bind");
+		exit(111);
+	}
+
+	r = listen(listenfd, SOMAXCONN);
+        if (r < 0) {
+                perror("listen");
+                exit(111);
+        }
+
+	while (1) {
+		int connfd = accept4(listenfd, 0, 0, SOCK_CLOEXEC);
+		if (connfd < 0)
+			continue;
+
+		char cmd = 0;
+		read(connfd, &cmd, 1);
+		write(connfd, "ok\n", 3);
+
+		if (cmd == 'l') {
+			dprintf(connfd, "level=%d\n", level);
+		}
+
+		if (cmd == 's') {
+			for (int i = 0; i < MAX_SV; i++) {
+				if (services[i].name[2] == 'D' ||
+				    services[i].name[2] == 'L')
+					dprintf(connfd, "%s pid=%d status=%d\n",
+					    services[i].name,
+					    services[i].pid,
+					    services[i].status);
+			}
+		}
+
+		if (cmd == 'd' || cmd == 'u') {
+			char buf[63];
+			ssize_t rd = read(connfd, buf, sizeof buf);
+			if (rd > 0) {
+				buf[rd] = 0;
+				printf("got %c|%s|\n", cmd, buf);
+
+				for (int i = 0; i < MAX_SV; i++) {
+					if (strcmp(services[i].name, buf) == 0) {
+						if (cmd == 'd') {
+							printf("setting %s down\n",
+							    services[i].name);
+							services[i].state = DOWN;
+						} else if (cmd == 'u') {
+							printf("setting %s up\n",
+							    services[i].name);
+							services[i].state = UP;
+						}
+					}
+				}
+			}
+
+			pthread_kill(main_thread, SIGUSR1);
+		}
+
+		if (cmd == 'X') {
+			pthread_kill(main_thread, SIGPWR);
+		}
+
+		close(connfd);
+	}
+
+	return 0;
+}
+
+void *
+logger_loop(void* ignored)
+{
+	(void)ignored;
+
+//	sigaction(SIGUSR2, &(struct sigaction){.sa_handler=on_sigusr2}, 0);
+
+	while (1) {
+		struct pollfd fds[MAX_SV];
+		nfds_t nfds = 0;
+
+		fds[nfds].fd = selflogfd[0];
+		fds[nfds].events = POLLIN;
+		nfds++;
+
+		for (int i = 0; i < MAX_SV; i++) {
+			if (/*services[i].pid > 0 && */ !services[i].logged && services[i].logfd[0] > 0) {
+				fds[nfds].fd = services[i].logfd[0];
+				fds[nfds].events = POLLIN;
+				nfds++;
+			}
+		}
+
+//		printf("waiting for %d logs\n", (int)nfds);
+		int n = poll(fds, nfds, -1);
+		if (n < 0)
+			continue;
+
+		if (n == 1 && (fds[0].revents & POLLHUP) &&
+		    !(fds[0].revents & POLLIN)) {
+			/* selflog was closed, end thread */
+			break;
+		}
+
+		for (nfds_t j = 0; j < nfds; j++) {
+			// XXX HUP, NVAL
+			if (!(fds[j].revents & POLLIN))
+				continue;
+
+			char buf[4096];
+			ssize_t rd = read(fds[j].fd, buf, sizeof buf);
+			if (rd < 0) {
+				perror("read");
+				continue;
+			}
+			if (fds[j].fd == selflogfd[0]) {
+				printf("%s[%d]: %.*s",
+				    "rvnit",
+				    (int)getpid(),
+				    (int)rd,
+				    buf);
+				continue;
+			}
+			for (int i = 0; i < MAX_SV; i++) {
+				if (services[i].logfd[0] == fds[j].fd) {
+					printf("%s[%d]: %.*s",
+					    services[i].name,
+					    services[i].display_pid,
+					    (int)rd,
+					    buf
+						);
+					break;
+				}
+			}
+		}
+	}
+
+	return 0;
+}
+
+int
+reap(pid_t pid, int status)
+{
+	for (int i = 0; i < MAX_SV; i++) {
+		if (services[i].pid != pid)
+			continue;
+
+		services[i].pid = 0;
+		services[i].status = status;
+
+		dprintf(2, "reaped %s[%d] with status %d during level=%d\n",
+		    services[i].name, pid, status, level);
+
+		return i;
+	}
+
+	return -1;
+}
+
+int
+main()
+{
+	pipe(selflogfd);
+	fcntl(selflogfd[0], F_SETFL, O_NONBLOCK);
+	fcntl(selflogfd[1], F_SETFL, O_NONBLOCK);
+	fcntl(selflogfd[0], F_SETFD, FD_CLOEXEC);
+	fcntl(selflogfd[1], F_SETFD, FD_CLOEXEC);
+
+	sigaction(SIGUSR2, &(struct sigaction){.sa_handler=on_sigusr2}, 0);
+
+	main_thread = pthread_self();
+	pthread_create(&socket_thread, 0, socket_loop, 0);
+	pthread_create(&logger_thread, 0, logger_loop, 0);
+
+	if (chdir("sv") < 0) {
+		perror("chdir");
+		return 111;
+	}
+
+	DIR *dir = opendir(".");
+	if (!dir)
+		return 111;
+
+	int i = 0;
+
+	struct dirent *ent;
+	while ((ent = readdir(dir))) {
+		if (!isdigit(ent->d_name[0]) || !isdigit(ent->d_name[1]))
+			continue;
+
+		services[i].level = 10 * (ent->d_name[0] - '0') +
+		    (ent->d_name[1] - '0');
+		(void)snprintf(services[i].name, sizeof services[i].name,
+		    "%s", ent->d_name);
+		// follow symlinks
+		services[i].state = access(ent->d_name, X_OK) == 0 ? UP : DOWN;
+		services[i].logfd[0] = -1;
+		services[i].logfd[1] = -1;
+		services[i].logged = 0;
+		i++;
+	}
+
+	closedir(dir);
+
+	dprintf(selflogfd[1], "booting\n");
+
+	for (level = 0; level < 100; level++) {
+		/* spawn all of level */
+		int oneshot = 0;
+
+		// spawn loggers first
+		for (i = 0; i < MAX_SV; i++) {
+			if (services[i].level != level)
+				continue;
+
+			if (services[i].state != UP)
+				continue;
+
+			if (services[i].name[2] == 'L')
+				restart(i);
+		}
+
+		// spawn oneshots and daemons
+		for (i = 0; i < MAX_SV; i++) {
+			if (services[i].level != level)
+				continue;
+
+			if (services[i].state != UP)
+				continue;
+
+			if (services[i].name[2] == 'S') {
+				restart(i);
+				oneshot++;
+			} else if (services[i].name[2] == 'D') {
+				restart(i);
+			}
+		}
+
+		while (oneshot) {
+			int status = 0;
+			int pid = wait(&status);
+
+			if (pid < 0 && errno == ECHILD)
+				break;
+
+			int i = reap(pid, status);
+			if (i < 0)
+				continue;
+
+			if (services[i].level != level)
+				continue;
+
+			if (services[i].name[2] == 'S') {
+				oneshot--;
+			} else if (services[i].name[2] == 'D' ||
+			    services[i].name[2] == 'L') {
+				restart(i);
+			}
+		}
+	}
+
+	sigaction(SIGINT, &(struct sigaction){.sa_handler=on_sigint}, 0);
+	sigaction(SIGPWR, &(struct sigaction){.sa_handler=on_sigint}, 0);
+	sigaction(SIGUSR1, &(struct sigaction){.sa_handler=on_sigusr1}, 0);
+
+	dprintf(selflogfd[1], "system up\n");
+
+	while (1) {
+		if (got_intr)
+			break;
+
+		if (got_sigusr1) {
+			printf("rescanning state\n");
+			for (i = 0; i < MAX_SV; i++) {
+				if (services[i].name[2] == 'D' ||
+				    services[i].name[2] == 'L') {
+					if (services[i].state == UP &&
+					    services[i].pid == 0)
+						restart(i);
+					if (services[i].state == DOWN &&
+					    services[i].pid > 0)
+						kill(services[i].pid, SIGTERM);
+				}
+			}
+
+			got_sigusr1 = 0;
+		}
+
+		int status = 0;
+		errno = 0;
+		int pid = wait(&status);
+
+		if (pid < 0) {
+			if (errno == ECHILD)
+				break;
+			if (errno == EINTR)
+				continue;
+		}
+
+		int i = reap(pid, status);
+		if (i < 0)
+			continue;
+
+		if (services[i].name[2] == 'D' ||
+		    services[i].name[2] == 'L') {
+			dprintf(selflogfd[1], "%s terminated with status %d\n", services[i].name, services[i].status);
+			if (services[i].state == UP) {
+				dprintf(selflogfd[1], "restarting %s\n", services[i].name);
+				restart(i);
+			}
+		}
+	}
+
+	dprintf(selflogfd[1], "shutting down\n");
+
+//	sigaction(SIGINT, &(struct sigaction){.sa_handler=SIG_DFL}, 0);
+
+
+	struct timespec timeout = {7, 0};
+	sigset_t childset;
+	sigemptyset(&childset);
+	sigaddset(&childset, SIGCHLD);
+
+
+	for (level = 99; level >= 0; level--) {
+		/* kill all of level */
+		int oneshot = 0;
+		int daemons = 0;
+
+		for (i = 0; i < MAX_SV; i++) {
+			if (services[i].level != level)
+				continue;
+
+			if (services[i].name[2] == 'K' &&
+			    services[i].state == UP) {
+				dprintf(2, "todo: shutdown oneshot %s\n", services[i].name);
+				oneshot++;
+			}
+			if (services[i].name[2] == 'D' &&
+			    services[i].pid > 0) {
+				dprintf(2, "todo: sending sigterm to %s\n", services[i].name);
+				daemons++;
+			}
+		}
+
+		dprintf(2, "level=%d oneshot=%d daemons=%d\n",
+		    level, oneshot, daemons);
+	}
+
+	for (level = 99; level >= 0; level--) {
+		/* kill all of level */
+		int oneshot = 0;
+		int daemons = 0;
+
+		for (i = 0; i < MAX_SV; i++) {
+			if (services[i].level != level)
+				continue;
+
+			if (services[i].name[2] == 'K' &&
+			    services[i].state == UP) {
+				dprintf(selflogfd[1], "starting shutdown oneshot %s\n", services[i].name);
+				restart(i);
+				oneshot++;
+			}
+			if (services[i].name[2] == 'D' &&
+			    services[i].pid > 0) {
+				dprintf(selflogfd[1], "sending sigterm to %s\n", services[i].name);
+				kill(services[i].pid, SIGTERM);
+				kill(services[i].pid, SIGCONT);
+				daemons++;
+			}
+		}
+
+		dprintf(2, "level=%d oneshot=%d daemons=%d\n",
+		    level, oneshot, daemons);
+		usleep(100000);
+
+		// XXX figure out logger shutdown
+
+		while (oneshot) {
+			int status = 0;
+			int pid = wait(&status);
+
+			if (pid < 0 && errno == ECHILD)
+				break;
+
+			int i = reap(pid, status);
+			if (i < 0)
+				continue;
+
+			if (services[i].level != level)
+				continue;
+
+			if (services[i].name[2] == 'K') {
+				dprintf(selflogfd[1], "oneshot %s exited with status %d\n", services[i].name, services[i].status);
+				oneshot--;
+			} else if (services[i].name[2] == 'D') {
+				dprintf(selflogfd[1], "daemon %s exited with status %d\n", services[i].name, services[i].status);
+//				if (services[i].logged)
+//					close(services[i].logfd[1]);
+				daemons--;
+			}
+		}
+
+		// only daemons are left, wait up to 7s before sending
+		// them SIGKILL.
+
+		if (!daemons)
+			continue;
+
+		sigprocmask(SIG_BLOCK, &childset, 0);
+
+		while (daemons) {
+			int status = 0;
+			int pid = waitpid(-1, &status, WNOHANG);
+			if (pid == 0) { // nothing to reap
+				if (sigtimedwait(&childset, 0, &timeout) == SIGCHLD)
+					continue;
+				if (errno == EAGAIN) { // hit timeout
+					printf("need kill\n");
+					for (i = 0; i < MAX_SV; i++) {
+						if (services[i].level != level)
+							continue;
+						if (services[i].name[2] == 'D' &&
+						    services[i].pid > 0)
+							kill(services[i].pid, SIGKILL);
+					}
+					// XXX ensure this is only run once,
+					// then force next level
+					continue;
+				}
+			}
+
+			if (pid < 0 && errno == ECHILD)
+				break;
+
+			int i = reap(pid, status);
+			if (i < 0)
+				continue;
+
+			if (services[i].level != level)
+				continue;
+
+			if (services[i].name[2] == 'K') {
+				// can't happen
+				dprintf(selflogfd[1], "oneshot %s exited with status %d\n", services[i].name, services[i].status);
+				oneshot--;
+			} else if (services[i].name[2] == 'D') {
+				dprintf(selflogfd[1], "daemon %s exited with status %d\n", services[i].name, services[i].status);
+//				if (services[i].logged)
+//					close(services[i].logfd[1]);
+				daemons--;
+			}
+		}
+
+		sigprocmask(SIG_UNBLOCK, &childset, 0);
+	}
+
+	dprintf(selflogfd[1], "shutdown\n");
+	close(selflogfd[1]);
+	pthread_join(logger_thread, 0);
+}