Python with 语句¶
概述¶
Python 的 with 语句是上下文管理器(Context Manager)的语法糖,用于简化资源管理代码。它确保在使用资源后正确地进行清理工作,即使在使用过程中发生了异常。
为什么需要 with 语句?¶
传统资源管理的问题¶
在 Python 中,许多操作需要在使用后进行清理,比如: - 文件操作后需要关闭文件 - 数据库连接后需要关闭连接 - 线程锁使用后需要释放锁
传统的方式需要手动管理:
# 传统文件操作方式
file = None
try:
file = open('example.txt', 'r')
content = file.read()
# 处理文件内容
except IOError:
print("文件读取错误")
finally:
if file:
file.close() # 必须手动关闭文件
with 语句的优势¶
- 代码简洁:减少样板代码
- 异常安全:即使发生异常也能正确清理资源
- 可读性强:代码意图更清晰
- 减少错误:避免忘记关闭资源的问题
with 语句的基本语法¶
基本格式¶
多个上下文管理器¶
with 语句的工作原理¶
上下文管理器协议¶
with 语句依赖于上下文管理器对象,该对象必须实现以下两个方法:
__enter__(self):进入上下文时调用,返回值赋给as后面的变量__exit__(self, exc_type, exc_value, traceback):退出上下文时调用
执行流程¶
# with 语句的执行过程相当于:
manager = expression # 获取上下文管理器
variable = manager.__enter__() # 调用 __enter__ 方法
try:
# 执行代码块
...
finally:
# 调用 __exit__ 方法,确保资源被清理
manager.__exit__(exc_type, exc_value, traceback)
实际应用场景¶
1. 文件操作¶
基本文件操作¶
# 读取文件
with open('example.txt', 'r') as file:
content = file.read()
print(content)
# 文件自动关闭
# 写入文件
with open('output.txt', 'w') as file:
file.write('Hello, World!')
# 文件自动关闭
同时打开多个文件¶
# 同时打开多个文件进行读写操作
with open('input.txt', 'r') as infile, open('output.txt', 'w') as outfile:
content = infile.read()
outfile.write(content.upper())
# 两个文件都会自动关闭
处理二进制文件¶
# 处理二进制文件
with open('image.jpg', 'rb') as image_file:
image_data = image_file.read()
# 处理图像数据
# 写入二进制数据
with open('copy.jpg', 'wb') as copy_file:
copy_file.write(image_data)
文件操作的异常处理¶
try:
with open('nonexistent.txt', 'r') as file:
content = file.read()
except FileNotFoundError:
print("文件不存在")
except IOError:
print("文件读取错误")
# 即使发生异常,文件也会被正确关闭
2. 数据库连接¶
SQLite 数据库操作¶
import sqlite3
# 基本的数据库操作
with sqlite3.connect('database.db') as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM users')
results = cursor.fetchall()
for row in results:
print(row)
# 连接自动关闭
数据库事务管理¶
import sqlite3
# 使用 with 语句管理事务
def update_user_age(user_id, new_age):
with sqlite3.connect('database.db') as conn:
cursor = conn.cursor()
try:
cursor.execute('UPDATE users SET age = ? WHERE id = ?', (new_age, user_id))
# 如果发生异常,事务会自动回滚
conn.commit()
print("更新成功")
except sqlite3.Error as e:
print(f"更新失败: {e}")
conn.rollback()
多个数据库操作¶
import sqlite3
# 复杂的数据库操作
with sqlite3.connect('database.db') as conn:
cursor = conn.cursor()
# 创建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
age INTEGER
)
''')
# 插入数据
cursor.execute('INSERT INTO users (name, age) VALUES (?, ?)', ('Alice', 25))
cursor.execute('INSERT INTO users (name, age) VALUES (?, ?)', ('Bob', 30))
# 查询数据
cursor.execute('SELECT * FROM users')
users = cursor.fetchall()
conn.commit()
for user in users:
print(f"ID: {user[0]}, Name: {user[1]}, Age: {user[2]}")
3. 线程锁¶
基本锁操作¶
import threading
# 创建锁对象
lock = threading.Lock()
# 使用 with 语句管理锁
shared_resource = 0
def increment():
global shared_resource
with lock:
# 临界区代码 - 线程安全
shared_resource += 1
print(f"当前值: {shared_resource}")
# 创建多个线程
threads = []
for i in range(5):
thread = threading.Thread(target=increment)
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
print(f"最终结果: {shared_resource}")
可重入锁(RLock)¶
import threading
# 可重入锁允许同一线程多次获取锁
rlock = threading.RLock()
def recursive_function(count):
with rlock:
print(f"进入递归函数,计数: {count}")
if count > 0:
recursive_function(count - 1)
print(f"退出递归函数,计数: {count}")
# 同一线程可以多次获取可重入锁
thread = threading.Thread(target=recursive_function, args=(3,))
thread.start()
thread.join()
条件变量¶
import threading
import time
# 使用条件变量进行线程间通信
condition = threading.Condition()
shared_list = []
def producer():
with condition:
print("生产者开始工作")
time.sleep(1) # 模拟生产时间
shared_list.append("新产品")
print("生产者生产完成,通知消费者")
condition.notify() # 通知等待的消费者
def consumer():
with condition:
print("消费者等待产品")
condition.wait() # 等待生产者通知
if shared_list:
item = shared_list.pop()
print(f"消费者消费: {item}")
# 创建生产者和消费者线程
prod_thread = threading.Thread(target=producer)
cons_thread = threading.Thread(target=consumer)
cons_thread.start()
prod_thread.start()
prod_thread.join()
cons_thread.join()
4. 临时修改系统状态¶
decimal 模块的高精度计算¶
import decimal
# 正常精度计算
result1 = decimal.Decimal('1') / decimal.Decimal('3')
print(f"正常精度: {result1}") # 0.3333333333333333333333333333
# 临时设置高精度
with decimal.localcontext() as ctx:
ctx.prec = 50 # 临时设置50位精度
result2 = decimal.Decimal('1') / decimal.Decimal('3')
print(f"高精度: {result2}") # 0.33333333333333333333333333333333333333333333333333
# 精度恢复原设置
result3 = decimal.Decimal('1') / decimal.Decimal('3')
print(f"恢复后精度: {result3}") # 0.3333333333333333333333333333
临时修改环境变量¶
import os
class TemporaryEnvironment:
"""临时修改环境变量的上下文管理器"""
def __init__(self, **kwargs):
self.new_env = kwargs
self.old_env = {}
def __enter__(self):
# 保存旧值并设置新值
for key, value in self.new_env.items():
self.old_env[key] = os.environ.get(key)
os.environ[key] = str(value)
return self
def __exit__(self, exc_type, exc_value, traceback):
# 恢复旧值
for key, old_value in self.old_env.items():
if old_value is None:
if key in os.environ:
del os.environ[key]
else:
os.environ[key] = old_value
# 使用临时环境变量
print("原始PATH:", os.environ.get('PATH', '未设置'))
with TemporaryEnvironment(TEMP_VAR="临时值", ANOTHER_VAR="另一个值"):
print("临时TEMP_VAR:", os.environ.get('TEMP_VAR'))
print("临时ANOTHER_VAR:", os.environ.get('ANOTHER_VAR'))
print("恢复后TEMP_VAR:", os.environ.get('TEMP_VAR', '未设置'))
临时修改工作目录¶
import os
class TemporaryDirectory:
"""临时切换工作目录的上下文管理器"""
def __init__(self, new_dir):
self.new_dir = new_dir
self.old_dir = None
def __enter__(self):
self.old_dir = os.getcwd()
os.chdir(self.new_dir)
return self
def __exit__(self, exc_type, exc_value, traceback):
os.chdir(self.old_dir)
# 使用临时目录
print("当前目录:", os.getcwd())
with TemporaryDirectory('/tmp'):
print("临时目录:", os.getcwd())
# 在这里执行需要特定目录的操作
print("恢复后目录:", os.getcwd())
创建自定义的上下文管理器¶
方法一:使用类实现¶
基本自定义上下文管理器¶
class FileManager:
"""自定义文件管理器"""
def __init__(self, filename, mode):
self.filename = filename
self.mode = mode
self.file = None
def __enter__(self):
self.file = open(self.filename, self.mode)
return self.file
def __exit__(self, exc_type, exc_value, traceback):
if self.file:
self.file.close()
# 如果返回True,异常会被抑制
# 如果返回False或None,异常会继续传播
return False
# 使用自定义文件管理器
with FileManager('example.txt', 'w') as f:
f.write('Hello, Custom Context Manager!')
带异常处理的自定义管理器¶
class DatabaseConnection:
"""自定义数据库连接管理器"""
def __init__(self, db_url):
self.db_url = db_url
self.connection = None
def __enter__(self):
try:
self.connection = sqlite3.connect(self.db_url)
print("数据库连接成功")
return self.connection
except sqlite3.Error as e:
print(f"数据库连接失败: {e}")
raise
def __exit__(self, exc_type, exc_value, traceback):
if self.connection:
self.connection.close()
print("数据库连接已关闭")
# 处理特定异常
if exc_type is sqlite3.IntegrityError:
print("完整性约束错误,已处理")
return True # 抑制异常
return False # 其他异常继续传播
# 使用自定义数据库管理器
try:
with DatabaseConnection('test.db') as conn:
cursor = conn.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS test (id INTEGER PRIMARY KEY, name TEXT)")
cursor.execute("INSERT INTO test (name) VALUES ('Alice')")
conn.commit()
except sqlite3.Error as e:
print(f"发生错误: {e}")
带资源统计的上下文管理器¶
import time
class Timer:
"""计时器上下文管理器"""
def __enter__(self):
self.start_time = time.time()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.end_time = time.time()
self.elapsed = self.end_time - self.start_time
print(f"代码执行时间: {self.elapsed:.4f}秒")
def get_elapsed_time(self):
return time.time() - self.start_time
# 使用计时器
with Timer() as timer:
# 模拟耗时操作
time.sleep(2)
current_time = timer.get_elapsed_time()
print(f"当前已执行: {current_time:.2f}秒")
方法二:使用 contextlib 模块¶
使用 contextlib.contextmanager 装饰器¶
from contextlib import contextmanager
@contextmanager
def file_manager(filename, mode):
"""使用生成器创建上下文管理器"""
try:
file = open(filename, mode)
print(f"文件 {filename} 已打开")
yield file # 这是 __enter__ 返回的值
except Exception as e:
print(f"发生错误: {e}")
raise
finally:
file.close()
print(f"文件 {filename} 已关闭")
# 使用生成器上下文管理器
with file_manager('example.txt', 'w') as f:
f.write('Hello from contextmanager!')
带参数和异常处理的生成器管理器¶
from contextlib import contextmanager
@contextmanager
def temporary_file_change(filename, new_content):
"""临时修改文件内容,完成后恢复"""
# 备份原内容
try:
with open(filename, 'r') as f:
original_content = f.read()
except FileNotFoundError:
original_content = None
# 写入新内容
try:
with open(filename, 'w') as f:
f.write(new_content)
print(f"文件 {filename} 内容已临时修改")
yield # 执行代码块
except Exception as e:
print(f"执行过程中发生错误: {e}")
raise
finally:
# 恢复原内容
if original_content is not None:
with open(filename, 'w') as f:
f.write(original_content)
print(f"文件 {filename} 内容已恢复")
else:
# 如果文件原本不存在,则删除创建的文件
import os
if os.path.exists(filename):
os.remove(filename)
print(f"临时文件 {filename} 已删除")
# 使用临时文件修改管理器
with temporary_file_change('temp.txt', '这是临时内容'):
with open('temp.txt', 'r') as f:
content = f.read()
print(f"临时内容: {content}")
# 在这里执行需要临时文件内容的操作
使用 contextlib 的其他工具¶
from contextlib import suppress, redirect_stdout, redirect_stderr
import io
# suppress - 抑制特定异常
with suppress(FileNotFoundError):
with open('nonexistent.txt') as f:
content = f.read()
print("如果文件不存在,这行不会执行")
print("程序继续执行")
# redirect_stdout - 重定向标准输出
output = io.StringIO()
with redirect_stdout(output):
print("这行输出会被重定向")
print("不会显示在控制台")
print("重定向的输出:", output.getvalue())
# redirect_stderr - 重定向标准错误
error_output = io.StringIO()
with redirect_stderr(error_output):
import sys
print("这是一个错误消息", file=sys.stderr)
print("重定向的错误输出:", error_output.getvalue())
高级用法和最佳实践¶
嵌套上下文管理器¶
# 嵌套使用多个上下文管理器
with open('input.txt', 'r') as infile:
with open('output.txt', 'w') as outfile:
with open('log.txt', 'a') as logfile:
content = infile.read()
outfile.write(content.upper())
logfile.write(f"文件处理完成: {len(content)} 字符\n")
# 等效的简洁写法
with open('input.txt', 'r') as infile, \
open('output.txt', 'w') as outfile, \
open('log.txt', 'a') as logfile:
content = infile.read()
outfile.write(content.upper())
logfile.write(f"文件处理完成: {len(content)} 字符\n")
上下文管理器的组合使用¶
from contextlib import ExitStack
# 使用 ExitStack 管理动态数量的上下文管理器
def process_files(file_list):
with ExitStack() as stack:
files = []
for filename in file_list:
file = stack.enter_context(open(filename, 'r'))
files.append(file)
# 处理所有文件
for file in files:
content = file.read()
print(f"{file.name}: {len(content)} 字符")
# 所有文件会自动关闭
# 使用动态文件列表
file_list = ['file1.txt', 'file2.txt', 'file3.txt']
process_files(file_list)
错误处理和资源清理¶
class SafeResourceManager:
"""安全的资源管理器,确保资源被正确清理"""
def __init__(self, resource_name):
self.resource_name = resource_name
self.resource = None
self.acquired = False
def __enter__(self):
try:
# 模拟资源获取
print(f"获取资源: {self.resource_name}")
self.resource = f"Resource_{self.resource_name}"
self.acquired = True
return self.resource
except Exception as e:
print(f"获取资源失败: {e}")
self.cleanup()
raise
def __exit__(self, exc_type, exc_value, traceback):
self.cleanup()
# 记录异常信息但不抑制
if exc_type is not None:
print(f"在资源使用过程中发生异常: {exc_type.__name__}: {exc_value}")
return False # 异常继续传播
def cleanup(self):
if self.acquired and self.resource:
print(f"释放资源: {self.resource_name}")
self.resource = None
self.acquired = False
# 测试安全资源管理器
try:
with SafeResourceManager("Database") as db:
print(f"使用资源: {db}")
# 模拟异常
raise ValueError("测试异常")
except ValueError as e:
print(f"捕获到异常: {e}")
print("程序继续执行")