LMDBCache
⚠️ Important Warning
LMDBCache is problematic with fork-based multiprocessing!
Before using LMDBCache, please consider using joblib.Memory
instead. joblib.Memory
provides similar caching functionality but is more user-friendly and works reliably with multiprocessing scenarios. It offers:
- Better multiprocessing support
- Automatic cache invalidation
- More intuitive API
Use LMDBCache only if you have specific requirements that joblib.Memory
cannot fulfill.
What is LMDB?
LMDB (Lightning Memory-Mapped Database) is a high-performance, memory-mapped key-value store. It provides:
- Fast read/write operations: Memory-mapped files for optimal performance
- ACID compliance: Reliable transactions with consistency guarantees
- Compact storage: Efficient data organization with minimal overhead
- Cross-platform: Works on Linux, macOS, and Windows
LMDBCache wraps LMDB to provide a simple Python caching interface with automatic serialization using pickle.
Key Points to Use it Correctly
It is stated clearly in the lmdb document that:
Restrictions/caveats:
- Use an MDB_env* in the process which opened it, without fork()ing.
This means that a LMDB connection (or environment, as it is called in LMDB) should not be shared across processes.
However, the default method of Linux is fork, which shares the memory space of the parent process, and thus shares the LMDB environment. So, a LMDBCache instance should not be created before any possible fork, and should be created in each process separately.
The correct way to use LMDBCache:
- Pass cache configuration to each process
- Create the process-local LMDBCache instance in each process
Note: if you use spawn
, the LMDB environment will not be shared.
The folowing sections are examples to demonstrate both correct and incorrect usage patterns.
✅ Correct Usage
This example shows how to properly use LMDBCache with multiprocessing by creating separate cache instances within each worker process, avoiding shared state issues.
from alpenstock.lmdb_cache import LMDBCache
from pydantic import BaseModel
import multiprocessing as mp
import time
from random import random
import tempfile
class TaskDescriptor(BaseModel):
key: str # identifier for the computation task
result: str | None = None # result of the computation task
def worker(task_list: list[TaskDescriptor], cache_params) -> list[TaskDescriptor]:
print(f"Worker started with {len(task_list)} tasks.")
with LMDBCache(**cache_params) as cache:
for task in task_list:
cached_result = cache.get(task.key)
if cached_result is not None:
# If the result is already cached, skip computation
print(f"Cache hit for {task.key}, skipping computation.")
task.result = cached_result.result
else:
# Simulate some computation
result = f"Compute result for {task.key}"
task.result = result
time.sleep(0.1 + random() * 0.4) # Simulate variable computation time
# Store the result in the cache
print(f"Caching result for {task.key}")
cache.put(task.key, task)
return task_list
def main():
# Create tasks
tasks = [TaskDescriptor(key=f"task_{i}") for i in range(8)]
tasks.extend([TaskDescriptor(key=f"task_{i}") for i in range(3, 8)]) # Some tasks overlap
tasks.extend([TaskDescriptor(key=f"task_{i}") for i in range(10)]) # More overlapping tasks
# Split tasks into batches for parallel processing
batch_size = 3
batches = [tasks[i:i + batch_size] for i in range(0, len(tasks), batch_size)]
# Start processing
cache_params = {"path": tempfile.mkdtemp(), "map_size": (1024**2) * 512} # 512 MB cache
with mp.Pool(processes=4) as pool:
results = pool.starmap(worker, [(batch, cache_params) for batch in batches])
# Flatten the results
results = [task for batch in results for task in batch]
print("\n\nAll tasks processed. Results:")
for task in results:
print(f"{task.key}: {task.result}")
if __name__ == "__main__":
main()
❌ Incorrect Usage
Deliberately Closing a Shared LMDB Environment
This example demonstrates the problems that arise when deliberately closing a shared LMDB environment in a child process, leading to lmdb.BadRslotError
.
import lmdb
from alpenstock.lmdb_cache import LMDBCache
import multiprocessing as mp
import tempfile
global_cache = None
### Failure case 1: Deliberately closing the shared LMDB environment in a child process
def deliberately_close_shared_lmdb_in_child():
# Initialize the global_cache in the main process, which
# is then shared with the forked child processes.
temp_dir = tempfile.mkdtemp()
global global_cache
global_cache = LMDBCache(path=temp_dir)
global_cache.put("key1", "value1")
print(f"Global cache: key1 = {global_cache.get('key1')}")
# Make sure that fork is used for process creation
mp.set_start_method('fork', force=True)
# Fork a new process
process = mp.Process(target=child_proc_close_cache_intentionally)
process.start()
process.join()
# Check if the global_cache is still accessible in the main process
try:
print(f"Main process after child: key1 = {global_cache.get('key1')}")
except lmdb.BadRslotError as e:
print("The global cache is no longer accessible in the main process after the child process closed it.")
print(f"A `lmdb.BadRslotError` is raised: {e}")
def child_proc_close_cache_intentionally():
# Close the global cache in the child process
global global_cache
global_cache.env.close()
if __name__ == "__main__":
deliberately_close_shared_lmdb_in_child()
The origin of the problem is that when the child process closes the shared LMDB environment, it invalidates the file descriptor for the parent process as well, while the parent process is still trying to use it.
Unintentionally Closing a Shared LMDB Environment
Even when we take special care to avoid touching the shared LMDB environment, it can still be unintentionally closed through garbage collection in a child process, breaking the cache in the parent process.
import lmdb
from alpenstock.lmdb_cache import LMDBCache
import multiprocessing as mp
import tempfile
### Failure case 2:
### Unintentionally closing the shared LMDB environment in a child process
global_cache = None
def unintentionally_close_shared_lmdb_in_child():
# Initialize the global_cache in the main process, which
# is then shared with the forked child processes.
temp_dir = tempfile.mkdtemp()
global global_cache
global_cache = LMDBCache(path=temp_dir)
global_cache.put("key1", "value1")
print(f"Global cache: key1 = {global_cache.get('key1')}")
# Make sure that fork is used for process creation
mp.set_start_method("fork", force=True)
# Fork a new process
process = mp.Process(target=child_proc_unintentionally_close_cache)
process.start()
process.join()
# Check if the global_cache is still accessible in the main process
try:
print(f"Main process after child: key1 = {global_cache.get('key1')}")
except lmdb.BadRslotError as e:
print(
"The global cache is no longer accessible in the main process after the child process closed it."
)
print(f"A `lmdb.BadRslotError` is raised: {e}")
def child_proc_unintentionally_close_cache():
# Subjectively, the child process do not wish to break the global cache,
# deciding to nullify the global_cache variable so that it is no longer
# accessible.
global global_cache
global_cache = None
# However, this will cause the GC to close the LMDB environment and
# eventually raise a `lmdb.BadRslotError` in the main process. Here, we
# trigger a GC manually to emulate the behavior.
import gc
gc.collect()
if __name__ == "__main__":
unintentionally_close_shared_lmdb_in_child()
The problem arises from the fact that py-lmdb closes the LMDB environment in the __del__
(see lmdb.Environment.__del__
). As we cannot modify the py-lmdb
library, we cannot prevent this behavior.