patch-2.2.18 linux/net/sunrpc/xprt.c
Next file: linux/net/unix/af_unix.c
Previous file: linux/net/sunrpc/xdr.c
Back to the patch index
Back to the overall index
- Lines: 2112
- Date:
Fri Sep 15 22:10:44 2000
- Orig file:
v2.2.17/net/sunrpc/xprt.c
- Orig date:
Fri Apr 21 12:47:17 2000
diff -u --new-file --recursive --exclude-from /usr/src/exclude v2.2.17/net/sunrpc/xprt.c linux/net/sunrpc/xprt.c
@@ -31,7 +31,7 @@
* primitives that `transparently' work for processes as well as async
* tasks that rely on callbacks.
*
- * Copyright (C) 1995, 1996, Olaf Kirch <okir@monad.swb.de>
+ * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
*
* TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
* TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
@@ -56,17 +56,20 @@
#include <linux/file.h>
#include <net/sock.h>
+#include <net/checksum.h>
#include <asm/uaccess.h>
-#define SOCK_HAS_USER_DATA
+/* Following value should be > 32k + RPC overhead */
+#define XPRT_MIN_WRITE_SPACE (35000 + MIN_WRITE_SPACE)
+#define RPCXPRT_TIMEOUT 300*HZ
/*
* Local variables
*/
-#ifndef SOCK_HAS_USER_DATA
-static struct rpc_xprt * sock_list = NULL;
-#endif
+
+/* Spinlock for critical sections in the code. */
+spinlock_t xprt_sock_lock = SPIN_LOCK_UNLOCKED;
#ifdef RPC_DEBUG
# undef RPC_DEBUG_DATA
@@ -82,13 +85,15 @@
* Local functions
*/
static void xprt_request_init(struct rpc_task *, struct rpc_xprt *);
-static void xprt_transmit_status(struct rpc_task *task);
-static void xprt_receive_status(struct rpc_task *task);
-static void xprt_reserve_status(struct rpc_task *task);
-static void xprt_reconn_timeout(struct rpc_task *task);
+static void do_xprt_transmit(struct rpc_task *);
+static void __xprt_reserve_status(struct rpc_task *task);
+static void __xprt_disconnect(struct rpc_xprt *);
static void xprt_reconn_status(struct rpc_task *task);
-static struct socket *xprt_create_socket(int, struct sockaddr_in *,
- struct rpc_timeout *);
+static struct socket *xprt_create_socket(int, struct rpc_timeout *);
+static int xprt_bind_socket(struct rpc_xprt *, struct socket *);
+static void __xprt_append_pending(struct rpc_xprt *);
+static void __xprt_remove_pending(struct rpc_xprt *);
+static int __xprt_clear_backlog(struct rpc_xprt *xprt);
#ifdef RPC_DEBUG_DATA
/*
@@ -127,98 +132,88 @@
static inline struct rpc_xprt *
xprt_from_sock(struct sock *sk)
{
-#ifndef SOCK_HAS_USER_DATA
- struct rpc_xprt *xprt;
-
- for (xprt = sock_list; xprt && sk != xprt->inet; xprt = xprt->link)
- ;
- return xprt;
-#else
return (struct rpc_xprt *) sk->user_data;
-#endif
}
/*
* Adjust the iovec to move on 'n' bytes
*/
-extern inline void xprt_move_iov(struct msghdr *msg, struct iovec *niv, int amount)
+extern inline void
+xprt_move_iov(struct msghdr *msg, struct iovec *niv, unsigned amount)
{
struct iovec *iv=msg->msg_iov;
+ int i;
/*
* Eat any sent iovecs
*/
-
- while(iv->iov_len < amount)
- {
- amount-=iv->iov_len;
+ while (iv->iov_len <= amount) {
+ amount -= iv->iov_len;
iv++;
msg->msg_iovlen--;
}
-
- msg->msg_iov=niv;
-
+
/*
* And chew down the partial one
*/
-
niv[0].iov_len = iv->iov_len-amount;
niv[0].iov_base =((unsigned char *)iv->iov_base)+amount;
iv++;
-
+
/*
* And copy any others
*/
-
- for(amount=1;amount<msg->msg_iovlen; amount++)
- {
- niv[amount]=*iv++;
- }
+ for(i = 1; i < msg->msg_iovlen; i++)
+ niv[i]=*iv++;
+
+ msg->msg_iov=niv;
}
/*
* Write data to socket.
*/
-static inline int
-xprt_sendmsg(struct rpc_xprt *xprt)
+static int
+xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
{
struct socket *sock = xprt->sock;
struct msghdr msg;
mm_segment_t oldfs;
int result;
+ int slen = req->rq_slen - req->rq_bytes_sent;
struct iovec niv[MAX_IOVEC];
+ if (slen <= 0)
+ return 0;
+
+ if (!sock)
+ return -ENOTCONN;
+
xprt_pktdump("packet data:",
- xprt->snd_buf.io_vec->iov_base,
- xprt->snd_buf.io_vec->iov_len);
+ req->rq_svec->iov_base,
+ req->rq_svec->iov_len);
- msg.msg_flags = MSG_DONTWAIT;
- msg.msg_iov = xprt->snd_buf.io_vec;
- msg.msg_iovlen = xprt->snd_buf.io_nr;
+ msg.msg_flags = MSG_DONTWAIT|MSG_NOSIGNAL;
+ msg.msg_iov = req->rq_svec;
+ msg.msg_iovlen = req->rq_snr;
msg.msg_name = (struct sockaddr *) &xprt->addr;
msg.msg_namelen = sizeof(xprt->addr);
msg.msg_control = NULL;
msg.msg_controllen = 0;
/* Dont repeat bytes */
-
- if(xprt->snd_sent)
- xprt_move_iov(&msg, niv, xprt->snd_sent);
-
+ if (req->rq_bytes_sent)
+ xprt_move_iov(&msg, niv, req->rq_bytes_sent);
+
oldfs = get_fs(); set_fs(get_ds());
- result = sock_sendmsg(sock, &msg, xprt->snd_buf.io_len);
+ result = sock_sendmsg(sock, &msg, slen);
set_fs(oldfs);
- dprintk("RPC: xprt_sendmsg(%d) = %d\n",
- xprt->snd_buf.io_len, result);
+ dprintk("RPC: xprt_sendmsg(%d) = %d\n", slen, result);
- if (result >= 0) {
- xprt->snd_buf.io_len -= result;
- xprt->snd_sent += result;
+ if (result >= 0)
return result;
- }
switch (result) {
case -ECONNREFUSED:
@@ -227,13 +222,17 @@
*/
break;
case -EAGAIN:
- return 0;
- case -ENOTCONN: case -EPIPE:
+ if (sock->flags & SO_NOSPACE)
+ result = -ENOMEM;
+ break;
+ case -ENOTCONN:
+ case -EPIPE:
/* connection broken */
+ if (xprt->stream)
+ result = -ENOTCONN;
break;
default:
printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result);
- result = 0;
}
return result;
}
@@ -241,41 +240,33 @@
/*
* Read data from socket
*/
-static inline int
-xprt_recvmsg(struct rpc_xprt *xprt, struct iovec *iov, int nr, int len)
+static int
+xprt_recvmsg(struct rpc_xprt *xprt, struct iovec *iov, int nr, unsigned len, unsigned shift)
{
struct socket *sock = xprt->sock;
- struct sockaddr_in sin;
struct msghdr msg;
mm_segment_t oldfs;
+ struct iovec niv[MAX_IOVEC];
int result;
-#if LINUX_VERSION_CODE >= 0x020100
- msg.msg_flags = MSG_DONTWAIT;
- msg.msg_iov = iov;
- msg.msg_iovlen = nr;
- msg.msg_name = &sin;
- msg.msg_namelen = sizeof(sin);
- msg.msg_control = NULL;
- msg.msg_controllen = 0;
+ if (!sock)
+ return -ENOTCONN;
- oldfs = get_fs(); set_fs(get_ds());
- result = sock_recvmsg(sock, &msg, len, MSG_DONTWAIT);
- set_fs(oldfs);
-#else
- int alen = sizeof(sin);
- msg.msg_flags = 0;
+ msg.msg_flags = MSG_DONTWAIT|MSG_NOSIGNAL;
msg.msg_iov = iov;
msg.msg_iovlen = nr;
- msg.msg_name = &sin;
- msg.msg_namelen = sizeof(sin);
+ msg.msg_name = NULL;
+ msg.msg_namelen = 0;
msg.msg_control = NULL;
msg.msg_controllen = 0;
+ /* Adjust the iovec if we've already filled it */
+ if (shift)
+ xprt_move_iov(&msg, niv, shift);
+
oldfs = get_fs(); set_fs(get_ds());
- result = sock->ops->recvmsg(sock, &msg, len, 1, 0, &alen);
+ result = sock_recvmsg(sock, &msg, len, MSG_DONTWAIT);
set_fs(oldfs);
-#endif
dprintk("RPC: xprt_recvmsg(iov %p, len %d) = %d\n",
iov, len, result);
@@ -288,7 +279,7 @@
* We use a time-smoothed congestion estimator to avoid heavy oscillation.
*/
static void
-xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
+__xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
{
unsigned long cwnd = xprt->cwnd;
@@ -304,14 +295,14 @@
cwnd = RPC_MAXCWND;
else
pprintk("RPC: %lu %ld cwnd\n", jiffies, cwnd);
- xprt->congtime = jiffies + ((cwnd * HZ) << 2) / RPC_CWNDSCALE;
+ xprt->congtime = jiffies + ((cwnd * HZ/10) << 2) / RPC_CWNDSCALE;
dprintk("RPC: cong %08lx, cwnd was %08lx, now %08lx, "
"time %ld ms\n", xprt->cong, xprt->cwnd, cwnd,
(xprt->congtime-jiffies)*1000/HZ);
} else if (result == -ETIMEDOUT) {
if ((cwnd >>= 1) < RPC_CWNDSCALE)
cwnd = RPC_CWNDSCALE;
- xprt->congtime = jiffies + ((cwnd * HZ) << 3) / RPC_CWNDSCALE;
+ xprt->congtime = jiffies + ((cwnd * HZ/10) << 3) / RPC_CWNDSCALE;
dprintk("RPC: cong %ld, cwnd was %ld, now %ld, "
"time %ld ms\n", xprt->cong, xprt->cwnd, cwnd,
(xprt->congtime-jiffies)*1000/HZ);
@@ -327,21 +318,82 @@
int
xprt_adjust_timeout(struct rpc_timeout *to)
{
- if (to->to_exponential)
- to->to_current <<= 1;
- else
- to->to_current += to->to_increment;
- if (to->to_maxval && to->to_current >= to->to_maxval) {
- to->to_current = to->to_maxval;
- to->to_retries = 0;
+ if (to->to_retries > 0) {
+ if (to->to_exponential)
+ to->to_current <<= 1;
+ else
+ to->to_current += to->to_increment;
+ if (to->to_maxval && to->to_current >= to->to_maxval)
+ to->to_current = to->to_maxval;
+ } else {
+ if (to->to_exponential)
+ to->to_initval <<= 1;
+ else
+ to->to_initval += to->to_increment;
+ if (to->to_maxval && to->to_initval >= to->to_maxval)
+ to->to_initval = to->to_maxval;
+ to->to_current = to->to_initval;
}
+
if (!to->to_current) {
printk(KERN_WARNING "xprt_adjust_timeout: to_current = 0!\n");
to->to_current = 5 * HZ;
}
pprintk("RPC: %lu %s\n", jiffies,
to->to_retries? "retrans" : "timeout");
- return (to->to_retries)--;
+ return to->to_retries-- > 0;
+}
+
+static inline void
+__xprt_disable_tcp_timer(struct rpc_xprt *xprt)
+{
+ xprt->tcp_timeout = 0;
+}
+
+static inline void
+__xprt_delete_tcp_timer(struct rpc_xprt *xprt)
+{
+ if (timer_pending(&xprt->tcp_timer))
+ del_timer(&xprt->tcp_timer);
+}
+
+/*
+ * Safe for use outside BH contexts.
+ */
+static inline void
+xprt_delete_tcp_timer(struct rpc_xprt *xprt)
+{
+ start_bh_atomic();
+ __xprt_delete_tcp_timer(xprt);
+ end_bh_atomic();
+}
+
+static void
+xprt_tcp_timeout(struct rpc_xprt *xprt)
+{
+ if (xprt->tcp_timeout) {
+ __xprt_disable_tcp_timer(xprt);
+ __xprt_disconnect(xprt);
+ }
+}
+
+static inline void
+__xprt_add_tcp_timer(struct rpc_xprt *xprt, long timeout)
+{
+ if ((xprt->tcp_timeout = timeout) == 0)
+ return;
+ mod_timer(&xprt->tcp_timer, jiffies + xprt->tcp_timeout);
+}
+
+/*
+ * Safe for use outside BH contexts.
+ */
+static inline void
+xprt_add_tcp_timer(struct rpc_xprt *xprt, long timeout)
+{
+ start_bh_atomic();
+ __xprt_add_tcp_timer(xprt, timeout);
+ end_bh_atomic();
}
/*
@@ -350,24 +402,39 @@
static void
xprt_close(struct rpc_xprt *xprt)
{
- struct sock *sk = xprt->inet;
+ struct socket *sock;
+ struct sock *sk;
+
+ start_bh_atomic();
+ if (!(sock = xprt->sock)) {
+ end_bh_atomic();
+ return;
+ }
+
+ __xprt_disable_tcp_timer(xprt);
+ xprt_clear_connected(xprt);
+ __xprt_remove_pending(xprt);
+ rpc_wake_up_status(&xprt->pending, -ENOTCONN);
+
+ sk = xprt->inet;
+ xprt->inet = NULL;
+ xprt->sock = NULL;
-#ifdef SOCK_HAS_USER_DATA
sk->user_data = NULL;
-#endif
sk->data_ready = xprt->old_data_ready;
sk->state_change = xprt->old_state_change;
sk->write_space = xprt->old_write_space;
- if (xprt->file)
- fput(xprt->file);
- else
- sock_release(xprt->sock);
+ end_bh_atomic();
+
+ xprt_delete_tcp_timer(xprt);
+
+ sock_release(sock);
/*
* TCP doesnt require the rpciod now - other things may
* but rpciod handles that not us.
*/
- if(xprt->stream && !xprt->connecting)
+ if(xprt->stream)
rpciod_down();
}
@@ -375,12 +442,11 @@
* Mark a transport as disconnected
*/
static void
-xprt_disconnect(struct rpc_xprt *xprt)
+__xprt_disconnect(struct rpc_xprt *xprt)
{
dprintk("RPC: disconnected transport %p\n", xprt);
- rpc_wake_up_status(&xprt->pending, -ENOTCONN);
- rpc_wake_up_status(&xprt->sending, -ENOTCONN);
- xprt->connected = 0;
+ xprt_clear_connected(xprt);
+ __xprt_append_pending(xprt);
}
/*
@@ -390,151 +456,145 @@
xprt_reconnect(struct rpc_task *task)
{
struct rpc_xprt *xprt = task->tk_xprt;
- struct socket *sock;
- struct sock *inet;
+ struct socket *sock = xprt->sock;
+ struct sock *inet = xprt->inet;
int status;
dprintk("RPC: %4d xprt_reconnect %p connected %d\n",
- task->tk_pid, xprt, xprt->connected);
- task->tk_status = 0;
+ task->tk_pid, xprt, xprt_connected(xprt));
+ if (xprt->shutdown)
+ return;
+
+ if (!xprt->stream)
+ return;
+
+ if (!xprt->addr.sin_port) {
+ task->tk_status = -EIO;
+ return;
+ }
if (xprt->connecting) {
- task->tk_timeout = xprt->timeout.to_maxval;
+ task->tk_timeout = 0;
rpc_sleep_on(&xprt->reconn, task, NULL, NULL);
return;
}
xprt->connecting = 1;
- /* Create an unconnected socket */
- if (!(sock = xprt_create_socket(xprt->prot, NULL, &xprt->timeout)))
- goto defer;
-
-#if LINUX_VERSION_CODE >= 0x020100
- inet = sock->sk;
-#else
- inet = (struct sock *) sock->data;
-#endif
- inet->data_ready = xprt->inet->data_ready;
- inet->state_change = xprt->inet->state_change;
- inet->write_space = xprt->inet->write_space;
-#ifdef SOCK_HAS_USER_DATA
- inet->user_data = xprt;
-#endif
-
- dprintk("RPC: %4d closing old socket\n", task->tk_pid);
- xprt_disconnect(xprt);
- xprt_close(xprt);
+ status = -ENOTCONN;
+ if (!inet) {
+ /* Create an unconnected socket */
+ if (!(sock = xprt_create_socket(xprt->prot, &xprt->timeout)))
+ goto defer;
+ xprt_bind_socket(xprt, sock);
+ inet = sock->sk;
+ }
- /* Reset to new socket and default congestion */
- xprt->sock = sock;
- xprt->inet = inet;
- xprt->cwnd = RPC_INITCWND;
+ xprt->tcp_offset = 0;
+ xprt->tcp_copied = 0;
+ xprt->tcp_more = 0;
/* Now connect it asynchronously. */
dprintk("RPC: %4d connecting new socket\n", task->tk_pid);
status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
sizeof(xprt->addr), O_NONBLOCK);
+
+ xprt_add_tcp_timer(xprt, RPCXPRT_TIMEOUT);
+
if (status < 0) {
- if (status != -EINPROGRESS && status != -EALREADY) {
+ switch (status) {
+ case -EALREADY:
+ case -EINPROGRESS:
+ status = 0;
+ break;
+ case -EISCONN:
+ case -EPIPE:
+ status = 0;
+ xprt_close(xprt);
+ goto defer;
+ default:
printk("RPC: TCP connect error %d!\n", -status);
+ xprt_close(xprt);
goto defer;
}
dprintk("RPC: %4d connect status %d connected %d\n",
- task->tk_pid, status, xprt->connected);
- task->tk_timeout = 60 * HZ;
+ task->tk_pid, status, xprt_connected(xprt));
start_bh_atomic();
- if (!xprt->connected) {
- rpc_sleep_on(&xprt->reconn, task,
- xprt_reconn_status, xprt_reconn_timeout);
+ if (!xprt_connected(xprt)) {
+ task->tk_timeout = xprt->timeout.to_maxval;
+ rpc_sleep_on(&xprt->reconn, task, xprt_reconn_status, NULL);
end_bh_atomic();
return;
}
end_bh_atomic();
}
-
- xprt->connecting = 0;
- rpc_wake_up(&xprt->reconn);
- return;
-
defer:
- task->tk_timeout = 30 * HZ;
- rpc_sleep_on(&xprt->reconn, task, NULL, NULL);
xprt->connecting = 0;
+ if (status < 0) {
+ rpc_delay(task, 5*HZ);
+ task->tk_status = -ENOTCONN;
+ }
+ rpc_wake_up(&xprt->reconn);
}
/*
- * Reconnect status
+ * Reconnect done
*/
static void
xprt_reconn_status(struct rpc_task *task)
{
struct rpc_xprt *xprt = task->tk_xprt;
- dprintk("RPC: %4d xprt_reconn_status %d\n",
- task->tk_pid, task->tk_status);
- if (!xprt->connected && task->tk_status != -ETIMEDOUT) {
- task->tk_timeout = 30 * HZ;
- rpc_sleep_on(&xprt->reconn, task, NULL, xprt_reconn_timeout);
- }
-}
-
-/*
- * Reconnect timeout. We just mark the transport as not being in the
- * process of reconnecting, and leave the rest to the upper layers.
- */
-static void
-xprt_reconn_timeout(struct rpc_task *task)
-{
- dprintk("RPC: %4d xprt_reconn_timeout %d\n",
- task->tk_pid, task->tk_status);
- task->tk_status = -ENOTCONN;
- task->tk_xprt->connecting = 0;
- task->tk_timeout = 0;
- rpc_wake_up_task(task);
+ xprt->connecting = 0;
+ rpc_wake_up(&xprt->reconn);
}
/*
- * Look up the RPC request corresponding to a reply.
+ * Look up the RPC request corresponding to a reply, and then lock it.
*/
-static inline struct rpc_rqst *
+static struct rpc_rqst *
xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
{
struct rpc_task *head, *task;
struct rpc_rqst *req;
+ unsigned long oldflags;
int safe = 0;
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
if ((head = xprt->pending.task) != NULL) {
task = head;
do {
if ((req = task->tk_rqstp) && req->rq_xid == xid)
- return req;
+ goto out;
task = task->tk_next;
if (++safe > 100) {
printk("xprt_lookup_rqst: loop in Q!\n");
- return NULL;
+ goto out_bad;
}
} while (task != head);
}
dprintk("RPC: unknown XID %08x in reply.\n", xid);
- return NULL;
+ out_bad:
+ req = NULL;
+ out:
+ if (req && !__rpc_lock_task(req->rq_task))
+ req = NULL;
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
+ return req;
}
/*
* Complete reply received.
* The TCP code relies on us to remove the request from xprt->pending.
*/
-static inline void
-xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
+static void
+__xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
{
struct rpc_task *task = req->rq_task;
- req->rq_rlen = copied;
- req->rq_gotit = 1;
-
/* Adjust congestion window */
- xprt_adjust_cwnd(xprt, copied);
+ __xprt_adjust_cwnd(xprt, copied);
#ifdef RPC_PROFILE
/* Profile only reads for now */
@@ -555,19 +615,31 @@
}
#endif
- /* ... and wake up the process. */
dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);
task->tk_status = copied;
+ req->rq_received = 1;
+ /* ... and wake up the process. */
rpc_wake_up_task(task);
return;
}
/*
+ * Safe for use outside BH contexts.
+ */
+static inline void
+xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
+{
+ start_bh_atomic();
+ __xprt_complete_rqst(xprt, req, copied);
+ end_bh_atomic();
+}
+
+/*
* Input handler for RPC replies. Called from a bottom half and hence
* atomic.
*/
-static inline void
+static void
udp_data_ready(struct sock *sk, int len)
{
struct rpc_task *task;
@@ -578,26 +650,34 @@
int err, repsize, copied;
dprintk("RPC: udp_data_ready...\n");
- if (!(xprt = xprt_from_sock(sk)))
- return;
+ if (!(xprt = xprt_from_sock(sk))) {
+ printk("RPC: udp_data_ready request not found!\n");
+ goto wake;
+ }
+
dprintk("RPC: udp_data_ready client %p\n", xprt);
if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
- return;
- repsize = skb->len - 8; /* don't account for UDP header */
+ goto wake;
+ if (xprt->shutdown)
+ goto dropit;
+
+ repsize = skb->len - sizeof(struct udphdr);
if (repsize < 4) {
printk("RPC: impossible RPC reply size %d!\n", repsize);
goto dropit;
}
- /* Look up the request corresponding to the given XID */
- if (!(rovr = xprt_lookup_rqst(xprt, *(u32 *) (skb->h.raw + 8))))
+ /* Look up and lock the request corresponding to the given XID */
+ rovr = xprt_lookup_rqst(xprt, *(u32 *) (skb->h.raw + sizeof(struct udphdr)));
+ if (!rovr)
goto dropit;
task = rovr->rq_task;
- dprintk("RPC: %4d received reply\n", task->tk_pid);
- xprt_pktdump("packet data:", (u32 *) (skb->h.raw+8), repsize);
+ dprintk("RPC: %4d received reply, (%d bytes)\n", task->tk_pid, repsize);
+ xprt_pktdump("packet data:",
+ (u32 *) (skb->h.raw+sizeof(struct udphdr)), repsize);
if ((copied = rovr->rq_rlen) > repsize)
copied = repsize;
@@ -605,226 +685,284 @@
/* Okay, we have it. Copy datagram... */
memcpy(iov, rovr->rq_rvec, rovr->rq_rnr * sizeof(iov[0]));
/* This needs to stay tied with the usermode skb_copy_dagram... */
- memcpy_tokerneliovec(iov, skb->data+8, copied);
+ memcpy_tokerneliovec(iov, skb->h.raw + sizeof(struct udphdr), copied);
- xprt_complete_rqst(xprt, rovr, copied);
+ __xprt_complete_rqst(xprt, rovr, copied);
-dropit:
+ rpc_unlock_task(task);
+ dropit:
skb_free_datagram(sk, skb);
- return;
+ wake:
+ wake_up_interruptible(sk->sleep);
}
/*
- * TCP record receive routine
- * This is not the most efficient code since we call recvfrom twice--
- * first receiving the record marker and XID, then the data.
- *
- * The optimal solution would be a RPC support in the TCP layer, which
- * would gather all data up to the next record marker and then pass us
- * the list of all TCP segments ready to be copied.
+ * TCP read fragment marker
*/
static inline int
-tcp_input_record(struct rpc_xprt *xprt)
+tcp_read_fraghdr(struct rpc_xprt *xprt)
{
- struct rpc_rqst *req;
- struct iovec *iov;
struct iovec riov;
- u32 offset;
- int result, maxcpy, reclen, avail, want;
+ int want, result;
- dprintk("RPC: tcp_input_record\n");
- offset = xprt->tcp_offset;
- result = -EAGAIN;
- if (offset < 4 || (!xprt->tcp_more && offset < 8)) {
- want = (xprt->tcp_more? 4 : 8) - offset;
- dprintk("RPC: reading header (%d bytes)\n", want);
- riov.iov_base = xprt->tcp_recm.data + offset;
+ if (xprt->tcp_offset >= xprt->tcp_reclen + sizeof(xprt->tcp_recm)) {
+ xprt->tcp_offset = 0;
+ xprt->tcp_reclen = 0;
+ }
+ if (xprt->tcp_offset >= sizeof(xprt->tcp_recm))
+ goto done;
+
+ want = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
+ dprintk("RPC: reading header (%d bytes)\n", want);
+ do {
+ riov.iov_base = ((u8*) &xprt->tcp_recm) + xprt->tcp_offset;
riov.iov_len = want;
- result = xprt_recvmsg(xprt, &riov, 1, want);
- if (!result)
- {
- dprintk("RPC: empty TCP record.\n");
- return -ENOTCONN;
- }
+ result = xprt_recvmsg(xprt, &riov, 1, want, 0);
if (result < 0)
- goto done;
- offset += result;
- if (result < want) {
- result = -EAGAIN;
- goto done;
- }
+ return result;
+ xprt->tcp_offset += result;
+ want -= result;
+ } while (want);
- /* Get the record length and mask out the more_fragments bit */
- reclen = ntohl(xprt->tcp_reclen);
- dprintk("RPC: reclen %08x\n", reclen);
- xprt->tcp_more = (reclen & 0x80000000)? 0 : 1;
- reclen &= 0x7fffffff;
- xprt->tcp_total += reclen;
- xprt->tcp_reclen = reclen;
-
- dprintk("RPC: got xid %08x reclen %d morefrags %d\n",
- xprt->tcp_xid, xprt->tcp_reclen, xprt->tcp_more);
- if (!xprt->tcp_copied
- && (req = xprt_lookup_rqst(xprt, xprt->tcp_xid))) {
- iov = xprt->tcp_iovec;
- memcpy(iov, req->rq_rvec, req->rq_rnr * sizeof(iov[0]));
-#if 0
-*(u32 *)iov->iov_base = req->rq_xid;
-#endif
- iov->iov_base += 4;
- iov->iov_len -= 4;
- xprt->tcp_copied = 4;
- xprt->tcp_rqstp = req;
- }
- } else {
- reclen = xprt->tcp_reclen;
- }
+ /* Is this another fragment in the last message */
+ if (!xprt->tcp_more)
+ xprt->tcp_copied = 0; /* No, so we're reading a new message */
+
+ /* Get the record length and mask out the last fragment bit */
+ xprt->tcp_reclen = ntohl(xprt->tcp_recm);
+ xprt->tcp_more = (xprt->tcp_reclen & 0x80000000) ? 0 : 1;
+ xprt->tcp_reclen &= 0x7fffffff;
+
+ dprintk("RPC: New record reclen %d morefrags %d\n",
+ xprt->tcp_reclen, xprt->tcp_more);
+ done:
+ return xprt->tcp_reclen + sizeof(xprt->tcp_recm) - xprt->tcp_offset;
+}
+
+/*
+ * TCP read xid
+ */
+static inline int
+tcp_read_xid(struct rpc_xprt *xprt, int avail)
+{
+ struct iovec riov;
+ int want, result;
+
+ if (xprt->tcp_copied >= sizeof(xprt->tcp_xid) || !avail)
+ goto done;
+ want = MIN(sizeof(xprt->tcp_xid) - xprt->tcp_copied, avail);
+ do {
+ dprintk("RPC: reading xid (%d bytes)\n", want);
+ riov.iov_base = ((u8*) &xprt->tcp_xid) + xprt->tcp_copied;
+ riov.iov_len = want;
+ result = xprt_recvmsg(xprt, &riov, 1, want, 0);
+ if (result < 0)
+ return result;
+ xprt->tcp_copied += result;
+ xprt->tcp_offset += result;
+ want -= result;
+ avail -= result;
+ } while (want);
+ done:
+ return avail;
+}
- avail = reclen - (offset - 4);
- if ((req = xprt->tcp_rqstp) && req->rq_xid == xprt->tcp_xid
- && req->rq_task->tk_rpcwait == &xprt->pending) {
- want = MIN(req->rq_rlen - xprt->tcp_copied, avail);
+/*
+ * TCP read and complete request
+ */
+static inline int
+tcp_read_request(struct rpc_xprt *xprt, struct rpc_rqst *req, int avail)
+{
+ int want, result;
+ if (req->rq_rlen <= xprt->tcp_copied || !avail)
+ goto done;
+ want = MIN(req->rq_rlen - xprt->tcp_copied, avail);
+ do {
dprintk("RPC: %4d TCP receiving %d bytes\n",
- req->rq_task->tk_pid, want);
- result = xprt_recvmsg(xprt, xprt->tcp_iovec, req->rq_rnr, want);
- if (!result && want)
- result = -EAGAIN;
+ req->rq_task->tk_pid, want);
+
+ result = xprt_recvmsg(xprt, req->rq_rvec, req->rq_rnr, want, xprt->tcp_copied);
if (result < 0)
- goto done;
+ return result;
xprt->tcp_copied += result;
- offset += result;
+ xprt->tcp_offset += result;
avail -= result;
- if (result < want) {
- result = -EAGAIN;
- goto done;
- }
+ want -= result;
+ } while (want);
- maxcpy = MIN(req->rq_rlen, xprt->tcp_total);
- if (xprt->tcp_copied == maxcpy && !xprt->tcp_more) {
- dprintk("RPC: %4d received reply complete\n",
- req->rq_task->tk_pid);
- xprt_complete_rqst(xprt, req, xprt->tcp_total);
- xprt->tcp_copied = 0;
- xprt->tcp_rqstp = NULL;
- }
- /* Request must be re-encoded before retransmit */
- req->rq_damaged = 1;
- }
+ done:
+ if (req->rq_rlen > xprt->tcp_copied && xprt->tcp_more)
+ return avail;
+ dprintk("RPC: %4d received reply complete\n", req->rq_task->tk_pid);
+ xprt_complete_rqst(xprt, req, xprt->tcp_copied);
- /* Skip over any trailing bytes on short reads */
- while (avail) {
- static u8 dummy[64];
+ return avail;
+}
+
+/*
+ * TCP discard extra bytes from a short read
+ */
+static inline int
+tcp_read_discard(struct rpc_xprt *xprt, int avail)
+{
+ struct iovec riov;
+ static u8 dummy[64];
+ int want, result = 0;
+ while (avail) {
want = MIN(avail, sizeof(dummy));
riov.iov_base = dummy;
riov.iov_len = want;
dprintk("RPC: TCP skipping %d bytes\n", want);
- result = xprt_recvmsg(xprt, &riov, 1, want);
- if (!result && want)
- result=-EAGAIN;
+ result = xprt_recvmsg(xprt, &riov, 1, want, 0);
if (result < 0)
- goto done;
- offset += result;
+ return result;
+ xprt->tcp_offset += result;
avail -= result;
- if (result < want) {
- result = -EAGAIN;
- goto done;
- }
}
- if (!xprt->tcp_more)
- xprt->tcp_total = 0;
- offset = 0;
-
-done:
- dprintk("RPC: tcp_input_record done (off %d total %d copied %d)\n",
- offset, xprt->tcp_total, xprt->tcp_copied);
- xprt->tcp_offset = offset;
- return result;
+ return avail;
}
-static __inline__ void tcp_output_record(struct rpc_xprt *xprt)
+/*
+ * TCP record receive routine
+ * This is not the most efficient code since we call recvfrom thrice--
+ * first receiving the record marker, then the XID, then the data.
+ *
+ * The optimal solution would be a RPC support in the TCP layer, which
+ * would gather all data up to the next record marker and then pass us
+ * the list of all TCP segments ready to be copied.
+ */
+static int
+tcp_input_record(struct rpc_xprt *xprt)
{
- if(xprt->snd_sent && xprt->snd_task)
- dprintk("RPC: write space\n");
- if(xprt->write_space == 0)
- {
- xprt->write_space = 1;
- if (xprt->snd_task && !RPC_IS_RUNNING(xprt->snd_task))
- {
- if(xprt->snd_sent)
- dprintk("RPC: Write wakeup snd_sent =%d\n",
- xprt->snd_sent);
- rpc_wake_up_task(xprt->snd_task);
- }
+ struct rpc_rqst *req = NULL;
+ struct rpc_task *task = NULL;
+ int avail, result;
+
+ dprintk("RPC: tcp_input_record\n");
+
+ if (xprt->shutdown)
+ return -EIO;
+ if (!xprt_connected(xprt))
+ return -ENOTCONN;
+
+ /* Read in a new fragment marker if necessary */
+ /* Can we ever really expect to get completely empty fragments? */
+ if ((result = tcp_read_fraghdr(xprt)) <= 0)
+ return result;
+ avail = result;
+
+ /* Read in the xid if necessary */
+ if ((result = tcp_read_xid(xprt, avail)) <= 0)
+ return result;
+ avail = result;
+
+ /* Find and lock the request corresponding to this xid */
+ req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
+ if (req) {
+ task = req->rq_task;
+ /* Read in the request data */
+ result = tcp_read_request(xprt, req, avail);
+ rpc_unlock_task(task);
+ if (result < 0)
+ return result;
+ avail = result;
}
+
+ /* Skip over any trailing bytes on short reads */
+ if ((result = tcp_read_discard(xprt, avail)) < 0)
+ return result;
+
+ dprintk("RPC: tcp_input_record done (off %d reclen %d copied %d)\n",
+ xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_copied);
+ result = xprt->tcp_reclen;
+ return result;
}
/*
* TCP task queue stuff
*/
-
-static struct rpc_xprt *rpc_rx_xprt_pending = NULL; /* Chain by rx_pending of rpc_xprt's */
-static struct rpc_xprt *rpc_tx_xprt_pending = NULL; /* Chain by tx_pending of rpc_xprt's */
-
-/*
- * This is protected from tcp_data_ready and the stack as its run
- * inside of the RPC I/O daemon
- */
+LIST_HEAD(rpc_xprt_pending); /* List of xprts having pending tcp requests */
-void rpciod_tcp_dispatcher(void)
+static inline
+void tcp_rpciod_queue(void)
{
- struct rpc_xprt *xprt;
- int result;
+ rpciod_wake_up();
+}
+
+static void
+__xprt_append_pending(struct rpc_xprt *xprt)
+{
+ if (list_empty(&xprt->rx_pending)) {
+ list_add(&xprt->rx_pending, rpc_xprt_pending.prev);
+ dprintk("RPC: xprt queue %p\n", xprt);
+ tcp_rpciod_queue();
+ }
+}
+
+static void
+__xprt_remove_pending(struct rpc_xprt *xprt)
+{
+ if (!list_empty(&xprt->rx_pending)) {
+ list_del(&xprt->rx_pending);
+ INIT_LIST_HEAD(&xprt->rx_pending);
+ }
+}
+
+static void
+xprt_remove_pending(struct rpc_xprt *xprt)
+{
+ start_bh_atomic();
+ __xprt_remove_pending(xprt);
+ end_bh_atomic();
+}
+
+static inline
+struct rpc_xprt *xprt_remove_pending_next(void)
+{
+ struct rpc_xprt *xprt = NULL;
+
+ start_bh_atomic();
+ if (!list_empty(&rpc_xprt_pending)) {
+ xprt = list_entry(rpc_xprt_pending.next, struct rpc_xprt, rx_pending);
+ list_del(&xprt->rx_pending);
+ INIT_LIST_HEAD(&xprt->rx_pending);
+ }
+ end_bh_atomic();
+ return xprt;
+}
+
+/*
+ * This is protected from tcp_data_ready and the stack as its run
+ * inside of the RPC I/O daemon
+ */
+void
+__rpciod_tcp_dispatcher(void)
+{
+ struct rpc_xprt *xprt;
+ int safe_retry = 0, result;
dprintk("rpciod_tcp_dispatcher: Queue Running\n");
-
+
/*
* Empty each pending socket
*/
-
- while((xprt=rpc_rx_xprt_pending)!=NULL)
- {
- int safe_retry=0;
-
- rpc_rx_xprt_pending=xprt->rx_pending;
- xprt->rx_pending_flag=0;
-
+ while ((xprt = xprt_remove_pending_next()) != NULL) {
dprintk("rpciod_tcp_dispatcher: Processing %p\n", xprt);
-
- do
- {
- if (safe_retry++ > 50)
- break;
- result = tcp_input_record(xprt);
- }
- while (result >= 0);
-
- switch (result) {
- case -EAGAIN:
- continue;
- case -ENOTCONN:
- case -EPIPE:
- xprt_disconnect(xprt);
- continue;
- default:
- printk(KERN_WARNING "RPC: unexpected error %d from tcp_input_record\n",
- result);
- }
- }
- while((xprt=rpc_tx_xprt_pending)!=NULL)
- {
- rpc_tx_xprt_pending = xprt->tx_pending;
- xprt->tx_pending_flag = 0;
- tcp_output_record(xprt);
- }
-}
+ do {
+ result = tcp_input_record(xprt);
+ } while (result >= 0);
+ if (!xprt_connected(xprt) && !xprt->connecting)
+ xprt_close(xprt);
-extern inline void tcp_rpciod_queue(void)
-{
- rpciod_wake_up();
+ if (safe_retry++ > 200) {
+ schedule();
+ safe_retry = 0;
+ }
+ }
}
/*
@@ -839,37 +977,22 @@
struct rpc_xprt *xprt;
dprintk("RPC: tcp_data_ready...\n");
- if (!(xprt = xprt_from_sock(sk)))
- {
+ if (!(xprt = xprt_from_sock(sk))) {
printk("Not a socket with xprt %p\n", sk);
- return;
+ goto wake;
}
+
+ if (xprt->shutdown)
+ goto wake;
+
+ __xprt_append_pending(xprt);
+
dprintk("RPC: tcp_data_ready client %p\n", xprt);
dprintk("RPC: state %x conn %d dead %d zapped %d\n",
- sk->state, xprt->connected,
+ sk->state, xprt_connected(xprt),
sk->dead, sk->zapped);
- /*
- * If we are not waiting for the RPC bh run then
- * we are now
- */
- if (!xprt->rx_pending_flag)
- {
- int start_queue=0;
-
- dprintk("RPC: xprt queue %p\n", rpc_rx_xprt_pending);
- if(rpc_rx_xprt_pending==NULL)
- start_queue=1;
- xprt->rx_pending_flag=1;
- xprt->rx_pending=rpc_rx_xprt_pending;
- rpc_rx_xprt_pending=xprt;
- if (start_queue)
- {
- tcp_rpciod_queue();
- start_queue=0;
- }
- }
- else
- dprintk("RPC: xprt queued already %p\n", xprt);
+ wake:
+ wake_up_interruptible(sk->sleep);
}
@@ -879,41 +1002,88 @@
struct rpc_xprt *xprt;
if (!(xprt = xprt_from_sock(sk)))
- return;
+ goto wake;
dprintk("RPC: tcp_state_change client %p...\n", xprt);
dprintk("RPC: state %x conn %d dead %d zapped %d\n",
- sk->state, xprt->connected,
+ sk->state, xprt_connected(xprt),
sk->dead, sk->zapped);
- if (sk->state == TCP_ESTABLISHED && !xprt->connected) {
- xprt->connected = 1;
- xprt->connecting = 0;
+ switch (sk->state) {
+ case TCP_ESTABLISHED:
+ xprt_set_connected(xprt);
+ if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
+ rpc_wake_up_task(xprt->snd_task);
rpc_wake_up(&xprt->reconn);
- } else if (sk->zapped) {
- rpc_wake_up_status(&xprt->pending, -ENOTCONN);
- rpc_wake_up_status(&xprt->sending, -ENOTCONN);
- rpc_wake_up_status(&xprt->reconn, -ENOTCONN);
+ break;
+ case TCP_SYN_SENT:
+ case TCP_SYN_RECV:
+ break;
+ default:
+ __xprt_disconnect(xprt);
+ break;
}
+ wake:
+ wake_up_interruptible(sk->sleep);
}
+/*
+ * The following 2 routines allow a task to sleep while socket memory is
+ * low.
+ */
static void
tcp_write_space(struct sock *sk)
{
struct rpc_xprt *xprt;
+ unsigned long oldflags;
if (!(xprt = xprt_from_sock(sk)))
return;
- if (!xprt->tx_pending_flag) {
- int start_queue = 0;
+ if (xprt->shutdown)
+ return;
- if (rpc_tx_xprt_pending == NULL)
- start_queue = 1;
- xprt->tx_pending_flag = 1;
- xprt->tx_pending = rpc_tx_xprt_pending;
- rpc_tx_xprt_pending = xprt;
- if (start_queue)
- tcp_rpciod_queue();
- }
+ /* Wait until we have enough socket memory */
+ if (sock_wspace(sk) < min(sk->sndbuf,XPRT_MIN_WRITE_SPACE))
+ return;
+
+ spin_lock_irqsave(&xprt_sock_lock, oldflags);
+ if (xprt->write_space)
+ goto out_unlock;
+
+ xprt->write_space = 1;
+
+ if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
+ rpc_wake_up_task(xprt->snd_task);
+ out_unlock:
+ spin_unlock_irqrestore(&xprt_sock_lock, oldflags);
+ wake_up_interruptible(sk->sleep);
+}
+
+static void
+udp_write_space(struct sock *sk)
+{
+ struct rpc_xprt *xprt;
+ unsigned long oldflags;
+
+ if (!(xprt = xprt_from_sock(sk)))
+ return;
+ if (xprt->shutdown)
+ return;
+
+ /* Wait until we have enough socket memory */
+ if (sock_wspace(sk) < min(sk->sndbuf,XPRT_MIN_WRITE_SPACE))
+ return;
+
+ spin_lock_irqsave(&xprt_sock_lock, oldflags);
+ if (xprt->write_space)
+ goto out_unlock;
+
+ xprt->write_space = 1;
+
+ if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
+ rpc_wake_up_task(xprt->snd_task);
+ out_unlock:
+ spin_unlock_irqrestore(&xprt_sock_lock, oldflags);
+ wake_up_interruptible(sk->sleep);
}
/*
@@ -924,9 +1094,8 @@
{
struct rpc_rqst *req = task->tk_rqstp;
- if (req) {
- xprt_adjust_cwnd(task->tk_xprt, -ETIMEDOUT);
- }
+ if (req)
+ __xprt_adjust_cwnd(task->tk_xprt, -ETIMEDOUT);
dprintk("RPC: %4d xprt_timer (%s request)\n",
task->tk_pid, req ? "pending" : "backlogged");
@@ -936,32 +1105,47 @@
rpc_wake_up_task(task);
}
+
/*
- * (Partly) transmit the RPC packet
- * Note that task->tk_status is either 0 or negative on return.
- * Only when the reply is received will the status be set to a
- * positive value.
+ * Serialize access to sockets, in order to prevent different
+ * requests from interfering with each other.
*/
-static inline int
-xprt_transmit_some(struct rpc_xprt *xprt, struct rpc_task *task)
+int
+xprt_down_transmit(struct rpc_task *task)
{
+ struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
struct rpc_rqst *req = task->tk_rqstp;
- int result;
- task->tk_status = 0;
- if ((result = xprt_sendmsg(xprt)) >= 0) {
- if (!xprt->snd_buf.io_len || !xprt->stream) {
- rpc_wake_up_next(&xprt->sending);
- return req->rq_slen;
- }
- result = -EAGAIN;
- } else if (xprt->stream) {
- if (result == -ENOTCONN || result == -EPIPE) {
- xprt_disconnect(xprt);
- result = -ENOTCONN;
- }
+ xprt_delete_tcp_timer(xprt);
+ if (xprt->snd_task && xprt->snd_task != task) {
+ dprintk("RPC: %4d TCP write queue full (task %d)\n",
+ task->tk_pid, xprt->snd_task->tk_pid);
+ task->tk_timeout = 0;
+ task->tk_status = -EAGAIN;
+ rpc_sleep_on(&xprt->sending, task, NULL, NULL);
+ } else if (!xprt->snd_task) {
+ xprt->snd_task = task;
+#ifdef RPC_PROFILE
+ req->rq_xtime = jiffies;
+#endif
+ req->rq_bytes_sent = 0;
+ }
+ return xprt->snd_task == task;
+}
+
+/*
+ * Releases the socket for use by other requests.
+ */
+void
+xprt_up_transmit(struct rpc_task *task)
+{
+ struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
+
+ if (xprt->snd_task && xprt->snd_task == task) {
+ xprt->snd_task = NULL;
+ if (!rpc_wake_up_next(&xprt->sending) && xprt->stream)
+ xprt_add_tcp_timer(xprt, RPCXPRT_TIMEOUT);
}
- return task->tk_status = result;
}
/*
@@ -971,129 +1155,137 @@
void
xprt_transmit(struct rpc_task *task)
{
- struct rpc_timeout *timeo;
struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt;
- int status;
- dprintk("RPC: %4d xprt_transmit(%x)\n", task->tk_pid,
- *(u32 *)(req->rq_svec[0].iov_base));
+ dprintk("RPC: %4d xprt_transmit(%x)\n", task->tk_pid, req->rq_xid);
- if (xprt->shutdown) {
+ if (xprt->shutdown)
task->tk_status = -EIO;
+
+ if (!xprt_connected(xprt))
+ task->tk_status = -ENOTCONN;
+
+ if (task->tk_status < 0)
return;
- }
- /* If we're not already in the process of transmitting our call,
- * set up everything as needed. */
- if (xprt->snd_task != task) {
- /* Write the record marker */
- if (xprt->stream) {
- u32 marker;
+ /* set up everything as needed. */
+ /* Write the record marker */
+ if (xprt->stream) {
+ u32 *marker = req->rq_svec[0].iov_base;
- if (!xprt->connected) {
- task->tk_status = -ENOTCONN;
- return;
- }
- marker = htonl(0x80000000|(req->rq_slen-4));
- *((u32 *) req->rq_svec[0].iov_base) = marker;
- }
+ *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));
+ }
- /* Reset timeout parameters */
- timeo = &req->rq_timeout;
- if (timeo->to_retries < 0) {
- dprintk("RPC: %4d xprt_transmit reset timeo\n",
- task->tk_pid);
- timeo->to_retries = xprt->timeout.to_retries;
- timeo->to_current = timeo->to_initval;
- }
+ if (!xprt_down_transmit(task))
+ return;
-#ifdef RPC_PROFILE
- req->rq_xtime = jiffies;
-#endif
- req->rq_gotit = 0;
+ do_xprt_transmit(task);
+}
+
+static void
+do_xprt_transmit(struct rpc_task *task)
+{
+ struct rpc_rqst *req = task->tk_rqstp;
+ struct rpc_xprt *xprt = req->rq_xprt;
+ unsigned long oldflags;
+ int status, retry = 0;
- if (xprt->snd_task) {
- dprintk("RPC: %4d TCP write queue full (task %d)\n",
- task->tk_pid, xprt->snd_task->tk_pid);
- rpc_sleep_on(&xprt->sending, task,
- xprt_transmit_status, NULL);
- return;
- }
- xprt->snd_buf = req->rq_snd_buf;
- xprt->snd_task = task;
- xprt->snd_sent = 0;
- }
/* For fast networks/servers we have to put the request on
* the pending list now:
+ * Note that we don't want the task timing out during the
+ * call to xprt_sendmsg(), so we initially disable the timeout,
+ * and then reset it later...
*/
- start_bh_atomic();
- status = rpc_add_wait_queue(&xprt->pending, task);
- if (!status)
- task->tk_callback = NULL;
- end_bh_atomic();
-
- if (status)
- {
- printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
- task->tk_status = status;
- return;
- }
+ xprt_receive(task);
/* Continue transmitting the packet/record. We must be careful
* to cope with writespace callbacks arriving _after_ we have
* called xprt_sendmsg().
*/
- while (1) {
+ for (;;) {
+ status = -ENOTCONN;
+ if (!xprt->inet)
+ break;
+
xprt->write_space = 0;
- if (xprt_transmit_some(xprt, task) != -EAGAIN) {
- dprintk("RPC: %4d xmit complete\n", task->tk_pid);
- xprt->snd_task = NULL;
- return;
+ status = -ENOMEM;
+ if (sock_wspace(xprt->inet) < req->rq_slen + MIN_WRITE_SPACE)
+ break;
+ status = xprt_sendmsg(xprt, req);
+
+ if (status < 0)
+ break;
+
+ if (xprt->stream) {
+ req->rq_bytes_sent += status;
+
+ if (req->rq_bytes_sent >= req->rq_slen)
+ goto out_receive;
+ } else {
+ if (status >= req->rq_slen)
+ goto out_receive;
+ status = -ENOMEM;
+ break;
}
- /*d*/dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
- task->tk_pid, xprt->snd_buf.io_len,
+ dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
+ task->tk_pid, req->rq_slen - req->rq_bytes_sent,
req->rq_slen);
- task->tk_status = 0;
- start_bh_atomic();
+
+ status = -EAGAIN;
+ if (retry++ > 50)
+ break;
+ }
+ rpc_unlock_task(task);
+
+ /* Note: at this point, task->tk_sleeping has not yet been set,
+ * hence there is no danger of the waking up task being put on
+ * schedq, and being picked up by a parallel run of rpciod().
+ */
+ rpc_wake_up_task(task);
+ if (!RPC_IS_RUNNING(task))
+ goto out_release;
+ if (req->rq_received)
+ goto out_release;
+
+ task->tk_status = status;
+
+ switch (status) {
+ case -ENOMEM:
+ /* Protect against (udp|tcp)_write_space */
+ spin_lock_irqsave(&xprt_sock_lock, oldflags);
if (!xprt->write_space) {
- /* Remove from pending */
- rpc_remove_wait_queue(task);
- rpc_sleep_on(&xprt->sending, task,
- xprt_transmit_status, NULL);
- end_bh_atomic();
+ task->tk_timeout = req->rq_timeout.to_current;
+ rpc_sleep_on(&xprt->sending, task, NULL, NULL);
+ }
+ spin_unlock_irqrestore(&xprt_sock_lock, oldflags);
+ return;
+ case -EAGAIN:
+ case -ECONNREFUSED:
+ case -ENOTCONN:
+ /* Keep holding the socket if it is blocked */
+ if (!xprt->stream) {
+ rpc_delay(task, HZ>>4);
return;
}
- end_bh_atomic();
+ default:
+ goto out_release;
}
-}
-
-/*
- * This callback is invoked when the sending task is forced to sleep
- * because the TCP write buffers are full
- */
-static void
-xprt_transmit_status(struct rpc_task *task)
-{
- struct rpc_xprt *xprt = task->tk_client->cl_xprt;
- dprintk("RPC: %4d transmit_status %d\n", task->tk_pid, task->tk_status);
- if (xprt->snd_task == task)
- {
- if (task->tk_status < 0)
- {
- xprt->snd_task = NULL;
- xprt_disconnect(xprt);
- }
- else
- xprt_transmit(task);
- }
+ out_receive:
+ dprintk("RPC: %4d xmit complete\n", task->tk_pid);
+ /* Set the task's receive timeout value */
+ task->tk_timeout = req->rq_timeout.to_current;
+ rpc_add_timer(task, xprt_timer);
+ rpc_unlock_task(task);
+ out_release:
+ xprt_up_transmit(task);
}
/*
- * Wait for the reply to our call.
+ * Queue the task for a reply to our call.
* When the callback is invoked, the congestion window should have
* been updated already.
*/
@@ -1104,34 +1296,10 @@
struct rpc_xprt *xprt = req->rq_xprt;
dprintk("RPC: %4d xprt_receive\n", task->tk_pid);
- if (xprt->connected == 0) {
- task->tk_status = -ENOTCONN;
- return;
- }
-
- /*
- * Wait until rq_gotit goes non-null, or timeout elapsed.
- */
- task->tk_timeout = req->rq_timeout.to_current;
-
- start_bh_atomic();
- if (!req->rq_gotit) {
- rpc_sleep_on(&xprt->pending, task,
- xprt_receive_status, xprt_timer);
- }
- end_bh_atomic();
- dprintk("RPC: %4d xprt_receive returns %d\n",
- task->tk_pid, task->tk_status);
-}
-
-static void
-xprt_receive_status(struct rpc_task *task)
-{
- struct rpc_xprt *xprt = task->tk_xprt;
-
- if (xprt->stream && xprt->tcp_rqstp == task->tk_rqstp)
- xprt->tcp_rqstp = NULL;
+ req->rq_received= 0;
+ task->tk_timeout = 0;
+ rpc_sleep_locked(&xprt->pending, task, NULL, NULL);
}
/*
@@ -1148,15 +1316,18 @@
dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n",
task->tk_pid, xprt->cong, xprt->cwnd);
- if ((!RPCXPRT_CONGESTED(xprt) && xprt->free)) {
- xprt_reserve_status(task);
+ start_bh_atomic();
+ __xprt_reserve_status(task);
+ if (task->tk_rqstp) {
task->tk_timeout = 0;
} else if (!task->tk_timeout) {
task->tk_status = -ENOBUFS;
} else {
dprintk("RPC: xprt_reserve waiting on backlog\n");
- rpc_sleep_on(&xprt->backlog, task, xprt_reserve_status, NULL);
+ task->tk_status = -EAGAIN;
+ rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
}
+ end_bh_atomic();
dprintk("RPC: %4d xprt_reserve returns %d\n",
task->tk_pid, task->tk_status);
return task->tk_status;
@@ -1166,7 +1337,7 @@
* Reservation callback
*/
static void
-xprt_reserve_status(struct rpc_task *task)
+__xprt_reserve_status(struct rpc_task *task)
{
struct rpc_xprt *xprt = task->tk_xprt;
struct rpc_rqst *req;
@@ -1177,40 +1348,25 @@
/* NOP */
} else if (task->tk_rqstp) {
/* We've already been given a request slot: NOP */
- } else if (!RPCXPRT_CONGESTED(xprt)) {
+ } else {
+ if (RPCXPRT_CONGESTED(xprt) || !(req = xprt->free))
+ goto out_nofree;
/* OK: There's room for us. Grab a free slot and bump
* congestion value */
- req = xprt->free;
- if (!req)
- goto bad_list;
- if (req->rq_xid)
- goto bad_used;
xprt->free = req->rq_next;
+ req->rq_next = NULL;
xprt->cong += RPC_CWNDSCALE;
task->tk_rqstp = req;
- req->rq_next = NULL;
xprt_request_init(task, xprt);
- } else {
- task->tk_status = -EAGAIN;
- }
- if (xprt->free && !RPCXPRT_CONGESTED(xprt))
- rpc_wake_up_next(&xprt->backlog);
+ if (xprt->free)
+ __xprt_clear_backlog(xprt);
+ }
return;
-bad_list:
- printk(KERN_ERR
- "RPC: %4d inconsistent free list (cong %ld cwnd %ld)\n",
- task->tk_pid, xprt->cong, xprt->cwnd);
- rpc_debug = ~0;
- goto bummer;
-bad_used:
- printk(KERN_ERR "RPC: used rqst slot %p on free list!\n", req);
-bummer:
- task->tk_status = -EIO;
- xprt->free = NULL;
- return;
+out_nofree:
+ task->tk_status = -EAGAIN;
}
/*
@@ -1227,7 +1383,6 @@
dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid, req, xid);
task->tk_status = 0;
- req->rq_gotit = 0;
req->rq_timeout = xprt->timeout;
req->rq_task = task;
req->rq_xprt = xprt;
@@ -1245,6 +1400,7 @@
struct rpc_xprt *xprt = task->tk_xprt;
struct rpc_rqst *req;
+ xprt_up_transmit(task);
if (!(req = task->tk_rqstp))
return;
task->tk_rqstp = NULL;
@@ -1252,46 +1408,15 @@
dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
- /* remove slot from queue of pending */
start_bh_atomic();
- if (task->tk_rpcwait) {
- printk("RPC: task of released request still queued!\n");
-#ifdef RPC_DEBUG
- printk("RPC: (task is on %s)\n", rpc_qname(task->tk_rpcwait));
-#endif
- rpc_del_timer(task);
- rpc_remove_wait_queue(task);
- }
- end_bh_atomic();
+ req->rq_next = xprt->free;
+ xprt->free = req;
/* Decrease congestion value. */
xprt->cong -= RPC_CWNDSCALE;
-#if 0
- /* If congestion threshold is not yet reached, pass on the request slot.
- * This looks kind of kludgy, but it guarantees backlogged requests
- * are served in order.
- * N.B. This doesn't look completely safe, as the task is still
- * on the backlog list after wake-up.
- */
- if (!RPCXPRT_CONGESTED(xprt)) {
- struct rpc_task *next = rpc_wake_up_next(&xprt->backlog);
-
- if (next && next->tk_rqstp == 0) {
- xprt->cong += RPC_CWNDSCALE;
- next->tk_rqstp = req;
- xprt_request_init(next, xprt);
- return;
- }
- }
-#endif
-
- req->rq_next = xprt->free;
- xprt->free = req;
-
- /* If not congested, wake up the next backlogged process */
- if (!RPCXPRT_CONGESTED(xprt))
- rpc_wake_up_next(&xprt->backlog);
+ __xprt_clear_backlog(xprt);
+ end_bh_atomic();
}
/*
@@ -1303,7 +1428,7 @@
if (proto == IPPROTO_UDP)
xprt_set_timeout(to, 5, 5 * HZ);
else
- xprt_set_timeout(to, 5, 15 * HZ);
+ xprt_set_timeout(to, 5, 60 * HZ);
}
/*
@@ -1330,56 +1455,36 @@
{
struct rpc_xprt *xprt;
struct rpc_rqst *req;
- struct sock *inet;
int i;
dprintk("RPC: setting up %s transport...\n",
proto == IPPROTO_UDP? "UDP" : "TCP");
-#if LINUX_VERSION_CODE >= 0x020100
- inet = sock->sk;
-#else
- inet = (struct sock *) sock->data;
-#endif
-
if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
return NULL;
memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */
- xprt->file = NULL;
- xprt->sock = sock;
- xprt->inet = inet;
xprt->addr = *ap;
xprt->prot = proto;
xprt->stream = (proto == IPPROTO_TCP)? 1 : 0;
- xprt->cwnd = RPC_INITCWND;
-#ifdef SOCK_HAS_USER_DATA
- inet->user_data = xprt;
-#else
- xprt->link = sock_list;
- sock_list = xprt;
-#endif
- xprt->old_data_ready = inet->data_ready;
- xprt->old_state_change = inet->state_change;
- xprt->old_write_space = inet->write_space;
- if (proto == IPPROTO_UDP) {
- inet->data_ready = udp_data_ready;
- } else {
- inet->data_ready = tcp_data_ready;
- inet->state_change = tcp_state_change;
- inet->write_space = tcp_write_space;
+ if (xprt->stream) {
+ xprt->cwnd = RPC_MAXCWND;
xprt->nocong = 1;
- }
- xprt->connected = 1;
+ } else
+ xprt->cwnd = RPC_INITCWND;
+ xprt->congtime = jiffies;
/* Set timeout parameters */
if (to) {
xprt->timeout = *to;
xprt->timeout.to_current = to->to_initval;
xprt->timeout.to_resrvval = to->to_maxval << 1;
- } else {
+ } else
xprt_default_timeout(&xprt->timeout, xprt->prot);
- }
+
+ init_timer(&xprt->tcp_timer);
+ xprt->tcp_timer.data = (unsigned long)xprt;
+ xprt->tcp_timer.function = (void(*)(unsigned long)) xprt_tcp_timeout;
xprt->pending = RPC_INIT_WAITQ("xprt_pending");
xprt->sending = RPC_INIT_WAITQ("xprt_sending");
@@ -1392,50 +1497,15 @@
req->rq_next = NULL;
xprt->free = xprt->slot;
+ INIT_LIST_HEAD(&xprt->rx_pending);
+
dprintk("RPC: created transport %p\n", xprt);
- /*
- * TCP requires the rpc I/O daemon is present
- */
- if(proto==IPPROTO_TCP)
- rpciod_up();
+ xprt_bind_socket(xprt, sock);
return xprt;
}
/*
- * Create and initialize an RPC client given an open file.
- * This is obsolete now.
- */
-#if 0
-struct rpc_xprt *
-xprt_create(struct file *file, struct sockaddr_in *ap, struct rpc_timeout *to)
-{
- struct rpc_xprt *xprt;
- struct socket *sock;
- int proto;
-
- if (!file) {
- printk("RPC: file == NULL in xprt_create!\n");
- return NULL;
- }
-
- sock = &file->f_inode->u.socket_i;
- if (sock->ops->family != PF_INET) {
- printk(KERN_WARNING "RPC: only INET sockets supported\n");
- return NULL;
- }
-
- proto = (sock->type == SOCK_DGRAM)? IPPROTO_UDP : IPPROTO_TCP;
- if ((xprt = xprt_setup(sock, proto, ap, to)) != NULL) {
- xprt->file = file;
- file->f_count++;
- }
-
- return xprt;
-}
-#endif
-
-/*
* Bind to a reserved port
*/
static inline int
@@ -1459,38 +1529,66 @@
return err;
}
+static int
+xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock)
+{
+ struct sock *sk = sock->sk;
+
+ if (xprt->inet)
+ return -EBUSY;
+
+ start_bh_atomic();
+ sk->user_data = xprt;
+ xprt->old_data_ready = sk->data_ready;
+ xprt->old_state_change = sk->state_change;
+ xprt->old_write_space = sk->write_space;
+ if (xprt->prot == IPPROTO_UDP) {
+ sk->data_ready = udp_data_ready;
+ sk->write_space = udp_write_space;
+ xprt_set_connected(xprt);
+ } else {
+ sk->data_ready = tcp_data_ready;
+ sk->state_change = tcp_state_change;
+ sk->write_space = tcp_write_space;
+ xprt_clear_connected(xprt);
+ }
+
+ /* Reset to new socket */
+ xprt->sock = sock;
+ xprt->inet = sk;
+ end_bh_atomic();
+ /*
+ * TCP requires the rpc I/O daemon is present
+ */
+ if(xprt->stream)
+ rpciod_up();
+
+ return 0;
+}
+
/*
* Create a client socket given the protocol and peer address.
*/
static struct socket *
-xprt_create_socket(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
+xprt_create_socket(int proto, struct rpc_timeout *to)
{
struct socket *sock;
int type, err;
- dprintk("RPC: xprt_create_socket(%08x, %s %d)\n",
- sap? ntohl(sap->sin_addr.s_addr) : 0,
+ dprintk("RPC: xprt_create_socket(%s %d)\n",
(proto == IPPROTO_UDP)? "udp" : "tcp", proto);
type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
+
if ((err = sock_create(PF_INET, type, proto, &sock)) < 0) {
printk("RPC: can't create socket (%d).\n", -err);
goto failed;
}
- /* If the caller has root privs, bind to a reserved port */
- if (!current->fsuid && xprt_bindresvport(sock) < 0)
+ /* If the caller has the capability, bind to a reserved port */
+ if (capable(CAP_NET_BIND_SERVICE) && xprt_bindresvport(sock) < 0)
goto failed;
- if (type == SOCK_STREAM && sap) {
- err = sock->ops->connect(sock, (struct sockaddr *) sap,
- sizeof(*sap), 0);
- if (err < 0) {
- printk("RPC: TCP connect failed (%d).\n", -err);
- goto failed;
- }
- }
-
return sock;
failed:
@@ -1509,7 +1607,7 @@
dprintk("RPC: xprt_create_proto called\n");
- if (!(sock = xprt_create_socket(proto, sap, to)))
+ if (!(sock = xprt_create_socket(proto, to)))
return NULL;
if (!(xprt = xprt_setup(sock, proto, sap, to)))
@@ -1529,6 +1627,20 @@
rpc_wake_up(&xprt->pending);
rpc_wake_up(&xprt->backlog);
rpc_wake_up(&xprt->reconn);
+ wake_up(&xprt->cong_wait);
+}
+
+/*
+ * Clear the xprt backlog queue
+ */
+static int
+__xprt_clear_backlog(struct rpc_xprt *xprt)
+{
+ if (RPCXPRT_CONGESTED(xprt))
+ return 0;
+ rpc_wake_up_next(&xprt->backlog);
+ wake_up(&xprt->cong_wait);
+ return 1;
}
/*
@@ -1537,20 +1649,11 @@
int
xprt_destroy(struct rpc_xprt *xprt)
{
-#ifndef SOCK_HAS_USER_DATA
- struct rpc_xprt **q;
-
- for (q = &sock_list; *q && *q != xprt; q = &((*q)->link))
- ;
- if (!*q) {
- printk(KERN_WARNING "xprt_destroy: unknown socket!\n");
- return -EIO; /* why is there no EBUGGYSOFTWARE */
- }
- *q = xprt->link;
-#endif
-
dprintk("RPC: destroying transport %p\n", xprt);
+ xprt_shutdown(xprt);
xprt_close(xprt);
+ xprt_delete_tcp_timer(xprt);
+ xprt_remove_pending(xprt);
kfree(xprt);
return 0;
FUNET's LINUX-ADM group, linux-adm@nic.funet.fi
TCL-scripts by Sam Shen (who was at: slshen@lbl.gov)