patch-2.4.19 linux-2.4.19/net/sunrpc/xprt.c
Next file: linux-2.4.19/net/unix/af_unix.c
Previous file: linux-2.4.19/net/sunrpc/stats.c
Back to the patch index
Back to the overall index
- Lines: 702
- Date:
Fri Aug 2 17:39:46 2002
- Orig file:
linux-2.4.18/net/sunrpc/xprt.c
- Orig date:
Fri Dec 21 09:42:06 2001
diff -urN linux-2.4.18/net/sunrpc/xprt.c linux-2.4.19/net/sunrpc/xprt.c
@@ -63,12 +63,10 @@
#include <net/sock.h>
#include <net/checksum.h>
#include <net/udp.h>
+#include <net/tcp.h>
#include <asm/uaccess.h>
-/* Following value should be > 32k + RPC overhead */
-#define XPRT_MIN_WRITE_SPACE (35000 + SOCK_MIN_WRITE_SPACE)
-
extern spinlock_t rpc_queue_lock;
/*
@@ -90,7 +88,6 @@
static void xprt_reconn_status(struct rpc_task *task);
static struct socket *xprt_create_socket(int, struct rpc_timeout *);
static int xprt_bind_socket(struct rpc_xprt *, struct socket *);
-static void xprt_remove_pending(struct rpc_xprt *);
#ifdef RPC_DEBUG_DATA
/*
@@ -272,43 +269,6 @@
}
/*
- * Read data from socket
- */
-static int
-xprt_recvmsg(struct rpc_xprt *xprt, struct iovec *iov, int nr, unsigned len, unsigned shift)
-{
- struct socket *sock = xprt->sock;
- struct msghdr msg;
- mm_segment_t oldfs;
- struct iovec niv[MAX_IOVEC];
- int result;
-
- if (!sock)
- return -ENOTCONN;
-
- msg.msg_flags = MSG_DONTWAIT|MSG_NOSIGNAL;
- msg.msg_iov = iov;
- msg.msg_iovlen = nr;
- msg.msg_name = NULL;
- msg.msg_namelen = 0;
- msg.msg_control = NULL;
- msg.msg_controllen = 0;
-
- /* Adjust the iovec if we've already filled it */
- if (shift)
- xprt_move_iov(&msg, niv, shift);
-
- oldfs = get_fs(); set_fs(get_ds());
- result = sock_recvmsg(sock, &msg, len, MSG_DONTWAIT);
- set_fs(oldfs);
-
- dprintk("RPC: xprt_recvmsg(iov %p, len %d) = %d\n",
- iov, len, result);
- return result;
-}
-
-
-/*
* Adjust RPC congestion window
* We use a time-smoothed congestion estimator to avoid heavy oscillation.
*/
@@ -425,7 +385,6 @@
{
dprintk("RPC: disconnected transport %p\n", xprt);
xprt_clear_connected(xprt);
- xprt_remove_pending(xprt);
rpc_wake_up_status(&xprt->pending, -ENOTCONN);
}
@@ -439,7 +398,7 @@
{
struct rpc_xprt *xprt = task->tk_xprt;
struct socket *sock = xprt->sock;
- struct sock *inet = xprt->inet;
+ struct sock *inet;
int status;
dprintk("RPC: %4d xprt_reconnect %p connected %d\n",
@@ -460,8 +419,10 @@
if (xprt_connected(xprt))
goto out_write;
+ if (sock && sock->state != SS_UNCONNECTED)
+ xprt_close(xprt);
status = -ENOTCONN;
- if (!inet) {
+ if (!(inet = xprt->inet)) {
/* Create an unconnected socket */
if (!(sock = xprt_create_socket(xprt->prot, &xprt->timeout)))
goto defer;
@@ -469,14 +430,6 @@
inet = sock->sk;
}
- xprt_disconnect(xprt);
-
- /* Reset TCP record info */
- xprt->tcp_offset = 0;
- xprt->tcp_reclen = 0;
- xprt->tcp_copied = 0;
- xprt->tcp_more = 0;
-
/* Now connect it asynchronously. */
dprintk("RPC: %4d connecting new socket\n", task->tk_pid);
status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
@@ -499,17 +452,21 @@
goto defer;
}
+ /* Protect against TCP socket state changes */
+ lock_sock(inet);
dprintk("RPC: %4d connect status %d connected %d\n",
task->tk_pid, status, xprt_connected(xprt));
- spin_lock_bh(&xprt->sock_lock);
- if (!xprt_connected(xprt)) {
+ if (inet->state != TCP_ESTABLISHED) {
task->tk_timeout = xprt->timeout.to_maxval;
+ /* if the socket is already closing, delay 5 secs */
+ if ((1<<inet->state) & ~(TCP_SYN_SENT|TCP_SYN_RECV))
+ task->tk_timeout = 5*HZ;
rpc_sleep_on(&xprt->sending, task, xprt_reconn_status, NULL);
- spin_unlock_bh(&xprt->sock_lock);
+ release_sock(inet);
return;
}
- spin_unlock_bh(&xprt->sock_lock);
+ release_sock(inet);
}
defer:
if (status < 0) {
@@ -728,309 +685,229 @@
wake_up_interruptible(sk->sleep);
}
+typedef struct {
+ struct sk_buff *skb;
+ unsigned offset;
+ size_t count;
+} skb_reader_t;
+
+/*
+ * Copy from an skb into memory and shrink the skb.
+ */
+static inline size_t
+tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
+{
+ if (len > desc->count)
+ len = desc->count;
+ skb_copy_bits(desc->skb, desc->offset, p, len);
+ desc->offset += len;
+ desc->count -= len;
+ return len;
+}
+
/*
* TCP read fragment marker
*/
-static inline int
-tcp_read_fraghdr(struct rpc_xprt *xprt)
+static inline void
+tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
{
- struct iovec riov;
- int want, result;
+ size_t len, used;
+ char *p;
- if (xprt->tcp_offset >= sizeof(xprt->tcp_recm))
- goto done;
-
- want = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
- dprintk("RPC: reading header (%d bytes)\n", want);
- do {
- riov.iov_base = ((u8*) &xprt->tcp_recm) + xprt->tcp_offset;
- riov.iov_len = want;
- result = xprt_recvmsg(xprt, &riov, 1, want, 0);
- if (result < 0)
- return result;
- xprt->tcp_offset += result;
- want -= result;
- } while (want);
-
- /* Get the record length and mask out the last fragment bit */
+ p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset;
+ len = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
+ used = tcp_copy_data(desc, p, len);
+ xprt->tcp_offset += used;
+ if (used != len)
+ return;
xprt->tcp_reclen = ntohl(xprt->tcp_recm);
- xprt->tcp_more = (xprt->tcp_reclen & 0x80000000) ? 0 : 1;
+ if (xprt->tcp_reclen & 0x80000000)
+ xprt->tcp_flags |= XPRT_LAST_FRAG;
+ else
+ xprt->tcp_flags &= ~XPRT_LAST_FRAG;
xprt->tcp_reclen &= 0x7fffffff;
-
- dprintk("RPC: New record reclen %d morefrags %d\n",
- xprt->tcp_reclen, xprt->tcp_more);
- done:
- return xprt->tcp_reclen + sizeof(xprt->tcp_recm) - xprt->tcp_offset;
+ xprt->tcp_flags &= ~XPRT_COPY_RECM;
+ xprt->tcp_offset = 0;
+ /* Sanity check of the record length */
+ if (xprt->tcp_reclen < 4) {
+ printk(KERN_ERR "RPC: Invalid TCP record fragment length\n");
+ xprt_disconnect(xprt);
+ }
+ dprintk("RPC: reading TCP record fragment of length %d\n",
+ xprt->tcp_reclen);
}
-/*
- * TCP read xid
- */
-static inline int
-tcp_read_xid(struct rpc_xprt *xprt, int avail)
+static void
+tcp_check_recm(struct rpc_xprt *xprt)
{
- struct iovec riov;
- int want, result;
-
- if (xprt->tcp_copied >= sizeof(xprt->tcp_xid) || !avail)
- goto done;
- want = min_t(unsigned int, sizeof(xprt->tcp_xid) - xprt->tcp_copied, avail);
- do {
- dprintk("RPC: reading xid (%d bytes)\n", want);
- riov.iov_base = ((u8*) &xprt->tcp_xid) + xprt->tcp_copied;
- riov.iov_len = want;
- result = xprt_recvmsg(xprt, &riov, 1, want, 0);
- if (result < 0)
- return result;
- xprt->tcp_copied += result;
- xprt->tcp_offset += result;
- want -= result;
- avail -= result;
- } while (want);
- done:
- return avail;
+ if (xprt->tcp_offset == xprt->tcp_reclen) {
+ xprt->tcp_flags |= XPRT_COPY_RECM;
+ xprt->tcp_offset = 0;
+ if (xprt->tcp_flags & XPRT_LAST_FRAG) {
+ xprt->tcp_flags &= ~XPRT_COPY_DATA;
+ xprt->tcp_flags |= XPRT_COPY_XID;
+ xprt->tcp_copied = 0;
+ }
+ }
}
/*
- * TCP read and complete request
+ * TCP read xid
*/
-static inline int
-tcp_read_request(struct rpc_xprt *xprt, struct rpc_rqst *req, int avail)
+static inline void
+tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
{
- int want, result;
-
- if (req->rq_rlen <= xprt->tcp_copied || !avail)
- goto done;
- want = min_t(unsigned int, req->rq_rlen - xprt->tcp_copied, avail);
- do {
- dprintk("RPC: %4d TCP receiving %d bytes\n",
- req->rq_task->tk_pid, want);
+ size_t len, used;
+ char *p;
- result = xprt_recvmsg(xprt, req->rq_rvec, req->rq_rnr, want, xprt->tcp_copied);
- if (result < 0)
- return result;
- xprt->tcp_copied += result;
- xprt->tcp_offset += result;
- avail -= result;
- want -= result;
- } while (want);
-
- done:
- if (req->rq_rlen > xprt->tcp_copied && xprt->tcp_more)
- return avail;
- dprintk("RPC: %4d received reply complete\n", req->rq_task->tk_pid);
- xprt_complete_rqst(xprt, req, xprt->tcp_copied);
-
- return avail;
-}
-
-/*
- * TCP discard extra bytes from a short read
- */
-static inline int
-tcp_read_discard(struct rpc_xprt *xprt, int avail)
-{
- struct iovec riov;
- static u8 dummy[64];
- int want, result = 0;
-
- while (avail) {
- want = min_t(unsigned int, avail, sizeof(dummy));
- riov.iov_base = dummy;
- riov.iov_len = want;
- dprintk("RPC: TCP skipping %d bytes\n", want);
- result = xprt_recvmsg(xprt, &riov, 1, want, 0);
- if (result < 0)
- return result;
- xprt->tcp_offset += result;
- avail -= result;
- }
- return avail;
+ len = sizeof(xprt->tcp_xid) - xprt->tcp_offset;
+ dprintk("RPC: reading XID (%Zu bytes)\n", len);
+ p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset;
+ used = tcp_copy_data(desc, p, len);
+ xprt->tcp_offset += used;
+ if (used != len)
+ return;
+ xprt->tcp_flags &= ~XPRT_COPY_XID;
+ xprt->tcp_flags |= XPRT_COPY_DATA;
+ xprt->tcp_copied = 4;
+ dprintk("RPC: reading reply for XID %08x\n", xprt->tcp_xid);
+ tcp_check_recm(xprt);
}
/*
- * TCP record receive routine
- * This is not the most efficient code since we call recvfrom thrice--
- * first receiving the record marker, then the XID, then the data.
- *
- * The optimal solution would be a RPC support in the TCP layer, which
- * would gather all data up to the next record marker and then pass us
- * the list of all TCP segments ready to be copied.
+ * TCP read and complete request
*/
-static int
-tcp_input_record(struct rpc_xprt *xprt)
+static inline void
+tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
{
- struct rpc_rqst *req = NULL;
- struct rpc_task *task = NULL;
- int avail, result;
-
- dprintk("RPC: tcp_input_record\n");
-
- if (xprt->shutdown)
- return -EIO;
- if (!xprt_connected(xprt))
- return -ENOTCONN;
-
- /* Read in a new fragment marker if necessary */
- /* Can we ever really expect to get completely empty fragments? */
- if ((result = tcp_read_fraghdr(xprt)) < 0)
- return result;
- avail = result;
-
- /* Read in the xid if necessary */
- if ((result = tcp_read_xid(xprt, avail)) < 0)
- return result;
- if (!(avail = result))
- goto out_ok;
+ struct rpc_rqst *req;
+ struct iovec *iov;
+ char *p;
+ unsigned long skip;
+ size_t len, used;
+ int n;
/* Find and lock the request corresponding to this xid */
req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
- if (req) {
- task = req->rq_task;
- /* Read in the request data */
- result = tcp_read_request(xprt, req, avail);
- rpc_unlock_task(task);
- if (result < 0)
- return result;
- avail = result;
+ if (!req) {
+ xprt->tcp_flags &= ~XPRT_COPY_DATA;
+ dprintk("RPC: XID %08x request not found!\n",
+ xprt->tcp_xid);
+ return;
}
-
- /* Skip over any trailing bytes on short reads */
- if ((result = tcp_read_discard(xprt, avail)) < 0)
- return result;
-
- out_ok:
- dprintk("RPC: tcp_input_record done (off %d reclen %d copied %d)\n",
- xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_copied);
- result = xprt->tcp_reclen;
- xprt->tcp_reclen = 0;
- xprt->tcp_offset = 0;
- if (!xprt->tcp_more)
- xprt->tcp_copied = 0;
- return result;
-}
-
-/*
- * TCP task queue stuff
- */
-LIST_HEAD(rpc_xprt_pending); /* List of xprts having pending tcp requests */
-
-static inline
-void tcp_rpciod_queue(void)
-{
- rpciod_wake_up();
-}
-
-int xprt_tcp_pending(void)
-{
- int retval;
-
- spin_lock_bh(&rpc_queue_lock);
- retval = !list_empty(&rpc_xprt_pending);
- spin_unlock_bh(&rpc_queue_lock);
- return retval;
-}
-
-static inline
-void xprt_append_pending(struct rpc_xprt *xprt)
-{
- spin_lock_bh(&rpc_queue_lock);
- if (list_empty(&xprt->rx_pending)) {
- list_add(&xprt->rx_pending, rpc_xprt_pending.prev);
- dprintk("RPC: xprt queue %p\n", xprt);
- tcp_rpciod_queue();
+ skip = xprt->tcp_copied;
+ iov = req->rq_rvec;
+ for (n = req->rq_rnr; n != 0; n--, iov++) {
+ if (skip >= iov->iov_len) {
+ skip -= iov->iov_len;
+ continue;
+ }
+ p = iov->iov_base;
+ len = iov->iov_len;
+ if (skip) {
+ p += skip;
+ len -= skip;
+ skip = 0;
+ }
+ if (xprt->tcp_offset + len > xprt->tcp_reclen)
+ len = xprt->tcp_reclen - xprt->tcp_offset;
+ used = tcp_copy_data(desc, p, len);
+ xprt->tcp_copied += used;
+ xprt->tcp_offset += used;
+ if (used != len)
+ break;
+ if (xprt->tcp_copied == req->rq_rlen) {
+ xprt->tcp_flags &= ~XPRT_COPY_DATA;
+ break;
+ }
+ if (xprt->tcp_offset == xprt->tcp_reclen) {
+ if (xprt->tcp_flags & XPRT_LAST_FRAG)
+ xprt->tcp_flags &= ~XPRT_COPY_DATA;
+ break;
+ }
}
- spin_unlock_bh(&rpc_queue_lock);
-}
-static
-void xprt_remove_pending(struct rpc_xprt *xprt)
-{
- spin_lock_bh(&rpc_queue_lock);
- if (!list_empty(&xprt->rx_pending)) {
- list_del(&xprt->rx_pending);
- INIT_LIST_HEAD(&xprt->rx_pending);
+ if (!(xprt->tcp_flags & XPRT_COPY_DATA)) {
+ dprintk("RPC: %4d received reply complete\n",
+ req->rq_task->tk_pid);
+ xprt_complete_rqst(xprt, req, xprt->tcp_copied);
}
- spin_unlock_bh(&rpc_queue_lock);
+ rpc_unlock_task(req->rq_task);
+ tcp_check_recm(xprt);
}
-static inline
-struct rpc_xprt *xprt_remove_pending_next(void)
+/*
+ * TCP discard extra bytes from a short read
+ */
+static inline void
+tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc)
{
- struct rpc_xprt *xprt = NULL;
+ size_t len;
- spin_lock_bh(&rpc_queue_lock);
- if (!list_empty(&rpc_xprt_pending)) {
- xprt = list_entry(rpc_xprt_pending.next, struct rpc_xprt, rx_pending);
- list_del(&xprt->rx_pending);
- INIT_LIST_HEAD(&xprt->rx_pending);
- }
- spin_unlock_bh(&rpc_queue_lock);
- return xprt;
+ len = xprt->tcp_reclen - xprt->tcp_offset;
+ if (len > desc->count)
+ len = desc->count;
+ desc->count -= len;
+ desc->offset += len;
+ xprt->tcp_offset += len;
+ tcp_check_recm(xprt);
}
/*
- * This is protected from tcp_data_ready and the stack as its run
- * inside of the RPC I/O daemon
+ * TCP record receive routine
+ * We first have to grab the record marker, then the XID, then the data.
*/
-void
-__rpciod_tcp_dispatcher(void)
+static int
+tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb,
+ unsigned int offset, size_t len)
{
- struct rpc_xprt *xprt;
- int safe_retry = 0, result;
-
- dprintk("rpciod_tcp_dispatcher: Queue Running\n");
-
- /*
- * Empty each pending socket
- */
- while ((xprt = xprt_remove_pending_next()) != NULL) {
- dprintk("rpciod_tcp_dispatcher: Processing %p\n", xprt);
-
- do {
- result = tcp_input_record(xprt);
- } while (result >= 0);
+ struct rpc_xprt *xprt = (struct rpc_xprt *)rd_desc->buf;
+ skb_reader_t desc = { skb, offset, len };
- if (safe_retry++ > 200) {
- schedule();
- safe_retry = 0;
+ dprintk("RPC: tcp_data_recv\n");
+ do {
+ /* Read in a new fragment marker if necessary */
+ /* Can we ever really expect to get completely empty fragments? */
+ if (xprt->tcp_flags & XPRT_COPY_RECM) {
+ tcp_read_fraghdr(xprt, &desc);
+ continue;
}
- }
+ /* Read in the xid if necessary */
+ if (xprt->tcp_flags & XPRT_COPY_XID) {
+ tcp_read_xid(xprt, &desc);
+ continue;
+ }
+ /* Read in the request data */
+ if (xprt->tcp_flags & XPRT_COPY_DATA) {
+ tcp_read_request(xprt, &desc);
+ continue;
+ }
+ /* Skip over any trailing bytes on short reads */
+ tcp_read_discard(xprt, &desc);
+ } while (desc.count && xprt_connected(xprt));
+ dprintk("RPC: tcp_data_recv done\n");
+ return len - desc.count;
}
-/*
- * data_ready callback for TCP. We can't just jump into the
- * tcp recvmsg functions inside of the network receive bh or
- * bad things occur. We queue it to pick up after networking
- * is done.
- */
-
-static void tcp_data_ready(struct sock *sk, int len)
+static void tcp_data_ready(struct sock *sk, int bytes)
{
- struct rpc_xprt *xprt;
+ struct rpc_xprt *xprt;
+ read_descriptor_t rd_desc;
dprintk("RPC: tcp_data_ready...\n");
- if (!(xprt = xprt_from_sock(sk)))
- {
- printk("Not a socket with xprt %p\n", sk);
- goto out;
+ if (!(xprt = xprt_from_sock(sk))) {
+ printk("RPC: tcp_data_ready socket info not found!\n");
+ return;
}
-
if (xprt->shutdown)
- goto out;
-
- xprt_append_pending(xprt);
+ return;
- dprintk("RPC: tcp_data_ready client %p\n", xprt);
- dprintk("RPC: state %x conn %d dead %d zapped %d\n",
- sk->state, xprt_connected(xprt),
- sk->dead, sk->zapped);
- out:
- if (sk->sleep && waitqueue_active(sk->sleep))
- wake_up_interruptible(sk->sleep);
+ /* We use rd_desc to pass struct xprt to tcp_data_recv */
+ rd_desc.buf = (char *)xprt;
+ rd_desc.count = 65536;
+ tcp_read_sock(sk, &rd_desc, tcp_data_recv);
}
-
static void
tcp_state_change(struct sock *sk)
{
@@ -1047,6 +924,13 @@
case TCP_ESTABLISHED:
if (xprt_test_and_set_connected(xprt))
break;
+
+ /* Reset TCP record info */
+ xprt->tcp_offset = 0;
+ xprt->tcp_reclen = 0;
+ xprt->tcp_copied = 0;
+ xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID;
+
spin_lock(&xprt->sock_lock);
if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
rpc_wake_up_task(xprt->snd_task);
@@ -1069,7 +953,7 @@
* low.
*/
static void
-tcp_write_space(struct sock *sk)
+xprt_write_space(struct sock *sk)
{
struct rpc_xprt *xprt;
struct socket *sock;
@@ -1098,32 +982,6 @@
}
}
-static void
-udp_write_space(struct sock *sk)
-{
- struct rpc_xprt *xprt;
-
- if (!(xprt = xprt_from_sock(sk)))
- return;
- if (xprt->shutdown)
- return;
-
-
- /* Wait until we have enough socket memory */
- if (sock_wspace(sk) < min_t(int, sk->sndbuf,XPRT_MIN_WRITE_SPACE))
- return;
-
- if (!xprt_test_and_set_wspace(xprt)) {
- spin_lock(&xprt->sock_lock);
- if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
- rpc_wake_up_task(xprt->snd_task);
- spin_unlock(&xprt->sock_lock);
- }
-
- if (sk->sleep && waitqueue_active(sk->sleep))
- wake_up_interruptible(sk->sleep);
-}
-
/*
* RPC receive timeout handler.
*/
@@ -1496,8 +1354,6 @@
req->rq_next = NULL;
xprt->free = xprt->slot;
- INIT_LIST_HEAD(&xprt->rx_pending);
-
dprintk("RPC: created transport %p\n", xprt);
xprt_bind_socket(xprt, sock);
@@ -1542,15 +1398,14 @@
xprt->old_write_space = sk->write_space;
if (xprt->prot == IPPROTO_UDP) {
sk->data_ready = udp_data_ready;
- sk->write_space = udp_write_space;
sk->no_check = UDP_CSUM_NORCV;
xprt_set_connected(xprt);
} else {
sk->data_ready = tcp_data_ready;
sk->state_change = tcp_state_change;
- sk->write_space = tcp_write_space;
xprt_clear_connected(xprt);
}
+ sk->write_space = xprt_write_space;
/* Reset to new socket */
xprt->sock = sock;
FUNET's LINUX-ADM group, linux-adm@nic.funet.fi
TCL-scripts by Sam Shen (who was at: slshen@lbl.gov)