about summary refs log tree commit diff
path: root/src/aio/aio.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/aio/aio.c')
-rw-r--r--src/aio/aio.c37
1 files changed, 21 insertions, 16 deletions
diff --git a/src/aio/aio.c b/src/aio/aio.c
index 38943bc0..f7955c44 100644
--- a/src/aio/aio.c
+++ b/src/aio/aio.c
@@ -41,13 +41,6 @@
  * blocked permanently.
  */
 
-struct aio_args {
-	struct aiocb *cb;
-	int op;
-	int err;
-	sem_t sem;
-};
-
 struct aio_thread {
 	pthread_t td;
 	struct aiocb *cb;
@@ -65,6 +58,13 @@ struct aio_queue {
 	struct aio_thread *head;
 };
 
+struct aio_args {
+	struct aiocb *cb;
+	struct aio_queue *q;
+	int op;
+	sem_t sem;
+};
+
 static pthread_rwlock_t maplock = PTHREAD_RWLOCK_INITIALIZER;
 static struct aio_queue *****map;
 static volatile int aio_fd_cnt;
@@ -196,12 +196,11 @@ static void *io_thread_func(void *ctx)
 	size_t len = cb->aio_nbytes;
 	off_t off = cb->aio_offset;
 
-	struct aio_queue *q = __aio_get_queue(fd, 1);
+	struct aio_queue *q = args->q;
 	ssize_t ret;
 
-	args->err = q ? 0 : EAGAIN;
+	pthread_mutex_lock(&q->lock);
 	sem_post(&args->sem);
-	if (!q) return 0;
 
 	at.op = op;
 	at.running = 1;
@@ -213,7 +212,6 @@ static void *io_thread_func(void *ctx)
 	at.prev = 0;
 	if ((at.next = q->head)) at.next->prev = &at;
 	q->head = &at;
-	q->ref++;
 
 	if (!q->init) {
 		int seekable = lseek(fd, 0, SEEK_CUR) >= 0;
@@ -272,9 +270,18 @@ static int submit(struct aiocb *cb, int op)
 	pthread_attr_t a;
 	sigset_t allmask, origmask;
 	pthread_t td;
-	struct aio_args args = { .cb = cb, .op = op };
+	struct aio_queue *q = __aio_get_queue(cb->aio_fildes, 1);
+	struct aio_args args = { .cb = cb, .op = op, .q = q };
 	sem_init(&args.sem, 0, 0);
 
+	if (!q) {
+		if (cb->aio_fildes < 0) errno = EBADF;
+		else errno = EAGAIN;
+		return -1;
+	}
+	q->ref++;
+	pthread_mutex_unlock(&q->lock);
+
 	if (cb->aio_sigevent.sigev_notify == SIGEV_THREAD) {
 		if (cb->aio_sigevent.sigev_notify_attributes)
 			a = *cb->aio_sigevent.sigev_notify_attributes;
@@ -291,6 +298,8 @@ static int submit(struct aiocb *cb, int op)
 	pthread_sigmask(SIG_BLOCK, &allmask, &origmask);
 	cb->__err = EINPROGRESS;
 	if (pthread_create(&td, &a, io_thread_func, &args)) {
+		pthread_mutex_lock(&q->lock);
+		__aio_unref_queue(q);
 		errno = EAGAIN;
 		ret = -1;
 	}
@@ -298,10 +307,6 @@ static int submit(struct aiocb *cb, int op)
 
 	if (!ret) {
 		while (sem_wait(&args.sem));
-		if (args.err) {
-			errno = args.err;
-			ret = -1;
-		}
 	}
 
 	return ret;