GridFTP transfer with Globus#
In order to serve your special requests on pool extensions, we use the globus tool kit which allows us to use the GridFTP server. GridFTP enables secure, efficient and fault-tolerant large volume data transfer.
The transfer process is initiated with a request submitted by you. This request should contain the dataset_id
s you would like to have replicated. Open an issue in the gitlab repository and make available a dictionary of the form:
dataset_id : { "data_node": ESGFNODE,
"globus_url": URL
}
After a review, a CDP manager will start the replication.
In this notebook, you learn
how to generate this request dictionary based on
pyesgf
how this dictionary is converted for file based globus transfer
a scheme of how globus transfer is started
When using in appmode
, you can create a replication request yourself. Therefore, all defined functions are executed at the end.
Replication request creation#
You set up a serach request with specifications for CMIP6 facets
The esgf-index is searched with
pyesgf
for request matches withretracted=False, latest=True
as default keyword arguments.Either
globus_url
s ordownload_url
s are saved in a dictionary which will serve as first step towards globus transfer
You can specify all valid search facets i.e. CMIP6 attributes for your request in a dictionary
datasets_to_download
. Inappmode
, we allow to set 5 different search facets. Please also provide a username which is used as a prefix for the output files.
import ipywidgets as widgets
user=widgets.Text(description="Username")
user
tab_contents = ['experiment_id', 'source_id', 'member_id', 'table_id', 'variable_id']
children = [widgets.Text(description=name) for name in tab_contents]
tab = widgets.Tab()
tab.children = children
[tab.set_title(i, title) for i, title in enumerate(tab_contents)]
tab
# "source_id":"E3SM-1-0",
# "member_id":"r1i1p1f1",
# "table_id":"fx",
# "variable_id":"sftlf",
We use the Lorence Livermoore ESGF node as the search node as it is the most stable node. We want to search through all ESGF nodes so we set
distrib=True
as a keyword argument.
from pyesgf.search import SearchConnection
#index_search_url = 'http://esgf-node.llnl.gov/esg-search'
index_search_url = 'http://esgf-data.dkrz.de/esg-search'
#conn = SearchConnection(index_search_url, distrib=True)
We define globus endpoints that are not useable in a list to sort them out.
noGoodEndpoints=["cmip.bcc.cma.cn",
"vesg.ipsl.upmc.fr",
"gfdl.noaa.gov",
"esgf-data.ucar.edu"]
We have to do two things:
collect_instances
which are identifiers for available datasets in the ESGF federation. These have to be valid (retracted=False, latest=True
)collect_globusURLs
which is done by searching an ESGF node which provides a globus URL for each instance.save_results_as_json
which is pretty self-explanatory
def collect_instances(searchOptions):
instances=[]
print("Connect to esgf index...")
conn = SearchConnection(index_search_url, distrib=True)
ctx=conn.new_context(**searchOptions,
retracted=False, latest=True)
recentctx=ctx.search()
for dset in recentctx:
instances.append(dset.json["instance_id"])
instances=list(set(instances))
return instances
from tqdm import tqdm
def collect_globusURLs(instances):
globusDict={}
print("Connect to esgf index...")
conn = SearchConnection(index_search_url, distrib=True)
for instance in tqdm(instances):
globusDict.setdefault(instance, {})
try:
ctx = conn.new_context(instance_id=instance)
recentctx=ctx.search()
for dataset in recentctx :
if "globus_url" in globusDict[instance] :
continue
if dataset.json["data_node"] in noGoodEndpoints:
continue
try:
files = dataset.file_context().search()
if len(files) == 0 :
globusDict[instance]=""
else:
if files[0].globus_url :
globusDict[instance]={
"data_node":files[0].json["data_node"],
"globus_url":'/'.join(files[0].globus_url.split('/')[0:-1])
}
else :
downloadUrls=[]
for file in files:
downloadUrls.append(file.download_url)
globusDict[instance] = {"download_url": downloadUrls}
except:
print("File search failed")
continue
except:
print("Context failed")
continue
return globusDict
import json
def save_results_as_json(globusDict, result_file):
with open(result_file, 'w', encoding='utf-8') as f:
json.dump(globusDict, f, ensure_ascii=False, indent=4)
print("You successfully created a request for replication.")
Dictionary conversion for globus input#
We aim at file based recursive gloubus
data transfer on dataset level. This requires the generation of inputFile
s organized such that each one contains only urls for one globus endpoint i.e. GridFTP server at another ESGF node. One line in those files corresponds to one dataset.
Therefore, by using pandas
and re
,
the dictionary is sorted by endpoints
the source and destinations are defined in each line
special configurations of specific end points are considered
Show code cell source
import pandas as pd
import re
def convert_dict_to_globus_input(result_file, result_trunk):
requestDict = pd.read_json(result_file, orient="index")
groups=requestDict.groupby("data_node")
for groupname, group in groups:
if "dkrz" in groupname:
print(str(len(group))+" dataset(s) already published and available at DKRZ. These are skipped")
continue
resultDf=pd.DataFrame(columns=['line_in_globus_input'])
#
urls = [group["globus_url"]] if isinstance(group["globus_url"], str) else group["globus_url"]
endpoint=urls[0].split('/')[0].split(':')[1]
for url in urls:
globusPath=url.split('/')
#
try:
startDir=next(i for i,v in enumerate(globusPath) if v.lower() == 'cmip6')
except:
print("Globus URL cannot be used because 'cmip6' is not within: "+url)
#EndDir is the version:
endDirs=[i for i, item in enumerate(globusPath) if re.search(r'v\d', item)]
if len(endDirs) != 1 :
print("Globus URL cannot be used because a valid version (v%d) is not within: "+url)
break
endDir=endDirs[0]+1
# ceda's esgf node is differently structured:
if "ceda" in groupname.lower() :
endDir-=1
destTrunk='/'.join(globusPath[startDir:endDir])
sourceForRecursive='/'.join(globusPath[1:endDir])
line_in_globus_input="--recursive /"+sourceForRecursive+" /~/mnt/lustre02/work/ik1017/Ingest/requests/"+result_trunk+"/"+destTrunk
resultDf=resultDf.append({'line_in_globus_input': line_in_globus_input}, ignore_index=True)
resultDfName=result_file+"_"+groupname+"_"+endpoint+".csv"
resultDf["line_in_globus_input"].to_csv(resultDfName, sep='\n', index=False, header=False)
Show code cell source
import uuid
button = widgets.Button(description="Create replication request!")
buttondisplay = widgets.Output()
display(button, buttondisplay)
def collect_and_validate_facets_input():
datasets_to_download={}
for i in range(len(tab_contents)):
if tab.children[i].value != "" and tab.children[i].value != None:
datasets_to_download[tab_contents[i]]=tab.children[i].value
if datasets_to_download == {} or len(datasets_to_download.keys()) < 3:
raise ValueError("Please specify at least three search facets")
return datasets_to_download
def set_and_validate_user():
username = user.value
if username == "" or username == None :
raise ValueError("Please specify a Username")
return username
def on_button_clicked(b):
with buttondisplay:
button.disabled=True
datasets_to_download=collect_and_validate_facets_input()
username=set_and_validate_user()
request_id=str(uuid.uuid4())
print("Collecting instances...")
instances=collect_instances(datasets_to_download)
print("Collecting globus URLs...")
globusDict=collect_globusURLs(instances)
result_trunk=username+"_"+request_id
result_file="/home/dkrz/k204210/"+result_trunk
print("Writing results to "+result_file+" ...")
save_results_as_json(globusDict, result_file)
print("Converting the request...")
convert_dict_to_globus_input(result_file, result_trunk)
print("Done!")
button.disabled=False
button.on_click(on_button_clicked)
Starting globus transfer#
The globus submission process starts with a manual login by a DKRZ Data Pool admin.
Corresponding globus endpoints are activated manually with ESGF credentials.
A part of the data pool which is affected by the request is mirrored so that globus can synchronize all missing files.
The globus command for starting a transfer looks like:
dkrz_endpoint="ac6870f0-5d5e-11ea-960a-0afc9e7dd773"
!globus transfer -s exists --notify off --batch {endpoint} {dkrz_endpoint} < {inputFile}