Using LoguruInitializer
with multiprocessing¶
Introduction¶
This notebook demonstrates how to properly use the LoguruInitializer
with multiprocessing tasks. We'll cover two main approaches:
- Creating sub-processes manually - Where each worker process initializes its own logger
- Working with a process pool - Where we use a pool initializer to set up logging for all worker processes
The key concepts:
- Each worker process needs its own logger initialization
- Use
logger.complete()
to ensure all log messages are flushed - Be aware of platform-specific considerations (Linux vs Windows) and start methods (fork vs spawn/forkserver)
Example 1: Creating Sub-processes Manually¶
In this example, we create worker processes manually using multiprocessing.Process
. Each worker process initializes its own logger instance.
Key Points:
- Each worker process calls
LoguruInitalizer()
independently - Use
logger.complete()
to ensure all log messages are flushed before the process terminates - Each process can have its own log file (using the PID in the filename)
Compatibility:
- Works seamlessly with all start methods (spawn, fork, forkserver)
- However, because Jupyter notebooks requires
fork
, users should copy the codes to a separate Python script and run it outside of Jupyter for full compatibility with all start methods.
In [1]:
Copied!
# Import required libraries
from alpenstock.logging import LoguruInitalizer, logger
import multiprocessing as mp
from pathlib import Path
import os
import tempfile
import time
from random import random
print("Libraries imported successfully!")
# Import required libraries
from alpenstock.logging import LoguruInitalizer, logger
import multiprocessing as mp
from pathlib import Path
import os
import tempfile
import time
from random import random
print("Libraries imported successfully!")
Libraries imported successfully!
In [2]:
Copied!
def sub_proc_task(tmp_dir: str):
"""
Worker function that will be executed in a separate process.
Each worker initializes its own logger and logs to a separate file.
"""
tmp_dir = Path(tmp_dir)
pid = os.getpid()
# Initialize logger once per worker process
(
LoguruInitalizer()
.preset_full() # Use preset_full to be distinct from the main process
.serialize_to_file(tmp_dir / f"log-{pid}.log")
.initialize(on_reinitialize="abort")
# If the logger is already initialized in a process, `abort` will raise
# an error to let you know.
)
logger.info(f"Worker {pid} started")
time.sleep(random() * 0.05)
logger.debug(f"Worker {pid} is doing some work")
time.sleep(random() * 0.05)
logger.info(f"Worker {pid} finished work")
# Without this line, some log messages may be missing
logger.complete()
def sub_proc_task(tmp_dir: str):
"""
Worker function that will be executed in a separate process.
Each worker initializes its own logger and logs to a separate file.
"""
tmp_dir = Path(tmp_dir)
pid = os.getpid()
# Initialize logger once per worker process
(
LoguruInitalizer()
.preset_full() # Use preset_full to be distinct from the main process
.serialize_to_file(tmp_dir / f"log-{pid}.log")
.initialize(on_reinitialize="abort")
# If the logger is already initialized in a process, `abort` will raise
# an error to let you know.
)
logger.info(f"Worker {pid} started")
time.sleep(random() * 0.05)
logger.debug(f"Worker {pid} is doing some work")
time.sleep(random() * 0.05)
logger.info(f"Worker {pid} finished work")
# Without this line, some log messages may be missing
logger.complete()
In [3]:
Copied!
def create_sub_proc_manually():
"""
Main function that creates and manages worker processes manually.
"""
# Initialize logger in the main process
LoguruInitalizer().preset_brief().initialize(on_reinitialize="abort")
tmp_dir = tempfile.mkdtemp(prefix="alpenstock-")
logger.info(f"Temporary directory created: {tmp_dir}")
# Compatible with spawn, fork and forkserver methods
mp.set_start_method("fork", force=True)
processes = []
for _ in range(4): # Create 4 worker processes
p = mp.Process(target=sub_proc_task, args=(tmp_dir,))
p.start()
processes.append(p)
for p in processes:
p.join() # Wait for all processes to finish
logger.info("All worker processes have completed.")
# Show the log files that were created
log_files = list(Path(tmp_dir).glob("*"))
logger.info(f"Log files created: {[f.name for f in log_files]}")
return tmp_dir
def create_sub_proc_manually():
"""
Main function that creates and manages worker processes manually.
"""
# Initialize logger in the main process
LoguruInitalizer().preset_brief().initialize(on_reinitialize="abort")
tmp_dir = tempfile.mkdtemp(prefix="alpenstock-")
logger.info(f"Temporary directory created: {tmp_dir}")
# Compatible with spawn, fork and forkserver methods
mp.set_start_method("fork", force=True)
processes = []
for _ in range(4): # Create 4 worker processes
p = mp.Process(target=sub_proc_task, args=(tmp_dir,))
p.start()
processes.append(p)
for p in processes:
p.join() # Wait for all processes to finish
logger.info("All worker processes have completed.")
# Show the log files that were created
log_files = list(Path(tmp_dir).glob("*"))
logger.info(f"Log files created: {[f.name for f in log_files]}")
return tmp_dir
In [4]:
Copied!
# Run Example 1: Manual process creation
create_sub_proc_manually()
# Run Example 1: Manual process creation
create_sub_proc_manually()
07-07 13:58:32|INFO |Temporary directory created: /tmp/alpenstock-dcjjoufa
07-07 13:58:32|INFO |2509924454:sub_proc_task:19|MainThread|Worker 134364 started 07-07 13:58:32|INFO |2509924454:sub_proc_task:19|MainThread|Worker 134365 started 07-07 13:58:32|INFO |2509924454:sub_proc_task:19|MainThread|Worker 134366 started 07-07 13:58:32|INFO |2509924454:sub_proc_task:19|MainThread|Worker 134368 started 07-07 13:58:32|DEBUG |2509924454:sub_proc_task:21|MainThread|Worker 134364 is doing some work 07-07 13:58:32|DEBUG |2509924454:sub_proc_task:21|MainThread|Worker 134365 is doing some work 07-07 13:58:32|DEBUG |2509924454:sub_proc_task:21|MainThread|Worker 134368 is doing some work 07-07 13:58:32|DEBUG |2509924454:sub_proc_task:21|MainThread|Worker 134366 is doing some work 07-07 13:58:32|INFO |2509924454:sub_proc_task:23|MainThread|Worker 134365 finished work 07-07 13:58:32|INFO |2509924454:sub_proc_task:23|MainThread|Worker 134364 finished work 07-07 13:58:32|INFO |2509924454:sub_proc_task:23|MainThread|Worker 134368 finished work 07-07 13:58:32|INFO |2509924454:sub_proc_task:23|MainThread|Worker 134366 finished work 07-07 13:58:32|INFO |All worker processes have completed. 07-07 13:58:32|INFO |Log files created: ['log-134364.log', 'log-134365.log', 'log-134366.log', 'log-134368.log']
Out[4]:
'/tmp/alpenstock-dcjjoufa'
Example 2: Working with a Process Pool¶
In this approach, we use multiprocessing.Pool
to manage a pool of worker processes. This is more efficient for many small tasks.
Key Points:
- Use a pool initializer function to set up logging for all worker processes
- DO NOT initialize the logger in the task function itself, as pool processes may be reused
- Be aware of platform-specific considerations:
- Linux + spawn/forkserver + mp.Pool: May require disabling enqueue mode to avoid semaphore leaks
- Windows + spawn + mp.Pool: Works without issues
- Linux + fork + mp.Pool: Works without issues (fork is preferred on Linux)
Compatibility:
The combination of Linux + spawn/forkserver + mp.Pool can lead to internal semaphore leaks inside the loguru
. Luckily, this is uncommon in practice since fork is the preferred method on Linux.
In [5]:
Copied!
def pool_task(x):
"""
Task function that will be executed by worker processes in the pool.
IMPORTANT: DO NOT initialize logger here, as a process pool may reuse
the same worker process for multiple tasks.
"""
time.sleep(0.05 + 0.05 * random()) # Simulate some work
pid = os.getpid()
logger.info(f"Worker {pid} processing {x}")
logger.complete() # Ensure all log messages are flushed
return x * x # Example computation
def pool_task(x):
"""
Task function that will be executed by worker processes in the pool.
IMPORTANT: DO NOT initialize logger here, as a process pool may reuse
the same worker process for multiple tasks.
"""
time.sleep(0.05 + 0.05 * random()) # Simulate some work
pid = os.getpid()
logger.info(f"Worker {pid} processing {x}")
logger.complete() # Ensure all log messages are flushed
return x * x # Example computation
In [6]:
Copied!
def worker_initializer():
"""
Initializer function that sets up logging for each worker process in the pool.
This is called once per worker process when the pool is created.
"""
# Initialize logger here (once per worker process)
(
LoguruInitalizer()
.preset_brief()
# Disable enqueue for the combination of `Linux` + `spawn`/`forkserver`
# + `mp.Pool`, or the internal semaphores of Loguru will not be cleaned
# up properly, and a warning message will be printed at shutdown like
# this:
#
# resource_tracker.py:301: UserWarning: resource_tracker: There appear
# to be 8 leaked semaphore objects to clean up at shutdown: ...
#
# .set_enqueue(False)
.initialize(on_reinitialize="abort")
)
def worker_initializer():
"""
Initializer function that sets up logging for each worker process in the pool.
This is called once per worker process when the pool is created.
"""
# Initialize logger here (once per worker process)
(
LoguruInitalizer()
.preset_brief()
# Disable enqueue for the combination of `Linux` + `spawn`/`forkserver`
# + `mp.Pool`, or the internal semaphores of Loguru will not be cleaned
# up properly, and a warning message will be printed at shutdown like
# this:
#
# resource_tracker.py:301: UserWarning: resource_tracker: There appear
# to be 8 leaked semaphore objects to clean up at shutdown: ...
#
# .set_enqueue(False)
.initialize(on_reinitialize="abort")
)
In [7]:
Copied!
def demo_with_proc_pool():
"""
Main function that demonstrates using LoguruInitializer with a process pool.
"""
# Initialize logger in the main process
(
LoguruInitalizer()
.preset_brief()
.initialize(on_reinitialize="overwrite")
# Because we have initailized the logger in the previous example,
# so we can use `overwrite` here to allow reinitialization.
)
# You can force a specific start method for testing
mp.set_start_method("fork", force=True)
with mp.Pool(
processes=4,
initializer=worker_initializer, # Register the initializer function here
) as pool:
# Process a range of numbers
results = pool.map(pool_task, range(10))
logger.info(f"Results: {results}")
logger.info("Process pool has completed.")
def demo_with_proc_pool():
"""
Main function that demonstrates using LoguruInitializer with a process pool.
"""
# Initialize logger in the main process
(
LoguruInitalizer()
.preset_brief()
.initialize(on_reinitialize="overwrite")
# Because we have initailized the logger in the previous example,
# so we can use `overwrite` here to allow reinitialization.
)
# You can force a specific start method for testing
mp.set_start_method("fork", force=True)
with mp.Pool(
processes=4,
initializer=worker_initializer, # Register the initializer function here
) as pool:
# Process a range of numbers
results = pool.map(pool_task, range(10))
logger.info(f"Results: {results}")
logger.info("Process pool has completed.")
In [8]:
Copied!
# Run Example 2: Process pool demo
demo_with_proc_pool()
# Run Example 2: Process pool demo
demo_with_proc_pool()
07-07 13:58:32|INFO |Worker 134381 processing 3 07-07 13:58:32|INFO |Worker 134378 processing 1 07-07 13:58:32|INFO |Worker 134377 processing 0 07-07 13:58:32|INFO |Worker 134379 processing 2 07-07 13:58:32|INFO |Worker 134378 processing 5 07-07 13:58:32|INFO |Worker 134377 processing 6 07-07 13:58:32|INFO |Worker 134381 processing 4 07-07 13:58:32|INFO |Worker 134379 processing 7 07-07 13:58:32|INFO |Worker 134378 processing 8 07-07 13:58:32|INFO |Worker 134377 processing 9 07-07 13:58:32|INFO |Results: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] 07-07 13:58:32|INFO |Process pool has completed.