import pymysql
# 创建连接
conn = pymysql.connect(host='localhost', user='root', password='password', database='test', port=3306)
# 创建游标
cursor = conn.cursor()
# 执行SQL语句
cursor.execute('select * from user')
# 获取所有数据
datas = cursor.fetchall()
for data in datas:
print(data)
# 关闭游标和连接
cursor.close()
conn.close()
在这个例子中,我们使用了`pymysql`库来连接和操作MySQL数据库。首先导入`pymysql`模块,然后创建了一个到MySQL服务器的连接。通过创建游标对象,我们可以执行SQL语句并获取结果。最后,记得关闭游标和连接以释放资源。
以下是介绍如何使用Python监听MySQL数据库的相关信息:
| 功能 | 库/工具 | 代码示例 |
|----------------------|-------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 连接到MySQL数据库 | `pymysql` | `import pymysql
connection = pymysql.connect(host='localhost', user='root', password='password', database='mydb')` |
| 监听数据库变更 | `MySQLdb` 或 `pymysql` + `SQLAlchemy` | 使用`MySQLdb`:
`import MySQLdb
db = MySQLdb.connect(host, user, password, database)
cursor = db.cursor()
cursor.execute("SELECT * FROM my_table")
for row in cursor.fetchall():
print(row)`
使用`pymysql` + `SQLAlchemy`:
`from sqlalchemy import create_engine, event
from sqlalchemy.engine import Engine
@event.listens_for(Engine, "connect")
def set_sql_mode(dbapi_connection, connection_record):
cursor = dbapi_connection.cursor()
cursor.execute("SET sql_mode='STRICT_TRANS_TABLES'")
engine = create_engine('mysql+pymysql://user:password@localhost/mydb')` |
| 使用触发器监听数据变更 | MySQL数据库触发器 + Python轮询 | 在MySQL中创建触发器:
`CREATE TRIGGER my_trigger AFTER UPDATE ON my_table FOR EACH ROW BEGIN
触发器逻辑
END;`
Python轮询:
`import time
while True:
# 检查触发器或表以获取变更
time.sleep(10)` |
| 监听binlog | `pythonmysqlreplication` | `from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent
stream = BinLogStreamReader(connection_settings={'host': 'localhost', 'port': 3306, 'user': 'root', 'password': 'password'})
for event in stream:
if isinstance(event, QueryEvent):
print(event.query)` |
请注意,以上代码只是示例,实际使用时需要根据具体场景进行调整。为了安全性考虑,不建议在代码中直接暴露数据库用户名和密码,可以考虑使用环境变量或其他安全方式存储敏感信息。
若有任何问题,请随时评论,关注,点赞和感谢观看。
本文链接:https://www.24zzc.com/news/171887401787053.html