Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use endpoint for cs3ref + use parent_id for relative paths in CS3Iface #67

Merged
merged 6 commits into from
Apr 6, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 61 additions & 39 deletions src/core/cs3iface.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
Main author: Giuseppe.LoPresti@cern.ch, CERN/IT-ST
'''

import os
import time
import http
import requests
Expand Down Expand Up @@ -48,6 +49,24 @@ def getuseridfromcreds(token, _wopiuser):
return token


def _getcs3reference(endpoint, fileref):
'''Generates a CS3 reference for a given fileref, covering the following cases:
absolute path, relative hybrid path, fully opaque fileid'''
if fileref[0] == '/':
# assume this is a filepath
ref = cs3spr.Reference(path=fileref)
elif fileref.find('/') > 0:
# assume we have a relative path in the form `<parent_opaque_id>/<base_filename>`,
# also works if we get `<parent_opaque_id>/<path>/<filename>`
ref = cs3spr.Reference(resource_id=cs3spr.ResourceId(storage_id=endpoint,
opaque_id=fileref[:fileref.find('/')]),
path='.' + fileref[fileref.find('/'):])
else:
# assume we have an opaque fileid
ref = cs3spr.Reference(resource_id=cs3spr.ResourceId(storage_id=endpoint, opaque_id=fileref))
return ref


def authenticate_for_test(userid, userpwd):
'''Use basic authentication against Reva for testing purposes'''
authReq = cs3gw.AuthenticateRequest(type='basic', client_id=userid, client_secret=userpwd)
Expand All @@ -62,15 +81,8 @@ def stat(endpoint, fileref, userid, versioninv=1):
'''Stat a file and returns (size, mtime) as well as other extended info using the given userid as access token.
Note that endpoint here means the storage id, and fileref can be either a path (which MUST begin with /),
or an id (which MUST NOT start with a /). The versioninv flag is natively supported by Reva.'''
if endpoint == 'default':
raise IOError('A CS3API-compatible storage endpoint must be identified by a storage UUID')
tstart = time.time()
if fileref[0] == '/':
# assume this is a filepath
ref = cs3spr.Reference(path=fileref)
else:
# assume we have an opaque fileid
ref = cs3spr.Reference(resource_id=cs3spr.ResourceId(storage_id=endpoint, opaque_id=fileref))
ref = _getcs3reference(endpoint, fileref)
statInfo = ctx['cs3gw'].Stat(request=cs3sp.StatRequest(ref=ref), metadata=[('x-access-token', userid)])
tend = time.time()
if statInfo.status.code == cs3code.CODE_OK:
Expand All @@ -81,11 +93,15 @@ def stat(endpoint, fileref, userid, versioninv=1):
log.warning('msg="Invoked stat" fileref="%s" unexpectedtype="%d"' % (fileref, statInfo.info.type))
raise IOError('Unexpected type %d' % statInfo.info.type)
inode = common.encodeinode(statInfo.info.id.storage_id, statInfo.info.id.opaque_id)
# in case we got a relative path, build an hybrid path that can be used to reference the file:
# note that as per specs the parent_id MUST be available in this case
filepath = statInfo.info.path if statInfo.info.path[0] == '/' else \
statInfo.info.parent_id.opaque_id + '/' + os.path.basename(statInfo.info.path)
log.info('msg="Invoked stat" fileref="%s" inode="%s" filepath="%s" elapsedTimems="%.1f"' %
(fileref, inode, statInfo.info.path, (tend-tstart)*1000))
(fileref, inode, filepath, (tend-tstart)*1000))
return {
'inode': inode,
'filepath': statInfo.info.path,
'filepath': filepath,
'ownerid': statInfo.info.owner.opaque_id + '@' + statInfo.info.owner.idp,
'size': statInfo.info.size,
'mtime': statInfo.info.mtime.seconds
Expand All @@ -94,14 +110,14 @@ def stat(endpoint, fileref, userid, versioninv=1):
raise IOError(common.ENOENT_MSG if statInfo.status.code == cs3code.CODE_NOT_FOUND else statInfo.status.message)


def statx(endpoint, fileid, userid, versioninv=0):
def statx(endpoint, fileref, userid, versioninv=0):
'''Get extended stat info (inode, filepath, userid, size, mtime). Equivalent to stat.'''
return stat(endpoint, fileid, userid, versioninv)
return stat(endpoint, fileref, userid, versioninv)


def setxattr(_endpoint, filepath, userid, key, value, lockid):
def setxattr(endpoint, filepath, userid, key, value, lockid):
'''Set the extended attribute <key> to <value> using the given userid as access token'''
reference = cs3spr.Reference(path=filepath)
reference = _getcs3reference(endpoint, filepath)
md = cs3spr.ArbitraryMetadata()
md.metadata.update({key: str(value)}) # pylint: disable=no-member
req = cs3sp.SetArbitraryMetadataRequest(ref=reference, arbitrary_metadata=md, lock_id=lockid)
Expand All @@ -113,10 +129,10 @@ def setxattr(_endpoint, filepath, userid, key, value, lockid):
log.debug('msg="Invoked setxattr" result="%s"' % res)


def getxattr(_endpoint, filepath, userid, key):
def getxattr(endpoint, filepath, userid, key):
'''Get the extended attribute <key> using the given userid as access token'''
tstart = time.time()
reference = cs3spr.Reference(path=filepath)
reference = _getcs3reference(endpoint, filepath)
statInfo = ctx['cs3gw'].Stat(request=cs3sp.StatRequest(ref=reference), metadata=[('x-access-token', userid)])
tend = time.time()
if statInfo.status.code == cs3code.CODE_NOT_FOUND:
Expand All @@ -138,9 +154,9 @@ def getxattr(_endpoint, filepath, userid, key):
return None


def rmxattr(_endpoint, filepath, userid, key, lockid):
def rmxattr(endpoint, filepath, userid, key, lockid):
'''Remove the extended attribute <key> using the given userid as access token'''
reference = cs3spr.Reference(path=filepath)
reference = _getcs3reference(endpoint, filepath)
req = cs3sp.UnsetArbitraryMetadataRequest(ref=reference, arbitrary_metadata_keys=[key], lock_id=lockid)
res = ctx['cs3gw'].UnsetArbitraryMetadata(request=req, metadata=[('x-access-token', userid)])
if res.status.code != cs3code.CODE_OK:
Expand All @@ -149,9 +165,9 @@ def rmxattr(_endpoint, filepath, userid, key, lockid):
log.debug('msg="Invoked rmxattr" result="%s"' % res.status)


def setlock(_endpoint, filepath, userid, appname, value):
def setlock(endpoint, filepath, userid, appname, value):
'''Set a lock to filepath with the given value metadata and appname as holder'''
reference = cs3spr.Reference(path=filepath)
reference = _getcs3reference(endpoint, filepath)
lock = cs3spr.Lock(type=cs3spr.LOCK_TYPE_WRITE, app_name=appname, lock_id=value, \
expiration={'seconds': int(time.time() + ctx['lockexpiration'])})
req = cs3sp.SetLockRequest(ref=reference, lock=lock)
Expand All @@ -167,9 +183,9 @@ def setlock(_endpoint, filepath, userid, appname, value):
log.debug('msg="Invoked setlock" filepath="%s" value="%s" result="%s"' % (filepath, value, res.status))


def getlock(_endpoint, filepath, userid):
def getlock(endpoint, filepath, userid):
'''Get the lock metadata for the given filepath'''
reference = cs3spr.Reference(path=filepath)
reference = _getcs3reference(endpoint, filepath)
req = cs3sp.GetLockRequest(ref=reference)
res = ctx['cs3gw'].GetLock(request=req, metadata=[('x-access-token', userid)])
if res.status.code == cs3code.CODE_NOT_FOUND:
Expand All @@ -187,17 +203,17 @@ def getlock(_endpoint, filepath, userid):
'app_name': res.lock.app_name,
'user': {'opaque_id' : res.lock.user.opaque_id,
'idp': res.lock.user.idp,
'type': 1
'type': res.lock.user.type
} if res.lock.user.opaque_id else {},
'expiration': {
'seconds': res.lock.expiration.seconds
}
}


def refreshlock(_endpoint, filepath, userid, appname, value):
def refreshlock(endpoint, filepath, userid, appname, value):
'''Refresh the lock metadata for the given filepath'''
reference = cs3spr.Reference(path=filepath)
reference = _getcs3reference(endpoint, filepath)
lock = cs3spr.Lock(type=cs3spr.LOCK_TYPE_WRITE, app_name=appname, lock_id=value, \
expiration={'seconds': int(time.time() + ctx['lockexpiration'])})
req = cs3sp.RefreshLockRequest(ref=reference, lock=lock)
Expand All @@ -209,9 +225,9 @@ def refreshlock(_endpoint, filepath, userid, appname, value):
log.debug('msg="Invoked refreshlock" filepath="%s" value="%s" result="%s"' % (filepath, value, res.status))


def unlock(_endpoint, filepath, userid, appname, value):
def unlock(endpoint, filepath, userid, appname, value):
'''Remove the lock for the given filepath'''
reference = cs3spr.Reference(path=filepath)
reference = _getcs3reference(endpoint, filepath)
lock = cs3spr.Lock(type=cs3spr.LOCK_TYPE_WRITE, app_name=appname, lock_id=value)
req = cs3sp.UnlockRequest(ref=reference, lock=lock)
res = ctx['cs3gw'].Unlock(request=req, metadata=[('x-access-token', userid)])
Expand All @@ -222,11 +238,13 @@ def unlock(_endpoint, filepath, userid, appname, value):
log.debug('msg="Invoked unlock" filepath="%s" value="%s" result="%s"' % (filepath, value, res.status))


def readfile(_endpoint, filepath, userid, lockid):
def readfile(endpoint, filepath, userid, lockid):
'''Read a file using the given userid as access token. Note that the function is a generator, managed by Flask.'''
tstart = time.time()
reference = _getcs3reference(endpoint, filepath)

# prepare endpoint
req = cs3sp.InitiateFileDownloadRequest(ref=cs3spr.Reference(path=filepath), lock_id=lockid)
req = cs3sp.InitiateFileDownloadRequest(ref=reference, lock_id=lockid)
initfiledownloadres = ctx['cs3gw'].InitiateFileDownload(request=req, metadata=[('x-access-token', userid)])
if initfiledownloadres.status.code == cs3code.CODE_NOT_FOUND:
log.info('msg="File not found on read" filepath="%s"' % filepath)
Expand All @@ -239,7 +257,7 @@ def readfile(_endpoint, filepath, userid, lockid):

# Download
try:
protocol = [p for p in initfiledownloadres.protocols if p.protocol == "simple"][0]
protocol = [p for p in initfiledownloadres.protocols if p.protocol == "simple" or p.protocol == "spaces"][0]
headers = {
'x-access-token': userid,
'x-reva-transfer': protocol.token # needed if the downloads pass through the data gateway in reva
Expand All @@ -260,20 +278,22 @@ def readfile(_endpoint, filepath, userid, lockid):
yield data[i:i+ctx['chunksize']]


def writefile(_endpoint, filepath, userid, content, lockid, islock=False):
def writefile(endpoint, filepath, userid, content, lockid, islock=False):
'''Write a file using the given userid as access token. The entire content is written
and any pre-existing file is deleted (or moved to the previous version if supported).
The islock flag is currently not supported. The backend should at least support
writing the file with O_CREAT|O_EXCL flags to prevent races.'''
if islock:
log.warning('msg="Lock (no-overwrite) flag not supported, going for standard upload"')
tstart = time.time()

# prepare endpoint
if isinstance(content, str):
content = bytes(content, 'UTF-8')
size = str(len(content))
reference = _getcs3reference(endpoint, filepath)
metadata = types.Opaque(map={"Upload-Length": types.OpaqueEntry(decoder="plain", value=str.encode(size))})
req = cs3sp.InitiateFileUploadRequest(ref=cs3spr.Reference(path=filepath), lock_id=lockid, opaque=metadata)
req = cs3sp.InitiateFileUploadRequest(ref=reference, lock_id=lockid, opaque=metadata)
initfileuploadres = ctx['cs3gw'].InitiateFileUpload(request=req, metadata=[('x-access-token', userid)])
if initfileuploadres.status.code != cs3code.CODE_OK:
log.error('msg="Failed to initiateFileUpload on write" filepath="%s" code="%s" reason="%s"' % \
Expand All @@ -283,8 +303,7 @@ def writefile(_endpoint, filepath, userid, content, lockid, islock=False):

# Upload
try:
# Get the endpoint for simple protocol
protocol = [p for p in initfileuploadres.protocols if p.protocol == "simple"][0]
protocol = [p for p in initfileuploadres.protocols if p.protocol == "simple" or p.protocol == "spaces"][0]
headers = {
'x-access-token': userid,
'Upload-Length': size,
Expand All @@ -305,10 +324,12 @@ def writefile(_endpoint, filepath, userid, content, lockid, islock=False):
(filepath, (tend-tstart)*1000, islock))


def renamefile(_endpoint, filepath, newfilepath, userid, lockid):
def renamefile(endpoint, filepath, newfilepath, userid, lockid):
'''Rename a file from origfilepath to newfilepath using the given userid as access token.'''
req = cs3sp.MoveRequest(source=cs3spr.Reference(path=filepath), \
destination=cs3spr.Reference(path=newfilepath), lock_id=lockid)
reference = _getcs3reference(endpoint, filepath)
newfileref = _getcs3reference(endpoint, newfilepath)

req = cs3sp.MoveRequest(source=reference, destination=newfileref, lock_id=lockid)
res = ctx['cs3gw'].Move(request=req, metadata=[('x-access-token', userid)])
if res.status.code != cs3code.CODE_OK:
log.error('msg="Failed to rename file" filepath="%s" code="%s" reason="%s"' %
Expand All @@ -317,10 +338,11 @@ def renamefile(_endpoint, filepath, newfilepath, userid, lockid):
log.debug('msg="Invoked renamefile" result="%s"' % res)


def removefile(_endpoint, filepath, userid, _force=False):
def removefile(endpoint, filepath, userid, _force=False):
'''Remove a file using the given userid as access token.
The force argument is ignored for now for CS3 storage.'''
req = cs3sp.DeleteRequest(ref=cs3spr.Reference(path=filepath))
reference = _getcs3reference(endpoint, filepath)
req = cs3sp.DeleteRequest(ref=reference)
res = ctx['cs3gw'].Delete(request=req, metadata=[('x-access-token', userid)])
if res.status.code != cs3code.CODE_OK:
if str(res) == common.ENOENT_MSG:
Expand Down