diff options
Diffstat (limited to 'ipc/msg.c')
-rw-r--r-- | ipc/msg.c | 204 |
1 files changed, 97 insertions, 107 deletions
diff --git a/ipc/msg.c b/ipc/msg.c index c6521c205cb4..e12307d0c920 100644 --- a/ipc/msg.c +++ b/ipc/msg.c @@ -51,19 +51,14 @@ struct msg_receiver { long r_msgtype; long r_maxsize; - /* - * Mark r_msg volatile so that the compiler - * does not try to get smart and optimize - * it. We rely on this for the lockless - * receive algorithm. - */ - struct msg_msg *volatile r_msg; + struct msg_msg *r_msg; }; /* one msg_sender for each sleeping sender */ struct msg_sender { struct list_head list; struct task_struct *tsk; + size_t msgsz; }; #define SEARCH_ANY 1 @@ -159,45 +154,72 @@ static int newque(struct ipc_namespace *ns, struct ipc_params *params) return msq->q_perm.id; } -static inline void ss_add(struct msg_queue *msq, struct msg_sender *mss) +static inline bool msg_fits_inqueue(struct msg_queue *msq, size_t msgsz) +{ + return msgsz + msq->q_cbytes <= msq->q_qbytes && + 1 + msq->q_qnum <= msq->q_qbytes; +} + +static inline void ss_add(struct msg_queue *msq, + struct msg_sender *mss, size_t msgsz) { mss->tsk = current; + mss->msgsz = msgsz; __set_current_state(TASK_INTERRUPTIBLE); list_add_tail(&mss->list, &msq->q_senders); } static inline void ss_del(struct msg_sender *mss) { - if (mss->list.next != NULL) + if (mss->list.next) list_del(&mss->list); } -static void ss_wakeup(struct list_head *h, int kill) +static void ss_wakeup(struct msg_queue *msq, + struct wake_q_head *wake_q, bool kill) { struct msg_sender *mss, *t; + struct task_struct *stop_tsk = NULL; + struct list_head *h = &msq->q_senders; list_for_each_entry_safe(mss, t, h, list) { if (kill) mss->list.next = NULL; - wake_up_process(mss->tsk); + + /* + * Stop at the first task we don't wakeup, + * we've already iterated the original + * sender queue. + */ + else if (stop_tsk == mss->tsk) + break; + /* + * We are not in an EIDRM scenario here, therefore + * verify that we really need to wakeup the task. + * To maintain current semantics and wakeup order, + * move the sender to the tail on behalf of the + * blocked task. + */ + else if (!msg_fits_inqueue(msq, mss->msgsz)) { + if (!stop_tsk) + stop_tsk = mss->tsk; + + list_move_tail(&mss->list, &msq->q_senders); + continue; + } + + wake_q_add(wake_q, mss->tsk); } } -static void expunge_all(struct msg_queue *msq, int res) +static void expunge_all(struct msg_queue *msq, int res, + struct wake_q_head *wake_q) { struct msg_receiver *msr, *t; list_for_each_entry_safe(msr, t, &msq->q_receivers, r_list) { - msr->r_msg = NULL; /* initialize expunge ordering */ - wake_up_process(msr->r_tsk); - /* - * Ensure that the wakeup is visible before setting r_msg as - * the receiving end depends on it: either spinning on a nil, - * or dealing with -EAGAIN cases. See lockless receive part 1 - * and 2 in do_msgrcv(). - */ - smp_wmb(); /* barrier (B) */ - msr->r_msg = ERR_PTR(res); + wake_q_add(wake_q, msr->r_tsk); + WRITE_ONCE(msr->r_msg, ERR_PTR(res)); } } @@ -213,11 +235,13 @@ static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp) { struct msg_msg *msg, *t; struct msg_queue *msq = container_of(ipcp, struct msg_queue, q_perm); + WAKE_Q(wake_q); - expunge_all(msq, -EIDRM); - ss_wakeup(&msq->q_senders, 1); + expunge_all(msq, -EIDRM, &wake_q); + ss_wakeup(msq, &wake_q, true); msg_rmid(ns, msq); ipc_unlock_object(&msq->q_perm); + wake_up_q(&wake_q); rcu_read_unlock(); list_for_each_entry_safe(msg, t, &msq->q_messages, m_list) { @@ -372,6 +396,9 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd, freeque(ns, ipcp); goto out_up; case IPC_SET: + { + WAKE_Q(wake_q); + if (msqid64.msg_qbytes > ns->msg_ctlmnb && !capable(CAP_SYS_RESOURCE)) { err = -EPERM; @@ -386,15 +413,21 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd, msq->q_qbytes = msqid64.msg_qbytes; msq->q_ctime = get_seconds(); - /* sleeping receivers might be excluded by + /* + * Sleeping receivers might be excluded by * stricter permissions. */ - expunge_all(msq, -EAGAIN); - /* sleeping senders might be able to send + expunge_all(msq, -EAGAIN, &wake_q); + /* + * Sleeping senders might be able to send * due to a larger queue size. */ - ss_wakeup(&msq->q_senders, 0); - break; + ss_wakeup(msq, &wake_q, false); + ipc_unlock_object(&msq->q_perm); + wake_up_q(&wake_q); + + goto out_unlock1; + } default: err = -EINVAL; goto out_unlock1; @@ -566,7 +599,8 @@ static int testmsg(struct msg_msg *msg, long type, int mode) return 0; } -static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg) +static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg, + struct wake_q_head *wake_q) { struct msg_receiver *msr, *t; @@ -577,27 +611,14 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg) list_del(&msr->r_list); if (msr->r_maxsize < msg->m_ts) { - /* initialize pipelined send ordering */ - msr->r_msg = NULL; - wake_up_process(msr->r_tsk); - /* barrier (B) see barrier comment below */ - smp_wmb(); - msr->r_msg = ERR_PTR(-E2BIG); + wake_q_add(wake_q, msr->r_tsk); + WRITE_ONCE(msr->r_msg, ERR_PTR(-E2BIG)); } else { - msr->r_msg = NULL; msq->q_lrpid = task_pid_vnr(msr->r_tsk); msq->q_rtime = get_seconds(); - wake_up_process(msr->r_tsk); - /* - * Ensure that the wakeup is visible before - * setting r_msg, as the receiving can otherwise - * exit - once r_msg is set, the receiver can - * continue. See lockless receive part 1 and 2 - * in do_msgrcv(). Barrier (B). - */ - smp_wmb(); - msr->r_msg = msg; + wake_q_add(wake_q, msr->r_tsk); + WRITE_ONCE(msr->r_msg, msg); return 1; } } @@ -613,6 +634,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext, struct msg_msg *msg; int err; struct ipc_namespace *ns; + WAKE_Q(wake_q); ns = current->nsproxy->ipc_ns; @@ -654,10 +676,8 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext, if (err) goto out_unlock0; - if (msgsz + msq->q_cbytes <= msq->q_qbytes && - 1 + msq->q_qnum <= msq->q_qbytes) { + if (msg_fits_inqueue(msq, msgsz)) break; - } /* queue full, wait: */ if (msgflg & IPC_NOWAIT) { @@ -666,7 +686,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext, } /* enqueue the sender and prepare to block */ - ss_add(msq, &s); + ss_add(msq, &s, msgsz); if (!ipc_rcu_getref(msq)) { err = -EIDRM; @@ -686,7 +706,6 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext, err = -EIDRM; goto out_unlock0; } - ss_del(&s); if (signal_pending(current)) { @@ -695,10 +714,11 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext, } } + msq->q_lspid = task_tgid_vnr(current); msq->q_stime = get_seconds(); - if (!pipelined_send(msq, msg)) { + if (!pipelined_send(msq, msg, &wake_q)) { /* no one is waiting for this message, enqueue it */ list_add_tail(&msg->m_list, &msq->q_messages); msq->q_cbytes += msgsz; @@ -712,6 +732,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext, out_unlock0: ipc_unlock_object(&msq->q_perm); + wake_up_q(&wake_q); out_unlock1: rcu_read_unlock(); if (msg != NULL) @@ -829,6 +850,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl struct msg_queue *msq; struct ipc_namespace *ns; struct msg_msg *msg, *copy = NULL; + WAKE_Q(wake_q); ns = current->nsproxy->ipc_ns; @@ -893,7 +915,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl msq->q_cbytes -= msg->m_ts; atomic_sub(msg->m_ts, &ns->msg_bytes); atomic_dec(&ns->msg_hdrs); - ss_wakeup(&msq->q_senders, 0); + ss_wakeup(msq, &wake_q, false); goto out_unlock0; } @@ -919,71 +941,38 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl rcu_read_unlock(); schedule(); - /* Lockless receive, part 1: - * Disable preemption. We don't hold a reference to the queue - * and getting a reference would defeat the idea of a lockless - * operation, thus the code relies on rcu to guarantee the - * existence of msq: + /* + * Lockless receive, part 1: + * We don't hold a reference to the queue and getting a + * reference would defeat the idea of a lockless operation, + * thus the code relies on rcu to guarantee the existence of + * msq: * Prior to destruction, expunge_all(-EIRDM) changes r_msg. * Thus if r_msg is -EAGAIN, then the queue not yet destroyed. - * rcu_read_lock() prevents preemption between reading r_msg - * and acquiring the q_perm.lock in ipc_lock_object(). */ rcu_read_lock(); - /* Lockless receive, part 2: - * Wait until pipelined_send or expunge_all are outside of - * wake_up_process(). There is a race with exit(), see - * ipc/mqueue.c for the details. The correct serialization - * ensures that a receiver cannot continue without the wakeup - * being visibible _before_ setting r_msg: - * - * CPU 0 CPU 1 - * <loop receiver> - * smp_rmb(); (A) <-- pair -. <waker thread> - * <load ->r_msg> | msr->r_msg = NULL; - * | wake_up_process(); - * <continue> `------> smp_wmb(); (B) - * msr->r_msg = msg; + /* + * Lockless receive, part 2: + * The work in pipelined_send() and expunge_all(): + * - Set pointer to message + * - Queue the receiver task for later wakeup + * - Wake up the process after the lock is dropped. * - * Where (A) orders the message value read and where (B) orders - * the write to the r_msg -- done in both pipelined_send and - * expunge_all. - */ - for (;;) { - /* - * Pairs with writer barrier in pipelined_send - * or expunge_all. - */ - smp_rmb(); /* barrier (A) */ - msg = (struct msg_msg *)msr_d.r_msg; - if (msg) - break; - - /* - * The cpu_relax() call is a compiler barrier - * which forces everything in this loop to be - * re-loaded. - */ - cpu_relax(); - } - - /* Lockless receive, part 3: - * If there is a message or an error then accept it without - * locking. + * Should the process wake up before this wakeup (due to a + * signal) it will either see the message and continue ... */ + msg = READ_ONCE(msr_d.r_msg); if (msg != ERR_PTR(-EAGAIN)) goto out_unlock1; - /* Lockless receive, part 3: - * Acquire the queue spinlock. - */ + /* + * ... or see -EAGAIN, acquire the lock to check the message + * again. + */ ipc_lock_object(&msq->q_perm); - /* Lockless receive, part 4: - * Repeat test after acquiring the spinlock. - */ - msg = (struct msg_msg *)msr_d.r_msg; + msg = msr_d.r_msg; if (msg != ERR_PTR(-EAGAIN)) goto out_unlock0; @@ -998,6 +987,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl out_unlock0: ipc_unlock_object(&msq->q_perm); + wake_up_q(&wake_q); out_unlock1: rcu_read_unlock(); if (IS_ERR(msg)) { |