diff --git a/.env_template b/.env_template new file mode 100644 index 0000000..8658dd3 --- /dev/null +++ b/.env_template @@ -0,0 +1,16 @@ +# Marketplace URL +MP_HOST="https://dataspace.reaxpro.eu" +# Access token for markeplace connection. Need to change time to time when expired +MP_ACCESS_TOKEN=.... + +# Env variables needed for data-sink connection otherwise optional +# Client-ID of the datasink application +CLIENT_ID="2c791805-ea52-4446-af97-80c0355a73b4" + +# optional: if incase if we want to get access_token directly from key_cloak +KEYCLOAK_SERVER_URL="https://dataspace.reaxpro.eu/auth/" +KEYCLOAK_CLIENT_ID=.... +KEYCLOAK_REALM_NAME=.... +KEYCLOAK_CLIENT_SECRET_KEY=.... +MARKETPLACE_USERNAME=.... +MARKETPLACE_PASSWORD=.... diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e8dcedc..61fd032 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -10,7 +10,7 @@ repos: - id: trailing-whitespace - repo: https://github.com/jumanjihouse/pre-commit-hook-yamlfmt - rev: 0.2.2 + rev: 0.2.3 hooks: - id: yamlfmt @@ -20,7 +20,7 @@ repos: - id: check-manifest - repo: https://github.com/psf/black - rev: 22.12.0 + rev: 23.3.0 hooks: - id: black @@ -31,7 +31,7 @@ repos: args: [--count, --show-source, --statistics] - repo: https://github.com/asottile/setup-cfg-fmt - rev: v2.2.0 + rev: v2.3.0 hooks: - id: setup-cfg-fmt @@ -42,7 +42,7 @@ repos: args: [--profile, black, --filter-files] - repo: https://github.com/asottile/pyupgrade - rev: v3.3.1 + rev: v3.6.0 hooks: - id: pyupgrade args: [--py38-plus] diff --git a/MANIFEST.in b/MANIFEST.in index 33a5b43..d5ee960 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -4,8 +4,10 @@ exclude examples recursive-exclude examples * exclude .pre-commit-config.yaml -exclude .gitlab-ci.yml +exclude .flake8 include LICENSE include README.md +include .env_template +recursive-include marketplace *.md recursive-include logos *.png diff --git a/examples/data_sink_client/collection_dcat.py b/examples/data_sink_client/collection_dcat.py new file mode 100644 index 0000000..1786e71 --- /dev/null +++ b/examples/data_sink_client/collection_dcat.py @@ -0,0 +1,5 @@ +from marketplace.data_sink_client.session import MPSession + +with MPSession() as test: + objects = test.get_collection_dcat(collection_name="c1") + print(objects) diff --git a/examples/data_sink_client/dataset_dcat.py b/examples/data_sink_client/dataset_dcat.py new file mode 100644 index 0000000..130fade --- /dev/null +++ b/examples/data_sink_client/dataset_dcat.py @@ -0,0 +1,5 @@ +from marketplace.data_sink_client.session import MPSession + +with MPSession() as test: + objects = test.get_dataset_dcat(collection_name="c1", dataset_name="d1") + print(objects) diff --git a/examples/data_sink_client/delete_collection.py b/examples/data_sink_client/delete_collection.py new file mode 100644 index 0000000..4916ebc --- /dev/null +++ b/examples/data_sink_client/delete_collection.py @@ -0,0 +1,5 @@ +from marketplace.data_sink_client.session import MPSession + +with MPSession() as test: + objects = test.delete_collection(collection_name="c1") + print(objects) diff --git a/examples/data_sink_client/delete_dataset.py b/examples/data_sink_client/delete_dataset.py new file mode 100644 index 0000000..e40eec5 --- /dev/null +++ b/examples/data_sink_client/delete_dataset.py @@ -0,0 +1,5 @@ +from marketplace.data_sink_client.session import MPSession + +with MPSession() as test: + objects = test.delete_dataset(collection_name="c1", dataset_name="d1") + print(objects) diff --git a/examples/data_sink_client/download_file.py b/examples/data_sink_client/download_file.py new file mode 100644 index 0000000..47980ab --- /dev/null +++ b/examples/data_sink_client/download_file.py @@ -0,0 +1,10 @@ +from marketplace.data_sink_client.session import MPSession + +with MPSession() as test: + objects = test.download_dataset( + collection_name="upload_file_example", + dataset_name="file3.txt", + targetdir="/root/symphony/reaxpro", + raise_if_directory_not_empty=False, + ) + print(objects) diff --git a/examples/data_sink_client/download_folder.py b/examples/data_sink_client/download_folder.py new file mode 100644 index 0000000..bb97e4f --- /dev/null +++ b/examples/data_sink_client/download_folder.py @@ -0,0 +1,9 @@ +from marketplace.data_sink_client.session import MPSession + +with MPSession() as test: + objects = test.download_datasets_from_collection( + collection_name="Simulation_folder9", + targetdir="/root/symphony/reaxpro", + raise_if_directory_not_empty=False, + ) + print(objects) diff --git a/examples/data_sink_client/list_collections.py b/examples/data_sink_client/list_collections.py new file mode 100644 index 0000000..edee211 --- /dev/null +++ b/examples/data_sink_client/list_collections.py @@ -0,0 +1,5 @@ +from marketplace.data_sink_client.session import MPSession + +with MPSession() as test: + objects = test.list_collections() + print(objects) diff --git a/examples/data_sink_client/list_datasets.py b/examples/data_sink_client/list_datasets.py new file mode 100644 index 0000000..4cc21a0 --- /dev/null +++ b/examples/data_sink_client/list_datasets.py @@ -0,0 +1,5 @@ +from marketplace.data_sink_client.session import MPSession + +with MPSession() as test: + objects = test.list_datasets(collection_name="c1") + print(objects) diff --git a/examples/data_sink_client/query.py b/examples/data_sink_client/query.py new file mode 100644 index 0000000..70bc9fc --- /dev/null +++ b/examples/data_sink_client/query.py @@ -0,0 +1,6 @@ +from marketplace.data_sink_client.session import MPSession + +query = """SELECT ?subject ?predicate ?object WHERE {{ ?subject ?predicate ?object . }} LIMIT 5""" +with MPSession() as test: + objects = test.query(query=query) + print(objects) diff --git a/examples/data_sink_client/query_dataset.py b/examples/data_sink_client/query_dataset.py new file mode 100644 index 0000000..271a72e --- /dev/null +++ b/examples/data_sink_client/query_dataset.py @@ -0,0 +1,6 @@ +from marketplace.data_sink_client.session import MPSession + +query = "SELECT ?subject ?predicate ?object WHERE {{ ?subject ?predicate ?object . }} LIMIT 5" +with MPSession() as test: + objects = test.query_dataset(collection_name="c1", dataset_name="d1", query=query) + print(objects) diff --git a/examples/data_sink_client/upload_files_from_path.py b/examples/data_sink_client/upload_files_from_path.py new file mode 100644 index 0000000..3bf19b4 --- /dev/null +++ b/examples/data_sink_client/upload_files_from_path.py @@ -0,0 +1,8 @@ +from marketplace.data_sink_client.session import MPSession + +with MPSession() as test: + objects = test.create_dataset_from_path( + path="/root/symphony/reaxpro/Simulation_folder9/file3.txt", + collection_name="ams_wrapper9", + ) + print(objects) diff --git a/examples/data_sink_client/upload_folder.py b/examples/data_sink_client/upload_folder.py new file mode 100644 index 0000000..b2e62e0 --- /dev/null +++ b/examples/data_sink_client/upload_folder.py @@ -0,0 +1,8 @@ +from marketplace.data_sink_client.session import MPSession + +with MPSession() as test: + objects = test.create_datasets_from_sourcedir( + sourcedir="/root/symphony/reaxpro/Simulation_folder9", + collection_name="Simulation_folder1", + ) + print(objects) diff --git a/examples/mp_api.py b/examples/mp_api.py index 66d1d64..9fefbac 100644 --- a/examples/mp_api.py +++ b/examples/mp_api.py @@ -5,7 +5,6 @@ from marketplace.app import get_app from marketplace.app.v0 import MarketPlaceApp -from marketplace.app.v0_0_1 import MarketPlaceApp as MarketPlaceApp_v_0_0_1 from marketplace.client import MarketPlaceClient # General MarketPlaceClient for simple requests like user info @@ -30,14 +29,3 @@ def heartbeat(self) -> HTMLResponse: my_mp_app = MyMarketPlaceApp(app_id="") print(my_mp_app.heartbeat()) - - -# To extend the MarketPlaceApp with custom implementations for deprecated api version 0.0.1 -class MyMarketPlaceApp_v_0_0_1(MarketPlaceApp_v_0_0_1): - def heartbeat(self) -> str: - res = super().heartbeat() - return f"heartbeat response: {res}" - - -my_mp_app_v_0_0_1 = MyMarketPlaceApp_v_0_0_1(client_id="") -print(my_mp_app_v_0_0_1.heartbeat()) diff --git a/marketplace/app/__init__.py b/marketplace/app/__init__.py index 8892c5d..c4ba784 100644 --- a/marketplace/app/__init__.py +++ b/marketplace/app/__init__.py @@ -3,7 +3,6 @@ .. currentmodule:: marketplace.app .. moduleauthor:: Pablo de Andres, Pranjali Singh (Fraunhofer IWM) """ -import warnings from typing import Optional from packaging.version import parse @@ -11,16 +10,6 @@ from ..client import MarketPlaceClient from .utils import camel_to_snake from .v0 import MarketPlaceApp as _MarketPlaceApp_v0 -from .v0_0_1 import MarketPlaceApp as _MarketPlaceApp_v0_0_1 - - -class MarketPlaceApp(_MarketPlaceApp_v0_0_1): - def __init__(self, *args, **kwargs): - warnings.warn( - "The MarketPlaceApp class is deprecated as of v0.2.0 and will be " - "removed in v1. Please use the get_app() function instead." - ) - super().__init__(*args, **kwargs) def get_app(app_id, client: Optional[MarketPlaceClient] = None, **kwargs): @@ -36,25 +25,13 @@ def get_app(app_id, client: Optional[MarketPlaceClient] = None, **kwargs): # Getting api version and list of capabilities for the application app_service_path = f"api/applications/{app_id}" app_info: dict = client.get(path=app_service_path).json() - app_api_version = parse(app_info.get("api_version", "0.0.1")) + app_api_version = parse(app_info.get("api_version", "1.0.0")) capabilities = [] for capability in app_info["capabilities"]: capabilities.append(camel_to_snake(capability["name"])) - if app_api_version == parse("0.0.1"): - if client is not None: - raise RuntimeError( - "Cannot use existing client for apps with API version 0.0.1." - ) - return _MarketPlaceApp_v0_0_1( - app_id, - marketplace_host_url=kwargs.get("marketplace_host_url"), - access_token=kwargs.get("access_token"), - capabilities=capabilities, - **kwargs, - ) - elif parse("0.0.1") < app_api_version <= parse("0.3.0"): + if app_api_version < parse("1.0.0"): return _MarketPlaceApp_v0(app_id, app_info, client, **kwargs) else: raise RuntimeError(f"App API version ({app_api_version}) not supported.") diff --git a/marketplace/app/v0/object_storage.py b/marketplace/app/v0/object_storage.py index e9e9546..a8bdc06 100644 --- a/marketplace/app/v0/object_storage.py +++ b/marketplace/app/v0/object_storage.py @@ -15,15 +15,17 @@ def list_collections( self, limit: int = 100, offset: int = 0 ) -> object_storage.CollectionResponseModel: response = self._client.get( - self._proxy_path("listCollections"), - params={"limit": limit, "offset": offset}, - ) + self._proxy_path("listCollections"), + params={"limit": limit, "offset": offset}, + ) try: return object_storage.CollectionResponseModel.parse_obj( response.json() ).dict() - except: - return "Error: Server returned {} while fetching collections: {}".format(response.status_code, response.text) + except Exception: + return "Error: Server returned {} while fetching collections: {}".format( + response.status_code, response.text + ) @check_capability_availability def list_datasets( @@ -33,19 +35,19 @@ def list_datasets( offset: int = 0, ) -> object_storage.DatasetResponseModel: response = self._client.get( - self._proxy_path("listDatasets"), - params={ - "collection_name": collection_name, - "limit": limit, - "offset": offset, - }, - ) + self._proxy_path("listDatasets"), + params={ + "collection_name": collection_name, + "limit": limit, + "offset": offset, + }, + ) try: - return object_storage.DatasetResponseModel.parse_obj( - response.json() - ).dict() + return object_storage.DatasetResponseModel.parse_obj(response.json()).dict() except Exception: - return "Error: Server returned {} while fetching datasets: {}".format(response.status_code, response.text) + return "Error: Server returned {} while fetching datasets: {}".format( + response.status_code, response.text + ) @check_capability_availability def create_or_update_collection( @@ -61,10 +63,10 @@ def create_or_update_collection( @check_capability_availability def delete_collection(self, collection_name: object_storage.CollectionName): - self._client.delete( + return self._client.delete( self._proxy_path("deleteCollection"), params={"collection_name": collection_name}, - ) + ).text # NOTE: change to GET for the meeting if proxy doesn't support HEAD requests @check_capability_availability @@ -82,7 +84,7 @@ def create_collection( self, collection_name: object_storage.CollectionName = None, metadata: dict = None, - config: dict = None + config: dict = None, ) -> object_storage.CollectionCreateResponse: data = {"collection_name": collection_name} if collection_name else {} if config is not None: @@ -93,9 +95,13 @@ def create_collection( headers=_encode_metadata(metadata) if metadata else {}, ) try: - return object_storage.CollectionCreateResponse.parse_obj(response.json()).dict() - except: - return "Error: Server returned {} while creating collection {}: {}".format(response.status_code, collection_name, response.text) + return object_storage.CollectionCreateResponse.parse_obj( + response.json() + ).dict() + except Exception: + return "Error: Server returned {} while creating collection {}: {}".format( + response.status_code, collection_name, response.text + ) @check_capability_availability def create_dataset( @@ -104,7 +110,7 @@ def create_dataset( dataset_name: object_storage.DatasetName = None, metadata: dict = None, file: UploadFile = None, - config: dict = None + config: dict = None, ) -> object_storage.DatasetCreateResponse: params = {"collection_name": collection_name} data = {} @@ -113,18 +119,20 @@ def create_dataset( if config is not None: data.update(config) response = self._client.put( - self._proxy_path("createDataset"), - data=data, - params=params, - files=file, - headers=_encode_metadata(metadata) if metadata else {}, - ) + self._proxy_path("createDataset"), + data=data, + params=params, + files=file, + headers=_encode_metadata(metadata) if metadata else {}, + ) try: return object_storage.DatasetCreateResponse.parse_obj( response.json() ).dict() - except: - return "Error: Server returned {} while creating dataset {}: {}".format(response.status_code, dataset_name, response.text) + except Exception: + return "Error: Server returned {} while creating dataset {}: {}".format( + response.status_code, dataset_name, response.text + ) @check_capability_availability def create_dataset_metadata( @@ -191,10 +199,10 @@ def delete_dataset( collection_name: object_storage.CollectionName, dataset_name: object_storage.DatasetName, ): - self._client.delete( + return self._client.delete( self._proxy_path("deleteDataset"), params={"collection_name": collection_name, "dataset_name": dataset_name}, - ) + ).text # NOTE: change to GET for the meeting if proxy doesn't support HEAD requests @check_capability_availability @@ -230,25 +238,46 @@ def get_semantic_mapping( params={"semantic_mapping_id": semantic_mapping_id}, ).json() ) - + @check_capability_availability - def get_collection_dcat( + def get_collection_metadata_dcat( self, collection_name: object_storage.CollectionName ) -> Union[Dict, str]: response: dict = self._client.get( - self._proxy_path("getCollectionDcat"), + self._proxy_path("getCollectionMetadataDcat"), params={"collection_name": collection_name}, ).text return response - + @check_capability_availability - def get_dataset_dcat( + def get_dataset_metadata_dcat( self, collection_name: object_storage.CollectionName, dataset_name: object_storage.DatasetName, ) -> Union[Dict, str]: - response: dict = self._client.head( - self._proxy_path("getDatasetDcat"), + response: dict = self._client.get( + self._proxy_path("getDatasetMetadataDcat"), params={"collection_name": collection_name, "dataset_name": dataset_name}, ).text return response + + @check_capability_availability + def query_dataset( + self, + collection_name: object_storage.CollectionName, + dataset_name: object_storage.DatasetName, + query: str, + ) -> Union[Dict, str]: + response: dict = self._client.post( + self._proxy_path("queryDataset"), + params={"collection_name": collection_name, "dataset_name": dataset_name}, + json={"query": query}, + ).text + return response + + @check_capability_availability + def query(self, query: str, meta_data: bool = False) -> Union[Dict, str]: + response: dict = self._client.post( + self._proxy_path("query"), json={"query": query, "meta_data": meta_data} + ).text + return response diff --git a/marketplace/app/v0/transformation.py b/marketplace/app/v0/transformation.py index 9c57287..d02c1f2 100644 --- a/marketplace/app/v0/transformation.py +++ b/marketplace/app/v0/transformation.py @@ -54,7 +54,7 @@ def _update_transformation( self._client.patch( self._proxy_path("updateTransformation"), params={"transformation_id": transformation_id}, - json=update, + json=update.dict(), ).json() ) diff --git a/marketplace/app/v0_0_1/__init__.py b/marketplace/app/v0_0_1/__init__.py deleted file mode 100644 index 1f9a469..0000000 --- a/marketplace/app/v0_0_1/__init__.py +++ /dev/null @@ -1,50 +0,0 @@ -"""This module contains all functionality for MarketPlace apps.. - -.. currentmodule:: marketplace.app.marketplace_app -.. moduleauthor:: Pablo de Andres, Pranjali Singh (Fraunhofer IWM) -""" - - -from typing import List -from urllib.parse import urljoin - -from ..utils import camel_to_snake, check_capability_availability -from .data_sink_app import DataSinkApp -from .data_source_app import DataSourceApp -from .transformation_app import TransformationApp - - -class MarketPlaceApp(DataSinkApp, DataSourceApp, TransformationApp): - """Base MarketPlace app. - - Includes the heartbeat capability and extends the MarketPlace class - to use the authentication mechanism. - """ - - def __init__(self, client_id, capabilities: list = None, **kwargs): - super().__init__(**kwargs) - self.client_id = client_id - # Must be run before the marketplace_host_url is updated to include the proxy. - self.capabilities = capabilities or self.get_capabilities() - self.marketplace_host_url = urljoin( - self.marketplace_host_url, f"api/applications/proxy/{self.client_id}/" - ) - - def get_capabilities(self) -> List[str]: - """Query the platform to get the capabilities supported by a certain - app.""" - app_service_path = f"api/applications/{self.client_id}" - response = self.get(path=app_service_path).json() - return [ - camel_to_snake(capability["name"]) - for capability in response["capabilities"] - ] - - @check_capability_availability - def heartbeat(self) -> str: - """Check the heartbeat of the application. - - Returns: - str: heartbeat - """ - return self.get(path="heartbeat").text diff --git a/marketplace/app/v0_0_1/data_sink_app.py b/marketplace/app/v0_0_1/data_sink_app.py deleted file mode 100644 index 8097bf3..0000000 --- a/marketplace/app/v0_0_1/data_sink_app.py +++ /dev/null @@ -1,126 +0,0 @@ -"""This module contains all functionality regarding data sink apps.. - -.. currentmodule:: marketplace.app.data_sink_app -.. moduleauthor:: Pablo de Andres, Pranjali Singh (Fraunhofer IWM) -""" -from typing import Dict, Union - -from marketplace.client import MarketPlaceClient - -from ..utils import check_capability_availability - - -class DataSinkApp(MarketPlaceClient): - """General data sink app with all the supported capabilities.""" - - @check_capability_availability - def create_dataset(self, config: Dict, files: Dict = None) -> str: - """Store a dataset. - - Args: - config (Dict): data metadata - files (Dict): binary data content - - Returns: - str: resourceId of the created dataset - """ - return self.post(path="createDataset", data=config, files=files).text - - @check_capability_availability - def create_cuds_dataset(self, config: Dict) -> Union[Dict, str]: - """Store a CUDS dataset - Args: - config (Dict): creation data - - Returns: - Dict: response object - """ - return self.post(path="createCudsDataset", json=config).text - - @check_capability_availability - def create_collection(self, config: Dict) -> str: - """Create a collection (used for workflows). - - Returns: - str: response string (success/error) - """ - return self.post(path="createCollection", data=config).text - - @check_capability_availability - def create_dataset_from_URI(self, uri: str) -> str: - """Store a dataset by fetching the data from a URI. - - Args: - uri (str): URI of the location to fetch data - - Returns: - str: resourceId of the created dataset - """ - return self.post(path="createDatasetFromURI", data=uri).text - - @check_capability_availability - def update_dataset(self, resourceId: str, config: Dict, **kwargs) -> str: - """Upload a new dataset to replace an existing one. - - Args: - resourceId (str): id of the dataset - config (Dict): update data - - Returns: - str: response string (success/error) - """ - params = {"resourceId": resourceId, **kwargs} - return self.put(path="updateDataset", params=params, json=config).text - - @check_capability_availability - def update_cuds_dataset(self, resourceId: str, config: Dict, **kwargs) -> str: - """Upload a new CUDS dataset to replace an existing one. - - Args: - resourceId (str): id of the CUDS dataset - config (Dict): update data - - Returns: - str: response string (success/error) - """ - params = {"resourceId": resourceId, **kwargs} - return self.put(path="updateCudsDatset", params=params, json=config).text - - @check_capability_availability - def update_dataset_from_URI(self, resourceId: str, uri: str, **kwargs) -> str: - """Update a dataset by fetching the data from a URI. - - Args: - resourceId (str): id of the dataset - uri (str): location of the data - - Returns: - str: response string (success/error) - """ - params = {"resourceId": resourceId, **kwargs} - return self.post(path="updateDatasetFromURI", params=params, data=uri).text - - @check_capability_availability - def delete_dataset(self, resourceId: str, **kwargs) -> str: - """Delete a dataset. - - Args: - resourceId (str): id of the dataset - Returns: - str: response string (success/error) - """ - params = {"resourceId": resourceId, **kwargs} - return self.delete(path="deleteDataset", params=params).text - - @check_capability_availability - def delete_cuds_dataset(self, resourceId: str, **kwargs) -> str: - """Delete a CUDS dataset. - - Args: - resourceId (str): id of the CUDS dataset - - Returns: - str: response string (success/error) - """ - params = {"resourceId": resourceId, **kwargs} - return self.delete(path="deleteCudsDataset", params=params).text diff --git a/marketplace/app/v0_0_1/data_source_app.py b/marketplace/app/v0_0_1/data_source_app.py deleted file mode 100644 index 113d63b..0000000 --- a/marketplace/app/v0_0_1/data_source_app.py +++ /dev/null @@ -1,171 +0,0 @@ -"""This module contains all functionality regarding data source apps.. - -.. currentmodule:: marketplace.app.data_source_app -.. moduleauthor:: Pablo de Andres, Pranjali Singh (Fraunhofer IWM) -""" - -from typing import Dict, List, Union - -from marketplace.client import MarketPlaceClient - -from ..utils import check_capability_availability - - -class DataSourceApp(MarketPlaceClient): - """General data source app with all the supported capabilities.""" - - @check_capability_availability - def get_collection(self, resource_id: str, **kwargs) -> List: - """Fetches a particular Catalog. - - Returns: - resource_id (str): [id of Catalog] - """ - return self.get(path="getCollection", params={"resourceId": resource_id, **kwargs}).text - - @check_capability_availability - def get_cuds_collection(self) -> Union[Dict, str]: - """Fetches list of CUDS datasets. - - Returns: - str: [description] - """ - return self.get(path="getCudsCollection").text - - @check_capability_availability - def get_dataset(self, resource_id: str, **kwargs) -> Union[Dict, str]: - """Fetches a particular Dataset. - - Args: - resource_id (str): [id of dataset] - - Returns: - Dict: [json response object as Dict] - """ - return self.get( - path="getDataset", params={"resourceId": resource_id, **kwargs} - ).content - - @check_capability_availability - def get_cuds_dataset(self, resource_id: str, **kwargs) -> Union[Dict, str]: - """Fetches a particular CUDS Dataset. - - Args: - resource_id (str): id of CUDS dataset - - Returns: - Dict: json response - """ - params = {"resourceId": resource_id, **kwargs} - return self.get(path="getCudsDataset", params=params).json() - - @check_capability_availability - def get_metadata(self, datatype: str, **kwargs) -> Union[Dict, str]: - """Fetch information about certain sets of data. - - Args: - datatype (str): datatype of metadata - - Returns: json response - """ - params = {"datatype": datatype, **kwargs} - return self.get(path="getMetadata", params=params).json() - - @check_capability_availability - def get_collection_metadata(self, resource_id: str, **kwargs) -> Union[Dict, str]: - """Execute search query on datasets. - - Args: - resource_id (str): id of dataset to query on - - Returns: - Dict: json response object - """ - params = {"resourceId": resource_id, **kwargs} - return self.get(path="getCollectionMetadata", params=params).text - - @check_capability_availability - def query_dataset(self, resource_id: str, query: str, **kwargs) -> Union[Dict, str]: - """Execute search query on datasets. - - Args: - resource_id (str): id of dataset to query on - query (str): query - - Returns: - Dict: json response object - """ - params = {"resourceId": resource_id, "query": query, **kwargs} - return self.get(path="queryDataset", params=params).json() - - @check_capability_availability - def post_query_dataset( - self, query: str, config: Dict, **kwargs - ) -> Union[Dict, str]: - """Query a dataset(Post for GraphQL) - - Args: - query (str): query to post - config (Dict): ? TO BE CONFIRMED ? - - Returns: - Dict: json response - """ - params = {"query": query, **kwargs} - return self.post(path="postQueryDataset", params=params, json=config).text - - @check_capability_availability - def export_dataset_with_attributes( - self, schema_id: str, config: Dict, **kwargs - ) -> Union[Dict, str]: - """Export data with attribute values of datasets. - - Args: - schema_id (str): id of schema (similar to datasetId) - config (Dict): Export data request - - Returns: - Dict: json response - """ - params = {"schema_id": schema_id, **kwargs} - return self.post( - path="exportDatasetWithAttributes", params=params, json=config - ).json() - - @check_capability_availability - def get_dataset_attributes(self, schema_id: str, **kwargs) -> Union[Dict, str]: - """List attributes included in specified dataset. - - Args: - schema_id (str): Schema ID (similar to datasetId) - - Returns: - Dict: json response object - """ - params = {"schema_id": schema_id, **kwargs} - return self.get(path="getDatasetAttributes", params=params).json() - - @check_capability_availability - def query_collection(self, query: str, **kwargs) -> Union[Dict, str]: - """Query a collection. - - Args: - query (str): query to execute - """ - params = {"query": query, **kwargs} - return self.get(path="queryCollection", params=params).text - - def post_query_collection( - self, query: str, config: Dict, **kwargs - ) -> Union[Dict, str]: - """Query a collection(Post for GraphQL) - - Args: - query (str): query to post - config (Dict): ? TO BE CONFIRMED ? - - Returns: - Dict: json response - """ - params = {"query": query, **kwargs} - return self.post(path="postQueryCollection", params=params, json=config).text \ No newline at end of file diff --git a/marketplace/app/v0_0_1/transformation_app.py b/marketplace/app/v0_0_1/transformation_app.py deleted file mode 100644 index 19f599b..0000000 --- a/marketplace/app/v0_0_1/transformation_app.py +++ /dev/null @@ -1,89 +0,0 @@ -"""This module contains all functionality regarding transformation apps.. - -.. currentmodule:: marketplace.app.transformation_app -.. moduleauthor:: Pablo de Andres, Pranjali Singh (Fraunhofer IWM) -""" - - -from typing import Dict, List - -from marketplace.client import MarketPlaceClient - -from ..utils import check_capability_availability - - -class TransformationApp(MarketPlaceClient): - """General transformation app with all the supported capabilities.""" - - @check_capability_availability - def new_transformation(self, config: Dict) -> str: - """Set up a new transformation. - - Args: - config (Dict): Set up configuration - - Returns: - str: uuid of the new transformation - """ - return self.post(path="newTransformation", json=config).text - - @check_capability_availability - def start_transformation(self, transformation_id: str, **kwargs) -> str: - """Start a configured transformation. - - Args: - transformation_id (str): id of the transformation to start - - Returns: - str: Success/Fail message - """ - params = {"transformationId": transformation_id, **kwargs} - return self.post(path="startTransformation", params=params).text - - @check_capability_availability - def stop_transformation(self, transformation_id: str, **kwargs) -> str: - """Stop a running transformation. - - Args: - transformation_id (str): id of the transformation to stop - - Returns: - str: Success/Fail message - """ - params = {"transformationId": transformation_id, **kwargs} - return self.post(path="stopTransformation", params=params).text - - @check_capability_availability - def delete_transformation(self, transformation_id: str, **kwargs) -> str: - """Delete a running transformation. - - Args: - transformation_id (str): id of the transformation to delete - - Returns: - str: Success/Fail message - """ - params = {"transformationId": transformation_id, **kwargs} - return self.post(path="deleteTransformation", params=params).text - - @check_capability_availability - def get_transformation_status(self, transformation_id: str, **kwargs) -> str: - """Get the status of a certain transformation. - - Args: - transformation_id (str): transformation being queried - - Returns: - str: status of the transformation - """ - params = {"transformationId": transformation_id, **kwargs} - return self.get(path="getTransformationStatus", params=params).text - - @check_capability_availability - def get_transformation_list(self) -> List[str]: - """List all the existing transformations. - - Returns: - List[str]: [description] - """ - return self.get(path="getTransformationList").json() diff --git a/marketplace/client.py b/marketplace/client.py index 65906d3..5a4a25a 100644 --- a/marketplace/client.py +++ b/marketplace/client.py @@ -13,7 +13,7 @@ from .version import __version__ -MP_DEFAULT_HOST = "https://www.materials-marketplace.eu/" +MP_DEFAULT_HOST = "https://materials-marketplace.eu/" class MarketPlaceClient: diff --git a/marketplace/data_sink_client/README.md b/marketplace/data_sink_client/README.md new file mode 100644 index 0000000..0b14d78 --- /dev/null +++ b/marketplace/data_sink_client/README.md @@ -0,0 +1,18 @@ +# reaxpro-marketplace-api + +The development is still progress. +TODO: +1. support token reconfigure for the reaxpro user in market place when session times out. Currently, we have to manually copy the token into the cli file and or any test file where we initiate the execution. Refer test.py file and cli.py files in repositories. +2. Docker support +3. If the datasink design changes in market place then we have to reimplement some of the logic. +4. Update the readme file after docker support +5. All the functionalities has to be tested again + +Note: Some of the functionalities from vimpp are not supported. For example there is no property field supported for digital objects, and currently the digital objects are downloded only "as-directories". + + +# Some basic commands to test cli functionalities. (change the value based on your usecases) +mpsession_search_information_packages --search-query "title=upload_test_dir" +mpsession_ingest_digital_objects_from_paths -t "cli_test" -p "/root/symphony/upload_test_dir/test1.txt" +mpsession_ingest_digital_objects_from_sourcedir -t "cli_test123" -s "/root/symphony/upload_test_dir" +mpsession_download_digital_objects_from_search_query -s "title=upload_test_dir" -d "/root/symphony/download_test" diff --git a/marketplace/data_sink_client/__init__.py b/marketplace/data_sink_client/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/marketplace/data_sink_client/cli.py b/marketplace/data_sink_client/cli.py new file mode 100644 index 0000000..50466b1 --- /dev/null +++ b/marketplace/data_sink_client/cli.py @@ -0,0 +1,421 @@ +import ast +import os +import typing as t +from functools import wraps +from pathlib import Path + +import click + +from marketplace.data_sink_client.session import MPSession + + +class CommaSeparatedListofPythonLiteralValues(click.Option): + def type_cast_value(self, ctx, value): + if value is not None: + if value.rstrip().endswith(","): + raise click.BadParameter(value) + if "," not in value: + values = [ast.literal_eval(value)] + else: + values = [ast.literal_eval(item) for item in value.split(",")] + return values + else: + return None + + +class CommaSeparatedListofFiles(click.Option): + def type_cast_value(self, ctx, value): + if value is not None: + if value.rstrip().endswith(","): + raise click.BadParameter(value) + if "," not in value: + values = [value] + else: + values = value.split(",") + for item in values: + if not Path(item).is_file(): + raise click.BadParameter(f'"{item}" is not a valid file.') + return values + else: + return None + + +class PythonLiteralOption(click.Option): + def type_cast_value(self, ctx, value): + if value is not None: + try: + return ast.literal_eval(value) + except Exception: + raise click.BadParameter(f'"{value}" is not a valid Python literal.') + else: + return None + + +def mpsession(f): + @wraps(f) + # @click.option('--token',type=str,required=True,default='cn389ncoiwuencr', help="Token generated after registration at the MarketPlace") + def wrapper(*args, **kwargs): + kwargs.update({"session": MPSession()}) + + return f(*args, **kwargs) + + return wrapper + + +@click.command() +@mpsession +def list_collections( + session: t.Optional[MPSession] = None, +) -> int: + try: + object = session.list_collections() + click.echo(object) + except Exception as e: + click.echo(f"{e.__class__.__name__}: {e}") + + +@click.command() +@click.option( + "--collection-name", + "-cn", + required=True, + type=click.STRING, + help="The name of the collection", +) +@mpsession +def list_datasets( + collection_name: str, + session: t.Optional[MPSession] = None, +) -> int: + try: + object = session.list_datasets(collection_name=collection_name) + click.echo(object) + except Exception as e: + click.echo(f"{e.__class__.__name__}: {e}") + + +@click.command() +@click.option( + "--collection-name", + "-cn", + required=True, + type=click.STRING, + help="The name of the collection", +) +@mpsession +def get_collection_dcat( + collection_name: str, + session: t.Optional[MPSession] = None, +) -> int: + try: + object = session.get_collection_dcat(collection_name=collection_name) + click.echo(object) + except Exception as e: + click.echo(f"{e.__class__.__name__}: {e}") + + +@click.command() +@click.option( + "--collection-name", + "-cn", + required=True, + type=click.STRING, + help="The name of the collection", +) +@click.option( + "--dataset-name", + "-dn", + required=True, + type=click.STRING, + help="The name of the dataset", +) +@mpsession +def get_dataset_dcat( + collection_name: str, + dataset_name: str, + session: t.Optional[MPSession] = None, +) -> int: + try: + object = session.get_dataset_dcat( + collection_name=collection_name, dataset_name=dataset_name + ) + click.echo(object) + except Exception as e: + click.echo(f"{e.__class__.__name__}: {e}") + + +@click.command() +@click.option( + "--collection-name", + "-cn", + required=True, + type=click.STRING, + help="The name of the collection", +) +@mpsession +def delete_collection( + collection_name: str, + session: t.Optional[MPSession] = None, +) -> int: + try: + object = session.delete_collection(collection_name=collection_name) + click.echo(object) + except Exception as e: + click.echo(f"{e.__class__.__name__}: {e}") + + +@click.command() +@click.option( + "--collection-name", + "-cn", + required=True, + type=click.STRING, + help="The name of the collection", +) +@click.option( + "--dataset-name", + "-dn", + required=True, + type=click.STRING, + help="The name of the dataset", +) +@mpsession +def delete_dataset( + collection_name: str, + dataset_name: str, + session: t.Optional[MPSession] = None, +) -> int: + try: + object = session.delete_dataset( + collection_name=collection_name, dataset_name=dataset_name + ) + click.echo(object) + except Exception as e: + click.echo(f"{e.__class__.__name__}: {e}") + + +@click.command() +@click.option( + "--path", + "-p", + required=True, + type=click.STRING, + help="Absolute or relative path of the file", +) +@click.option( + "--collection-name", + "-cn", + required=False, + type=click.STRING, + help="The name of the collection", +) +@click.option( + "--dataset-name", + "-dn", + required=False, + type=click.STRING, + help="The name of the dataset", +) +@mpsession +def upload_file_from_path( + path: str, + collection_name: t.Optional[str] = None, + dataset_name: t.Optional[str] = None, + session: t.Optional[MPSession] = None, +) -> int: + try: + object = session.create_dataset_from_path( + path=path, collection_name=collection_name, dataset_name=dataset_name + ) + click.echo(object) + except Exception as e: + click.echo(f"{e.__class__.__name__}: {e}") + + +@click.command() +@click.option( + "--path", + "-p", + required=True, + type=click.STRING, + help="Absolute or relative path of the file", +) +@click.option( + "--collection-name", + "-cn", + required=False, + type=click.STRING, + help="The name of the collection", +) +@mpsession +def upload_files_from_folder( + path: str, + collection_name: t.Optional[str] = None, + session: t.Optional[MPSession] = None, +) -> int: + try: + object = session.create_datasets_from_sourcedir( + sourcedir=path, collection_name=collection_name + ) + click.echo(object) + except Exception as e: + click.echo(f"{e.__class__.__name__}: {e}") + + +@click.command() +@click.option( + "--collection-name", + "-cn", + required=True, + type=click.STRING, + help="The name of the collection", +) +@click.option( + "--target_dir", + "-td", + required=False, + type=click.STRING, + help="Absolute or relative path to download", +) +@click.option( + "--raise_if_directory_not_empty", + "-r", + required=False, + type=click.STRING, + help="Should the download stop if target dir is not empty", +) +@mpsession +def download_folder( + collection_name, + target_dir: t.Optional[str] = None, + raise_if_directory_not_empty: t.Optional[bool] = False, + session: t.Optional[MPSession] = None, +) -> int: + try: + if target_dir is None: + target_dir = os.getcwd() + object = session.download_datasets_from_collection( + targetdir=target_dir, + collection_name=collection_name, + raise_if_directory_not_empty=raise_if_directory_not_empty, + ) + click.echo(object) + except Exception as e: + click.echo(f"{e.__class__.__name__}: {e}") + + +@click.command() +@click.option( + "--collection-name", + "-cn", + required=True, + type=click.STRING, + help="The name of the collection", +) +@click.option( + "--dataset-name", + "-dn", + required=True, + type=click.STRING, + help="The name of the dataset", +) +@click.option( + "--target_dir", + "-td", + required=False, + type=click.STRING, + help="Absolute or relative path to download", +) +@click.option( + "--raise_if_directory_not_empty", + "-r", + required=False, + type=click.STRING, + help="Should the download stop if target dir is not empty", +) +@mpsession +def download_file( + collection_name, + dataset_name, + target_dir: t.Optional[str] = None, + raise_if_directory_not_empty: t.Optional[bool] = False, + session: t.Optional[MPSession] = None, +) -> int: + try: + if target_dir is None: + target_dir = os.getcwd() + object = session.download_dataset( + targetdir=target_dir, + collection_name=collection_name, + dataset_name=dataset_name, + raise_if_directory_not_empty=raise_if_directory_not_empty, + ) + click.echo(object) + except Exception as e: + click.echo(f"{e.__class__.__name__}: {e}") + + +@click.command() +@click.option( + "--query", + "-q", + required=True, + type=click.STRING, + help="SPARQL query.", +) +@click.option( + "--meta-data", + "-md", + required=False, + type=click.BOOL, + help="whether to executeSPARQL query on meta data.", +) +@mpsession +def query( + query: str, + meta_data: str = False, + session: t.Optional[MPSession] = None, +) -> int: + try: + object = session.query(query=query, meta_data=meta_data) + click.echo(object) + except Exception as e: + click.echo(f"{e.__class__.__name__}: {e}") + + +@click.command() +@click.option( + "--query", + "-q", + required=True, + type=click.STRING, + help="SPARQL query.", +) +@click.option( + "--collection-name", + "-cn", + required=True, + type=click.STRING, + help="The name of the collection", +) +@click.option( + "--dataset-name", + "-dn", + required=True, + type=click.STRING, + help="The name of the dataset", +) +@mpsession +def query_dataset( + query: str, + collection_name, + dataset_name, + session: t.Optional[MPSession] = None, +) -> int: + try: + object = session.query_dataset( + query=query, collection_name=collection_name, dataset_name=dataset_name + ) + click.echo(object) + except Exception as e: + click.echo(f"{e.__class__.__name__}: {e}") diff --git a/marketplace/data_sink_client/session.py b/marketplace/data_sink_client/session.py new file mode 100644 index 0000000..8b1f331 --- /dev/null +++ b/marketplace/data_sink_client/session.py @@ -0,0 +1,511 @@ +import os +import os.path +from functools import wraps + +# from dotenv import find_dotenv, load_dotenv +from keycloak import KeycloakOpenID + +from marketplace.app import MarketPlaceClient, get_app +from marketplace.data_sink_client.utils import ( + get_collections_from_catalog, + parse_objects_from_collection, +) + + +def reconfigure_if_expired(func): + @wraps(func) + def func_(self, *arg, **kwargs): + try: + r = func(self, *arg, **kwargs) + return r + except Exception as e: + print( + "API encountered exception. Please check if all your environment variables configured properly once. Error details: ", + str(e), + ) + # temporary work around to catch Un Authorized error + """error = str(e) + if len(error.split("401 Unauthorized")) > 0: + print("Token expired. Reconfiguring again.") + token = configure() + os.environ["MP_ACCESS_TOKEN"] = token + r = func(self, *arg, **kwargs) + return r + else: + raise RuntimeError(e)""" + + return func_ + + +def configure(): + # Configure client + server_url = os.environ.get("KEYCLOAK_SERVER_URL") + client_id = os.environ.get("KEYCLOAK_CLIENT_ID") + realm_name = os.environ.get("KEYCLOAK_REALM_NAME") + client_key = os.environ.get("KEYCLOAK_CLIENT_SECRET_KEY") + user = os.environ.get("MARKETPLACE_USERNAME") + passwd = os.environ.get("MARKETPLACE_PASSWORD") + keycloak_openid = KeycloakOpenID( + server_url=server_url, + client_id=client_id, + realm_name=realm_name, + client_secret_key=client_key, + ) + token = keycloak_openid.token(user, passwd) + token = token["access_token"] + return token + + +class MPSession: + """ReaxPro-MarketPlace Session API Wrapper. + + This session wrapper simplifies generic tasks. + + Keyword arguments: + :param token: necessary token for acessing a MarketPlace registered Datasink + + """ + + @reconfigure_if_expired + def __init__( + self, marketplace_host_url=None, access_token=None, client_id=None, **kwargs + ): + CLIENT_ID = client_id or os.environ.get("CLIENT_ID") + # configure() + mp_client = MarketPlaceClient( + marketplace_host_url=marketplace_host_url, access_token=access_token + ) + self.marketPlace = get_app(app_id=CLIENT_ID, client=mp_client) + + @reconfigure_if_expired + def create_dataset( + self, collection_name, dataset_name, sub_collection_id, abs_path + ): + """create a dataset to the MarketPlace DataSink. + + Keyword arguments: + :param collection_name: A string value which indicates the title of the catalog the dataset belongs to. + :param dataset_name: A string value which indicates the title of the dataset/dataset. + :param sub_collection_id: A string representing ID of sub catalog(catalog within a catalog) to which this dataset belong to. + + :return status + """ + try: + with open(abs_path, "rb") as f: + file_content = f.read() + + if not file_content: + raise Exception("Empty file content in ", abs_path) + + config = { + "sub_collection_id": sub_collection_id, + } + + files = { + "file": (dataset_name, file_content), + } + response = self.marketPlace.create_dataset( + collection_name, dataset_name, config=config, file=files + ) + if "dataset_id" not in response: + print(response) + return None + else: + return response["dataset_id"] + + except Exception as e: + print( + f"Something went wrong while uploading the data from {abs_path}. " + + str(e) + ) + return None + + @reconfigure_if_expired + def create_collection(self, collection_name, sub_collection_id): + """create collection/catalog to the MarketPlace DataSink. + + Keyword arguments: + :param collection_name: A string value which indicates the title of the collection/catalog. + :param sub_collection_id: A string value which indicates the ID of the collection to add this collection/catalog to. + + :return response + """ + + try: + config = {"sub_collection_id": sub_collection_id} + + response = self.marketPlace.create_collection( + collection_name=collection_name, config=config + ) + if "collection_id" not in response: + print(response) + return None + else: + return response["collection_id"] + + except Exception as e: + print( + f"Something went wrong while uploading the data with title {collection_name}. " + + str(e) + ) + return None + + @reconfigure_if_expired + def get_dataset(self, collection_name=None, dataset_name=None): + """Get binary data from a get request + + :param collection_name: Name of the collection. + :param dataset_name: Name of the dataset. + + :returns: binary content data stored within the dataset + """ + response = self.marketPlace.get_dataset( + collection_name=collection_name, dataset_name=dataset_name + ) + + return response + + @reconfigure_if_expired + def get_collection_dcat(self, collection_name=None): + """Get a collection/catalog object from a get request + + :param collection_name: Name of the collection. + + :returns: dcat meta-data for the catalog + """ + response = self.marketPlace.get_collection_metadata_dcat( + collection_name=collection_name + ) + return response + + @reconfigure_if_expired + def get_dataset_dcat(self, collection_name=None, dataset_name=None): + """Get a dataset dcat object from a get request + + :param collection_name: Name of the collection. + :param daatset_name: Name of the dataset. + + :returns: dcat meta-data for the dataset + """ + response = self.marketPlace.get_dataset_metadata_dcat( + collection_name=collection_name, dataset_name=dataset_name + ) + return response + + @reconfigure_if_expired + def list_collections(self): + """Returns list of Collections. + + :returns: Dictionary with list of collections + """ + response = self.marketPlace.list_collections() + return response + + @reconfigure_if_expired + def list_datasets(self, collection_name): + """Returns list of datasets for a specific collection. + + :param collection_name: Name of the collection + + :returns: Dictionary with list of datasets + """ + response = self.marketPlace.list_datasets(collection_name) + return response + + @reconfigure_if_expired + def delete_collection(self, collection_name): + """Delete a collection from datasink. + + :param collection_name: Name of the collection + + :returns: None on success + """ + response = self.marketPlace.delete_collection(collection_name) + return response + + @reconfigure_if_expired + def delete_dataset(self, collection_name, dataset_name): + """Delete a dataset from datasink. + + :param collection_name: Name of the collection + :param dataset_name: Name of the dataset + + :returns: None on success + """ + response = self.marketPlace.delete_dataset(collection_name, dataset_name) + return response + + @reconfigure_if_expired + def query_dataset(self, collection_name, dataset_name, query): + """Execute a aparql query on a dataset stored in datasink. + + :param collection_name: Name of the collection + :param dataset_name: Name of the + :param query: SPARQL query + + :returns: List of data + """ + response = self.marketPlace.query_dataset(collection_name, dataset_name, query) + return response + + @reconfigure_if_expired + def query(self, query, meta_data=False): + """Execute a aparql query on a dataset stored in datasink. + + :param query: SPARQL query + :param meta_data: Query meta_data instead of actual data. + + :returns: List of data + """ + response = self.marketPlace.query(query, meta_data=meta_data) + return response + + @reconfigure_if_expired + def create_dataset_from_path(self, path, collection_name=None, dataset_name=None): + if not os.path.exists(path): + raise Exception("File " + path + " does not exist.") + + if os.path.isdir(path): + raise Exception("File " + path + " is a directory") + + if dataset_name is None: + dataset_name = os.path.basename(path) + + # use name of the file if it is not specified. If in case a collection exists already then it will fails with duplicate error + if collection_name is None: + collection_name = os.path.basename(path).split(".")[0] + + response = self.create_datasets_from_paths( + paths=[path], dataset_names=[dataset_name], collection_name=collection_name + ) + return response + + @reconfigure_if_expired + def create_datasets_from_paths(self, paths, collection_name, dataset_names): + """Inject a list of datasets. A single InformationPackage will be created. + + :param paths: List of filepaths to inject. + :param collection_name: The title of the collection + :param dataset_names: The titles of the datasets + + :returns: response + """ + assert len(paths) == len(dataset_names) + + response_list = [] + if collection_name is not None: + collection_id = self.create_collection( + collection_name=collection_name, sub_collection_id=None + ) + if collection_id is not None: + response_list.append((collection_name, collection_id)) + else: + return + else: + raise Exception("collection title cannot be empty.") + + for path, dataset_name in zip(paths, dataset_names): + dataset_id = self.create_dataset( + collection_name=collection_name, + dataset_name=dataset_name, + sub_collection_id=None, + abs_path=path, + ) + response_list.append((path, dataset_id)) + + return response_list + + @reconfigure_if_expired + def create_datasets_from_sourcedir( + self, sourcedir: str, collection_name: str = None + ): + """Inject a datasets from a directory. A single InformationPackage will be created. + + :param sourcedir: The source directory to create + :param collection_name: The title of the collection + + :returns: response""" + + assert os.path.isdir(sourcedir), "Source directory doesn't exist." + + if collection_name is None: + collection_name = os.path.basename(sourcedir) + + response_list = [] + if collection_name is None: + collection_name = os.path.basename(sourcedir) + collection_id = self.create_collection( + collection_name=collection_name, sub_collection_id=None + ) + if collection_id is None: + return + response_list.append((collection_name, collection_id)) + response_list = self.create_objects_from_sourcedir( + collection_name, + sourcedir, + collection_id=collection_id, + response_list=response_list, + ) + + return response_list + + @reconfigure_if_expired + def create_objects_from_sourcedir( + self, collection_name, sourcedir, collection_id, response_list=[] + ): + for file in os.listdir(sourcedir): + if os.path.isdir(os.path.join(sourcedir, file)): + id = self.create_collection( + collection_name=file, sub_collection_id=collection_id + ) + if id is None: + return + response_list.append((os.path.join(sourcedir, file), id)) + # print("added directory: ", (os.path.join(sourcedir, file), id)) + self.create_objects_from_sourcedir( + collection_name, os.path.join(sourcedir, file), id, response_list + ) + else: + dataset_id = self.create_dataset( + collection_name=collection_name, + dataset_name=file, + sub_collection_id=collection_id, + abs_path=os.path.join(sourcedir, file), + ) + if dataset_id is None: + return + response_list.append((os.path.join(sourcedir, file), dataset_id)) + # print("added file: ", (os.path.join(sourcedir, file), dataset_id)) + return response_list + + @reconfigure_if_expired + def download_dataset( + self, + collection_name, + dataset_name, + targetdir=os.getcwd(), + raise_if_directory_not_empty=True, + ): + """Download the dataset to local directory""" + result = [] + if not os.path.isdir(targetdir): + raise Exception("Download Directory" + targetdir + "does not exist.") + + file_path = os.path.join(targetdir, dataset_name) + content = self.get_dataset( + collection_name=collection_name, dataset_name=dataset_name + ) + with open(file_path, "wb") as f: + f.write(content) + # print("created file: ", file_path) + result.append({"download_path": file_path}) + return result + + @reconfigure_if_expired + def download_datasets_from_search_query( + self, + collection_search_query, + targetdir=os.getcwd(), + raise_if_directory_not_empty=True, + raise_if_missing_dataset=True, + download_mode="digital-object-ids", + zipfile_name="datasets.zip", + ): + """Download the datasets from collections to a local + directory. Uses a search query as input. + + :param collection_search_query: The collection search query + :param targetdir: (optional, default=curdir) The directory to download to + :param raise_if_directory_not_empty: (optional) Raise error when True and directories are not empty + :param raise_if_missing_dataset: (optional) Raise Exception when trying to download from an empty collection + :param download_mode: (optional) One of 'digital-object-ids', 'digital-object-titles', 'as-directories', 'as-zipfiles' + :param zipfile_name: (optional) The optional name of the zipfile + + :returns: tuple(List of paths, List of datasets) + + """ + collections = self.search_collections( + collection_search_query, include_dataset=False + ) + + # convert collections to python representation + collections = get_collections_from_catalog(collections) + + result = self.download_datasets_from_collections( + collections=collections, + targetdir=targetdir, + raise_if_directory_not_empty=raise_if_directory_not_empty, + raise_if_missing_dataset=raise_if_missing_dataset, + download_mode=download_mode, + ) + + return result + + @reconfigure_if_expired + def download_datasets_from_collection( + self, + collection_name, + targetdir=os.getcwd(), + raise_if_directory_not_empty=True, + download_mode="digital-object-ids", + ): + """Download the datasets from collection to a local directory. + + :param collection_name: Name of the collection. + :param targetdir: (optional, default=curdit) The directory to download to + :param raise_if_directory_not_empty: (optional) Raise error when True and directories are not empty + :param download_mode: (optional) TODO: One of 'digital-object-ids', 'digital-object-titles', 'as-directories', 'as-zipfiles' + + :returns: List of (title, file_path, file_type) + + .. note:: + + The folder structure as defined in each dataset's `folderPath` will be + preserved with `targetdir/collection_identifier` as the root directory. + + """ + + result = [] + print(targetdir) + if not os.path.isdir(targetdir): + raise Exception("Download Directory" + targetdir + "does not exist.") + + if raise_if_directory_not_empty and any(os.listdir(targetdir)): + raise Exception("Directory " + targetdir + " is not empty.") + + collection = self.get_collection_dcat(collection_name=collection_name) + + # convert to python dictionary. There will be atmost one collection in the list + packages = parse_objects_from_collection(collection, collection_name, path="") + # print(packages) + + if len(packages) == 0: + raise Exception("Error: " + collection) + + for package in packages: + abs_path = package["path"] + title = package["title"] + file_type = package["type"] + if not os.path.exists(os.path.join(targetdir, abs_path)): + os.makedirs(os.path.join(targetdir, abs_path)) + # print("created directory: ", os.path.join(targetdir, abs_path)) + result.append((title, os.path.join(targetdir, abs_path), file_type)) + + file_path = os.path.join(targetdir, abs_path, title) + if file_type == "file": + # print("sending request for, " , collection_identifier, title) + response = self.get_dataset( + collection_name=collection_name, dataset_name=title + ) + with open(file_path, "wb") as f: + f.write(response) + # print("created file: ", file_path) + result.append((title, file_path, file_type)) + + return result + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_traceback): + return False diff --git a/marketplace/data_sink_client/utils.py b/marketplace/data_sink_client/utils.py new file mode 100644 index 0000000..9de9792 --- /dev/null +++ b/marketplace/data_sink_client/utils.py @@ -0,0 +1,296 @@ +import os +import re + +from rdflib import Graph + + +def get_datasets_uid_sparql(collections): + """A utility that supports searching and retrieving for the datasets uid in a catalog + + Keyword arguments: + :param collections: response.text + + :return List of datasets uid + """ + uid_list = [] + + g = Graph() + g.parse(data=collections, format="json-ld") + + qrs = g.query( + """ + SELECT ?asource ?id + WHERE { + ?asource a . + ?asource ?id. + } + """ + ) + + for row in qrs: + uid_list.append(row.id.value) + + return uid_list + + +def get_collections_uid_sparql(collections): + """A utility that supports searching and retrieving for the catalogs uid in a catalog + + Keyword arguments: + :param collections: response.text + + :return List of catalog objects uid + """ + uid_list = [] + + g = Graph() + g.parse(data=collections, format="json-ld") + + qrs = g.query( + """ + SELECT ?asource ?id + WHERE { + ?asource a . + ?asource ?id. + } + """ + ) + + for row in qrs: + uid_list.append(row.id.value) + + return uid_list + + +def get_collections_from_catalog(collections): + """A utility that supports converts collections stored as catalog to list of dictionaries. + + Keyword arguments: + :param collections: response.text + + :return List of dictionary represented collections + """ + inf_packages = [] + g = Graph() + g.parse(data=collections, format="json-ld") + + qrs = g.query( + """ + SELECT ?asource ?identifier ?subject ?title + WHERE { + ?asource a . + ?asource ?identifier. + ?asource ?title. + } + """ + ) + + for row in qrs: + inf_packages.append(row.identifier.value) + + return inf_packages + + +def parse_objects_from_collection(collection, collection_name, path=""): + """A utility that supports converts collections stored as catalog to list of dictionaries. + + Keyword arguments: + :param collection: response.text + :param collection: Name of the collection + :param path: relative path from root collection + + :return List of dictionary represented collection + """ + inf_packages = [] + + g = Graph() + g.parse(data=collection, format="json-ld") + + # get all datasets linked to current catalog + dataset_query = f""" + SELECT ?catalog_identifier ?catalog_title ?dataset_identifier ?dataset_title + WHERE {{ + ?asource a . + ?asource ?catalog_identifier. + ?asource ?catalog_title. + ?asource ?datasets. + ?datasets ?dataset_identifier. + ?datasets ?distribution. + ?distribution ?dataset_title + FILTER regex(?catalog_title, "{collection_name}") + }} + """ + qrs = g.query(dataset_query) + + for row in qrs: + parent_title = row.catalog_title.value + dataset_title = row.dataset_title.value + abs_path = os.path.join(path, parent_title) + inf_packages.append( + { + "path": abs_path, + "title": dataset_title, + "identifier": row.dataset_identifier.value, + "type": "file", + } + ) + + # recursively fetch information of the connected catalog + catalog_query = f""" + SELECT ?parent_catalog_identifier ?parent_catalog_title ?catalog_identifier ?catalog_title + WHERE {{ + ?asource a . + ?asource ?parent_catalog_identifier. + ?asource ?parent_catalog_title. + ?asource ?catalogs. + ?catalogs ?catalog_identifier. + ?catalogs ?catalog_title + FILTER regex(?parent_catalog_title, "{collection_name}") + }} + """ + catalogs = g.query(catalog_query) + for row in catalogs: + print(row) + parent_title = row.parent_catalog_title.value + catalog_title = row.catalog_title.value + abs_path = os.path.join(path, parent_title, catalog_title) + updated_path = os.path.join(path, parent_title) + inf_packages.append( + { + "path": abs_path, + "title": catalog_title, + "identifier": row.catalog_identifier.value, + "type": "dir", + } + ) + return inf_packages + parse_objects_from_collection( + collection, catalog_title, path=updated_path + ) + + return inf_packages + + +def parse_objects_from_datasets(datasets_response): + """A utility that supports converts datasets stored as dataset to list of dictionaries. + + Keyword arguments: + :param datasets_response: response.text + + :return List of dictionary represented digtal objects + """ + datasets = [] + + g = Graph() + g.parse(data=datasets_response, format="json-ld") + + # get all datasets linked to current catalog + dataset_query = """ + SELECT ?identifier ?title + WHERE {{ + ?asource a . + ?asource ?identifier. + ?asource ?distribution. + ?distribution ?title + }} + """ + qrs = g.query(dataset_query) + + for row in qrs: + title = row.title.value + datasets.append({"title": title, "identifier": row.identifier.value}) + return datasets + + +def get_datasets_from_catalog(datasets): + """A utility that supports converts datasets stored as catalog to list of dictionaries. + + Keyword arguments: + :param datasets: response.text + + :return List of dictionary represented datasets + """ + dig_objects = [] + + g = Graph() + g.parse(data=datasets, format="turtle") + + qrs = g.query( + """ + SELECT ?asource ?identifier ?isPartOf ?title ?spatial + WHERE { + ?asource a . + ?asource ?identifier. + ?asource ?isPartOf. + ?asource ?title. + ?asource ?spatial. + } + """ + ) + + for row in qrs: + dataset = {} + dataset["identifier"] = row.identifier.value + dataset["isPartOf"] = row.isPartOf.value + dataset["title"] = row.title.value + dataset["spatial"] = row.spatial.value + dig_objects.append(dataset) + + return dig_objects + + +def get_datasets_uid(collections): + """An utility function to support search for the datasets. + + Keyword arguments: + :param collections: collection + + :return List of datasets + """ + pass + + +def serialize_jsondl(collections): + """An utility that enables serializing the response.text into json-ld + + Keyword arguments: + :param collections: response.text + :return Collection + """ + + g = Graph().parse(data=collections, format="n3") + r = g.serialize(format="json-ld") + + return r + + +def _walk(rootpath, pattern=None, relative_path=".", depth=0, data=[]): + """Walk root directory and return the absolute paths, the paths relative + to rootpath and the filenames, optionally matching a regex pattern. + + :param rootpath: (str) The root directory to walk + :param pattern: (str) the regex pattern for the filenames + :param depth: int indicates the depth of the folder/file from root directory + :param data: contains information about all subdirectories and files in theroot path + + :returns: Generator of (absolute_path, relative_to_rootpath_path, filename) tuples. + + """ + absolute_path = os.path.join(rootpath, relative_path) + for p in os.listdir(absolute_path): + if os.path.isdir(os.path.join(absolute_path, p)): + data.append( + (rootpath, pattern, os.path.join(relative_path, p), depth, True) + ) + _walk(rootpath, pattern, os.path.join(relative_path, p), depth + 1, data) + return data + else: + if not pattern or (pattern and re.match(pattern, p)): + data.append( + ( + os.path.join(rootpath, relative_path, p), + relative_path, + p, + depth, + False, + ) + ) diff --git a/marketplace/version.py b/marketplace/version.py index 29bc6d3..fe50744 100644 --- a/marketplace/version.py +++ b/marketplace/version.py @@ -13,4 +13,4 @@ except RuntimeError: __version__ = get_version("marketplace-sdk").serialize() except ImportError: - __version__ = "v0.3.2" + __version__ = "v0.4.0" diff --git a/setup.cfg b/setup.cfg index a83564c..2cf3c1a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = marketplace_sdk -version = v0.3.2 +version = v0.4.0 description = Software Development Toolkit to communicate with the Materials MarketPlace platform. long_description = file: README.md long_description_content_type = text/markdown @@ -8,7 +8,7 @@ url = https://github.com/materials-marketplace/python-sdk author = Carl Simon Adorf, Pablo de Andres, Pranjali Singh and the AiiDAlab team author_email = simon.adorf@epfl.ch, pablo.de.andres@iwm.fraunhofer.de, pranjali.singh@iwm.fraunhofer.de license = MIT -license_file = LICENSE +license_files = LICENSE classifiers = Development Status :: 2 - Pre-Alpha License :: OSI Approved :: MIT License @@ -19,13 +19,31 @@ classifiers = [options] packages = find: install_requires = - fastapi==0.75.2 - marketplace-standard-app-api~=0.3 - packaging==21.3 + fastapi>0.75,<1.0 + marketplace-standard-app-api~=0.4 + packaging>=21.3,<=23.0 pika~=1.2 - requests~=2.26 + python-keycloak==2.12.0 + rdflib==6.2.0 + rdflib-jsonld==0.6.2 + requests>2.26.0,<3.0 python_requires = >=3.8 +[options.entry_points] +console_scripts = + list_collections = marketplace.data_sink_client.cli:list_collections + list_datasets = marketplace.data_sink_client.cli:list_datasets + get_collection_dcat = marketplace.data_sink_client.cli:get_collection_dcat + get_dataset_dcat = marketplace.data_sink_client.cli:get_dataset_dcat + delete_dataset = marketplace.data_sink_client.cli:delete_dataset + delete_collection = marketplace.data_sink_client.cli:delete_collection + upload_file_from_path = marketplace.data_sink_client.cli:upload_file_from_path + upload_folder = marketplace.data_sink_client.cli:upload_files_from_folder + download_folder = marketplace.data_sink_client.cli:download_folder + download_file = marketplace.data_sink_client.cli:download_file + query = marketplace.data_sink_client.cli:query + query_dataset = marketplace.data_sink_client.cli:query_dataset + [options.extras_require] dev = bumpver==2021.1114 diff --git a/tests/test_app.py b/tests/test_app.py index e229a25..1a9bee7 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -1,6 +1,6 @@ import pytest -from marketplace.app import MarketPlaceApp, get_app +from marketplace.app import get_app @pytest.fixture @@ -8,12 +8,6 @@ def app(): return get_app("test-app") -def test_app_v0_0_1(): - with pytest.warns(UserWarning): - app = MarketPlaceApp(client_id="test-app", capabilities=["heartbeat"]) - assert app.heartbeat() == "OK" - - def test_app_v0(app): assert "heartbeat" in app.capabilities response = app.heartbeat()