## ----setup, include = FALSE-------------------------------------------------------------------------------------------
knitr::opts_chunk$set(
    collapse = TRUE,
    comment = "#>",
    crop = NULL # Related to 
    # https://stat.ethz.ch/pipermail/bioc-devel/2020-April/016656.html
)

## ----vignetteSetup, echo=FALSE, message=FALSE, warning = FALSE--------------------------------------------------------
## Bib setup
library("RefManageR")

## Write bibliography information
bib_packages <- c(
    R = citation(),
    BiocStyle = citation("BiocStyle")[1],
    knitr = citation("knitr")[1],
    RefManageR = citation("RefManageR")[1],
    rmarkdown = citation("rmarkdown")[1],
    sessioninfo = citation("sessioninfo")[1],
    testthat = citation("testthat")[1]
)

bib <- ReadBib("../inst/REFERENCES.bib")

# for (pkg in names(bib_packages)) {
#   bib[[pkg]] <- bib_packages[[pkg]]
# }


BibOptions(
    check.entries = FALSE, style = "markdown", cite.style = "numeric",
    bib.style = "numeric"
)

## ----structure, echo = FALSE, out.width = "100%", fig.cap = "Components of the streaming API"-------------------------
knitr::include_graphics("streaming_structure.svg")

## ----default_callbacks------------------------------------------------------------------------------------------------
library(posDemux)
input_fastq <- system.file("extdata",
    "PETRI-seq_forward_reads.fq.gz",
    package = "posDemux"
)
output_barcode_table <- tempfile(
    pattern = "barcode_table",
    fileext = ".txt"
)
default_callbacks <- streaming_callbacks(
    input_file = input_fastq,
    output_table_file = output_barcode_table,
    chunk_size = 1e+4,
    verbose = TRUE
)

## ----run_streaming----------------------------------------------------------------------------------------------------
suppressPackageStartupMessages({
    library(purrr)
    library(Biostrings)
})
barcode_files <- system.file(
    "extdata/PETRI-seq_barcodes",
    c(bc1 = "bc1.fa", bc2 = "bc2.fa", bc3 = "bc3.fa"),
    package = "posDemux"
)
names(barcode_files) <- paste0("bc", 1L:3L)
barcode_index <- map(barcode_files, readDNAStringSet)
barcodes <- barcode_index[c("bc3", "bc2", "bc1")]
sequence_annotation <- c(UMI = "P", "B", "A", "B", "A", "B", "A")
segment_lengths <- c(7L, 7L, 15L, 7L, 14L, 7L, NA_integer_)
streaming_summary_res <- streaming_demultiplex(
    state_init = default_callbacks$state_init,
    loader = default_callbacks$loader,
    archiver = default_callbacks$archiver,
    barcodes = barcodes,
    allowed_mismatches = 1L,
    segments = sequence_annotation,
    segment_lengths = segment_lengths
)

## ----streaming_freq_table---------------------------------------------------------------------------------------------
head(streaming_summary_res$freq_table)

## ----streaming_summary_res--------------------------------------------------------------------------------------------
streaming_summary_res$summary_res

## ----freq_table_extract-----------------------------------------------------------------------------------------------
freq_table <- streaming_summary_res$freq_table
bc_cutoff <- 500L
selected_freq_table <- freq_table[seq_len(bc_cutoff), ]

## ----database_setup---------------------------------------------------------------------------------------------------
suppressPackageStartupMessages({
    library(DBI)
})
output_database_path <- tempfile(
    pattern = "selected_barcode_table",
    fileext = ".sqlite"
)
output_db <- dbConnect(RSQLite::SQLite(), output_database_path)

## ----database_write---------------------------------------------------------------------------------------------------
chunk_size <- 1e4
suppressPackageStartupMessages({
    library(chunked)
    library(magrittr)
})
read_table_chunkwise(
    file = output_barcode_table,
    header = TRUE,
    sep = "\t",
    chunk_size = chunk_size
) %>%
    # For use within a pipeline, it is more convenient to use inner_join() 
    # than posDemux::row_match() and it achieves the save result
    inner_join(selected_freq_table) %>%
    mutate(
        celltag = do.call(
            paste,
            c(barcodes %>% names() %>% all_of() %>% across(), sep = "_")
        )
    ) %>%
    select(read, UMI, celltag) %>%
    write_chunkwise(dbplyr::src_dbi(output_db), "selected_barcodes")

