Using dask with Scanpy

Danger

Please access this document in its canonical location as the currently accessed page may not be rendered correctly: Using dask with Scanpy

Using dask with Scanpy#

Warning

🔪 Beware sharp edges! 🔪

dask support in scanpy is new and highly experimental!

Many functions in scanpy do not support dask and may exhibit unexpected behaviour if dask arrays are passed to them. Stick to what’s outlined in this tutorial and you should be fine!

Please report any issues you run into over on the issue tracker.

dask is a popular out-of-core, distributed array processing library that scanpy is beginning to support. Here we walk through a quick tutorial of using dask in a simple analysis task.

This notebook relies on optional dependencies in dask, sklearn-ann and annoy. Install them with:

pip install -U "scanpy[dask,leiden]" "dask[distributed,diagnostics]" sklearn-ann annoy

(scanpy[dask] means to install scanpy together with dask[array], but will also always make sure that a compatible dask version is selected)

from pathlib import Path

import dask.distributed as dd
import scanpy as sc
import anndata as ad
import h5py

sc.logging.print_header()
scanpy==1.11.0.dev144+g3d220a93 anndata==0.11.0rc3 umap==0.5.6 numpy==2.0.2 scipy==1.14.1 pandas==2.2.3 scikit-learn==1.5.2 statsmodels==0.14.4 pynndescent==0.5.13

Here, we’ll be working with a moderately large dataset of 1.4 million cells taken from: COVID-19 immune features revealed by a large-scale single-cell transcriptome atlas

def download(url: str, path: Path) -> None:
    """Download a file from `url` and save it to `path`, showing a progress bar."""
    from tqdm.autonotebook import tqdm
    from urllib.request import urlretrieve

    pb = tqdm(unit="B", unit_scale=True, unit_divisor=1024)

    def update(b: int = 1, bsize: int = 1, tsize: int | None = None):
        if tsize is not None:
            pb.total = tsize
        return pb.update(b * bsize - pb.n)

    try:
        with pb:
            urlretrieve(url, path, reporthook=update)
    except BaseException:
        path.unlink(missing_ok=True)
        raise
cell_atlas_path = Path("data/cell_atlas.h5ad")
cell_atlas_path.parent.mkdir(exist_ok=True)
if not cell_atlas_path.exists():
    download(
        "https://datasets.cellxgene.cziscience.com/82eac9c1-485f-4e21-ab21-8510823d4f6e.h5ad",
        cell_atlas_path,
    )

For more information on using distributed computing via dask, please see their documentation. In short, one needs to define both a cluster and a client to have some degree of control over the compute resources dask will use. It’s very likely you will have to tune the number of workers and amount of memory per worker along with your chunk sizes.

cluster = dd.LocalCluster(n_workers=3)
client = dd.Client(cluster)

Note

In this notebook we will be demonstrating some computations in scanpy that use scipy.sparse classes within each dask chunk. Be aware that this is currently poorly supported by dask, and that if you want to interact with the dask arrays in any way other than though the anndata and scanpy libraries you will likely need to densify each chunk.

All operations in scanpy and anndata that work with sparse chunks also work with dense chunks.

The advantage of using sparse chunks are:

  • The ability to work with fewer, larger chunks

  • Accelerated computations per chunk (e.g. don’t need to sum all those extra zeros)

You can convert from sparse to dense chunks via:

X = X.map_blocks(lambda x: x.toarray(), dtype=X.dtype, meta=np.array([]))

And in reverse:

X = X.map_blocks(sparse.csr_matrix)

Note that you will likely have to work with smaller chunks when doing this, via a rechunking operation. We suggest using a factor of the larger chunk size to achieve the most efficient rechunking.

SPARSE_CHUNK_SIZE = 100_000

Dask provides extensive tooling for monitoring your computation. You can access that via the dashboard started when using any of their distributed clusters.

client

Client

Client-4fd47eed-9200-11ef-9f8e-00001107fe80

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

We’ll convert the X representation to dask using anndata.experimental.read_elem_as_dask.

The file we’ve retrieved from cellxgene has already been processed. Since this tutorial is demonstrating processing from counts, we’re just going to access the counts matrix and annotations.

%%time
with h5py.File(cell_atlas_path, "r") as f:
    adata = ad.AnnData(
        obs=ad.io.read_elem(f["obs"]),
        var=ad.io.read_elem(f["var"]),
    )
    adata.X = ad.experimental.read_elem_as_dask(
        f["raw/X"], chunks=(SPARSE_CHUNK_SIZE, adata.shape[1])
    )
