
创建一个测试数据库并创建一个简单的表。
mysql> create database mytest;
Query OK, 1 row affected (0.02 sec)
mysql> use mytest;
Database changed
mysql> show tables;
Empty set (0.00 sec)
mysql> create table test(
-> id int auto_increment primary key,
-> name varchar(50) not null,
-> email varchar(50) not null unique
-> );
Query OK, 0 rows affected (0.04 sec)
mysql> show tables;
+------------------+
| Tables_in_mytest |
+------------------+
| test |
+------------------+
1 row in set (0.00 sec)接下来我们使用python和sqlalchemy来演示不同的隔离级别的作用和影响。
SQLAlchemy 是一个用于 Python 的 SQL 工具包和对象关系映射(ORM)库。它提供了一种高效且灵活的方式来与数据库进行交互,支持多种数据库后端(如 SQLite、PostgreSQL、MySQL、Oracle 等)。
可以通过多线程的方式来看一下 read uncommitted的作用。
import time
import threading
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, text
from sqlalchemy.orm import sessionmaker
# 连接数据库
engine = create_engine('mysql+pymysql://user:passwd@localhost/mytest', isolation_level="READ UNCOMMITTED")
metadata = MetaData()
# 确保表存在
test_table = Table('test', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(50)),
Column('email', String(50)))
# 创建会话
Session = sessionmaker(bind=engine)
def insert_data():
session = Session()
try:
session.begin()
session.execute(test_table.insert().values(name="tom", email="xxx@qq.com"))
session.commit()
print("Insert successful!")
except Exception as e:
session.rollback()
print(f"Insert failed: {e}")
finally:
session.close()
def updata_data():
session = Session() # 每个线程创建自己的会话
try:
session.begin()
session.execute(test_table.update().values(email="eee@qq.com").where(test_table.c.id == 1))
time.sleep(5) # 模拟长时间操作
session.commit()
print("Update successful!")
except Exception as e:
session.rollback()
print(f"Update failed: {e}")
finally:
session.close()
def read_data():
session = Session() # 每个线程创建自己的会话
try:
session.begin()
result = session.execute(test_table.select()).fetchall()
print("Read Uncommitted:", result)
except Exception as e:
session.rollback()
print(f"Read failed: {e}")
finally:
session.close()
def main():
# 插入一条数据
insert_data()
th1 = threading.Thread(target=updata_data)
th1.start()
time.sleep(1) # 确保更新线程先启动
th2 = threading.Thread(target=read_data)
th2.start()
th1.join()
th2.join()
if __name__ == "__main__":
main()注意在创建数据库引擎的时候需要配置好隔离级别,这样在接下来创建的会话就都是同一隔离级别等级的了。
读数据的函数会在update完成commit之前就读取要更新的数据(id==1),因为我们设置了可以读取其他事务未提交的数据,所以这里应该可以看到读取的数据应该是eee@qq.com,随后update才完成。
(venv) yidaas@TOMMWWU-MC3 Scripts % python mytest.py
Insert successful!
Read Uncommitted: (1, 'tom', 'eee@qq.com')
Update successful!如果我将上面的代码修改一下隔离级别就可以看到我们不能再读取未commit的数据。
import time
import threading
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, text
from sqlalchemy.orm import sessionmaker
# 连接数据库
engine = create_engine('mysql+pymysql://user:passwd@localhost/mytest', isolation_level="READ COMMITTED")
metadata = MetaData()
# 确保表存在
test_table = Table('test', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(50)),
Column('email', String(50)))
# 创建会话
Session = sessionmaker(bind=engine)
def updata_data():
session = Session() # 每个线程创建自己的会话
try:
session.begin()
session.execute(test_table.update().values(email="ert@qq.com").where(test_table.c.id == 1))
time.sleep(5) # 模拟长时间操作
session.commit()
print("Update successful!")
except Exception as e:
session.rollback()
print(f"Update failed: {e}")
finally:
session.close()
def read_data():
session = Session() # 每个线程创建自己的会话
try:
session.begin()
result = session.execute(test_table.select().where(test_table.c.id == 1)).first()
print("Read Uncommitted:", result)
except Exception as e:
session.rollback()
print(f"Read failed: {e}")
finally:
session.close()
def main():
th1 = threading.Thread(target=updata_data)
th1.start()
time.sleep(1) # 确保更新线程先启动
th2 = threading.Thread(target=read_data)
th2.start()
th1.join()
th2.join()
if __name__ == "__main__":
main()
输出如下:可以看到我们读取的是update之前的数据,而不是要update的数据。
python mytest.py
Read Uncommitted: (1, 'tom', 'eee@qq.com')
Update successful!MySQL 的默认隔离级别。在在可重复读事务开启的那一刻,该事务读取的都是那一刻的表的快照,也就是说,重复读读取的数据一直是一致的,它不像前两种,在事务(读未提交和读已提交)开启后读取的数据会受其他事务的影响。而可重复读。读取的是事务开启那一刻的数据快照(副本),所以在事务内无论何时读取到的都是一样的数据。
实验如下,我们在数据更新前,开启可重复读事务,并在事务内等待数据更新完成,观察数据是否变化,并在可重复事务结束后再次读取更新的数据,来验证结果的正确性。
import time
import threading
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, text
from sqlalchemy.orm import sessionmaker
# 连接数据库
engine = create_engine('mysql+pymysql://user:passwa@ip:3306/mytest', isolation_level="Repeatable Read")
metadata = MetaData()
# 确保表存在
test_table = Table('test', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(50)),
Column('email', String(50)))
# 创建会话
Session = sessionmaker(bind=engine)
def updata_data():
session = Session() # 每个线程创建自己的会话
try:
session.begin()
session.execute(test_table.update().values(email="dsdfasf@qq.com").where(test_table.c.id == 1))
session.commit()
print("Update successful!")
except Exception as e:
session.rollback()
print(f"Update failed: {e}")
finally:
session.close()
def read_data():
session = Session() # 每个线程创建自己的会话
try:
session.begin()
result = session.execute(test_table.select().where(test_table.c.id == 1)).first()
print("Read 1:", result)
time.sleep(6)
result = session.execute(test_table.select().where(test_table.c.id == 1)).first()
print("Read 2:", result)
except Exception as e:
session.rollback()
print(f"Read failed: {e}")
finally:
session.close()
# 可重复读事务结束后再次读取数据,看是否真的变化。
result = session.execute(test_table.select().where(test_table.c.id == 1)).first()
print("Read 3:", result)
def main():
# 插入一条数据
# insert_data()
#先读取更新之前的数据,然后在事务内等待数据更新之后,再次读取数据,观察是否存在变化
th1 = threading.Thread(target=read_data)
th1.start()
time.sleep(1) # 确保读线程先启动
th2 = threading.Thread(target=updata_data)
th2.start()
th1.join()
th2.join()
if __name__ == "__main__":
main()输出结果如下:
Read 1: (1, 'tom', 'qq@qq.com')
Update successful!
Read 2: (1, 'tom', 'qq@qq.com')
Read 3: (1, 'tom', 'dsdfasf@qq.com')串行化的逻辑就很简单了,跟编程器中的串行化一样,保证事务都是按顺序执行,因此也就不可能存在脏读,幻读等数据问题了,不过会带来明显的性能下降,因为它会导致更多的锁定和等待。
一般情况我们使用mysql默认的隔离级别就可以了,但是还是要根据自己的业务场景进行选择,下面给出一些不同隔离级别适用的可能场景:
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。