## ----database_close---------------------------------------------------------------------------------------------------
dbExecute(output_db, "CREATE INDEX idx ON selected_barcodes(read)") %>%
    invisible()
dbDisconnect(output_db)

## ----state_init-------------------------------------------------------------------------------------------------------
custom_state_init <- list(
    total_reads = 0L, demultiplexed_reads = 0L,
    output_table_initialized = FALSE
)

## ----loader-----------------------------------------------------------------------------------------------------------
input_fasta <- system.file("extdata", "PETRI-seq_forward_reads.fa.gz",
    package = "posDemux"
)

loader <- function(state) {
    n_reads_per_chunk <- 1e4
    if (!state$output_table_initialized) {
        message("Starting to load sequences")
    }
    chunk <- Biostrings::readDNAStringSet(
        filepath = input_fasta,
        format = "fasta",
        nrec = n_reads_per_chunk,
        skip = state$total_reads
    )
    n_reads_in_chunk <- length(chunk)
    if (n_reads_in_chunk == 0L && state$output_table_initialized) {
        # The case when the initial chunk is empty is given
        # special treatment since
        # we want to create the table regardless
        # No more reads to process, make the outer framework return
        final_res <- list(
            state = state,
            sequences = NULL,
            should_terminate = TRUE
        )
        message("Done demultiplexing")
        return(final_res)
    }
    state$total_reads <- state$total_reads + n_reads_in_chunk
    list(
        state = state,
        sequences = chunk,
        should_terminate = FALSE
    )
}

## ----archiver---------------------------------------------------------------------------------------------------------
output_umi_file <- tempfile(pattern = "umi.fa", fileext = ".txt")
custom_output_table <- tempfile(
    pattern = "barcode_table_custom",
    fileext = ".txt"
)
archiver <- function(state, filtered_res) {
    barcode_matrix <- filtered_res$demultiplex_res$assigned_barcodes
    barcode_names <- colnames(barcode_matrix)
    read_names <- rownames(barcode_matrix)
    # If the table has no rows, we may risk getting a NULL value
    if (is.null(read_names)) {
        read_names <- character()
    }
    barcode_table <- as.data.frame(barcode_matrix)
    read_name_table <- data.frame(read = read_names)
    umis <- filtered_res$demultiplex_res$payload$UMI

    chunk_table <- cbind(read_name_table, barcode_table)
    if (!state$output_table_initialized) {
        append <- FALSE
        state$output_table_initialized <- TRUE
    } else {
        append <- TRUE
    }

    Biostrings::writeXStringSet(
        umis,
        filepath = output_umi_file, append = TRUE
    )
    readr::write_tsv(
        x = chunk_table,
        file = custom_output_table,
        append = append,
        col_names = !append,
        eol = "\n"
    )
    state <- within(state, {
        demultiplexed_reads <- demultiplexed_reads + nrow(barcode_matrix)
        paste0(
            "Processed {total_reads} reads,", " ",
            "successfully demultiplexed {demultiplexed_reads} reads so far..."
        ) %>%
            glue::glue() %>%
            message()
    })
    state
}

## ----combining_custom_callbacks---------------------------------------------------------------------------------------
custom_summary_res <- streaming_demultiplex(
    state_init = custom_state_init,
    loader = loader,
    archiver = archiver,
    barcodes = barcodes,
    allowed_mismatches = 1L,
    segments = sequence_annotation,
    segment_lengths = segment_lengths
)

## ----verify_barcode_table---------------------------------------------------------------------------------------------
readr::read_tsv(custom_output_table, show_col_types = FALSE)

## ----verify_umi-------------------------------------------------------------------------------------------------------
Biostrings::readDNAStringSet(output_umi_file, format = "fasta")

## ----reproduce3, echo=FALSE-------------------------------------------------------------------------------------------
## Session info
library("sessioninfo")
options(width = 120)
session_info()

