Danger
Please access this document in its canonical location as the currently accessed page may not be rendered correctly: Using dask with Scanpy
Using dask
with Scanpy#
Warning
🔪 Beware sharp edges! 🔪
dask
support in scanpy
is new and highly experimental!
Many functions in scanpy
do not support dask
and may exhibit unexpected behaviour if dask arrays are passed to them. Stick to what’s outlined in this tutorial and you should be fine!
Please report any issues you run into over on the issue tracker.
dask
is a popular out-of-core, distributed array processing library that scanpy is beginning to support. Here we walk through a quick tutorial of using dask
in a simple analysis task.
This notebook relies on optional dependencies in dask, sklearn-ann
and annoy
. Install them with:
pip install -U "scanpy[dask,leiden]" "dask[distributed,diagnostics]" sklearn-ann annoy
(scanpy[dask]
means to install scanpy together with dask[array]
, but will also always make sure that a compatible dask version is selected)
from pathlib import Path
import dask.distributed as dd
import scanpy as sc
import anndata as ad
import h5py
sc.logging.print_header()
scanpy==1.11.0.dev144+g3d220a93 anndata==0.11.0rc3 umap==0.5.6 numpy==2.0.2 scipy==1.14.1 pandas==2.2.3 scikit-learn==1.5.2 statsmodels==0.14.4 pynndescent==0.5.13
Here, we’ll be working with a moderately large dataset of 1.4 million cells taken from: COVID-19 immune features revealed by a large-scale single-cell transcriptome atlas
def download(url: str, path: Path) -> None:
"""Download a file from `url` and save it to `path`, showing a progress bar."""
from tqdm.autonotebook import tqdm
from urllib.request import urlretrieve
pb = tqdm(unit="B", unit_scale=True, unit_divisor=1024)
def update(b: int = 1, bsize: int = 1, tsize: int | None = None):
if tsize is not None:
pb.total = tsize
return pb.update(b * bsize - pb.n)
try:
with pb:
urlretrieve(url, path, reporthook=update)
except BaseException:
path.unlink(missing_ok=True)
raise
cell_atlas_path = Path("data/cell_atlas.h5ad")
cell_atlas_path.parent.mkdir(exist_ok=True)
if not cell_atlas_path.exists():
download(
"https://datasets.cellxgene.cziscience.com/82eac9c1-485f-4e21-ab21-8510823d4f6e.h5ad",
cell_atlas_path,
)
For more information on using distributed computing via dask
, please see their documentation. In short, one needs to define both a cluster and a client to have some degree of control over the compute resources dask will use. It’s very likely you will have to tune the number of workers and amount of memory per worker along with your chunk sizes.
cluster = dd.LocalCluster(n_workers=3)
client = dd.Client(cluster)
Note
In this notebook we will be demonstrating some computations in scanpy that use scipy.sparse
classes within each dask chunk. Be aware that this is currently poorly supported by dask
, and that if you want to interact with the dask
arrays in any way other than though the anndata
and scanpy
libraries you will likely need to densify each chunk.
All operations in scanpy
and anndata
that work with sparse chunks also work with dense chunks.
The advantage of using sparse chunks are:
The ability to work with fewer, larger chunks
Accelerated computations per chunk (e.g. don’t need to
sum
all those extra zeros)
You can convert from sparse
to dense
chunks via:
X = X.map_blocks(lambda x: x.toarray(), dtype=X.dtype, meta=np.array([]))
And in reverse:
X = X.map_blocks(sparse.csr_matrix)
Note that you will likely have to work with smaller chunks when doing this, via a rechunking operation. We suggest using a factor of the larger chunk size to achieve the most efficient rechunking.
SPARSE_CHUNK_SIZE = 100_000
Dask provides extensive tooling for monitoring your computation. You can access that via the dashboard started when using any of their distributed clusters.
client
Client
Client-4fd47eed-9200-11ef-9f8e-00001107fe80
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
ed840d14
Dashboard: http://127.0.0.1:8787/status | Workers: 3 |
Total threads: 21 | Total memory: 128.00 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-55ec9b65-d863-41d0-871c-32f878e7b184
Comm: tcp://127.0.0.1:41085 | Workers: 3 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 21 |
Started: Just now | Total memory: 128.00 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:42367 | Total threads: 7 |
Dashboard: http://127.0.0.1:44627/status | Memory: 42.67 GiB |
Nanny: tcp://127.0.0.1:34927 | |
Local directory: /tmp/dask-scratch-space/worker-gjxs6ouq |
Worker: 1
Comm: tcp://127.0.0.1:41229 | Total threads: 7 |
Dashboard: http://127.0.0.1:45435/status | Memory: 42.67 GiB |
Nanny: tcp://127.0.0.1:45945 | |
Local directory: /tmp/dask-scratch-space/worker-x6pi696y |
Worker: 2
Comm: tcp://127.0.0.1:46523 | Total threads: 7 |
Dashboard: http://127.0.0.1:36217/status | Memory: 42.67 GiB |
Nanny: tcp://127.0.0.1:43553 | |
Local directory: /tmp/dask-scratch-space/worker-g98s_77o |
We’ll convert the X
representation to dask
using anndata.experimental.read_elem_as_dask
.
The file we’ve retrieved from cellxgene has already been processed. Since this tutorial is demonstrating processing from counts, we’re just going to access the counts matrix and annotations.
%%time
with h5py.File(cell_atlas_path, "r") as f:
adata = ad.AnnData(
obs=ad.io.read_elem(f["obs"]),
var=ad.io.read_elem(f["var"]),
)
adata.X = ad.experimental.read_elem_as_dask(
f["raw/X"], chunks=(SPARSE_CHUNK_SIZE, adata.shape[1])
)
CPU times: user 4.31 s, sys: 835 ms, total: 5.14 s
Wall time: 14 s
We’ve optimized a number of scanpy functions to be completely lazy. That means it will look like nothing is computed when you call an operation on a dask array, but only later when you hit compute.
In some cases it’s currently unavoidable to skip all computation, and these cases will kick off compute for all the delayed operations immediately.
%%time
adata.layers["counts"] = adata.X.copy() # Making sure we keep access to the raw counts
sc.pp.normalize_total(adata, target_sum=1e4)
CPU times: user 10.6 ms, sys: 2.92 ms, total: 13.5 ms
Wall time: 13.4 ms
%%time
sc.pp.log1p(adata)
CPU times: user 2.21 ms, sys: 1.86 ms, total: 4.07 ms
Wall time: 58.5 ms
Highly variable genes needs to add entries into obs
, which currently does not support lazy column. So computation will occur immediately on call.
%%time
sc.pp.highly_variable_genes(adata)
CPU times: user 4.68 s, sys: 418 ms, total: 5.1 s
Wall time: 43.1 s
Scanpy 1.11+ supports computing PCA on dask
arrays with sparse chunks.
%%time
sc.pp.pca(adata)
CPU times: user 7.97 s, sys: 474 ms, total: 8.45 s
Wall time: 47.4 s
While most of the PCA computation runs immediately, the last step (computing the observation loadings) is lazy, so must be triggered manually to avoid recomputation.
%%time
adata.obsm["X_pca"] = adata.obsm["X_pca"].compute()
CPU times: user 3.52 s, sys: 803 ms, total: 4.32 s
Wall time: 38.5 s
adata
AnnData object with n_obs × n_vars = 1462702 × 27714
obs: 'celltype', 'majorType', 'City', 'sampleID', 'donor_id', 'Sample type', 'CoVID-19 severity', 'Sample time', 'Sampling day (Days after symptom onset)', 'BCR single cell sequencing', 'TCR single cell sequencing', 'Outcome', 'Comorbidities', 'COVID-19-related medication and anti-microbials', 'Leukocytes [G over L]', 'Neutrophils [G over L]', 'Lymphocytes [G over L]', 'Unpublished', 'disease_ontology_term_id', 'cell_type_ontology_term_id', 'tissue_ontology_term_id', 'development_stage_ontology_term_id', 'self_reported_ethnicity_ontology_term_id', 'assay_ontology_term_id', 'sex_ontology_term_id', 'is_primary_data', 'organism_ontology_term_id', 'suspension_type', 'tissue_type', 'cell_type', 'assay', 'disease', 'organism', 'sex', 'tissue', 'self_reported_ethnicity', 'development_stage', 'observation_joinid'
var: 'feature_is_filtered', 'feature_name', 'feature_reference', 'feature_biotype', 'feature_length', 'highly_variable', 'means', 'dispersions', 'dispersions_norm'
uns: 'log1p', 'hvg', 'pca'
obsm: 'X_pca'
varm: 'PCs'
layers: 'counts'
Now that we’ve computed our PCA let’s take a look at it:
sc.pl.pca(adata, color="majorType")
Further support for dask
is a work in progress. However, many operations past this point can work with the dimensionality reduction directly in memory. With scanpy 1.10
many of these operations can be accelerated to make working with large datasets significantly easier. For example:
Using alternative KNN backends for faster neighbor calculation Using other kNN libraries in Scanpy
Using the
igraph
backend for clustering
%%time
from sklearn_ann.kneighbors.annoy import AnnoyTransformer # noqa: E402
transformer = AnnoyTransformer(n_neighbors=15)
sc.pp.neighbors(adata, transformer=transformer)
CPU times: user 1min 26s, sys: 2.09 s, total: 1min 28s
Wall time: 1min 10s
%%time
sc.tl.leiden(adata, flavor="igraph", n_iterations=2)
CPU times: user 1min 42s, sys: 6.3 s, total: 1min 49s
Wall time: 1min 49s
UMAP computation can still be rather slow, taking longer than the rest of this notebook combined:
%%time
sc.tl.umap(adata)
sc.pl.umap(adata, color=["leiden", "majorType"])
CPU times: user 3h 13min 5s, sys: 32.2 s, total: 3h 13min 37s
Wall time: 37min 55s