RGWAboutMultipart部分
4、终止Multipart数据上传。
终止Multipart使用的是HTTP DELETE操作,在RGW中使用RGWAbortMultipart_ObjStore_S3类处理。由于RGWAbortMultipart_ObjStore_S3类继承自RGWAbortMultipart_ObjStore,而RGWAbortMultipart_ObjStore类继承自RGWAbortMultipart,而RGWAbortMultipart类继承自RGWOp类。因此处理Mulipart终止操作主要集中在RGWAbortMultipart::execute()函数中。下面对该函数的处理流程进行详细解析。
RGWAbortMultipart::execute()
|__get_multipart_info() 得到meta_oid对象的属性值
|__list_multipart_parts() 遍历Multipart对象所有分片
|__RGWRados::update_gc_chain() 将已经完成的Multipart分片放入到GC队列中等待GC线程做回收操作
|__RGWRados::send_chain_to_gc() 启动GC线程
|__创建RGWRados::Object对象
|__创建RGWRados::Object::Delete对象
|__RGWRados::Object::Delete::delete_obj() 删除meta_oid对象
rgw_op.h中定義:
class RGWAbortMultipart : public RGWOp {
public:
RGWAbortMultipart() {}
int verify_permission();
void pre_exec();
void execute();
virtual void send_response() = 0;
virtual const string name() { return "abort_multipart"; }
virtual RGWOpType get_type() { return RGW_OP_ABORT_MULTIPART; }
virtual uint32_t op_mask() { return RGW_OP_TYPE_DELETE; }
};
void RGWAbortMultipart::execute()
{
op_ret = -EINVAL;
string upload_id;
string meta_oid;
upload_id = s->info.args.get("uploadId");
map<uint32_t, RGWUploadPartInfo> obj_parts;
map<uint32_t, RGWUploadPartInfo>::iterator obj_iter;
map<string, bufferlist> attrs;
rgw_obj meta_obj;
RGWMPObj mp;
if (upload_id.empty() || s->object.empty())
return;
mp.init(s->object.name, upload_id);
meta_oid = mp.get_meta();
op_ret = get_multipart_info(store, s, meta_oid, NULL, attrs);
if (op_ret < 0)
return;
bool truncated;
int marker = 0;
int max_parts = 1000;
RGWObjectCtx *obj_ctx = static_cast<RGWObjectCtx *>(s->obj_ctx);
meta_obj.init_ns(s->bucket, meta_oid, mp_ns);
meta_obj.set_in_extra_data(true);
meta_obj.index_hash_source = s->object.name;
cls_rgw_obj_chain chain;
list<rgw_obj_key> remove_objs;
do {
op_ret = list_multipart_parts(store, s, upload_id, meta_oid, max_parts,
marker, obj_parts, &marker, &truncated);
if (op_ret < 0)
return;
for (obj_iter = obj_parts.begin();
obj_iter != obj_parts.end(); ++obj_iter) {
RGWUploadPartInfo& obj_part = obj_iter->second;
if (obj_part.manifest.empty()) {
string oid = mp.get_part(obj_iter->second.num);
rgw_obj obj;
obj.init_ns(s->bucket, oid, mp_ns);
obj.index_hash_source = s->object.name;
op_ret = store->delete_obj(*obj_ctx, s->bucket_info, obj, 0);
if (op_ret < 0 && op_ret != -ENOENT)
return;
} else {
store->update_gc_chain(meta_obj, obj_part.manifest, &chain);
RGWObjManifest::obj_iterator oiter = obj_part.manifest.obj_begin();
if (oiter != obj_part.manifest.obj_end()) {
rgw_obj head = oiter.get_location();
rgw_obj_key key;
head.get_index_key(&key);
remove_objs.push_back(key);
}
}
}
} while (truncated);
/* use upload id as tag */
op_ret = store->send_chain_to_gc(chain, upload_id , false); // do it async
if (op_ret < 0) {
ldout(store->ctx(), 5) << "gc->send_chain() returned " << op_ret << dendl;
return;
}
RGWRados::Object del_target(store, s->bucket_info, *obj_ctx, meta_obj);
RGWRados::Object::Delete del_op(&del_target);
del_op.params.bucket_owner = s->bucket_info.owner;
del_op.params.versioning_status = 0;
if (!remove_objs.empty()) {
del_op.params.remove_objs = &remove_objs;
}
// and also remove the metadata obj
op_ret = del_op.delete_obj();
if (op_ret == -ENOENT) {
op_ret = -ERR_NO_SUCH_BUCKET;
}
}