跳转至

Python with 语句

概述

Python 的 with 语句是上下文管理器(Context Manager)的语法糖,用于简化资源管理代码。它确保在使用资源后正确地进行清理工作,即使在使用过程中发生了异常。

为什么需要 with 语句?

传统资源管理的问题

在 Python 中,许多操作需要在使用后进行清理,比如: - 文件操作后需要关闭文件 - 数据库连接后需要关闭连接 - 线程锁使用后需要释放锁

传统的方式需要手动管理:

# 传统文件操作方式
file = None
try:
    file = open('example.txt', 'r')
    content = file.read()
    # 处理文件内容
except IOError:
    print("文件读取错误")
finally:
    if file:
        file.close()  # 必须手动关闭文件

with 语句的优势

  1. 代码简洁:减少样板代码
  2. 异常安全:即使发生异常也能正确清理资源
  3. 可读性强:代码意图更清晰
  4. 减少错误:避免忘记关闭资源的问题

with 语句的基本语法

基本格式

with expression [as variable]:
    # 代码块
    # 资源在这里使用
# 离开 with 块时自动清理资源

多个上下文管理器

with expression1 as var1, expression2 as var2:
    # 同时使用多个资源

with 语句的工作原理

上下文管理器协议

with 语句依赖于上下文管理器对象,该对象必须实现以下两个方法:

  1. __enter__(self):进入上下文时调用,返回值赋给 as 后面的变量
  2. __exit__(self, exc_type, exc_value, traceback):退出上下文时调用

执行流程

# with 语句的执行过程相当于:
manager = expression  # 获取上下文管理器
variable = manager.__enter__()  # 调用 __enter__ 方法
try:
    # 执行代码块
    ...
finally:
    # 调用 __exit__ 方法,确保资源被清理
    manager.__exit__(exc_type, exc_value, traceback)

实际应用场景

1. 文件操作

基本文件操作

# 读取文件
with open('example.txt', 'r') as file:
    content = file.read()
    print(content)
# 文件自动关闭

# 写入文件
with open('output.txt', 'w') as file:
    file.write('Hello, World!')
# 文件自动关闭

同时打开多个文件

# 同时打开多个文件进行读写操作
with open('input.txt', 'r') as infile, open('output.txt', 'w') as outfile:
    content = infile.read()
    outfile.write(content.upper())
# 两个文件都会自动关闭

处理二进制文件

# 处理二进制文件
with open('image.jpg', 'rb') as image_file:
    image_data = image_file.read()
    # 处理图像数据

# 写入二进制数据
with open('copy.jpg', 'wb') as copy_file:
    copy_file.write(image_data)

文件操作的异常处理

try:
    with open('nonexistent.txt', 'r') as file:
        content = file.read()
except FileNotFoundError:
    print("文件不存在")
except IOError:
    print("文件读取错误")
# 即使发生异常,文件也会被正确关闭

2. 数据库连接

SQLite 数据库操作

import sqlite3

# 基本的数据库操作
with sqlite3.connect('database.db') as conn:
    cursor = conn.cursor()
    cursor.execute('SELECT * FROM users')
    results = cursor.fetchall()
    for row in results:
        print(row)
# 连接自动关闭

数据库事务管理

import sqlite3

# 使用 with 语句管理事务
def update_user_age(user_id, new_age):
    with sqlite3.connect('database.db') as conn:
        cursor = conn.cursor()
        try:
            cursor.execute('UPDATE users SET age = ? WHERE id = ?', (new_age, user_id))
            # 如果发生异常,事务会自动回滚
            conn.commit()
            print("更新成功")
        except sqlite3.Error as e:
            print(f"更新失败: {e}")
            conn.rollback()

多个数据库操作

import sqlite3

