patch-2.3.22 linux/net/sunrpc/xprt.c
Next file: linux/net/unix/af_unix.c
Previous file: linux/net/sunrpc/sched.c
Back to the patch index
Back to the overall index
- Lines: 1278
- Date:
Fri Oct 15 09:15:26 1999
- Orig file:
v2.3.21/linux/net/sunrpc/xprt.c
- Orig date:
Sat Oct 9 11:47:50 1999
diff -u --recursive --new-file v2.3.21/linux/net/sunrpc/xprt.c linux/net/sunrpc/xprt.c
@@ -31,12 +31,16 @@
* primitives that `transparently' work for processes as well as async
* tasks that rely on callbacks.
*
- * Copyright (C) 1995, 1996, Olaf Kirch <okir@monad.swb.de>
+ * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
*
* TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
* TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
* TCP NFS related read + write fixes
* (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
+ *
+ * Rewrite of larges part of the code in order to stabilize TCP stuff.
+ * Fix behaviour when socket buffer is full.
+ * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
*/
#define __KERNEL_SYSCALLS__
@@ -62,6 +66,8 @@
#include <asm/uaccess.h>
#define SOCK_HAS_USER_DATA
+/* Following value should be > 32k + RPC overhead */
+#define XPRT_MIN_WRITE_SPACE 35000
/*
* Local variables
@@ -70,6 +76,9 @@
static struct rpc_xprt * sock_list = NULL;
#endif
+/* Spinlock for critical sections in the code. */
+spinlock_t xprt_lock = SPIN_LOCK_UNLOCKED;
+
#ifdef RPC_DEBUG
# undef RPC_DEBUG_DATA
# define RPCDBG_FACILITY RPCDBG_XPRT
@@ -84,11 +93,13 @@
* Local functions
*/
static void xprt_request_init(struct rpc_task *, struct rpc_xprt *);
+static void do_xprt_transmit(struct rpc_task *);
static void xprt_transmit_status(struct rpc_task *task);
+static void xprt_transmit_timeout(struct rpc_task *task);
static void xprt_receive_status(struct rpc_task *task);
static void xprt_reserve_status(struct rpc_task *task);
+static void xprt_disconnect(struct rpc_xprt *);
static void xprt_reconn_timeout(struct rpc_task *task);
-static void xprt_reconn_status(struct rpc_task *task);
static struct socket *xprt_create_socket(int, struct sockaddr_in *,
struct rpc_timeout *);
@@ -144,39 +155,35 @@
* Adjust the iovec to move on 'n' bytes
*/
-extern inline void xprt_move_iov(struct msghdr *msg, struct iovec *niv, int amount)
+extern inline void
+xprt_move_iov(struct msghdr *msg, struct iovec *niv, int amount)
{
struct iovec *iv=msg->msg_iov;
+ int i;
/*
* Eat any sent iovecs
*/
-
- while(iv->iov_len < amount)
- {
- amount-=iv->iov_len;
+ while(iv->iov_len <= amount) {
+ amount -= iv->iov_len;
iv++;
msg->msg_iovlen--;
}
-
- msg->msg_iov=niv;
-
+
/*
* And chew down the partial one
*/
-
niv[0].iov_len = iv->iov_len-amount;
niv[0].iov_base =((unsigned char *)iv->iov_base)+amount;
iv++;
-
+
/*
* And copy any others
*/
-
- for(amount=1;amount<msg->msg_iovlen; amount++)
- {
- niv[amount]=*iv++;
- }
+ for(i = 1; i < msg->msg_iovlen; i++)
+ niv[i]=*iv++;
+
+ msg->msg_iov=niv;
}
/*
@@ -184,43 +191,42 @@
*/
static inline int
-xprt_sendmsg(struct rpc_xprt *xprt)
+xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
{
struct socket *sock = xprt->sock;
struct msghdr msg;
mm_segment_t oldfs;
int result;
+ int slen = req->rq_slen - req->rq_bytes_sent;
struct iovec niv[MAX_IOVEC];
+ if (slen == 0)
+ return 0;
+
xprt_pktdump("packet data:",
- xprt->snd_buf.io_vec->iov_base,
- xprt->snd_buf.io_vec->iov_len);
+ req->rq_svec->iov_base,
+ req->rq_svec->iov_len);
msg.msg_flags = MSG_DONTWAIT;
- msg.msg_iov = xprt->snd_buf.io_vec;
- msg.msg_iovlen = xprt->snd_buf.io_nr;
+ msg.msg_iov = req->rq_svec;
+ msg.msg_iovlen = req->rq_snr;
msg.msg_name = (struct sockaddr *) &xprt->addr;
msg.msg_namelen = sizeof(xprt->addr);
msg.msg_control = NULL;
msg.msg_controllen = 0;
/* Dont repeat bytes */
-
- if(xprt->snd_sent)
- xprt_move_iov(&msg, niv, xprt->snd_sent);
-
+ if (req->rq_bytes_sent)
+ xprt_move_iov(&msg, niv, req->rq_bytes_sent);
+
oldfs = get_fs(); set_fs(get_ds());
- result = sock_sendmsg(sock, &msg, xprt->snd_buf.io_len);
+ result = sock_sendmsg(sock, &msg, slen);
set_fs(oldfs);
- dprintk("RPC: xprt_sendmsg(%d) = %d\n",
- xprt->snd_buf.io_len, result);
+ dprintk("RPC: xprt_sendmsg(%d) = %d\n", slen, result);
- if (result >= 0) {
- xprt->snd_buf.io_len -= result;
- xprt->snd_sent += result;
+ if (result >= 0)
return result;
- }
switch (result) {
case -ECONNREFUSED:
@@ -229,9 +235,14 @@
*/
break;
case -EAGAIN:
- return 0;
- case -ENOTCONN: case -EPIPE:
+ if (sock->flags & SO_NOSPACE)
+ result = -ENOMEM;
+ break;
+ case -ENOTCONN:
+ case -EPIPE:
/* connection broken */
+ if (xprt->stream)
+ result = -ENOTCONN;
break;
default:
printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result);
@@ -252,7 +263,6 @@
mm_segment_t oldfs;
int result;
-#if LINUX_VERSION_CODE >= 0x020100
msg.msg_flags = MSG_DONTWAIT;
msg.msg_iov = iov;
msg.msg_iovlen = nr;
@@ -264,20 +274,6 @@
oldfs = get_fs(); set_fs(get_ds());
result = sock_recvmsg(sock, &msg, len, MSG_DONTWAIT);
set_fs(oldfs);
-#else
- int alen = sizeof(sin);
- msg.msg_flags = 0;
- msg.msg_iov = iov;
- msg.msg_iovlen = nr;
- msg.msg_name = &sin;
- msg.msg_namelen = sizeof(sin);
- msg.msg_control = NULL;
- msg.msg_controllen = 0;
-
- oldfs = get_fs(); set_fs(get_ds());
- result = sock->ops->recvmsg(sock, &msg, len, 1, 0, &alen);
- set_fs(oldfs);
-#endif
dprintk("RPC: xprt_recvmsg(iov %p, len %d) = %d\n",
iov, len, result);
@@ -354,13 +350,15 @@
{
struct sock *sk = xprt->inet;
+ xprt_disconnect(xprt);
+
#ifdef SOCK_HAS_USER_DATA
sk->user_data = NULL;
#endif
sk->data_ready = xprt->old_data_ready;
- sk->no_check = 0;
sk->state_change = xprt->old_state_change;
sk->write_space = xprt->old_write_space;
+ sk->no_check = 0;
sock_release(xprt->sock);
/*
@@ -378,9 +376,16 @@
xprt_disconnect(struct rpc_xprt *xprt)
{
dprintk("RPC: disconnected transport %p\n", xprt);
+ xprt->connected = 0;
+ xprt->tcp_offset = 0;
+ xprt->tcp_more = 0;
+ xprt->tcp_total = 0;
+ xprt->tcp_reclen = 0;
+ xprt->tcp_copied = 0;
+ xprt->tcp_rqstp = NULL;
+ xprt->rx_pending_flag = 0;
rpc_wake_up_status(&xprt->pending, -ENOTCONN);
rpc_wake_up_status(&xprt->sending, -ENOTCONN);
- xprt->connected = 0;
}
/*
@@ -398,22 +403,33 @@
task->tk_pid, xprt, xprt->connected);
task->tk_status = 0;
+ if (xprt->shutdown)
+ return;
+
+ if (!xprt->stream)
+ return;
+
+ start_bh_atomic();
+ if (xprt->connected) {
+ end_bh_atomic();
+ return;
+ }
if (xprt->connecting) {
task->tk_timeout = xprt->timeout.to_maxval;
rpc_sleep_on(&xprt->reconn, task, NULL, NULL);
+ end_bh_atomic();
return;
}
xprt->connecting = 1;
+ end_bh_atomic();
/* Create an unconnected socket */
- if (!(sock = xprt_create_socket(xprt->prot, NULL, &xprt->timeout)))
+ if (!(sock = xprt_create_socket(xprt->prot, NULL, &xprt->timeout))) {
+ xprt->connecting = 0;
goto defer;
+ }
-#if LINUX_VERSION_CODE >= 0x020100
inet = sock->sk;
-#else
- inet = (struct sock *) sock->data;
-#endif
inet->data_ready = xprt->inet->data_ready;
inet->state_change = xprt->inet->state_change;
inet->write_space = xprt->inet->write_space;
@@ -422,18 +438,18 @@
#endif
dprintk("RPC: %4d closing old socket\n", task->tk_pid);
- xprt_disconnect(xprt);
xprt_close(xprt);
- /* Reset to new socket and default congestion */
+ /* Reset to new socket */
xprt->sock = sock;
xprt->inet = inet;
- xprt->cwnd = RPC_INITCWND;
/* Now connect it asynchronously. */
dprintk("RPC: %4d connecting new socket\n", task->tk_pid);
status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
sizeof(xprt->addr), O_NONBLOCK);
+
+ xprt->connecting = 0;
if (status < 0) {
if (status != -EINPROGRESS && status != -EALREADY) {
printk("RPC: TCP connect error %d!\n", -status);
@@ -447,37 +463,19 @@
start_bh_atomic();
if (!xprt->connected) {
rpc_sleep_on(&xprt->reconn, task,
- xprt_reconn_status, xprt_reconn_timeout);
+ NULL, xprt_reconn_timeout);
end_bh_atomic();
return;
}
end_bh_atomic();
}
- xprt->connecting = 0;
- rpc_wake_up(&xprt->reconn);
- return;
defer:
- task->tk_timeout = 30 * HZ;
- rpc_sleep_on(&xprt->reconn, task, NULL, NULL);
- xprt->connecting = 0;
-}
-
-/*
- * Reconnect status
- */
-static void
-xprt_reconn_status(struct rpc_task *task)
-{
- struct rpc_xprt *xprt = task->tk_xprt;
-
- dprintk("RPC: %4d xprt_reconn_status %d\n",
- task->tk_pid, task->tk_status);
- if (!xprt->connected && task->tk_status != -ETIMEDOUT) {
- task->tk_timeout = 30 * HZ;
- rpc_sleep_on(&xprt->reconn, task, NULL, xprt_reconn_timeout);
- }
+ start_bh_atomic();
+ if (!xprt->connected)
+ rpc_wake_up_next(&xprt->reconn);
+ end_bh_atomic();
}
/*
@@ -490,11 +488,19 @@
dprintk("RPC: %4d xprt_reconn_timeout %d\n",
task->tk_pid, task->tk_status);
task->tk_status = -ENOTCONN;
- task->tk_xprt->connecting = 0;
+ start_bh_atomic();
+ if (task->tk_xprt->connecting)
+ task->tk_xprt->connecting = 0;
+ if (!task->tk_xprt->connected)
+ task->tk_status = -ENOTCONN;
+ else
+ task->tk_status = -ETIMEDOUT;
+ end_bh_atomic();
task->tk_timeout = 0;
rpc_wake_up_task(task);
}
+extern spinlock_t rpc_queue_lock;
/*
* Look up the RPC request corresponding to a reply.
*/
@@ -503,22 +509,28 @@
{
struct rpc_task *head, *task;
struct rpc_rqst *req;
+ unsigned long oldflags;
int safe = 0;
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
if ((head = xprt->pending.task) != NULL) {
task = head;
do {
if ((req = task->tk_rqstp) && req->rq_xid == xid)
- return req;
+ goto out;
task = task->tk_next;
if (++safe > 100) {
printk("xprt_lookup_rqst: loop in Q!\n");
- return NULL;
+ goto out_bad;
}
} while (task != head);
}
dprintk("RPC: unknown XID %08x in reply.\n", xid);
- return NULL;
+ out_bad:
+ req = NULL;
+ out:
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
+ return req;
}
/*
@@ -559,11 +571,13 @@
dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);
task->tk_status = copied;
- rpc_wake_up_task(task);
+ if (!RPC_IS_RUNNING(task))
+ rpc_wake_up_task(task);
return;
}
-/* We have set things up such that we perform the checksum of the UDP
+/*
+ * We have set things up such that we perform the checksum of the UDP
* packet in parallel with the copies into the RPC client iovec. -DaveM
*/
static int csum_partial_copy_to_page_cache(struct iovec *iov,
@@ -609,7 +623,8 @@
return 0;
}
-/* Input handler for RPC replies. Called from a bottom half and hence
+/*
+ * Input handler for RPC replies. Called from a bottom half and hence
* atomic.
*/
static inline void
@@ -621,12 +636,15 @@
int err, repsize, copied;
dprintk("RPC: udp_data_ready...\n");
- if (!(xprt = xprt_from_sock(sk)))
+ if (!(xprt = xprt_from_sock(sk))) {
+ printk("RPC: udp_data_ready request not found!\n");
return;
+ }
+
dprintk("RPC: udp_data_ready client %p\n", xprt);
if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
- return;
+ goto out_err;
repsize = skb->len - sizeof(struct udphdr);
if (repsize < 4) {
@@ -646,6 +664,7 @@
if ((copied = rovr->rq_rlen) > repsize)
copied = repsize;
+ rovr->rq_damaged = 1;
/* Suck it into the iovec, verify checksum if not done by hw. */
if (csum_partial_copy_to_page_cache(rovr->rq_rvec, skb, copied))
goto dropit;
@@ -658,6 +677,8 @@
dropit:
skb_free_datagram(sk, skb);
return;
+out_err:
+ return;
}
/*
@@ -679,6 +700,7 @@
int result, maxcpy, reclen, avail, want;
dprintk("RPC: tcp_input_record\n");
+
offset = xprt->tcp_offset;
result = -EAGAIN;
if (offset < 4 || (!xprt->tcp_more && offset < 8)) {
@@ -687,11 +709,6 @@
riov.iov_base = xprt->tcp_recm.data + offset;
riov.iov_len = want;
result = xprt_recvmsg(xprt, &riov, 1, want);
- if (!result)
- {
- dprintk("RPC: empty TCP record.\n");
- return -ENOTCONN;
- }
if (result < 0)
goto done;
offset += result;
@@ -733,9 +750,9 @@
dprintk("RPC: %4d TCP receiving %d bytes\n",
req->rq_task->tk_pid, want);
+ /* Request must be re-encoded before retransmit */
+ req->rq_damaged = 1;
result = xprt_recvmsg(xprt, xprt->tcp_iovec, req->rq_rnr, want);
- if (!result && want)
- result = -EAGAIN;
if (result < 0)
goto done;
xprt->tcp_copied += result;
@@ -754,12 +771,10 @@
xprt->tcp_copied = 0;
xprt->tcp_rqstp = NULL;
}
- /* Request must be re-encoded before retransmit */
- req->rq_damaged = 1;
}
/* Skip over any trailing bytes on short reads */
- while (avail) {
+ while (avail > 0) {
static u8 dummy[64];
want = MIN(avail, sizeof(dummy));
@@ -767,8 +782,6 @@
riov.iov_len = want;
dprintk("RPC: TCP skipping %d bytes\n", want);
result = xprt_recvmsg(xprt, &riov, 1, want);
- if (!result && want)
- result=-EAGAIN;
if (result < 0)
goto done;
offset += result;
@@ -789,55 +802,40 @@
return result;
}
-static __inline__ void tcp_output_record(struct rpc_xprt *xprt)
-{
- if(xprt->snd_sent && xprt->snd_task)
- dprintk("RPC: write space\n");
- if(xprt->write_space == 0)
- {
- xprt->write_space = 1;
- if (xprt->snd_task && !RPC_IS_RUNNING(xprt->snd_task))
- {
- if(xprt->snd_sent)
- dprintk("RPC: Write wakeup snd_sent =%d\n",
- xprt->snd_sent);
- rpc_wake_up_task(xprt->snd_task);
- }
- }
-}
-
/*
* TCP task queue stuff
*/
-static struct rpc_xprt *rpc_rx_xprt_pending = NULL; /* Chain by rx_pending of rpc_xprt's */
-static struct rpc_xprt *rpc_tx_xprt_pending = NULL; /* Chain by tx_pending of rpc_xprt's */
+static struct rpc_xprt *rpc_xprt_pending = NULL; /* Chain by rx_pending of rpc_xprt's */
/*
* This is protected from tcp_data_ready and the stack as its run
* inside of the RPC I/O daemon
*/
-
-void rpciod_tcp_dispatcher(void)
+static void
+do_rpciod_tcp_dispatcher(void)
{
struct rpc_xprt *xprt;
int result;
dprintk("rpciod_tcp_dispatcher: Queue Running\n");
-
+
/*
* Empty each pending socket
*/
-
- while((xprt=rpc_rx_xprt_pending)!=NULL)
- {
+
+ while(1) {
int safe_retry=0;
-
- rpc_rx_xprt_pending=xprt->rx_pending;
- xprt->rx_pending_flag=0;
-
+
+ if ((xprt = rpc_xprt_pending) == NULL) {
+ break;
+ }
+ xprt->rx_pending_flag = 0;
+ rpc_xprt_pending=xprt->rx_pending;
+ xprt->rx_pending = NULL;
+
dprintk("rpciod_tcp_dispatcher: Processing %p\n", xprt);
-
+
do
{
if (safe_retry++ > 50)
@@ -845,28 +843,30 @@
result = tcp_input_record(xprt);
}
while (result >= 0);
-
+
switch (result) {
case -EAGAIN:
- continue;
case -ENOTCONN:
case -EPIPE:
- xprt_disconnect(xprt);
continue;
default:
printk(KERN_WARNING "RPC: unexpected error %d from tcp_input_record\n",
result);
}
}
+}
- while((xprt=rpc_tx_xprt_pending)!=NULL)
- {
- rpc_tx_xprt_pending = xprt->tx_pending;
- xprt->tx_pending_flag = 0;
- tcp_output_record(xprt);
- }
+void rpciod_tcp_dispatcher(void)
+{
+ start_bh_atomic();
+ do_rpciod_tcp_dispatcher();
+ end_bh_atomic();
}
+int xprt_tcp_pending(void)
+{
+ return rpc_xprt_pending != NULL;
+}
extern inline void tcp_rpciod_queue(void)
{
@@ -890,6 +890,7 @@
printk("Not a socket with xprt %p\n", sk);
return;
}
+
dprintk("RPC: tcp_data_ready client %p\n", xprt);
dprintk("RPC: state %x conn %d dead %d zapped %d\n",
sk->state, xprt->connected,
@@ -898,24 +899,16 @@
* If we are not waiting for the RPC bh run then
* we are now
*/
- if (!xprt->rx_pending_flag)
- {
- int start_queue=0;
+ if (!xprt->rx_pending_flag) {
+ dprintk("RPC: xprt queue %p\n", rpc_xprt_pending);
- dprintk("RPC: xprt queue %p\n", rpc_rx_xprt_pending);
- if(rpc_rx_xprt_pending==NULL)
- start_queue=1;
+ xprt->rx_pending=rpc_xprt_pending;
+ rpc_xprt_pending=xprt;
xprt->rx_pending_flag=1;
- xprt->rx_pending=rpc_rx_xprt_pending;
- rpc_rx_xprt_pending=xprt;
- if (start_queue)
- {
- tcp_rpciod_queue();
- start_queue=0;
- }
- }
- else
+ } else
dprintk("RPC: xprt queued already %p\n", xprt);
+ tcp_rpciod_queue();
+
}
@@ -931,17 +924,32 @@
sk->state, xprt->connected,
sk->dead, sk->zapped);
- if (sk->state == TCP_ESTABLISHED && !xprt->connected) {
+ switch(sk->state) {
+ case TCP_ESTABLISHED:
+ if (xprt->connected)
+ break;
xprt->connected = 1;
xprt->connecting = 0;
rpc_wake_up(&xprt->reconn);
- } else if (sk->zapped) {
- rpc_wake_up_status(&xprt->pending, -ENOTCONN);
- rpc_wake_up_status(&xprt->sending, -ENOTCONN);
+ rpc_wake_up_next(&xprt->sending);
+ tcp_rpciod_queue();
+ break;
+ case TCP_CLOSE:
+ if (xprt->connecting)
+ break;
+ xprt_disconnect(xprt);
rpc_wake_up_status(&xprt->reconn, -ENOTCONN);
+ break;
+ default:
+ break;
}
+
}
+/*
+ * The following 2 routines allow a task to sleep while socket memory is
+ * low.
+ */
static void
tcp_write_space(struct sock *sk)
{
@@ -949,17 +957,43 @@
if (!(xprt = xprt_from_sock(sk)))
return;
- if (!xprt->tx_pending_flag) {
- int start_queue = 0;
- if (rpc_tx_xprt_pending == NULL)
- start_queue = 1;
- xprt->tx_pending_flag = 1;
- xprt->tx_pending = rpc_tx_xprt_pending;
- rpc_tx_xprt_pending = xprt;
- if (start_queue)
- tcp_rpciod_queue();
- }
+ /* Wait until we have enough socket memory */
+ if (sock_wspace(sk) < min(sk->sndbuf,XPRT_MIN_WRITE_SPACE))
+ return;
+
+ if (xprt->write_space)
+ return;
+
+ xprt->write_space = 1;
+
+ if (!xprt->snd_task)
+ rpc_wake_up_next(&xprt->sending);
+ else if (!RPC_IS_RUNNING(xprt->snd_task))
+ rpc_wake_up_task(xprt->snd_task);
+}
+
+static void
+udp_write_space(struct sock *sk)
+{
+ struct rpc_xprt *xprt;
+
+ if (!(xprt = xprt_from_sock(sk)))
+ return;
+
+
+ /* Wait until we have enough socket memory */
+ if (sock_wspace(sk) < min(sk->sndbuf,XPRT_MIN_WRITE_SPACE))
+ return;
+
+ if (xprt->write_space)
+ return;
+
+ xprt->write_space = 1;
+ if (!xprt->snd_task)
+ rpc_wake_up_next(&xprt->sending);
+ else if (!RPC_IS_RUNNING(xprt->snd_task))
+ rpc_wake_up_task(xprt->snd_task);
}
/*
@@ -982,32 +1016,50 @@
rpc_wake_up_task(task);
}
+
/*
- * (Partly) transmit the RPC packet
- * Note that task->tk_status is either 0 or negative on return.
- * Only when the reply is received will the status be set to a
- * positive value.
+ * Serialize access to sockets, in order to prevent different
+ * requests from interfering with each other.
*/
-static inline int
-xprt_transmit_some(struct rpc_xprt *xprt, struct rpc_task *task)
+static int
+xprt_down_transmit(struct rpc_task *task)
{
+ struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
struct rpc_rqst *req = task->tk_rqstp;
- int result;
- task->tk_status = 0;
- if ((result = xprt_sendmsg(xprt)) >= 0) {
- if (!xprt->snd_buf.io_len || !xprt->stream) {
- rpc_wake_up_next(&xprt->sending);
- return req->rq_slen;
- }
- result = -EAGAIN;
- } else if (xprt->stream) {
- if (result == -ENOTCONN || result == -EPIPE) {
- xprt_disconnect(xprt);
- result = -ENOTCONN;
- }
+ start_bh_atomic();
+ spin_lock(&xprt_lock);
+ if (xprt->snd_task && xprt->snd_task != task) {
+ dprintk("RPC: %4d TCP write queue full (task %d)\n",
+ task->tk_pid, xprt->snd_task->tk_pid);
+ task->tk_timeout = req->rq_timeout.to_current;
+ rpc_sleep_on(&xprt->sending, task, xprt_transmit, NULL);
+ } else if (!xprt->snd_task) {
+ xprt->snd_task = task;
+#ifdef RPC_PROFILE
+ req->rq_xtime = jiffies;
+#endif
+ req->rq_bytes_sent = 0;
+ }
+ spin_unlock(&xprt_lock);
+ end_bh_atomic();
+ return xprt->snd_task == task;
+}
+
+/*
+ * Releases the socket for use by other requests.
+ */
+static void
+xprt_up_transmit(struct rpc_task *task)
+{
+ struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
+
+ if (xprt->snd_task && xprt->snd_task == task) {
+ start_bh_atomic();
+ xprt->snd_task = NULL;
+ rpc_wake_up_next(&xprt->sending);
+ end_bh_atomic();
}
- return task->tk_status = result;
}
/*
@@ -1020,71 +1072,65 @@
struct rpc_timeout *timeo;
struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt;
- int status;
dprintk("RPC: %4d xprt_transmit(%x)\n", task->tk_pid,
*(u32 *)(req->rq_svec[0].iov_base));
- if (xprt->shutdown) {
+ if (xprt->shutdown)
task->tk_status = -EIO;
+
+ if (task->tk_status < 0)
return;
+
+ /* Reset timeout parameters */
+ timeo = &req->rq_timeout;
+ if (timeo->to_retries < 0) {
+ dprintk("RPC: %4d xprt_transmit reset timeo\n",
+ task->tk_pid);
+ timeo->to_retries = xprt->timeout.to_retries;
+ timeo->to_current = timeo->to_initval;
}
- /* If we're not already in the process of transmitting our call,
- * set up everything as needed. */
- if (xprt->snd_task != task) {
- /* Write the record marker */
- if (xprt->stream) {
- u32 marker;
+ /* set up everything as needed. */
+ /* Write the record marker */
+ if (xprt->stream) {
+ u32 marker;
- if (!xprt->connected) {
- task->tk_status = -ENOTCONN;
- return;
- }
- marker = htonl(0x80000000|(req->rq_slen-4));
- *((u32 *) req->rq_svec[0].iov_base) = marker;
- }
+ marker = htonl(0x80000000|(req->rq_slen-4));
+ *((u32 *) req->rq_svec[0].iov_base) = marker;
- /* Reset timeout parameters */
- timeo = &req->rq_timeout;
- if (timeo->to_retries < 0) {
- dprintk("RPC: %4d xprt_transmit reset timeo\n",
- task->tk_pid);
- timeo->to_retries = xprt->timeout.to_retries;
- timeo->to_current = timeo->to_initval;
- }
+ }
-#ifdef RPC_PROFILE
- req->rq_xtime = jiffies;
-#endif
- req->rq_gotit = 0;
+ if (!xprt_down_transmit(task))
+ return;
- if (xprt->snd_task) {
- dprintk("RPC: %4d TCP write queue full (task %d)\n",
- task->tk_pid, xprt->snd_task->tk_pid);
- rpc_sleep_on(&xprt->sending, task,
- xprt_transmit_status, NULL);
- return;
- }
- xprt->snd_buf = req->rq_snd_buf;
- xprt->snd_task = task;
- xprt->snd_sent = 0;
+ do_xprt_transmit(task);
+}
+
+static void
+do_xprt_transmit(struct rpc_task *task)
+{
+ struct rpc_rqst *req = task->tk_rqstp;
+ struct rpc_xprt *xprt = req->rq_xprt;
+ int status, retry = 0;
+
+ if (xprt->shutdown) {
+ task->tk_status = -EIO;
+ goto out_release;
}
/* For fast networks/servers we have to put the request on
* the pending list now:
*/
- start_bh_atomic();
+ req->rq_gotit = 0;
status = rpc_add_wait_queue(&xprt->pending, task);
if (!status)
task->tk_callback = NULL;
- end_bh_atomic();
- if (status)
- {
+ if (status) {
printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
task->tk_status = status;
- return;
+ goto out_release;
}
/* Continue transmitting the packet/record. We must be careful
@@ -1093,27 +1139,55 @@
*/
while (1) {
xprt->write_space = 0;
- if (xprt_transmit_some(xprt, task) != -EAGAIN) {
+ status = xprt_sendmsg(xprt, req);
+
+ if (status < 0)
+ break;
+
+ if (xprt->stream) {
+ req->rq_bytes_sent += status;
+
+ if (req->rq_bytes_sent >= req->rq_slen)
+ goto out_release;
+ }
+
+ if (status < req->rq_slen)
+ status = -EAGAIN;
+
+ if (status >= 0 || !xprt->stream) {
dprintk("RPC: %4d xmit complete\n", task->tk_pid);
- xprt->snd_task = NULL;
- return;
+ goto out_release;
}
- /*d*/dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
- task->tk_pid, xprt->snd_buf.io_len,
+ dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
+ task->tk_pid, req->rq_slen - req->rq_bytes_sent,
req->rq_slen);
- task->tk_status = 0;
- start_bh_atomic();
- if (!xprt->write_space) {
- /* Remove from pending */
- rpc_remove_wait_queue(task);
- rpc_sleep_on(&xprt->sending, task,
- xprt_transmit_status, NULL);
- end_bh_atomic();
- return;
- }
+
+ if (retry++ > 50)
+ break;
+ }
+
+ task->tk_status = (status == -ENOMEM) ? -EAGAIN : status;
+
+ /* We don't care if we got a reply, so don't protect
+ * against bh. */
+ if (task->tk_rpcwait == &xprt->pending)
+ rpc_remove_wait_queue(task);
+
+ /* Protect against (udp|tcp)_write_space */
+ start_bh_atomic();
+ if (status == -ENOMEM || status == -EAGAIN) {
+ task->tk_timeout = req->rq_timeout.to_current;
+ if (!xprt->write_space)
+ rpc_sleep_on(&xprt->sending, task, xprt_transmit_status,
+ xprt_transmit_timeout);
end_bh_atomic();
+ return;
}
+ end_bh_atomic();
+
+out_release:
+ xprt_up_transmit(task);
}
/*
@@ -1126,19 +1200,27 @@
struct rpc_xprt *xprt = task->tk_client->cl_xprt;
dprintk("RPC: %4d transmit_status %d\n", task->tk_pid, task->tk_status);
- if (xprt->snd_task == task)
- {
- if (task->tk_status < 0)
- {
- xprt->snd_task = NULL;
- xprt_disconnect(xprt);
- }
- else
- xprt_transmit(task);
+ if (xprt->snd_task == task) {
+ task->tk_status = 0;
+ do_xprt_transmit(task);
+ return;
}
}
/*
+ * RPC transmit timeout handler.
+ */
+static void
+xprt_transmit_timeout(struct rpc_task *task)
+{
+ dprintk("RPC: %4d transmit_timeout %d\n", task->tk_pid, task->tk_status);
+ task->tk_status = -ETIMEDOUT;
+ task->tk_timeout = 0;
+ rpc_wake_up_task(task);
+ xprt_up_transmit(task);
+}
+
+/*
* Wait for the reply to our call.
* When the callback is invoked, the congestion window should have
* been updated already.
@@ -1150,25 +1232,33 @@
struct rpc_xprt *xprt = req->rq_xprt;
dprintk("RPC: %4d xprt_receive\n", task->tk_pid);
- if (xprt->connected == 0) {
- task->tk_status = -ENOTCONN;
- return;
- }
/*
- * Wait until rq_gotit goes non-null, or timeout elapsed.
+ * Wait until rq_gotit goes non-null, or timeout elapsed.
*/
task->tk_timeout = req->rq_timeout.to_current;
start_bh_atomic();
+ if (task->tk_rpcwait)
+ rpc_remove_wait_queue(task);
+
+ if (task->tk_status < 0 || xprt->shutdown) {
+ end_bh_atomic();
+ goto out;
+ }
+
if (!req->rq_gotit) {
rpc_sleep_on(&xprt->pending, task,
xprt_receive_status, xprt_timer);
+ end_bh_atomic();
+ return;
}
end_bh_atomic();
dprintk("RPC: %4d xprt_receive returns %d\n",
task->tk_pid, task->tk_status);
+ out:
+ xprt_receive_status(task);
}
static void
@@ -1176,8 +1266,9 @@
{
struct rpc_xprt *xprt = task->tk_xprt;
- if (xprt->stream && xprt->tcp_rqstp == task->tk_rqstp)
+ if (xprt->tcp_rqstp == task->tk_rqstp)
xprt->tcp_rqstp = NULL;
+
}
/*
@@ -1194,7 +1285,7 @@
dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n",
task->tk_pid, xprt->cong, xprt->cwnd);
- if ((!RPCXPRT_CONGESTED(xprt) && xprt->free)) {
+ if (!RPCXPRT_CONGESTED(xprt) && xprt->free) {
xprt_reserve_status(task);
task->tk_timeout = 0;
} else if (!task->tk_timeout) {
@@ -1223,40 +1314,30 @@
/* NOP */
} else if (task->tk_rqstp) {
/* We've already been given a request slot: NOP */
- } else if (!RPCXPRT_CONGESTED(xprt)) {
+ } else if (!RPCXPRT_CONGESTED(xprt) && xprt->free) {
/* OK: There's room for us. Grab a free slot and bump
* congestion value */
- req = xprt->free;
- if (!req)
- goto bad_list;
- if (req->rq_xid)
- goto bad_used;
+ spin_lock(&xprt_lock);
+ if (!(req = xprt->free)) {
+ spin_unlock(&xprt_lock);
+ goto out_nofree;
+ }
xprt->free = req->rq_next;
+ req->rq_next = NULL;
+ spin_unlock(&xprt_lock);
xprt->cong += RPC_CWNDSCALE;
task->tk_rqstp = req;
- req->rq_next = NULL;
xprt_request_init(task, xprt);
- } else {
- task->tk_status = -EAGAIN;
- }
- if (xprt->free && !RPCXPRT_CONGESTED(xprt))
- rpc_wake_up_next(&xprt->backlog);
+ if (xprt->free)
+ xprt_clear_backlog(xprt);
+ } else
+ goto out_nofree;
return;
-bad_list:
- printk(KERN_ERR
- "RPC: %4d inconsistent free list (cong %ld cwnd %ld)\n",
- task->tk_pid, xprt->cong, xprt->cwnd);
- rpc_debug = ~0;
- goto bummer;
-bad_used:
- printk(KERN_ERR "RPC: used rqst slot %p on free list!\n", req);
-bummer:
- task->tk_status = -EIO;
- xprt->free = NULL;
- return;
+out_nofree:
+ task->tk_status = -EAGAIN;
}
/*
@@ -1298,13 +1379,15 @@
dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
+ spin_lock(&xprt_lock);
+ req->rq_next = xprt->free;
+ xprt->free = req;
+ spin_unlock(&xprt_lock);
+
/* remove slot from queue of pending */
start_bh_atomic();
if (task->tk_rpcwait) {
printk("RPC: task of released request still queued!\n");
-#ifdef RPC_DEBUG
- printk("RPC: (task is on %s)\n", rpc_qname(task->tk_rpcwait));
-#endif
rpc_del_timer(task);
rpc_remove_wait_queue(task);
}
@@ -1313,31 +1396,7 @@
/* Decrease congestion value. */
xprt->cong -= RPC_CWNDSCALE;
-#if 0
- /* If congestion threshold is not yet reached, pass on the request slot.
- * This looks kind of kludgy, but it guarantees backlogged requests
- * are served in order.
- * N.B. This doesn't look completely safe, as the task is still
- * on the backlog list after wake-up.
- */
- if (!RPCXPRT_CONGESTED(xprt)) {
- struct rpc_task *next = rpc_wake_up_next(&xprt->backlog);
-
- if (next && next->tk_rqstp == 0) {
- xprt->cong += RPC_CWNDSCALE;
- next->tk_rqstp = req;
- xprt_request_init(next, xprt);
- return;
- }
- }
-#endif
-
- req->rq_next = xprt->free;
- xprt->free = req;
-
- /* If not congested, wake up the next backlogged process */
- if (!RPCXPRT_CONGESTED(xprt))
- rpc_wake_up_next(&xprt->backlog);
+ xprt_clear_backlog(xprt);
}
/*
@@ -1382,11 +1441,7 @@
dprintk("RPC: setting up %s transport...\n",
proto == IPPROTO_UDP? "UDP" : "TCP");
-#if LINUX_VERSION_CODE >= 0x020100
inet = sock->sk;
-#else
- inet = (struct sock *) sock->data;
-#endif
if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
return NULL;
@@ -1398,7 +1453,8 @@
xprt->addr = *ap;
xprt->prot = proto;
xprt->stream = (proto == IPPROTO_TCP)? 1 : 0;
- xprt->cwnd = RPC_INITCWND;
+ xprt->congtime = jiffies;
+ init_waitqueue_head(&xprt->cong_wait);
#ifdef SOCK_HAS_USER_DATA
inet->user_data = xprt;
#else
@@ -1410,11 +1466,14 @@
xprt->old_write_space = inet->write_space;
if (proto == IPPROTO_UDP) {
inet->data_ready = udp_data_ready;
+ inet->write_space = udp_write_space;
inet->no_check = UDP_CSUM_NORCV;
+ xprt->cwnd = RPC_INITCWND;
} else {
inet->data_ready = tcp_data_ready;
inet->state_change = tcp_state_change;
inet->write_space = tcp_write_space;
+ xprt->cwnd = RPC_MAXCWND;
xprt->nocong = 1;
}
xprt->connected = 1;
@@ -1487,6 +1546,7 @@
(proto == IPPROTO_UDP)? "udp" : "tcp", proto);
type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
+
if ((err = sock_create(PF_INET, type, proto, &sock)) < 0) {
printk("RPC: can't create socket (%d).\n", -err);
goto failed;
@@ -1543,6 +1603,21 @@
rpc_wake_up(&xprt->pending);
rpc_wake_up(&xprt->backlog);
rpc_wake_up(&xprt->reconn);
+ wake_up(&xprt->cong_wait);
+}
+
+/*
+ * Clear the xprt backlog queue
+ */
+int
+xprt_clear_backlog(struct rpc_xprt *xprt) {
+ if (!xprt)
+ return 0;
+ if (RPCXPRT_CONGESTED(xprt))
+ return 0;
+ rpc_wake_up_next(&xprt->backlog);
+ wake_up(&xprt->cong_wait);
+ return 1;
}
/*
FUNET's LINUX-ADM group, linux-adm@nic.funet.fi
TCL-scripts by Sam Shen (who was at: slshen@lbl.gov)