sqlalchemy实现时间列自动更新教程

(编辑:jimmy 日期: 2025/1/13 浏览:2)

一、使用场景需求

1、在实际项目开发过程中,用户可以操作的数据,我们往往会新增一个字段,来保存用户最后一次修改时间

2、一些系统中,我们需要存储用户最后一次登录时间,来统计用户的活跃度

二、 在sqlalchemy中常规的做法

1、数据模型

import datetime
from uuid import uuid4
from sqlalchemy import Column, Integer, String, DateTime, Boolean
from sqlalchemy_demo.connect import Base
class UserModule(Base):
  """
  创建一个用户的数据模型
  """
  __tablename__ = 'user'

  uuid = Column(String(36), unique=True, nullable=False, default=lambda: str(uuid4()), comment='uuid')
  id = Column(Integer, primary_key=True, autoincrement=True, comment='用户id')
  user_name = Column(String(30), nullable=False, unique=True, comment='用户名')
  password = Column(String(64), nullable=False, comment='用户密码')
  createtime = Column(DateTime, default=datetime.datetime.now, comment='创建时间')
  updatetime = Column(DateTime, default=datetime.datetime.now, comment='修改时间')
  is_lock = Column(Boolean, default=False, nullable=False, comment='是否锁住用户')

2、每次更新数据的时候,需要手动插入时间字段,来确保updatetime这个时间字段才会更新

三、使用自动更新数据

基于上面手动插入时间字段,在开发过程中很不方便,我们需要的是类似django中修改数据,该列会自动更新

1、导包

from uuid import uuid4
from sqlalchemy import Column, Integer, String, DateTime, Boolean, TIMESTAMP, func
from sqlalchemy.orm import relationship

from sqlalchemy_demo.connect import Base

2、定义数据模型

class UserModule(Base):
  """
  创建一个用户的数据模型
  """
  __tablename__ = 'user'

  uuid = Column(String(36), unique=True, nullable=False, default=lambda: str(uuid4()), comment='uuid')
  id = Column(Integer, primary_key=True, autoincrement=True, comment='用户id')
  user_name = Column(String(30), nullable=False, unique=True, comment='用户名')
  password = Column(String(64), nullable=False, comment='用户密码')
  createtime = Column(DateTime, server_default=func.now(), comment='创建时间')
  # onupdate设置自动更改
  updatetime = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment='修改时间')
  is_lock = Column(Boolean, default=False, nullable=False, comment='是否锁住用户')

3、接下来的创建表与增删改查都一样的。

补充知识:Flask-SQLALchemy对表中数据按时间进行统计

例如表的结构如下:

class Status(db.Model):
  id = db.Column(db.Integer, primary_key=True)
  submit_time = db.Column(db.DateTime, default=datetime.now())

其中,Status表接受用户的提交,现在想对用户的提交情况按时间进行统计。例如过去24小时,每小时的提交次数;过去12个月,每个月的提交次数。

python代码实现查询如下:

from datetime import datetime, timedelta
 
NOW = datetime.utcnow()
last_24h_submits_count = []
for h in xrange(1,25):
  count = session.query(Status).filter(Status.submit_time.between(NOW - timedelta(seconds=h*3600-1), NOW - timedelta(hours=h-1))).count()
  last_24h_submits_count.append(count)

以上这篇sqlalchemy实现时间列自动更新教程就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。