CPU times: user 4.31 s, sys: 835 ms, total: 5.14 s
Wall time: 14 s

We’ve optimized a number of scanpy functions to be completely lazy. That means it will look like nothing is computed when you call an operation on a dask array, but only later when you hit compute.

In some cases it’s currently unavoidable to skip all computation, and these cases will kick off compute for all the delayed operations immediately.

%%time
adata.layers["counts"] = adata.X.copy()  # Making sure we keep access to the raw counts
sc.pp.normalize_total(adata, target_sum=1e4)
CPU times: user 10.6 ms, sys: 2.92 ms, total: 13.5 ms
Wall time: 13.4 ms
%%time
sc.pp.log1p(adata)
CPU times: user 2.21 ms, sys: 1.86 ms, total: 4.07 ms
Wall time: 58.5 ms

Highly variable genes needs to add entries into obs, which currently does not support lazy column. So computation will occur immediately on call.

%%time
sc.pp.highly_variable_genes(adata)
CPU times: user 4.68 s, sys: 418 ms, total: 5.1 s
Wall time: 43.1 s

Scanpy 1.11+ supports computing PCA on dask arrays with sparse chunks.

%%time
sc.pp.pca(adata)
CPU times: user 7.97 s, sys: 474 ms, total: 8.45 s
Wall time: 47.4 s

While most of the PCA computation runs immediately, the last step (computing the observation loadings) is lazy, so must be triggered manually to avoid recomputation.

%%time
adata.obsm["X_pca"] = adata.obsm["X_pca"].compute()
CPU times: user 3.52 s, sys: 803 ms, total: 4.32 s
Wall time: 38.5 s
adata
AnnData object with n_obs × n_vars = 1462702 × 27714
    obs: 'celltype', 'majorType', 'City', 'sampleID', 'donor_id', 'Sample type', 'CoVID-19 severity', 'Sample time', 'Sampling day (Days after symptom onset)', 'BCR single cell sequencing', 'TCR single cell sequencing', 'Outcome', 'Comorbidities', 'COVID-19-related medication and anti-microbials', 'Leukocytes [G over L]', 'Neutrophils [G over L]', 'Lymphocytes [G over L]', 'Unpublished', 'disease_ontology_term_id', 'cell_type_ontology_term_id', 'tissue_ontology_term_id', 'development_stage_ontology_term_id', 'self_reported_ethnicity_ontology_term_id', 'assay_ontology_term_id', 'sex_ontology_term_id', 'is_primary_data', 'organism_ontology_term_id', 'suspension_type', 'tissue_type', 'cell_type', 'assay', 'disease', 'organism', 'sex', 'tissue', 'self_reported_ethnicity', 'development_stage', 'observation_joinid'
    var: 'feature_is_filtered', 'feature_name', 'feature_reference', 'feature_biotype', 'feature_length', 'highly_variable', 'means', 'dispersions', 'dispersions_norm'
    uns: 'log1p', 'hvg', 'pca'
    obsm: 'X_pca'
    varm: 'PCs'
    layers: 'counts'

Now that we’ve computed our PCA let’s take a look at it:

sc.pl.pca(adata, color="majorType")
_images/f4c24a5d838e696160214939f05a7e909d83a8b69664ef37567882535eb0e97a.png

Further support for dask is a work in progress. However, many operations past this point can work with the dimensionality reduction directly in memory. With scanpy 1.10 many of these operations can be accelerated to make working with large datasets significantly easier. For example:

%%time
from sklearn_ann.kneighbors.annoy import AnnoyTransformer  # noqa: E402

transformer = AnnoyTransformer(n_neighbors=15)
sc.pp.neighbors(adata, transformer=transformer)
CPU times: user 1min 26s, sys: 2.09 s, total: 1min 28s
Wall time: 1min 10s
%%time
sc.tl.leiden(adata, flavor="igraph", n_iterations=2)
CPU times: user 1min 42s, sys: 6.3 s, total: 1min 49s
Wall time: 1min 49s

UMAP computation can still be rather slow, taking longer than the rest of this notebook combined:

%%time
sc.tl.umap(adata)
sc.pl.umap(adata, color=["leiden", "majorType"])
_images/18f10d98f7b0d26476b4fd0d77cf726419d9db9db0ac6c051c7fccd57ebddc83.png
CPU times: user 3h 13min 5s, sys: 32.2 s, total: 3h 13min 37s
Wall time: 37min 55s