百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT知识 > 正文

数据开发工具dbt手拉手教程-03.定义数据源模型

liuian 2025-09-18 22:49 2 浏览

本章节介绍在dbt项目中,如何定义数据源模型。

定义并引入数据源

通过Extract和Load方式加载到仓库中的数据,可以使用dbt中的sources组件进行定义和描述。通过在dbt中将这些数据集(表)声明为sources,可以实现如下功能:

  • 在dbt模型models中,通过使用{{ source }}函数来查询数据源的数据集(表),这样就可以实现数据间的血缘关系。
  • 可以对数据源进行测试验证。
  • 能够计算数据源的数据抽取刷新情况。

dbt项目中的数据源定义在.yml文件中的sources关键字下。如下所示,在models目录中,数据源yml文件的内容为:

version: 2

sources:
  - name: jaffle_shop
    database: raw  
    schema: jaffle_shop  
    tables:
      - name: orders
      - name: customers

  - name: stripe
    tables:
      - name: payments

数据源配置文件中的 schema 通常与 name 相同,除非使用了不同于现有 schema 的数据源名称,此时需要添加 schema 关键字。

一旦数据源被定义完毕,在模型model中就可以通过使用source函数调用数据源。例如在models文件目录下定义一个order模型,文件名称为:models/order.sql

select
  ...

from {{ source('jaffle_shop', 'orders') }}

left join {{ source('jaffle_shop', 'customers') }} using (customer_id)

dbt将把它编译成完整的表名,编译后的文件在target/compiled/目录下:

select
  ...

from raw.jaffle_shop.orders

left join raw.jaffle_shop.customers using (customer_id)

在models的代码中使用{{ source() }}函数可以创建模型model和数据源间的依赖关系,并在文档中以图的形式显示出来:



创建完成数据源后,可以针对数据源中的数据集(表)进行测试,同时对数据源和数据集(表)添加注释后,dbt会将注释添加到 生成的web文档中。

version: 2

sources:
  - name: jaffle_shop
    description: This is a replica of the Postgres database used by our app
    tables:
      - name: orders
        description: >
          每条记录为一个订单. 包括取消和删除订单操作.
        columns:
          - name: id
            description: 订单表的主键
            tests:
              - unique # 添加对表中字段唯一测试
              - not_null # 添加对表中字段非空测试
          - name: status
            description: 超时后订单状态会改变

      - name: ...

  - name: ...

校验数据源的刷新

数据源通常是由数据同步工具定期加载到数据仓库中,供下层数仓模型进行消费。如果数据同步异常,可能会导致上层数据模型业务逻辑出现问题。dbt通过使用sources freshness属性配置选项,对数据源或表的限制,来验证数据源的加载是否符合同步预期,以确认数据处理是否处于正确状态。这对于数据处理的准确性非常有用。

要配置数据源刷新状态信息,在yml配置文件中添加一个freshness属性代码块,并在表声明中添加loadd_at_field属性关键字:

version: 2

sources:
  - name: jaffle_shop
    database: raw
    freshness: # 针对整个数据源级别的数据刷新情况
      warn_after: {count: 12, period: hour} # dbt source freshness命令执行时间与loaded_at_field属性定义的时间差值需要小于count属性值,否则执行freshness命令时报警
      error_after: {count: 24, period: hour}
# 同上定义
    loaded_at_field: _etl_loaded_at

    tables:
      - name: orders
        freshness: # 针对orders表定义数据刷新情况,表级别的定义
          warn_after: {count: 6, period: hour}
          error_after: {count: 12, period: hour}

      - name: customers # 如果表级别没有定义数据源的刷新,则默认数据源级别


      - name: product_skus
        freshness: null # do not check freshness for this table

在freshness属性代码块中,可以提供warn_after和error_after中的一个或两个告警级别。如果两者都没有提供,那么dbt将不会为这个数据源中的表计算刷新快照。

此外,还需要loadd_at_field关键子来计算表的刷新情况。如果没有提供loadd_at_field关键字,那么dbt将不会计算表的刷新情况。

这些配置存在层级关系,因此为一个数据源指定的刷新校验规则和和loadd_at_field值,将传递到该源下一层级定义的所有表中。如果一个源中的所有表都具有相同的loadd_at_field,只需要在上级源定义中指定一次即可。

使用dbt source freshness命令进行对数据源刷新情况进行校验。

$ dbt source freshness

在后台,dbt使用freshness属性来构造一个select语句查询,如下所示,您可以在查询执行日志中找到此执行语句。

select
  max(_etl_loaded_at) as max_loaded_at,
  convert_timezone('UTC', current_timestamp()) as snapshotted_at
from raw.jaffle_shop.orders

此查询的结果用于确定数据源源是否符合数据刷新规则:


sources数据源的yml配置文件中,通过使用freshness属性代码块、test属性代码块,实现了对数据源刷新频率的校验和数据质量的检查,避免由于数据异常引起后续的数据处理逻辑错误。因此,通过使用dbt工具对数据项目进行构建,很容易帮助数据开发人员提高工作效率,保证项目交付质量。

相关推荐

eino v0.4.5版本深度解析:接口类型处理优化与错误机制全面升级

近日,eino框架发布了v0.4.5版本,该版本在错误处理、类型安全、流处理机制以及代理配置注释等方面进行了多项优化与修复。本次更新共包含6个提交,涉及10个文件的修改,由2位贡献者共同完成。本文将详...

SpringBoot异常处理_springboot异常注解

在SpringBoot中,异常处理是构建健壮、可维护Web应用的关键部分。良好的异常处理机制可以统一返回格式、提升用户体验、便于调试和监控。以下是SpringBoot中处理异常的完整指...

Jenkins运维之路(Jenkins流水线改造Day02-1-容器项目)

这回对线上容器服务器的流水线进行了一定的改造来满足目前线上的需求,还是会将所有的自动化脚本都放置到代码库中统一管理,我感觉一章不一定写的完,所以先给标题加了个-1,话不多说开干1.本次流水线的流程设计...

告别宕机!零基础搭建服务器监控告警系统!小白也能学会!

前言本文将带你从零开始,一步步搭建一个完整的服务器指标监控与邮件告警系统,使用的技术栈均为业界主流、稳定可靠的开源工具:Prometheus:云原生时代的监控王者,擅长指标采集与告警规则定义Node_...

httprunner实战接口测试笔记,拿走不谢

每天进步一点点,关注我们哦,每天分享测试技术文章本文章出自【码同学软件测试】码同学公众号:自动化软件测试码同学抖音号:小码哥聊软件测试01开始安装跟创建项目pipinstallhttprunne...

基于JMeter的性能压测平台实现_jmeter压测方案

这篇文章已经是两年前写的,短短两年时间,JMeter开源应用技术的发展已经是翻天覆地,最初由github开源项目zyanycall/stressTestPlatform形成的这款测试工具也开始慢...

12K+ Star!新一代的开源持续测试工具!

大家好,我是Java陈序员。在企业软件研发的持续交付流程中,测试环节往往是影响效率的关键瓶颈,用例管理混乱、接口调试复杂、团队协作不畅、与DevOps流程脱节等问题都能影响软件交付。今天,给大家...

Spring Boot3 中分库分表之后如何合并查询

在当今互联网应用飞速发展的时代,数据量呈爆发式增长。对于互联网软件开发人员而言,如何高效管理和查询海量数据成为了一项关键挑战。分库分表技术应运而生,它能有效缓解单库单表数据量过大带来的性能瓶颈。而在...

离线在docker镜像方式部署ragflow0.17.2

经常项目上会出现不能连外网的情况,要怎么使用ragflow镜像部署呢,这里提供详细的步骤。1、下载基础镜像根据docker-compose-base.yml及docker-compose.yml中的i...

看,教你手写一个最简单的SpringBoot Starter

何为Starter?想必大家都使用过SpringBoot,在SpringBoot项目中,使用最多的无非就是各种各样的Starter了。那何为Starter呢?你可以理解为一个可拔插式...

《群星stellaris》军事基地跳出怎么办?解决方法一览

《群星stellaris》军事基地跳出情况有些小伙伴出现过这种情况,究竟该怎么解决呢?玩家“gmjdadk”分享的自己的解决方法,看看能不能解决。我用英文原版、德语、法语和俄语四个版本对比了一下,结果...

数据开发工具dbt手拉手教程-03.定义数据源模型

本章节介绍在dbt项目中,如何定义数据源模型。定义并引入数据源通过Extract和Load方式加载到仓库中的数据,可以使用dbt中的sources组件进行定义和描述。通过在dbt中将这些数据集(表)声...

docker compose 常用命令手册_docker-compose init

以下是DockerCompose常用命令手册,按生命周期管理、服务运维、构建配置、扩缩容、调试工具分类,附带参数解析、示例和关键说明,覆盖多容器编排核心场景:一、生命周期管理(核心命令...

RagFlow与DeepSeek R1本地知识库搭建详细步骤及代码实现

一、环境准备硬件要求独立显卡(建议NVIDIAGPU,8GB显存以上)内存16GB以上,推荐32GB(处理大规模文档时更高效)SSD硬盘(加速文档解析与检索)软件安装bash#必装组件Docker...

Docker Compose 配置更新指南_docker-compose配置

高效管理容器配置变更的最佳实践方法重启范围保留数据卷适用场景docker-composeup-d变更的服务常规配置更新--force-recreate指定/所有服务强制重建down→up流程...