In this stage, we are going to build a simple workflow to align short reads of DNA. This section is specifically modelled from the GATK4 data processing for variant discovery WDL workflow. As discussed before, we've made modifications to simplify the workflow creation, and start with unaligned Fastqs rather than uBAMs.
This is the full workflow we'll build:
Our workflow will consist of the following steps:
- Start with a pair of compressed
FASTQ
files, - Align these reads using
BWA MEM
into an uncompressedSAM
file (the de facto standard for short read alignments), - Compress this into the binary equivalent
BAM
file usingsamtools
, and finally - Mark duplicates using
gatk4 MarkDuplicates
.
These tools already exist within the Janis Tool Registry, you can see their documentation online:
- BWA MEM - Aligning our fastqs to the reference genome
- Samtols View
- GATK4 MarkDuplicates
A Janis workflow is a Python script, so we can start by creating a file called preprocessing.py
and importing Janis.
Use a text editor to edit part1/preprocessing.py
file.
You'll see there already a number of imports for you, Let's go through them:
from janis_core import WorkflowBuilder, String
# Import bioinformatics types
from janis_bioinformatics.data_types import FastqGzPairedEnd, FastaWithIndexes
# Import bioinformatics tools
from janis_bioinformatics.tools.bwa import BwaMemLatest
from janis_bioinformatics.tools.samtools import SamToolsView_1_9
from janis_bioinformatics.tools.gatk4 import (
Gatk4MarkDuplicates_4_1_4,
Gatk4SortSam_4_1_4,
Gatk4SetNmMdAndUqTags_4_1_4,
)
- From
janis_core
library, we've imported theWorkflowBuilder
andString
types - From
janis_bioinformatics.data_types
, we've importedFastqGzPairedEnd
andFastaWithIndexes
- We've imported the four tools:
BwaMemLatest
frombwa
SamToolsView_1_9
fromsamtools
Gatk4MarkDuplicates_4_1_4
fromgatk4
Gatk4SortSam_4_1_4
fromgatk4
Gatk4SetNmMdAndUqTags_4_1_4
fromgatk4
We'll import additional data_type to handle our Bam output. Add the following line to the file:
from janis_bioinformatics.data_types import Bam
We'll create an instance of the WorkflowBuilder
class, this just requires a name for your workflow (can contain alphanumeric characters and underscores).
w = WorkflowBuilder("preprocessingWorkflow")
A workflow has 3 methods for building workflows:
workflow.input
- Used for creating inputs,workflow.step
- Creates a step on a workflow,workflow.output
- Exposes an output on a workflow.
We give each input / step / output a unique identifier, which then becomes a node in our workflow graph. We can refer to the created node using dot-notation (eg: w.input_name
). We'll see how this works in the later sections.
More information about each step will be linked from this page about the Workflow
and WorkflowBuilder
class.
Further reading: Creating an input
To create an input on a workflow, you can use the Workflow.input
method, which has the following structure:
Workflow.input(
identifier: str,
datatype: DataType,
default: any = None,
doc: str = None
)
[NOTE]
This is a little confusing. They're not actually key: value pairs - theyre positional args. Could reword this to
A workflow input must have two items:
- A unique identifier (the name of the input)
- The datatype of the input (a janis DataType)
Additionally, we can give a default, and a doc explaining what the input is.
# a paired-end gz fastq
w.input("fastq", FastqGzPairedEnd)
# a sample name with defaults and an explanation
w.input("sample_name", String, "mysample", "The name of the sample")
An input requires a unique identifier (string) and a DataType (String, FastqGzPair, etc). We have four inputs we want to expose on this workflow.
- Sequencing Reads (
FastqGzPairedEnd
- paired end sequence) - Sample name (
String
) - Read group header (
String
) - Reference files (
FastaWithIndexes
) -Fasta
+ index files)
To add these input definitions to the workflow, add the following lines to your code:
w.input("sample_name", String)
w.input("read_group", String)
w.input("fastq", FastqGzPairedEnd)
w.input("reference", FastaWithIndexes)
Further reading: Creating a step
Similar to exposing inputs, we create steps with the Workflow.step
method. It has the following structure:
Workflow.step(
identifier: str,
tool: janis_core.tool.tool.Tool, # why is this .tool.tool.Tool lol?
scatter: Union[str, List[str], ScatterDescription] = None,
)
[NOTE] scatter is not explained here! better to leave it out for now? touch on it later when it becomes relevant.
We provide a identifier for the step (unique amongst the other nodes in the workflow), and intialise our tool, passing our inputs of the step as parameters.
We can refer to an input (or previous result) using the dot notation. For example, to refer to the fastq
input, we can use w.fastq
.
We use bwa mem's documentation to determine that we need to provide the following inputs:
reads
:FastqGzPair
(connect tow.fastq
)readGroupHeaderLine
:String
(connect tow.read_group
)reference
:FastaWithDict
(connect tow.reference
)markShorterSplits
:True
The -M flag (
markShorterSplits
) causes BWA to mark shorter split hits as secondary (essential for Picard compatibility). (howto) Map and mark duplicates
To add BWA MEM step to your workflow, add the following step definition to your code:
w.step(
"bwamem", # step identifier
BwaMemLatest(
reads=w.fastq,
readGroupHeaderLine=w.read_group,
reference=w.reference,
markShorterSplits=True,
)
)
We'll use a very similar pattern for Samtools View, except this time we'll reference the output of bwamem
. From bwa mem's documentation, there is one output called out
with type Sam
. We'll connect this to SamtoolsView
only input, called sam
.
Add the following step definition to your code:
w.step(
"samtoolsview",
SamToolsView_1_9(
sam=w.bwamem.out
)
)
Now that we have an aligned BAM file, we can use gatk4 MarkDuplicates
. MarkDuplicates requires sorted inputs so the secondary and supplementary reads get marked correctly. The output of BWA is query-grouped, using assumeSortOrder="queryname"
is good for MarkDuplicates [source].
Janis will automatically generate a filename for the metrics file, so we don't need to provide that.
Hence, we'll provide MarkDuplicates the following params:
bam
: Output ofsamtoolsview.out
assumeSortOrder
the value"queryname"
:https://gatkforums.broadinstitute.org/gatk/discussion/2799/howto-map-and-mark-duplicates
Add the following step definition to your code:
w.step(
"markduplicates",
Gatk4MarkDuplicates_4_1_4(
bam=w.samtoolsview.out,
assumeSortOrder="queryname"
),
)
Further reading: Creating an output
Outputs have a very similar syntax to both inputs and steps, they take an identifier
and a named source
parameter. Here is the structure:
Workflow.output(
identifier: str,
datatype: DataType = None,
source: Node = None,
output_folder: List[Union[String, Node]] = None,
output_name: Union[String, Node] = None
)
Often, we don't want to specify the output data type, because we can let Janis do this for us. We'll talk about the output_folder
and output_name
in the next few sections. For now, we just have to specify an output identifier and a source.
We'll add an output called tmp_out_unsortedbam
, and use markduplicates.out
as the source.
To add this output to your workflow, add the following line to your code:
w.output("tmp_out_unsortedbam", Bam, source=w.markduplicates.out)
Now that we've implemented part of our workflow, let's test that it's working so far!
Although we're using CWL to run the tools, because we've written out analysis in Janis, we can also translate to WDL for free! You can translate your workflow to Janis using:
# in bash now
janis translate part1/preprocessing.py wdl
version development
import "tools/bwamem_v0_7_15.wdl" as B
import "tools/SamToolsView_1_9_0.wdl" as S
import "tools/Gatk4MarkDuplicates_4_1_4_0.wdl" as G
workflow preprocessingWorkflow {
input {
String sample_name
String read_group
Array[File] fastq
File reference
File reference_fai
File reference_amb
File reference_ann
File reference_bwt
File reference_pac
File reference_sa
File reference_dict
Boolean? bwamem_markShorterSplits = true
String? markduplicates_assumeSortOrder = "queryname"
}
call B.bwamem as bwamem {
input:
reference=reference,
reference_amb=reference_amb,
reference_ann=reference_ann,
reference_bwt=reference_bwt,
reference_pac=reference_pac,
reference_sa=reference_sa,
reads=fastq,
readGroupHeaderLine=read_group,
markShorterSplits=select_first([bwamem_markShorterSplits, true])
}
call S.SamToolsView as samtoolsview {
input:
sam=bwamem.out
}
call G.Gatk4MarkDuplicates as markduplicates {
input:
bam=[samtoolsview.out],
assumeSortOrder=select_first([markduplicates_assumeSortOrder, "queryname"])
}
output {
File tmp_out_unsortedbam = markduplicates.out
}
}
You can export the CWL translation too!
We're looking good! Now let's run the worfklow using Janis and CWLTool. We'll include the --development
flag, as when we re-run the pipeline, it won't recompute steps we've already ran!
janis run -o part1 --development --keep-intermediate-files \
part1/preprocessing.py \
--fastq data/BRCA1_R*.fastq.gz \
--reference reference/hg38-brca1.fasta \
--sample_name NA12878 \
--read_group "@RG\tID:NA12878\tSM:NA12878\tLB:NA12878\tPL:ILLUMINA"
[NOTE] I hadn't downloaded the data at this stage. When I downloaded the data, the preprocessing.py file was overwritten, which was kinda annoying. Had to copy the workflow out again.
Hopefully we got Task has finished with status: Completed
. We can check out output directory:
$ ls -lgh part1/
# drwxr-xr-x 288B Jul 16 16:50 janis
# -rw-r--r-- 1.4K Jul 16 16:49 preprocessing_solution.py
# -rw-r--r-- 411B Jul 16 13:15 preprocessing.py
# -rw-r--r-- 2.6M Jul 16 16:50 tmp_out_unsortedbam.bam
We see the tmp_out_unsortedbam.bam
which is the BAM output of mark duplicates.
Once you are happy with this task, you can now remove the tmp_out_unsortedbam.bam
output we created in this section.