{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## GridFTP transfer with Globus\n", "\n", "In order to serve your special requests on pool extensions, we use the [globus](https://www.globus.org) tool kit which allows us to use the GridFTP server. [GridFTP](https://www.dkrz.de/up/services/analysis/data-transfer/gridftp) enables secure, efficient and fault-tolerant large volume data transfer.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The transfer process is initiated with a **request** submitted by you. This request should contain the `dataset_id`s you would like to have replicated. Open an issue in the [gitlab repository](https://gitlab.dkrz.de/data-infrastructure-services/cmip-data-pool/-/issues) and make available a dictionary of the form:\n", "\n", "```\n", "dataset_id : { \"data_node\": ESGFNODE,\n", " \"globus_url\": URL\n", " }\n", "```\n", "\n", "After a review, a CDP manager will start the replication." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this notebook, you learn\n", "- how to generate this request dictionary based on `pyesgf`\n", "- how this dictionary is converted for file based globus transfer\n", "- a scheme of how globus transfer is started\n", "\n", "When using in `appmode`, you can create a replication request yourself. Therefore, all defined functions are executed at the end." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Replication request creation\n", "\n", "1. You set up a serach request with specifications for CMIP6 facets\n", "2. The esgf-index is searched with `pyesgf` for request matches with `retracted=False, latest=True` as default keyword arguments.\n", "3. Either `globus_url`s or `download_url`s are saved in a dictionary which will serve as first step towards globus transfer" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "1. You can specify all valid search facets i.e. CMIP6 attributes for your request in a dictionary `datasets_to_download`. In `appmode`, we allow to set 5 different search facets. Please also provide a username which is used as a prefix for the output files." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import ipywidgets as widgets\n", "user=widgets.Text(description=\"Username\")\n", "user" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tab_contents = ['experiment_id', 'source_id', 'member_id', 'table_id', 'variable_id']\n", "children = [widgets.Text(description=name) for name in tab_contents]\n", "tab = widgets.Tab()\n", "tab.children = children\n", "[tab.set_title(i, title) for i, title in enumerate(tab_contents)]\n", "tab\n", "# \"source_id\":\"E3SM-1-0\",\n", "# \"member_id\":\"r1i1p1f1\",\n", "# \"table_id\":\"fx\",\n", "# \"variable_id\":\"sftlf\",\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "2. We use the Lorence Livermoore ESGF node as the search node as it is the most stable node. We want to search through all ESGF nodes so we set `distrib=True` as a keyword argument." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pyesgf.search import SearchConnection\n", "#index_search_url = 'http://esgf-node.llnl.gov/esg-search'\n", "index_search_url = 'http://esgf-data.dkrz.de/esg-search'\n", "#conn = SearchConnection(index_search_url, distrib=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We define globus endpoints that are not useable in a list to sort them out." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "noGoodEndpoints=[\"cmip.bcc.cma.cn\", \n", " \"vesg.ipsl.upmc.fr\",\n", " \"gfdl.noaa.gov\",\n", " \"esgf-data.ucar.edu\"]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "3. We have to do two things:\n", " - `collect_instances` which are identifiers for available datasets in the ESGF federation. These have to be valid (`retracted=False, latest=True`)\n", " - `collect_globusURLs` which is done by searching an ESGF node which provides a globus URL for each instance.\n", " - `save_results_as_json` which is pretty self-explanatory" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def collect_instances(searchOptions):\n", " instances=[]\n", " print(\"Connect to esgf index...\")\n", " conn = SearchConnection(index_search_url, distrib=True)\n", " ctx=conn.new_context(**searchOptions,\n", " retracted=False, latest=True)\n", " recentctx=ctx.search()\n", " for dset in recentctx:\n", " instances.append(dset.json[\"instance_id\"])\n", " instances=list(set(instances))\n", " return instances" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from tqdm import tqdm\n", "\n", "def collect_globusURLs(instances):\n", " globusDict={}\n", " print(\"Connect to esgf index...\")\n", " conn = SearchConnection(index_search_url, distrib=True)\n", " for instance in tqdm(instances):\n", " globusDict.setdefault(instance, {})\n", " try:\n", " ctx = conn.new_context(instance_id=instance)\n", " recentctx=ctx.search()\n", " for dataset in recentctx :\n", " if \"globus_url\" in globusDict[instance] :\n", " continue\n", " if dataset.json[\"data_node\"] in noGoodEndpoints:\n", " continue\n", " try:\n", " files = dataset.file_context().search()\n", " if len(files) == 0 :\n", " globusDict[instance]=\"\"\n", " else:\n", " if files[0].globus_url :\n", " globusDict[instance]={\n", " \"data_node\":files[0].json[\"data_node\"],\n", " \"globus_url\":'/'.join(files[0].globus_url.split('/')[0:-1])\n", " }\n", " else : \n", " downloadUrls=[]\n", " for file in files:\n", " downloadUrls.append(file.download_url)\n", " globusDict[instance] = {\"download_url\": downloadUrls}\n", " except:\n", " print(\"File search failed\")\n", " continue\n", " except:\n", " print(\"Context failed\")\n", " continue\n", " return globusDict" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import json\n", "\n", "def save_results_as_json(globusDict, result_file):\n", " with open(result_file, 'w', encoding='utf-8') as f:\n", " json.dump(globusDict, f, ensure_ascii=False, indent=4)\n", " print(\"You successfully created a request for replication.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Dictionary conversion for globus input\n", "\n", "We aim at file based recursive `gloubus` data transfer on dataset level. This requires the generation of `inputFile`s organized such that each one contains only urls for one globus endpoint i.e. GridFTP server at another ESGF node. One line in those files corresponds to one dataset.\n", "\n", "Therefore, by using `pandas` and `re`,\n", "1. the dictionary is sorted by *endpoint*s \n", "2. the source and destinations are defined in each line\n", "3. special configurations of specific end points are considered" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "import pandas as pd\n", "import re\n", "\n", "def convert_dict_to_globus_input(result_file, result_trunk):\n", " requestDict = pd.read_json(result_file, orient=\"index\")\n", " groups=requestDict.groupby(\"data_node\")\n", "\n", " for groupname, group in groups:\n", " if \"dkrz\" in groupname:\n", " print(str(len(group))+\" dataset(s) already published and available at DKRZ. These are skipped\")\n", " continue\n", " resultDf=pd.DataFrame(columns=['line_in_globus_input'])\n", " #\n", " urls = [group[\"globus_url\"]] if isinstance(group[\"globus_url\"], str) else group[\"globus_url\"]\n", " endpoint=urls[0].split('/')[0].split(':')[1]\n", " for url in urls:\n", " globusPath=url.split('/')\n", " #\n", " try:\n", " startDir=next(i for i,v in enumerate(globusPath) if v.lower() == 'cmip6')\n", " except:\n", " print(\"Globus URL cannot be used because 'cmip6' is not within: \"+url)\n", " #EndDir is the version:\n", " endDirs=[i for i, item in enumerate(globusPath) if re.search(r'v\\d', item)]\n", " if len(endDirs) != 1 :\n", " print(\"Globus URL cannot be used because a valid version (v%d) is not within: \"+url)\n", " break\n", " endDir=endDirs[0]+1\n", "\n", " # ceda's esgf node is differently structured:\n", " if \"ceda\" in groupname.lower() :\n", " endDir-=1\n", "\n", " destTrunk='/'.join(globusPath[startDir:endDir])\n", " sourceForRecursive='/'.join(globusPath[1:endDir])\n", " line_in_globus_input=\"--recursive /\"+sourceForRecursive+\" /~/mnt/lustre02/work/ik1017/Ingest/requests/\"+result_trunk+\"/\"+destTrunk\n", " resultDf=resultDf.append({'line_in_globus_input': line_in_globus_input}, ignore_index=True)\n", " resultDfName=result_file+\"_\"+groupname+\"_\"+endpoint+\".csv\"\n", " resultDf[\"line_in_globus_input\"].to_csv(resultDfName, sep='\\n', index=False, header=False)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "import uuid\n", "\n", "button = widgets.Button(description=\"Create replication request!\")\n", "buttondisplay = widgets.Output()\n", "display(button, buttondisplay)\n", "\n", "def collect_and_validate_facets_input():\n", " datasets_to_download={}\n", " for i in range(len(tab_contents)):\n", " if tab.children[i].value != \"\" and tab.children[i].value != None:\n", " datasets_to_download[tab_contents[i]]=tab.children[i].value\n", " if datasets_to_download == {} or len(datasets_to_download.keys()) < 3:\n", " raise ValueError(\"Please specify at least three search facets\")\n", " return datasets_to_download\n", "\n", "def set_and_validate_user():\n", " username = user.value\n", " if username == \"\" or username == None :\n", " raise ValueError(\"Please specify a Username\")\n", " return username\n", "\n", "def on_button_clicked(b):\n", " with buttondisplay:\n", " button.disabled=True\n", " datasets_to_download=collect_and_validate_facets_input()\n", " username=set_and_validate_user()\n", " request_id=str(uuid.uuid4())\n", " print(\"Collecting instances...\")\n", " instances=collect_instances(datasets_to_download)\n", " print(\"Collecting globus URLs...\")\n", " globusDict=collect_globusURLs(instances)\n", " result_trunk=username+\"_\"+request_id\n", " result_file=\"/home/dkrz/k204210/\"+result_trunk\n", " print(\"Writing results to \"+result_file+\" ...\")\n", " save_results_as_json(globusDict, result_file)\n", " print(\"Converting the request...\")\n", " convert_dict_to_globus_input(result_file, result_trunk)\n", " print(\"Done!\")\n", " button.disabled=False\n", "\n", "button.on_click(on_button_clicked)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Starting globus transfer\n", "\n", "1. The globus submission process starts with a **manual login** by a DKRZ Data Pool admin.\n", "2. Corresponding globus endpoints are **activated manually** with ESGF credentials.\n", "3. A part of the data pool which is affected by the request is **mirrored** so that globus can synchronize all missing files.\n", "\n", "The globus command for starting a transfer looks like:\n", "\n", "```\n", "dkrz_endpoint=\"ac6870f0-5d5e-11ea-960a-0afc9e7dd773\"\n", "!globus transfer -s exists --notify off --batch {endpoint} {dkrz_endpoint} < {inputFile}\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "python3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.6" }, "nbsphinx": { "execute": "never" } }, "nbformat": 4, "nbformat_minor": 4 }