patch-2.4.10 linux/net/sunrpc/xprt.c
Next file: linux/net/unix/af_unix.c
Previous file: linux/net/sunrpc/xdr.c
Back to the patch index
Back to the overall index
- Lines: 484
- Date:
Fri Sep 21 11:24:50 2001
- Orig file:
v2.4.9/linux/net/sunrpc/xprt.c
- Orig date:
Mon Aug 27 12:41:50 2001
diff -u --recursive --new-file v2.4.9/linux/net/sunrpc/xprt.c linux/net/sunrpc/xprt.c
@@ -75,10 +75,6 @@
* Local variables
*/
-/* Spinlock for critical sections in the code. */
-spinlock_t xprt_sock_lock = SPIN_LOCK_UNLOCKED;
-spinlock_t xprt_lock = SPIN_LOCK_UNLOCKED;
-
#ifdef RPC_DEBUG
# undef RPC_DEBUG_DATA
# define RPCDBG_FACILITY RPCDBG_XPRT
@@ -172,6 +168,44 @@
}
/*
+ * Serialize write access to sockets, in order to prevent different
+ * requests from interfering with each other.
+ * Also prevents TCP socket reconnections from colliding with writes.
+ */
+static int
+xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
+{
+ int retval;
+ spin_lock_bh(&xprt->sock_lock);
+ if (!xprt->snd_task)
+ xprt->snd_task = task;
+ else if (xprt->snd_task != task) {
+ dprintk("RPC: %4d TCP write queue full (task %d)\n",
+ task->tk_pid, xprt->snd_task->tk_pid);
+ task->tk_timeout = 0;
+ task->tk_status = -EAGAIN;
+ rpc_sleep_on(&xprt->sending, task, NULL, NULL);
+ }
+ retval = xprt->snd_task == task;
+ spin_unlock_bh(&xprt->sock_lock);
+ return retval;
+}
+
+/*
+ * Releases the socket for use by other requests.
+ */
+static void
+xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
+{
+ spin_lock_bh(&xprt->sock_lock);
+ if (xprt->snd_task == task) {
+ xprt->snd_task = NULL;
+ rpc_wake_up_next(&xprt->sending);
+ }
+ spin_unlock_bh(&xprt->sock_lock);
+}
+
+/*
* Write data to socket.
*/
static inline int
@@ -285,7 +319,10 @@
if (xprt->nocong)
return;
- spin_lock_bh(&xprt_sock_lock);
+ /*
+ * Note: we're in a BH context
+ */
+ spin_lock(&xprt->xprt_lock);
cwnd = xprt->cwnd;
if (result >= 0) {
if (xprt->cong < cwnd || time_before(jiffies, xprt->congtime))
@@ -313,7 +350,7 @@
xprt->cwnd = cwnd;
out:
- spin_unlock_bh(&xprt_sock_lock);
+ spin_unlock(&xprt->xprt_lock);
}
/*
@@ -394,6 +431,8 @@
/*
* Reconnect a broken TCP connection.
+ *
+ * Note: This cannot collide with the TCP reads, as both run from rpciod
*/
void
xprt_reconnect(struct rpc_task *task)
@@ -416,15 +455,10 @@
return;
}
- spin_lock(&xprt_lock);
- if (xprt->connecting) {
- task->tk_timeout = 0;
- rpc_sleep_on(&xprt->reconn, task, NULL, NULL);
- spin_unlock(&xprt_lock);
+ if (!xprt_lock_write(xprt, task))
return;
- }
- xprt->connecting = 1;
- spin_unlock(&xprt_lock);
+ if (xprt_connected(xprt))
+ goto out_write;
status = -ENOTCONN;
if (!inet) {
@@ -439,6 +473,7 @@
/* Reset TCP record info */
xprt->tcp_offset = 0;
+ xprt->tcp_reclen = 0;
xprt->tcp_copied = 0;
xprt->tcp_more = 0;
@@ -467,24 +502,22 @@
dprintk("RPC: %4d connect status %d connected %d\n",
task->tk_pid, status, xprt_connected(xprt));
- spin_lock_bh(&xprt_sock_lock);
+ spin_lock_bh(&xprt->sock_lock);
if (!xprt_connected(xprt)) {
task->tk_timeout = xprt->timeout.to_maxval;
- rpc_sleep_on(&xprt->reconn, task, xprt_reconn_status, NULL);
- spin_unlock_bh(&xprt_sock_lock);
+ rpc_sleep_on(&xprt->sending, task, xprt_reconn_status, NULL);
+ spin_unlock_bh(&xprt->sock_lock);
return;
}
- spin_unlock_bh(&xprt_sock_lock);
+ spin_unlock_bh(&xprt->sock_lock);
}
defer:
- spin_lock(&xprt_lock);
- xprt->connecting = 0;
if (status < 0) {
rpc_delay(task, 5*HZ);
task->tk_status = -ENOTCONN;
}
- rpc_wake_up(&xprt->reconn);
- spin_unlock(&xprt_lock);
+ out_write:
+ xprt_release_write(xprt, task);
}
/*
@@ -499,10 +532,7 @@
dprintk("RPC: %4d xprt_reconn_timeout %d\n",
task->tk_pid, task->tk_status);
- spin_lock(&xprt_lock);
- xprt->connecting = 0;
- rpc_wake_up(&xprt->reconn);
- spin_unlock(&xprt_lock);
+ xprt_release_write(xprt, task);
}
/*
@@ -699,10 +729,6 @@
struct iovec riov;
int want, result;
- if (xprt->tcp_offset >= xprt->tcp_reclen + sizeof(xprt->tcp_recm)) {
- xprt->tcp_offset = 0;
- xprt->tcp_reclen = 0;
- }
if (xprt->tcp_offset >= sizeof(xprt->tcp_recm))
goto done;
@@ -718,10 +744,6 @@
want -= result;
} while (want);
- /* Is this another fragment in the last message */
- if (!xprt->tcp_more)
- xprt->tcp_copied = 0; /* No, so we're reading a new message */
-
/* Get the record length and mask out the last fragment bit */
xprt->tcp_reclen = ntohl(xprt->tcp_recm);
xprt->tcp_more = (xprt->tcp_reclen & 0x80000000) ? 0 : 1;
@@ -744,7 +766,7 @@
if (xprt->tcp_copied >= sizeof(xprt->tcp_xid) || !avail)
goto done;
- want = min(unsigned int, sizeof(xprt->tcp_xid) - xprt->tcp_copied, avail);
+ want = min_t(unsigned int, sizeof(xprt->tcp_xid) - xprt->tcp_copied, avail);
do {
dprintk("RPC: reading xid (%d bytes)\n", want);
riov.iov_base = ((u8*) &xprt->tcp_xid) + xprt->tcp_copied;
@@ -771,7 +793,7 @@
if (req->rq_rlen <= xprt->tcp_copied || !avail)
goto done;
- want = min(unsigned int, req->rq_rlen - xprt->tcp_copied, avail);
+ want = min_t(unsigned int, req->rq_rlen - xprt->tcp_copied, avail);
do {
dprintk("RPC: %4d TCP receiving %d bytes\n",
req->rq_task->tk_pid, want);
@@ -805,7 +827,7 @@
int want, result = 0;
while (avail) {
- want = min(unsigned int, avail, sizeof(dummy));
+ want = min_t(unsigned int, avail, sizeof(dummy));
riov.iov_base = dummy;
riov.iov_len = want;
dprintk("RPC: TCP skipping %d bytes\n", want);
@@ -843,14 +865,15 @@
/* Read in a new fragment marker if necessary */
/* Can we ever really expect to get completely empty fragments? */
- if ((result = tcp_read_fraghdr(xprt)) <= 0)
+ if ((result = tcp_read_fraghdr(xprt)) < 0)
return result;
avail = result;
/* Read in the xid if necessary */
- if ((result = tcp_read_xid(xprt, avail)) <= 0)
+ if ((result = tcp_read_xid(xprt, avail)) < 0)
return result;
- avail = result;
+ if (!(avail = result))
+ goto out_ok;
/* Find and lock the request corresponding to this xid */
req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
@@ -868,9 +891,14 @@
if ((result = tcp_read_discard(xprt, avail)) < 0)
return result;
+ out_ok:
dprintk("RPC: tcp_input_record done (off %d reclen %d copied %d)\n",
xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_copied);
result = xprt->tcp_reclen;
+ xprt->tcp_reclen = 0;
+ xprt->tcp_offset = 0;
+ if (!xprt->tcp_more)
+ xprt->tcp_copied = 0;
return result;
}
@@ -885,11 +913,19 @@
rpciod_wake_up();
}
+int xprt_tcp_pending(void)
+{
+ int retval;
+
+ spin_lock_bh(&rpc_queue_lock);
+ retval = !list_empty(&rpc_xprt_pending);
+ spin_unlock_bh(&rpc_queue_lock);
+ return retval;
+}
+
static inline
void xprt_append_pending(struct rpc_xprt *xprt)
{
- if (!list_empty(&xprt->rx_pending))
- return;
spin_lock_bh(&rpc_queue_lock);
if (list_empty(&xprt->rx_pending)) {
list_add(&xprt->rx_pending, rpc_xprt_pending.prev);
@@ -1003,11 +1039,10 @@
case TCP_ESTABLISHED:
if (xprt_test_and_set_connected(xprt))
break;
- spin_lock_bh(&xprt_sock_lock);
+ spin_lock(&xprt->sock_lock);
if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
rpc_wake_up_task(xprt->snd_task);
- rpc_wake_up(&xprt->reconn);
- spin_unlock_bh(&xprt_sock_lock);
+ spin_unlock(&xprt->sock_lock);
break;
case TCP_SYN_SENT:
case TCP_SYN_RECV:
@@ -1041,10 +1076,10 @@
return;
if (!xprt_test_and_set_wspace(xprt)) {
- spin_lock_bh(&xprt_sock_lock);
+ spin_lock(&xprt->sock_lock);
if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
rpc_wake_up_task(xprt->snd_task);
- spin_unlock_bh(&xprt_sock_lock);
+ spin_unlock(&xprt->sock_lock);
}
if (test_bit(SOCK_NOSPACE, &sock->flags)) {
@@ -1067,14 +1102,14 @@
/* Wait until we have enough socket memory */
- if (sock_wspace(sk) < min(int, sk->sndbuf,XPRT_MIN_WRITE_SPACE))
+ if (sock_wspace(sk) < min_t(int, sk->sndbuf,XPRT_MIN_WRITE_SPACE))
return;
if (!xprt_test_and_set_wspace(xprt)) {
- spin_lock_bh(&xprt_sock_lock);
+ spin_lock(&xprt->sock_lock);
if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
rpc_wake_up_task(xprt->snd_task);
- spin_unlock_bh(&xprt_sock_lock);
+ spin_unlock(&xprt->sock_lock);
}
if (sk->sleep && waitqueue_active(sk->sleep))
@@ -1100,55 +1135,6 @@
rpc_wake_up_task(task);
}
-
-/*
- * Serialize access to sockets, in order to prevent different
- * requests from interfering with each other.
- */
-static int
-xprt_down_transmit(struct rpc_task *task)
-{
- struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
- struct rpc_rqst *req = task->tk_rqstp;
-
- spin_lock_bh(&xprt_sock_lock);
- spin_lock(&xprt_lock);
- if (xprt->snd_task && xprt->snd_task != task) {
- dprintk("RPC: %4d TCP write queue full (task %d)\n",
- task->tk_pid, xprt->snd_task->tk_pid);
- task->tk_timeout = 0;
- task->tk_status = -EAGAIN;
- rpc_sleep_on(&xprt->sending, task, NULL, NULL);
- } else if (!xprt->snd_task) {
- xprt->snd_task = task;
-#ifdef RPC_PROFILE
- req->rq_xtime = jiffies;
-#endif
- req->rq_bytes_sent = 0;
- }
- spin_unlock(&xprt_lock);
- spin_unlock_bh(&xprt_sock_lock);
- return xprt->snd_task == task;
-}
-
-/*
- * Releases the socket for use by other requests.
- */
-static inline void
-xprt_up_transmit(struct rpc_task *task)
-{
- struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
-
- if (xprt->snd_task && xprt->snd_task == task) {
- spin_lock_bh(&xprt_sock_lock);
- spin_lock(&xprt_lock);
- xprt->snd_task = NULL;
- rpc_wake_up_next(&xprt->sending);
- spin_unlock(&xprt_lock);
- spin_unlock_bh(&xprt_sock_lock);
- }
-}
-
/*
* Place the actual RPC call.
* We have to copy the iovec because sendmsg fiddles with its contents.
@@ -1182,9 +1168,12 @@
*marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));
}
- if (!xprt_down_transmit(task))
+ if (!xprt_lock_write(xprt, task))
return;
+#ifdef RPC_PROFILE
+ req->rq_xtime = jiffies;
+#endif
do_xprt_transmit(task);
}
@@ -1252,12 +1241,12 @@
switch (status) {
case -ENOMEM:
/* Protect against (udp|tcp)_write_space */
- spin_lock_bh(&xprt_sock_lock);
+ spin_lock_bh(&xprt->sock_lock);
if (!xprt_wspace(xprt)) {
task->tk_timeout = req->rq_timeout.to_current;
rpc_sleep_on(&xprt->sending, task, NULL, NULL);
}
- spin_unlock_bh(&xprt_sock_lock);
+ spin_unlock_bh(&xprt->sock_lock);
return;
case -EAGAIN:
/* Keep holding the socket if it is blocked */
@@ -1268,6 +1257,9 @@
if (!xprt->stream)
return;
default:
+ if (xprt->stream)
+ xprt_disconnect(xprt);
+ req->rq_bytes_sent = 0;
goto out_release;
}
@@ -1278,7 +1270,7 @@
rpc_add_timer(task, xprt_timer);
rpc_unlock_task(task);
out_release:
- xprt_up_transmit(task);
+ xprt_release_write(xprt, task);
}
/*
@@ -1313,7 +1305,7 @@
dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n",
task->tk_pid, xprt->cong, xprt->cwnd);
- spin_lock_bh(&xprt_sock_lock);
+ spin_lock_bh(&xprt->xprt_lock);
xprt_reserve_status(task);
if (task->tk_rqstp) {
task->tk_timeout = 0;
@@ -1324,7 +1316,7 @@
task->tk_status = -EAGAIN;
rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
}
- spin_unlock_bh(&xprt_sock_lock);
+ spin_unlock_bh(&xprt->xprt_lock);
dprintk("RPC: %4d xprt_reserve returns %d\n",
task->tk_pid, task->tk_status);
return task->tk_status;
@@ -1397,7 +1389,11 @@
struct rpc_xprt *xprt = task->tk_xprt;
struct rpc_rqst *req;
- xprt_up_transmit(task);
+ if (xprt->snd_task == task) {
+ if (xprt->stream)
+ xprt_disconnect(xprt);
+ xprt_release_write(xprt, task);
+ }
if (!(req = task->tk_rqstp))
return;
task->tk_rqstp = NULL;
@@ -1411,7 +1407,7 @@
rpc_remove_wait_queue(task);
}
- spin_lock_bh(&xprt_sock_lock);
+ spin_lock_bh(&xprt->xprt_lock);
req->rq_next = xprt->free;
xprt->free = req;
@@ -1419,7 +1415,7 @@
xprt->cong -= RPC_CWNDSCALE;
xprt_clear_backlog(xprt);
- spin_unlock_bh(&xprt_sock_lock);
+ spin_unlock_bh(&xprt->xprt_lock);
}
/*
@@ -1476,6 +1472,8 @@
} else
xprt->cwnd = RPC_INITCWND;
xprt->congtime = jiffies;
+ spin_lock_init(&xprt->sock_lock);
+ spin_lock_init(&xprt->xprt_lock);
init_waitqueue_head(&xprt->cong_wait);
/* Set timeout parameters */
@@ -1489,7 +1487,6 @@
xprt->pending = RPC_INIT_WAITQ("xprt_pending");
xprt->sending = RPC_INIT_WAITQ("xprt_sending");
xprt->backlog = RPC_INIT_WAITQ("xprt_backlog");
- xprt->reconn = RPC_INIT_WAITQ("xprt_reconn");
/* initialize free list */
for (i = 0, req = xprt->slot; i < RPC_MAXREQS-1; i++, req++)
@@ -1625,7 +1622,6 @@
rpc_wake_up(&xprt->sending);
rpc_wake_up(&xprt->pending);
rpc_wake_up(&xprt->backlog);
- rpc_wake_up(&xprt->reconn);
if (waitqueue_active(&xprt->cong_wait))
wake_up(&xprt->cong_wait);
}
FUNET's LINUX-ADM group, linux-adm@nic.funet.fi
TCL-scripts by Sam Shen (who was at: slshen@lbl.gov)