# 复杂的数据库操作
with sqlite3.connect('database.db') as conn:
    cursor = conn.cursor()

    # 创建表
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY,
            name TEXT NOT NULL,
            age INTEGER
        )
    ''')

    # 插入数据
    cursor.execute('INSERT INTO users (name, age) VALUES (?, ?)', ('Alice', 25))
    cursor.execute('INSERT INTO users (name, age) VALUES (?, ?)', ('Bob', 30))

    # 查询数据
    cursor.execute('SELECT * FROM users')
    users = cursor.fetchall()

    conn.commit()

    for user in users:
        print(f"ID: {user[0]}, Name: {user[1]}, Age: {user[2]}")

3. 线程锁

基本锁操作

import threading

# 创建锁对象
lock = threading.Lock()

# 使用 with 语句管理锁
shared_resource = 0

def increment():
    global shared_resource
    with lock:
        # 临界区代码 - 线程安全
        shared_resource += 1
        print(f"当前值: {shared_resource}")

# 创建多个线程
threads = []
for i in range(5):
    thread = threading.Thread(target=increment)
    threads.append(thread)
    thread.start()

# 等待所有线程完成
for thread in threads:
    thread.join()

print(f"最终结果: {shared_resource}")

可重入锁(RLock)

import threading

# 可重入锁允许同一线程多次获取锁
rlock = threading.RLock()

def recursive_function(count):
    with rlock:
        print(f"进入递归函数,计数: {count}")
        if count > 0:
            recursive_function(count - 1)
        print(f"退出递归函数,计数: {count}")

# 同一线程可以多次获取可重入锁
thread = threading.Thread(target=recursive_function, args=(3,))
thread.start()
thread.join()

条件变量

import threading
import time

# 使用条件变量进行线程间通信
condition = threading.Condition()
shared_list = []

def producer():
    with condition:
        print("生产者开始工作")
        time.sleep(1)  # 模拟生产时间
        shared_list.append("新产品")
        print("生产者生产完成,通知消费者")
        condition.notify()  # 通知等待的消费者

def consumer():
    with condition:
        print("消费者等待产品")
        condition.wait()  # 等待生产者通知
        if shared_list:
            item = shared_list.pop()
            print(f"消费者消费: {item}")

# 创建生产者和消费者线程
prod_thread = threading.Thread(target=producer)
cons_thread = threading.Thread(target=consumer)

cons_thread.start()
prod_thread.start()

prod_thread.join()
cons_thread.join()

4. 临时修改系统状态

decimal 模块的高精度计算

import decimal

# 正常精度计算
result1 = decimal.Decimal('1') / decimal.Decimal('3')
print(f"正常精度: {result1}")  # 0.3333333333333333333333333333

# 临时设置高精度
with decimal.localcontext() as ctx:
    ctx.prec = 50  # 临时设置50位精度
    result2 = decimal.Decimal('1') / decimal.Decimal('3')
    print(f"高精度: {result2}")  # 0.33333333333333333333333333333333333333333333333333

# 精度恢复原设置
result3 = decimal.Decimal('1') / decimal.Decimal('3')
print(f"恢复后精度: {result3}")  # 0.3333333333333333333333333333

临时修改环境变量

import os

class TemporaryEnvironment:
    """临时修改环境变量的上下文管理器"""

    def __init__(self, **kwargs):
        self.new_env = kwargs
        self.old_env = {}

    def __enter__(self):
        # 保存旧值并设置新值
        for key, value in self.new_env.items():
            self.old_env[key] = os.environ.get(key)
            os.environ[key] = str(value)
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        # 恢复旧值
        for key, old_value in self.old_env.items():
            if old_value is None:
                if key in os.environ:
                    del os.environ[key]
            else:
                os.environ[key] = old_value

# 使用临时环境变量
print("原始PATH:", os.environ.get('PATH', '未设置'))

with TemporaryEnvironment(TEMP_VAR="临时值", ANOTHER_VAR="另一个值"):
    print("临时TEMP_VAR:", os.environ.get('TEMP_VAR'))
    print("临时ANOTHER_VAR:", os.environ.get('ANOTHER_VAR'))

print("恢复后TEMP_VAR:", os.environ.get('TEMP_VAR', '未设置'))

临时修改工作目录

import os

class TemporaryDirectory:
    """临时切换工作目录的上下文管理器"""

    def __init__(self, new_dir):
        self.new_dir = new_dir
        self.old_dir = None

    def __enter__(self):
        self.old_dir = os.getcwd()
        os.chdir(self.new_dir)
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        os.chdir(self.old_dir)

# 使用临时目录
print("当前目录:", os.getcwd())

with TemporaryDirectory('/tmp'):
    print("临时目录:", os.getcwd())
    # 在这里执行需要特定目录的操作

print("恢复后目录:", os.getcwd())

创建自定义的上下文管理器

方法一:使用类实现

基本自定义上下文管理器

class FileManager:
    """自定义文件管理器"""

    def __init__(self, filename, mode):
        self.filename = filename
        self.mode = mode
        self.file = None

    def __enter__(self):
        self.file = open(self.filename, self.mode)
        return self.file

    def __exit__(self, exc_type, exc_value, traceback):
        if self.file:
            self.file.close()
        # 如果返回True,异常会被抑制
        # 如果返回False或None,异常会继续传播
        return False

# 使用自定义文件管理器
with FileManager('example.txt', 'w') as f:
    f.write('Hello, Custom Context Manager!')

带异常处理的自定义管理器

class DatabaseConnection:
    """自定义数据库连接管理器"""

    def __init__(self, db_url):
        self.db_url = db_url
        self.connection = None

    def __enter__(self):
        try:
            self.connection = sqlite3.connect(self.db_url)
            print("数据库连接成功")
            return self.connection
        except sqlite3.Error as e:
            print(f"数据库连接失败: {e}")
            raise

    def __exit__(self, exc_type, exc_value, traceback):
        if self.connection:
            self.connection.close()
            print("数据库连接已关闭")

        # 处理特定异常
        if exc_type is sqlite3.IntegrityError:
            print("完整性约束错误,已处理")
            return True  # 抑制异常

        return False  # 其他异常继续传播

# 使用自定义数据库管理器
try:
    with DatabaseConnection('test.db') as conn:
        cursor = conn.cursor()
        cursor.execute("CREATE TABLE IF NOT EXISTS test (id INTEGER PRIMARY KEY, name TEXT)")
        cursor.execute("INSERT INTO test (name) VALUES ('Alice')")
        conn.commit()
except sqlite3.Error as e:
    print(f"发生错误: {e}")

带资源统计的上下文管理器

import time

class Timer:
    """计时器上下文管理器"""

    def __enter__(self):
        self.start_time = time.time()
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.end_time = time.time()
        self.elapsed = self.end_time - self.start_time
        print(f"代码执行时间: {self.elapsed:.4f}秒")

    def get_elapsed_time(self):
        return time.time() - self.start_time

# 使用计时器
with Timer() as timer:
    # 模拟耗时操作
    time.sleep(2)
    current_time = timer.get_elapsed_time()
    print(f"当前已执行: {current_time:.2f}秒")

方法二:使用 contextlib 模块

使用 contextlib.contextmanager 装饰器

from contextlib import contextmanager

@contextmanager
def file_manager(filename, mode):
    """使用生成器创建上下文管理器"""
    try:
        file = open(filename, mode)
        print(f"文件 {filename} 已打开")
        yield file  # 这是 __enter__ 返回的值
    except Exception as e:
        print(f"发生错误: {e}")
        raise
    finally:
        file.close()
        print(f"文件 {filename} 已关闭")

# 使用生成器上下文管理器
with file_manager('example.txt', 'w') as f:
    f.write('Hello from contextmanager!')

带参数和异常处理的生成器管理器

from contextlib import contextmanager

@contextmanager
def temporary_file_change(filename, new_content):
    """临时修改文件内容,完成后恢复"""

    # 备份原内容
    try:
        with open(filename, 'r') as f:
            original_content = f.read()
    except FileNotFoundError:
        original_content = None

    # 写入新内容
    try:
        with open(filename, 'w') as f:
            f.write(new_content)
        print(f"文件 {filename} 内容已临时修改")
        yield  # 执行代码块
    except Exception as e:
        print(f"执行过程中发生错误: {e}")
        raise
    finally:
        # 恢复原内容
        if original_content is not None:
            with open(filename, 'w') as f:
                f.write(original_content)
            print(f"文件 {filename} 内容已恢复")
        else:
            # 如果文件原本不存在,则删除创建的文件
            import os
            if os.path.exists(filename):
                os.remove(filename)
                print(f"临时文件 {filename} 已删除")

# 使用临时文件修改管理器
with temporary_file_change('temp.txt', '这是临时内容'):
    with open('temp.txt', 'r') as f:
        content = f.read()
        print(f"临时内容: {content}")
    # 在这里执行需要临时文件内容的操作

使用 contextlib 的其他工具

from contextlib import suppress, redirect_stdout, redirect_stderr
import io

# suppress - 抑制特定异常
with suppress(FileNotFoundError):
    with open('nonexistent.txt') as f:
        content = f.read()
    print("如果文件不存在,这行不会执行")
print("程序继续执行")

# redirect_stdout - 重定向标准输出
output = io.StringIO()
with redirect_stdout(output):
    print("这行输出会被重定向")
    print("不会显示在控制台")

print("重定向的输出:", output.getvalue())

# redirect_stderr - 重定向标准错误
error_output = io.StringIO()
with redirect_stderr(error_output):
    import sys
    print("这是一个错误消息", file=sys.stderr)

print("重定向的错误输出:", error_output.getvalue())

高级用法和最佳实践

嵌套上下文管理器

# 嵌套使用多个上下文管理器
with open('input.txt', 'r') as infile:
    with open('output.txt', 'w') as outfile:
        with open('log.txt', 'a') as logfile:
            content = infile.read()
            outfile.write(content.upper())
            logfile.write(f"文件处理完成: {len(content)} 字符\n")

# 等效的简洁写法
with open('input.txt', 'r') as infile, \
     open('output.txt', 'w') as outfile, \
     open('log.txt', 'a') as logfile:

    content = infile.read()
    outfile.write(content.upper())
    logfile.write(f"文件处理完成: {len(content)} 字符\n")

上下文管理器的组合使用

from contextlib import ExitStack

# 使用 ExitStack 管理动态数量的上下文管理器
def process_files(file_list):
    with ExitStack() as stack:
        files = []
        for filename in file_list:
            file = stack.enter_context(open(filename, 'r'))
            files.append(file)

        # 处理所有文件
        for file in files:
            content = file.read()
            print(f"{file.name}: {len(content)} 字符")

        # 所有文件会自动关闭

# 使用动态文件列表
file_list = ['file1.txt', 'file2.txt', 'file3.txt']
process_files(file_list)

错误处理和资源清理

class SafeResourceManager:
    """安全的资源管理器,确保资源被正确清理"""

    def __init__(self, resource_name):
        self.resource_name = resource_name
        self.resource = None
        self.acquired = False

    def __enter__(self):
        try:
            # 模拟资源获取
            print(f"获取资源: {self.resource_name}")
            self.resource = f"Resource_{self.resource_name}"
            self.acquired = True
            return self.resource
        except Exception as e:
            print(f"获取资源失败: {e}")
            self.cleanup()
            raise

    def __exit__(self, exc_type, exc_value, traceback):
        self.cleanup()

        # 记录异常信息但不抑制
        if exc_type is not None:
            print(f"在资源使用过程中发生异常: {exc_type.__name__}: {exc_value}")

        return False  # 异常继续传播

    def cleanup(self):
        if self.acquired and self.resource:
            print(f"释放资源: {self.resource_name}")
            self.resource = None
            self.acquired = False

# 测试安全资源管理器
try:
    with SafeResourceManager("Database") as db:
        print(f"使用资源: {db}")
        # 模拟异常
        raise ValueError("测试异常")
except ValueError as e:
    print(f"捕获到异常: {e}")

print("程序继续执行")