Hello everyone, I'm your Python friend. Recently while working on projects, I noticed many developers encountering various issues when handling asynchronous tasks. Some report task pile-ups, others complain about high memory usage, and some find error tracking particularly challenging... Today, let's discuss Python asynchronous tasks and explore how to properly use asynchronous programming.
Speaking of asynchronous programming, you might ask: Why do we need it? Is it really necessary?
Let me give you a real-life example. Imagine you're at a restaurant - in synchronous mode, you'd have to wait doing nothing after ordering until all dishes arrive before you can start eating. But in asynchronous mode, you can start with appetizers while hot dishes arrive one by one, eating and chatting - much more comfortable.
It's the same in the programming world. When our program needs to handle lots of IO operations (like file reading/writing, network requests), using synchronous methods would waste time waiting, just like sitting idle in the restaurant. With asynchronous programming, we can process other tasks while waiting for one task to complete, greatly improving program efficiency.
Before we start coding, let's understand several important concepts:
import asyncio
async def hello():
print("Starting execution")
await asyncio.sleep(1) # Simulating time-consuming operation
print("Execution complete")
async def main():
await hello()
asyncio.run(main())
This code demonstrates the most basic asynchronous programming structure. async def
defines a coroutine, and await
is used to wait for a coroutine to complete.
Let's start with a simple asynchronous downloader:
import asyncio
import aiohttp
import time
async def download_file(session, url, file_name):
try:
async with session.get(url) as response:
content = await response.read()
with open(file_name, 'wb') as f:
f.write(content)
return len(content)
except Exception as e:
print(f"Error downloading {url}: {e}")
return 0
async def main():
urls = [
"http://example.com/file1.txt",
"http://example.com/file2.txt",
"http://example.com/file3.txt"
]
start_time = time.time()
async with aiohttp.ClientSession() as session:
tasks = []
for i, url in enumerate(urls):
task = download_file(session, url, f"file_{i}.txt")
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
print(f"Total downloaded {sum(results)} bytes")
print(f"Time taken: {end_time - start_time:.2f} seconds")
While this version works, there's room for improvement. For instance, it lacks concurrency limits, retry mechanisms, and error handling is quite basic.
import asyncio
import aiohttp
import time
from typing import List, Dict
import logging
class DownloadManager:
def __init__(self, max_concurrency: int = 5, max_retries: int = 3):
self.semaphore = asyncio.Semaphore(max_concurrency)
self.max_retries = max_retries
self.session = None
self.download_stats: Dict[str, int] = {}
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
async def download_with_retry(self, url: str, file_name: str) -> int:
for attempt in range(self.max_retries):
try:
async with self.semaphore: # Limit concurrency
logging.info(f"Starting download {url} (Attempt {attempt + 1})")
async with self.session.get(url) as response:
response.raise_for_status()
content = await response.read()
# Write file
async with aiofiles.open(file_name, 'wb') as f:
await f.write(content)
size = len(content)
self.download_stats[url] = size
logging.info(f"Successfully downloaded {url}, size: {size} bytes")
return size
except Exception as e:
logging.error(f"Failed to download {url}: {str(e)}")
if attempt == self.max_retries - 1:
logging.error(f"Maximum retry attempts reached for {url}")
self.download_stats[url] = 0
return 0
await asyncio.sleep(2 ** attempt) # Exponential backoff
async def download_files(self, urls: List[str]) -> Dict[str, int]:
async with aiohttp.ClientSession() as session:
self.session = session
tasks = []
for i, url in enumerate(urls):
file_name = f"download_{i}.file"
task = self.download_with_retry(url, file_name)
tasks.append(task)
results = await asyncio.gather(*tasks)
return self.download_stats
async def main():
urls = [
"http://example.com/file1",
"http://example.com/file2",
"http://example.com/file3",
# ... more URLs
]
manager = DownloadManager(max_concurrency=3, max_retries=3)
stats = await manager.download_files(urls)
# Print statistics
total_size = sum(stats.values())
successful_downloads = len([s for s in stats.values() if s > 0])
print(f"
Download Statistics:")
print(f"Total download size: {total_size} bytes")
print(f"Successful downloads: {successful_downloads}/{len(urls)}")
This advanced version adds many practical features:
import asyncio
import aiohttp
import aiofiles
import logging
from typing import List, Dict, Any, Callable
from datetime import datetime
import json
from dataclasses import dataclass
from enum import Enum
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class TaskInfo:
id: str
status: TaskStatus
start_time: datetime
end_time: datetime = None
result: Any = None
error: str = None
retries: int = 0
class AsyncTaskManager:
def __init__(
self,
max_concurrency: int = 5,
max_retries: int = 3,
cache_enabled: bool = True,
cache_file: str = "task_cache.json"
):
self.semaphore = asyncio.Semaphore(max_concurrency)
self.max_retries = max_retries
self.tasks: Dict[str, TaskInfo] = {}
self.cache_enabled = cache_enabled
self.cache_file = cache_file
self.result_cache = {}
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('async_tasks.log'),
logging.StreamHandler()
]
)
if cache_enabled:
self._load_cache()
def _load_cache(self):
try:
with open(self.cache_file, 'r') as f:
self.result_cache = json.load(f)
except FileNotFoundError:
self.result_cache = {}
def _save_cache(self):
with open(self.cache_file, 'w') as f:
json.dump(self.result_cache, f)
async def execute_task(
self,
task_id: str,
func: Callable,
*args,
**kwargs
) -> Any:
# Check cache
cache_key = f"{task_id}_{str(args)}_{str(kwargs)}"
if self.cache_enabled and cache_key in self.result_cache:
logging.info(f"Task {task_id} hit cache")
return self.result_cache[cache_key]
# Create task info
self.tasks[task_id] = TaskInfo(
id=task_id,
status=TaskStatus.PENDING,
start_time=datetime.now()
)
for attempt in range(self.max_retries):
try:
async with self.semaphore:
self.tasks[task_id].status = TaskStatus.RUNNING
self.tasks[task_id].retries = attempt
# Execute task
result = await func(*args, **kwargs)
# Update task info
self.tasks[task_id].status = TaskStatus.COMPLETED
self.tasks[task_id].end_time = datetime.now()
self.tasks[task_id].result = result
# Cache result
if self.cache_enabled:
self.result_cache[cache_key] = result
self._save_cache()
return result
except Exception as e:
logging.error(f"Task {task_id} failed: {str(e)}")
if attempt == self.max_retries - 1:
self.tasks[task_id].status = TaskStatus.FAILED
self.tasks[task_id].end_time = datetime.now()
self.tasks[task_id].error = str(e)
raise
await asyncio.sleep(2 ** attempt)
def get_task_status(self, task_id: str) -> TaskInfo:
return self.tasks.get(task_id)
def get_all_tasks(self) -> Dict[str, TaskInfo]:
return self.tasks
async def cleanup(self):
# Clean up expired cache and task info
pass
async def example_usage():
async def download_file(url: str) -> int:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
content = await response.read()
return len(content)
manager = AsyncTaskManager(max_concurrency=3)
# Create multiple tasks
urls = [
"http://example.com/file1",
"http://example.com/file2",
"http://example.com/file3"
]
tasks = []
for i, url in enumerate(urls):
task = manager.execute_task(f"download_{i}", download_file, url)
tasks.append(task)
# Wait for all tasks to complete
results = await asyncio.gather(*tasks, return_exceptions=True)
# Print task status
for task_id, task_info in manager.get_all_tasks().items():
print(f"
Task {task_id}:")
print(f"Status: {task_info.status}")
print(f"Start time: {task_info.start_time}")
print(f"End time: {task_info.end_time}")
print(f"Result: {task_info.result}")
print(f"Error: {task_info.error}")
print(f"Retries: {task_info.retries}")
if __name__ == "__main__":
asyncio.run(example_usage())
This ultimate version adds more enterprise-level features:
Memory management is crucial when handling many asynchronous tasks. Here are some suggestions:
Proper task scheduling can greatly improve system performance:
Well, that's all about Python asynchronous tasks. What do you think? Did it help solve problems you've encountered in actual development?
Remember, asynchronous programming isn't a silver bullet. When deciding whether to use it, consider your specific scenario. If your application is primarily CPU-intensive, multiprocessing might be a better choice.
If you encounter other issues in practice, feel free to discuss in the comments. Let's learn and improve together.
Finally, I hope this article has been helpful. Remember, programming is a practical art - write more code, think more, and you'll definitely become better and better.