我有一个DSL2 Nextflow管道,它可以分支到2个FILTER进程。然后在CONCAT过程中,我重用前两个流程输出作为输入。同样在SUMMARIZE过程中,我重用以前的进程输出作为输入。
我发现当我用两个或更多对fastq样本运行管道时,输入是混合的。
例如,在CONCAT步骤中,我最后将一对fastq样本的bwa_2_ch输出与另一对fastq样本的filter_1_ch连接起来,而不是使用相同的pair_id样本。
我相信我没有完全正确地编写workflow { }通道和输入,工作流在没有混合样本的情况下正确地通过步骤。但我不知道如何定义输入,这样就不会混淆。
//trimmomatic read trimming
process TRIM {
tag "trim ${pair_id}"
publishDir "${params.outdir}/$pair_id/trim_results"
input:
tuple val(pair_id), path(reads)
output:
tuple val(pair_id), path("trimmed_${pair_id}_...")
script:
"""
"""
}
//bwa alignment
process BWA_1 {
tag "align-1 ${pair_id}f"
publishDir "${params.outdir}/$pair_id/..."
input:
tuple val(pair_id), path(reads)
path index
output:
tuple val(pair_id), path("${pair_id}_...}")
script:
"""
"""
}
process FILTER_1 {
tag "filter ${pair_id}"
publishDir "${params.outdir}/$pair_id/filter_results"
input:
tuple val(pair_id), path(reads)
output:
tuple val(pair_id),
path("${pair_id}_...")
script:
"""
"""
}
process FILTER_2 {
tag "filter ${pair_id}"
publishDir "${params.outdir}/$pair_id/filter_results"
input:
tuple val(pair_id), path(reads)
output:
tuple val(pair_id),
path("${pair_id}_...")
script:
"""
"""
}
//bwa alignment
process BWA_2 {
tag "align-2 ${pair_id}"
publishDir "${params.outdir}/$pair_id/bwa_2_results"
input:
tuple val(pair_id), path(reads)
path index
output:
tuple val(pair_id), path("${pair_id}_...}")
script:
"""
"""
}
//concatenate pf and non_human reads
process CONCAT{
tag "concat ${pair_id}"
publishDir "${params.outdir}/$pair_id"
input:
tuple val(pair_id), path(program_reads)
tuple val(pair_id), path(pf_reads)
output:
tuple val(pair_id), path("${pair_id}_...")
script:
"""
"""
}
//summary
process SUMMARY{
tag "summary ${pair_id}"
publishDir "${params.outdir}/$pair_id"
input:
tuple val(pair_id), path(trim_reads)
tuple val(pair_id), path(non_human_reads)
output:
file("summary_${pair_id}.csv")
script:
"""
"""
}
workflow {
Channel
.fromFilePairs(params.reads, checkIfExists: true)
.set {read_pairs_ch}
// trim reads
trim_ch = TRIM(read_pairs_ch)
// map to pf genome
bwa_1_ch = BWA_1(trim_ch, params.pf_index)
// filter mapped reads
filter_1_ch = FILTER_1(bwa_1_ch)
filter_2_ch = FILTER_2(bwa_1_ch)
// map to pf and human genome
bwa_2_ch = BWA_2(filter_2_ch, params.index)
// concatenate non human reads
concat_ch = CONCAT(bwa_2_ch,filter_1_ch)
// summarize
summary_ch = SUMMARY(trim_ch,concat_ch)
}发布于 2022-11-23 12:43:52
这样的混淆通常发生在进程错误地接收两个或多个队列通道时。在大多数情况下,当您需要价值时,您需要的是一个队列通道和一个或多个多输入通道通道。在这里,我不确定pair_id的确切目标是什么,但可能不是您所期望的那样:
input:
tuple val(pair_id), path(program_reads)
tuple val(pair_id), path(pf_reads)您要做的是将上面的内容替换为:
input:
tuple val(pair_id), path(program_reads), path(pf_reads)然后使用加入操作符创建所需的输入。例如:
workflow {
Channel
.fromFilePairs( params.reads, checkIfExists: true )
.set { read_pairs_ch }
pf_index = file( params.pf_index )
bwa_index = file( params.bwa_index )
// trim reads
trim_ch = TRIM( read_pairs_ch )
// map to pf genome
bwa_1_ch = BWA_1( trim_ch, pf_index)
// filter mapped reads
filter_1_ch = FILTER_1(bwa_1_ch)
filter_2_ch = FILTER_2(bwa_1_ch)
// map to pf and human genome
bwa_2_ch = BWA_2(filter_2_ch, bwa_index)
// concatenate non human reads
concat_ch = bwa_2_ch \
| join( filter_1_ch ) \
| CONCAT
// summarize
summary_ch = trim_ch \
| join( concat_ch ) \
| SUMMARY
}https://stackoverflow.com/questions/74537223
复制相似问题