[Devel] [PATCH RHEL8 COMMIT] fs/fuse kio: concurently work shrink and IO operations

Konstantin Khorenko khorenko at virtuozzo.com
Thu Oct 15 10:37:37 MSK 2020


The commit is pushed to "branch-rh8-4.18.0-193.6.3.vz8.4.x-ovz" and will appear at https://src.openvz.org/scm/ovz/vzkernel.git
after rh8-4.18.0-193.6.3.vz8.4.13
------>
commit 5b985004e52ba23a6c0dbec4236b06a0342089c0
Author: Ildar Ismagilov <Ildar.Ismagilov at acronis.com>
Date:   Thu Oct 15 10:37:37 2020 +0300

    fs/fuse kio: concurently work shrink and IO operations
    
    The problem is that the shrink operation can be performed
    concurently with IO operations.
    
    How it's possible:
    cpu0:                                      cpu1:
    pcs_fuse_prep_rw()
      spin_lock(&di->lock);
      if (di->size.op == PCS_SIZE_SHRINK) {
        ...
        goto pending;
      }
      ...
      switch (req->in.h.opcode) {
        ...
        case FUSE_WRITE or FUSE_READ: {
          ...
          if (in->offset + in->size > di->fileinfo.attr.size) {
            ...
          }
          ...
          spin_unlock(&di->lock);
          ...
                                               pcs_kio_setattr_handle()
                                                 ...
                                                 spin_lock(&di->lock);
                                                  if (inarg->size < di->fileinfo.attr.size) {
                                                  ...
                                                  di->size.op = PCS_SIZE_SHRINK;
                                                 }
                                                 spin_unlock(&di->lock);
                                                 ...
                                                 if (di->size.op == PCS_SIZE_SHRINK) {
                                                  ...
                                                  fuse_dio_wait(fi); <-- we don't wait IO operation because
                                                                         incrementing of dio counter will be later
                                                  ...
                                                 }
          ...
          break;
        }
        ...
      }
      if (!kqueue_insert(di, ff, req))
        return -EIO;
      if (req->in.h.opcode == FUSE_READ)
        fuse_read_dio_begin(fi);  <-- incrementing of dio counter
      else
        fuse_write_dio_begin(fi); <-- incrementing of dio counter
      return 0;
      ...
    }
    
    To fix it, we increment dio counter before unlock of di->lock.
    
    Signed-off-by: Ildar Ismagilov <Ildar.Ismagilov at acronis.com>
    Acked-by: Alexey Kuznetsov <Alexey.Kuznetsov at acronis.com>
    Acked-by: Andrey Zaitsev <azaitsev at virtuozzo.com>
---
 fs/fuse/kio/pcs/pcs_fuse_kdirect.c | 33 +++++++++++++++------------------
 1 file changed, 15 insertions(+), 18 deletions(-)

diff --git a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
index 3205288da404..d4cb1b87fadf 100644
--- a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
+++ b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
@@ -803,7 +803,7 @@ static int pcs_fuse_prep_rw(struct pcs_fuse_req *r, struct fuse_file *ff)
 	struct fuse_io_args *ia = container_of(args, typeof(*ia), ap.args);
 	struct pcs_dentry_info *di = get_pcs_inode(args->io_inode);
 	struct fuse_inode *fi = get_fuse_inode(args->io_inode);
-	int ret;
+	int ret = 0;
 
 	spin_lock(&di->lock);
 	/* Deffer all requests if shrink requested to prevent livelock */
@@ -817,7 +817,7 @@ static int pcs_fuse_prep_rw(struct pcs_fuse_req *r, struct fuse_file *ff)
 		}
 		wait_shrink(r, di);
 		ret = 1;
-		goto pending;
+		goto out;
 	}
 
 	switch (req->in.h.opcode) {
@@ -830,11 +830,10 @@ static int pcs_fuse_prep_rw(struct pcs_fuse_req *r, struct fuse_file *ff)
 			if (in->offset >= di->fileinfo.attr.size) {
 				args->out_args[0].size = 0;
 				ret = -EPERM;
-				goto fail;
+				goto out;
 			}
 			size = di->fileinfo.attr.size - in->offset;
 		}
-		spin_unlock(&di->lock);
 
 		pcs_fuse_prep_io(r, PCS_REQ_T_READ, in->offset, size, 0);
 		break;
@@ -846,9 +845,8 @@ static int pcs_fuse_prep_rw(struct pcs_fuse_req *r, struct fuse_file *ff)
 			pcs_fuse_prep_io(r, PCS_REQ_T_WRITE, in->offset,
 					 in->size, 0);
 			ret = req_wait_grow_queue(r, ff, in->offset, in->size);
-			goto pending;
+			goto out;
 		}
-		spin_unlock(&di->lock);
 
 		pcs_fuse_prep_io(r, PCS_REQ_T_WRITE, in->offset, in->size, 0);
 		break;
@@ -865,11 +863,10 @@ static int pcs_fuse_prep_rw(struct pcs_fuse_req *r, struct fuse_file *ff)
 		if (in->fm_start + size > di->fileinfo.attr.size) {
 			if (in->fm_start >= di->fileinfo.attr.size) {
 				ret = -EPERM;
-				goto fail;
+				goto out;
 			}
 			size = di->fileinfo.attr.size - in->fm_start;
 		}
-		spin_unlock(&di->lock);
 
 		pcs_fuse_prep_io(r, PCS_REQ_T_FIEMAP, in->fm_start,
 				 in->fm_extent_count*sizeof(struct fiemap_extent),
@@ -893,14 +890,15 @@ static int pcs_fuse_prep_rw(struct pcs_fuse_req *r, struct fuse_file *ff)
 			else
 				pcs_fuse_prep_fallocate(r);
 			ret = req_wait_grow_queue(r, ff, in->offset, in->length);
-			goto pending;
+			goto out;
 		}
-		spin_unlock(&di->lock);
 
-		if (type < PCS_REQ_T_MAX)
+		if (type < PCS_REQ_T_MAX) {
 			pcs_fuse_prep_io(r, type, in->offset, in->length, 0);
-		else
-			return -EPERM; /* NOPE */
+		} else {
+			ret = -EPERM; /* NOPE */
+			goto out;
+		}
 		break;
 	}
 	default:
@@ -908,14 +906,13 @@ static int pcs_fuse_prep_rw(struct pcs_fuse_req *r, struct fuse_file *ff)
 	}
 
 	if (!kqueue_insert(di, ff, req))
-		return -EIO;
-	if (req->in.h.opcode == FUSE_READ)
+		ret = -EIO;
+	else if (req->in.h.opcode == FUSE_READ)
 		fuse_read_dio_begin(fi);
 	else
 		fuse_write_dio_begin(fi);
-	return 0;
-fail:
-pending:
+
+out:
 	spin_unlock(&di->lock);
 	return ret;
 }


More information about the Devel mailing list