5 Star 6 Fork 4

Gitee 极速下载 / GATK

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/broadinstitute/gatk
克隆/下载
gatk 20.60 KB
AI 代码解读
一键复制 编辑 原始数据 按行查看 历史
Chris Norman 提交于 2023-03-02 15:43 . Move to Java 17! (#8035)
#!/usr/bin/env python
#
# Launcher script for GATK tools. Delegates to java -jar, spark-submit, or gcloud as appropriate,
# and sets many important Spark and htsjdk properties before launch.
#
# If running a non-Spark tool, or a Spark tool in local mode, will search for GATK executables
# as follows:
# -If the GATK_LOCAL_JAR environment variable is set, uses that jar
# -Otherwise if the GATK_RUN_SCRIPT created by "gradle installDist" exists, uses that
# -Otherwise uses the newest local jar in the same directory as the script or the BIN_PATH
# (in that order of precedence)
#
# If running a Spark tool, searches for GATK executables as follows:
# -If the GATK_SPARK_JAR environment variable is set, uses that jar
# -Otherwise uses the newest Spark jar in the same directory as the script or the BIN_PATH
# (in that order of precedence)
#
import sys
from subprocess import check_call, CalledProcessError, call
import os
import hashlib
import signal
import re
script = os.path.dirname(os.path.realpath(__file__))
projectName = "gatk"
BUILD_LOCATION = script +"/build/install/" + projectName + "/bin/"
GATK_RUN_SCRIPT = BUILD_LOCATION + projectName
GATK_LOCAL_JAR_ENV_VARIABLE = "GATK_LOCAL_JAR"
GATK_SPARK_JAR_ENV_VARIABLE = "GATK_SPARK_JAR"
BIN_PATH = script + "/build/libs"
EXTRA_JAVA_OPTIONS_SPARK= "-DGATK_STACKTRACE_ON_USER_EXCEPTION=true " \
"-Dsamjdk.use_async_io_read_samtools=false " \
"-Dsamjdk.use_async_io_write_samtools=false " \
"-Dsamjdk.use_async_io_write_tribble=false " \
"-Dsamjdk.compression_level=2 "
PACKAGED_LOCAL_JAR_OPTIONS= [
"-Dsamjdk.use_async_io_read_samtools=false",
"-Dsamjdk.use_async_io_write_samtools=true",
"-Dsamjdk.use_async_io_write_tribble=false",
"-Dsamjdk.compression_level=2"
]
DEFAULT_SPARK_ARGS_PREFIX = '--conf'
DEFAULT_SPARK_ARGS = {
"spark.kryoserializer.buffer.max" : "512m",
"spark.driver.maxResultSize" : "0",
"spark.driver.userClassPathFirst" : "false",
"spark.io.compression.codec" : "lzf",
"spark.executor.memoryOverhead" : "600",
"spark.driver.extraJavaOptions" : EXTRA_JAVA_OPTIONS_SPARK,
"spark.executor.extraJavaOptions" : EXTRA_JAVA_OPTIONS_SPARK
}
def createSparkConfArgs(javaOptions):
sparkConfArgs = DEFAULT_SPARK_ARGS
if javaOptions is not None:
sparkConfArgs["spark.driver.extraJavaOptions"] = sparkConfArgs["spark.driver.extraJavaOptions"] + ' ' + javaOptions
sparkConfArgs["spark.executor.extraJavaOptions"] = sparkConfArgs["spark.executor.extraJavaOptions"] + ' ' + javaOptions
return DEFAULT_SPARK_ARGS_PREFIX, sparkConfArgs
class GATKLaunchException(Exception):
pass
def signal_handler(signal, frame):
sys.exit(1)
def main(args):
#suppress stack trace when killed by keyboard interrupt
signal.signal(signal.SIGINT, signal_handler)
try:
if len(args) == 0 or (len(args) == 1 and (args[0] == "--help" or args[0] == "-h")):
print("")
print(" Usage template for all tools (uses --spark-runner LOCAL when used with a Spark tool)")
print(" gatk AnyTool toolArgs")
print("")
print(" Usage template for Spark tools (will NOT work on non-Spark tools)")
print(" gatk SparkTool toolArgs [ -- --spark-runner <LOCAL | SPARK | GCS> sparkArgs ]")
print("")
print(" Getting help")
print(" gatk --list Print the list of available tools" )
print("")
print(" gatk Tool --help Print help on a particular tool" )
print("")
print(" Configuration File Specification")
print(" --gatk-config-file PATH/TO/GATK/PROPERTIES/FILE")
print("")
print(" gatk forwards commands to GATK and adds some sugar for submitting spark jobs")
print("")
print(" --spark-runner <target> controls how spark tools are run")
print(" valid targets are:")
print(" LOCAL: run using the in-memory spark runner")
print(" SPARK: run using spark-submit on an existing cluster ")
print(" --spark-master must be specified")
print(" --spark-submit-command may be specified to control the Spark submit command")
print(" arguments to spark-submit may optionally be specified after -- ")
print(" GCS: run using Google cloud dataproc")
print(" commands after the -- will be passed to dataproc")
print(" --cluster <your-cluster> must be specified after the --")
print(" spark properties and some common spark-submit parameters will be translated ")
print(" to dataproc equivalents")
print("")
print(" --dry-run may be specified to output the generated command line without running it")
print(" --java-options 'OPTION1[ OPTION2=Y ... ]' optional - pass the given string of options to the ")
print(" java JVM at runtime. ")
print(" Java options MUST be passed inside a single string with space-separated values.")
print("")
print(" --debug-port <number> sets up a Java VM debug agent to listen to debugger connections on a")
print(" particular port number. This in turn will add the necessary java VM arguments")
print(" so that you don't need to explicitly indicate these using --java-options.")
print(" --debug-suspend sets the Java VM debug agent up so that the run get immediatelly suspended")
print(" waiting for a debugger to connect. By default the port number is 5005 but")
print(" can be customized using --debug-port")
sys.exit(0)
if len(args) == 1 and args[0] == "--list":
args[0] = "--help" # if we're invoked with --list, invoke the GATK with --help
dryRun = "--dry-run" in args
if dryRun:
dryRun = True
args.remove("--dry-run")
debugPort = getValueForArgument(args, "--debug-port")
if debugPort is not None:
i = args.index("--debug-port")
del args[i]
del args[i]
debugSuspend = "--debug-suspend" in args
if debugSuspend:
debugSuspend = True
i = args.index("--debug-suspend")
del args[i]
if debugPort is None:
debugPort = 5005
javaOptions = getValueForArgument(args, "--java-options")
if javaOptions is not None:
i = args.index("--java-options")
del args[i] #remove javaOptions
del args[i] #and its parameter
sparkRunner = getValueForArgument(args, "--spark-runner")
if sparkRunner is not None:
i = args.index("--spark-runner")
del args[i] #remove spark target
del args[i] #and its parameter
sparkSubmitCommand = getValueForArgument(args, "--spark-submit-command")
if sparkSubmitCommand is not None:
i = args.index("--spark-submit-command")
del args[i] #remove sparkSubmitCommand target
del args[i] #and its parameter
(gatkArgs, sparkArgs) = getSplitArgs(args)
sparkMaster = getValueForArgument(sparkArgs, "--spark-master")
if sparkMaster is not None:
i = sparkArgs.index("--spark-master")
del sparkArgs[i] #remove spark target
del sparkArgs[i] #and its parameter
gatkArgs += ["--spark-master", sparkMaster]
runGATK(sparkRunner, sparkSubmitCommand, dryRun, gatkArgs, sparkArgs, javaOptions, debugPort, debugSuspend)
except GATKLaunchException as e:
sys.stderr.write(str(e)+"\n")
sys.exit(3)
except CalledProcessError as e:
sys.exit(e.returncode)
def getSparkSubmitCommand(sparkSubmitCommand):
if sparkSubmitCommand is None:
sparkhome = os.environ.get("SPARK_HOME")
if sparkhome is not None:
return sparkhome +"/bin/spark-submit"
else:
return "spark-submit"
else:
return sparkSubmitCommand
def getGCloudSubmitCommand(gcloudCmd):
if gcloudCmd is None:
gcloudHome = os.environ.get("GCLOUD_HOME")
if gcloudHome is not None:
return gcloudHome +"/gcloud"
else:
return "gcloud"
else:
return gcloudCmd
def getLocalGatkRunCommand(javaOptions, debugPort, debugSuspend):
localJarFromEnv = getJarFromEnv(GATK_LOCAL_JAR_ENV_VARIABLE)
# Add java options to our packaged local jar options
if javaOptions is not None:
PACKAGED_LOCAL_JAR_OPTIONS.extend(javaOptions.split())
if debugPort is not None:
if debugSuspend:
PACKAGED_LOCAL_JAR_OPTIONS.extend(["-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=" + str(debugPort)])
else:
PACKAGED_LOCAL_JAR_OPTIONS.extend(["-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=" + str(debugPort)])
if localJarFromEnv is not None:
return formatLocalJarCommand(localJarFromEnv)
wrapperScript = getGatkWrapperScript(throwIfNotFound=False)
if wrapperScript is not None:
# Add options to JAVA_OPTS environment var for dispatch script
if javaOptions is not None:
envJavaOpts = os.environ.get('JAVA_OPTS')
if envJavaOpts is not None:
envJavaOpts = envJavaOpts + ' ' + javaOptions
else:
envJavaOpts = javaOptions
os.environ['JAVA_OPTS'] = envJavaOpts
return [wrapperScript]
return formatLocalJarCommand(getLocalJar()) # will throw if local jar not found
def formatLocalJarCommand(localJar):
return ["java"] + PACKAGED_LOCAL_JAR_OPTIONS + [ "-jar", localJar]
def getGatkWrapperScript(throwIfNotFound=True):
if not os.path.exists(GATK_RUN_SCRIPT):
if throwIfNotFound:
raise GATKLaunchException("Missing GATK wrapper script: " + GATK_RUN_SCRIPT + "\nTo generate the wrapper run:\n\n " + script + "/gradlew installDist")
else:
return None
sys.stderr.write("Using GATK wrapper script " + GATK_RUN_SCRIPT)
return GATK_RUN_SCRIPT
def getLocalJar(throwIfNotFound=True):
localJar = findJar("local.jar", envVariableOverride=GATK_LOCAL_JAR_ENV_VARIABLE)
if localJar is None and throwIfNotFound:
raise GATKLaunchException("No local jar was found, please build one by running\n\n " +
script + "/gradlew localJar\n"
"or\n"
" export " + GATK_LOCAL_JAR_ENV_VARIABLE + "=<path_to_local_jar>")
return localJar
def getSparkJar(throwIfNotFound=True):
sparkJar = findJar("spark.jar", envVariableOverride=GATK_SPARK_JAR_ENV_VARIABLE)
if sparkJar is None and throwIfNotFound:
raise GATKLaunchException("No spark jar was found, please build one by running\n\n " +
script + "/gradlew sparkJar\n"
"or\n"
" export " + GATK_SPARK_JAR_ENV_VARIABLE + "=<path_to_spark_jar>")
return sparkJar
def findJar(jarSuffix, jarPrefix=projectName, envVariableOverride=None, jarSearchDirs=(script, BIN_PATH)):
if envVariableOverride is not None:
jarPathFromEnv = getJarFromEnv(envVariableOverride)
if jarPathFromEnv is not None:
return jarPathFromEnv
for jarDir in jarSearchDirs:
jar = getNewestJarInDir(jarDir, jarSuffix, jarPrefix)
if jar is not None:
sys.stderr.write("Using GATK jar " + jar)
return jar
return None
def getJarFromEnv(envVariableName):
jarPathFromEnv = os.environ.get(envVariableName)
if jarPathFromEnv is not None:
if not os.path.exists(jarPathFromEnv):
raise GATKLaunchException(envVariableName + " was set to: " + jarPathFromEnv + " but this file doesn't exist. Please fix your environment")
else:
sys.stderr.write("Using GATK jar " + jarPathFromEnv + " defined in environment variable " + envVariableName)
return jarPathFromEnv
return None
def getNewestJarInDir(dir, jarSuffix, jarPrefix):
if not os.path.exists(dir):
return None
dirContents = os.listdir(dir)
jarPattern = re.compile("^" + jarPrefix + ".*" + jarSuffix + "$")
jars = [f for f in dirContents if jarPattern.match(f)]
if len(jars) != 0:
newestJar = max(jars, key=lambda x: os.stat(dir + "/" + x).st_mtime)
return dir + "/" + newestJar
return None
def md5(file):
hash = hashlib.md5()
with open(file, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash.update(chunk)
return hash.hexdigest()
def cacheJarOnGCS(jar, dryRun):
staging = os.environ.get("GATK_GCS_STAGING")
if staging is not None:
staging = staging.strip('/')
staging = staging + '/'
if not staging.startswith("gs://"):
staging = "gs://" + staging
if dryRun is True:
return jar
elif staging is None:
sys.stderr.write( "\njar caching is disabled because GATK_GCS_STAGING is not set\n\n"
"please set GATK_GCS_STAGING to a bucket you have write access too in order to enable jar caching\n"
"add the following line to you .bashrc or equivalent startup script\n\n"
" export GATK_GCS_STAGING=gs://<my_bucket>/\n")
return jar
else:
jarname = os.path.basename(jar)
(name, ext) = os.path.splitext(jarname)
jarmd5 = md5(jar)
gcsjar = staging + name + "_"+ jarmd5 + ext
try:
if call(["gsutil", "-q", "stat", gcsjar]) == 0:
sys.stderr.write("\nfound cached jar: " + gcsjar + "\n")
return gcsjar
else:
if call(["gsutil", "cp", jar, gcsjar]) == 0:
sys.stderr.write("\nuploaded " + jar + " -> " + gcsjar + "\n")
return gcsjar
else:
sys.stderr.write("\nfailed to upload " + jar + " -> " + gcsjar + "\nThere may be something wrong with your bucket permissions or gsutil installation\n")
return jar
except OSError:
sys.stderr.write("\nTried to execute gsutil to upload the jar but it wasn't available\n "
"See https://cloud.google.com/sdk/#Quick_Start for instructions on installing gsutil\n\n")
return jar
def runGATK(sparkRunner, suppliedSparkSubmitCommand, dryrun, gatkArgs, sparkArgs, javaOptions, debugPort, debugSuspend):
if sparkRunner is None or sparkRunner == "LOCAL":
cmd = getLocalGatkRunCommand(javaOptions, debugPort, debugSuspend) + gatkArgs + sparkArgs
runCommand(cmd, dryrun)
elif sparkRunner == "SPARK":
sparkSubmitCmd = getSparkSubmitCommand(suppliedSparkSubmitCommand)
sparkConfArgsPrefix, sparkConfArgs = createSparkConfArgs(javaOptions)
sparkConfArgList = [[sparkConfArgsPrefix, "%s=%s" % (a, sparkConfArgs[a])] for a in sparkConfArgs];
cmd = [ sparkSubmitCmd,
"--master", getSparkMasterSpecified(gatkArgs)] \
+ [s for args in sparkConfArgList for s in args] \
+ sparkArgs \
+ [getSparkJar()] \
+ gatkArgs
try:
runCommand(cmd, dryrun)
except OSError:
raise GATKLaunchException("Tried to run %s but failed.\nMake sure %s is available in your path" % (sparkSubmitCmd, sparkSubmitCmd))
elif sparkRunner == "GCS":
jarPath = cacheJarOnGCS(getSparkJar(), dryrun)
gcloudCmd = getGCloudSubmitCommand(suppliedSparkSubmitCommand)
# Note: For GCS we don't need the prefix for the sparkConfArgs, so we ignore it and only grab the second
# return value of createSparkConfArgs
sparkConfArgs = createSparkConfArgs(javaOptions)[1]
dataprocargs = convertSparkSubmitToDataprocArgs(sparkConfArgs, sparkArgs)
sys.stderr.write("\nReplacing spark-submit style args with dataproc style args\n\n" + " ".join(sparkArgs) +" -> " + " ".join(dataprocargs) +"\n" )
cmd = [ gcloudCmd, "dataproc", "jobs", "submit", "spark"] \
+ dataprocargs \
+ ["--jar", jarPath] \
+ ["--"] + gatkArgs + ["--spark-master", "yarn"]
try:
runCommand(cmd, dryrun)
except OSError:
raise GATKLaunchException("Tried to run gcloud but failed.\nMake sure gcloud is available in your path and you are properly authenticated")
else:
raise GATKLaunchException("Value: " + sparkRunner + " is not a valid value for --spark-runner. Choose one of LOCAL, SPARK, GCS")
def runCommand(cmd, dryrun):
if dryrun:
print("\nDry run:\n")
# Display environment variables for dry run
if len(os.environ) != 0:
print(" Env:\n")
print( '\n'.join([' %s = %s' % (v, os.environ[v]) for v in os.environ]) )
print(" Cmd:\n")
print((" " + " ".join(cmd)+"\n"))
else:
sys.stderr.write( "\nRunning:\n")
sys.stderr.write(" " + " ".join(cmd)+"\n")
gatk_env = os.environ.copy()
gatk_env["SUPPRESS_GCLOUD_CREDS_WARNING"] = "true"
check_call(cmd, env=gatk_env)
def getSplitArgs(args):
inFirstGroup = True
firstArgs = []
secondArgs = []
for arg in args:
if arg == "--":
if not inFirstGroup:
raise GATKLaunchException("Argument '--' must only be specified once")
inFirstGroup = False
else:
if inFirstGroup:
firstArgs.append(arg)
else:
secondArgs.append(arg)
return (firstArgs, secondArgs)
def isDryRun(args):
return "--dry-run" in args
def getValueForArgument(args, argument):
if argument in args:
i = args.index(argument)
if len(args) <= i+1:
raise GATKLaunchException("Argument: " + argument + " requires a parameter")
return args[i+1]
return None
def getSparkMasterSpecified(args):
value = getValueForArgument(args, "--spark-master")
if value is None:
raise GATKLaunchException("The argument --spark-master <master url> must be specified")
else:
return value
# translate select spark-submit parameters to their gcloud equivalent
def convertSparkSubmitToDataprocArgs(sparkConfArgs, sparkArgs):
replacements = {"--driver-memory": "spark.driver.memory",
"--driver-cores": "spark.driver.cores",
"--executor-memory": "spark.executor.memory",
"--executor-cores": "spark.executor.cores",
"--num-executors": "spark.executor.instances" }
dataprocargs = []
filesToAdd = []
properties = []
try:
# Arguments passed as the sparkConfArgs should be passed through as properties.
# In practice yarn files will be added through the 'sparkArgs' argument that is parsed below,
# so we don't need to check for them up here.
properties.extend(['%s=%s' % (p, sparkConfArgs[p]) for p in sparkConfArgs])
# Iterate through sparkArgs
i = 0
while i < len(sparkArgs):
arg = sparkArgs[i]
if arg == "--conf":
i += 1
property = sparkArgs[i]
if "spark.yarn.dist.files" in property: #intercept yarn files and pass it through --files instead
files = property.split("=")[1]
filesToAdd = filesToAdd + files.split(",")
else:
properties.append(sparkArgs[i])
elif not replacements.get(arg) is None:
i += 1
propertyname = replacements.get(arg)
properties.append(propertyname + "=" + sparkArgs[i])
else:
dataprocargs.append(arg)
i +=1
except IndexError:
raise GATKLaunchException("Found an argument: " + arg + "with no matching value.")
if not len(properties) == 0:
dataprocargs.append("--properties")
dataprocargs.append(",".join(properties))
if not len(filesToAdd) == 0:
dataprocargs.append("--files")
dataprocargs.append(",".join(filesToAdd))
return dataprocargs
if __name__ == "__main__":
main(sys.argv[1:])
Java
1
https://gitee.com/mirrors/GATK.git
git@gitee.com:mirrors/GATK.git
mirrors
GATK
GATK
master

搜索帮助