数据格式

TorchEasyRec作为阿里云PAI的推荐算法包,可以无缝对接MaxCompute的数据表,也可以读取OSS、NAS或Local环境中的CSV, Parquet文件,同时支持Kafka和Datahub的流式消息。 TorchEasyRec的OdpsDataset, ParquetDataset, KafkaDataset进一步可支持训练断点续训功能,当训练中断后可以从上次的数据位置继续训练,避免重复处理已消费的数据。

一个最简单的data config的配置

这个配置里面,读取MaxCompute的表作为输入数据(OdpsDataset),并且输入数据已经编码好,每个worker上以8192的batch_size,并行度为8来读取数据

data_config {
    batch_size: 8192
    dataset_type: OdpsDataset
    fg_mode: FG_NONE
    label_fields: "clk"
    num_workers: 8
}

目前支持以下几种dataset_type:

OdpsDataset

输入数据为MaxCompute表

  • 前置条件:

    • MaxCompute控制台的「租户管理」->「租户属性」页面打开开放存储(Storage API)开关

    • 「租户管理」->「新增成员」给相应用户授予「admin」权限;或参考租户权限文档,精细授予用户Quota的使用权限

  • input_path: 按如下格式设置

    • odps://{project}/tables/{table_name}/{partition},多表按逗号分隔

    • 如果单表需要设置多个分区,可以用&简写,来分隔多个分区,odps://{project}/tables/{table_name}/{partition1}&{partition2}

  • 运行训练/评估/导出/预测等命令时

    • 本地环境

      • 需要准备一个odps_conf文件,并在启动命令中设置在ODPS_CONFIG_FILE_PATH环境变量中

      cat << EOF >> odps_conf
      project_name=${PROJECT_NAME}
      access_id=${ACCESS_ID}
      access_key=${ACCESS_KEY}
      end_point=http://service.{region}-vpc.maxcompute.aliyun-inc.com/api
      EOF
      
      ODPS_CONFIG_FILE_PATH=odps_conf \
      torchrun --master_addr=$MASTER_ADDR --master_port=$MASTER_PORT \
      --nnodes=$WORLD_SIZE --nproc-per-node=$NPROC_PER_NODE --node_rank=$RANK \
      -m tzrec.train_eval \
      --pipeline_config_path ${PIPELINE_CONFIG}
      
    • PAI-DLC/PAI-DSW环境

      • 需要设置ODPS_ENDPOINT的环境变量,并新建任务时,「角色信息」选择PAI默认角色

      ODPS_ENDPOINT=http://service.{region}-vpc.maxcompute.aliyun-inc.com/api \
      torchrun --master_addr=$MASTER_ADDR --master_port=$MASTER_PORT \
      --nnodes=$WORLD_SIZE --nproc-per-node=$NPROC_PER_NODE --node_rank=$RANK \
      -m tzrec.train_eval \
      --pipeline_config_path ${PIPELINE_CONFIG}
      
  • 如果是预付费Quota,参考独享数据传输服务文档购买和授权,可通过odps_data_quota_name传入购买的Quota名

  • 如果CPU/GPU利用率都不高,可能是网络传输带宽瓶颈,可以尝试设置odps_data_compressionZSTD来增大数据的压缩率,减少数据网络传输带宽

ParquetDataset

输入数据为parquet格式

  • input_path: 按如下格式设置

    • ${PATH_TO_DATA_DIR}/*.parquet

  • 注意: 如果每个parquet文件中的数据量不相等或文件数据小于worker数,ParquetDataset会自动重分配数据,来保证每个worker读取的数据量相等。但仍建议parquet文件数是 nproc-per-node * nnodes * num_workers的倍数,并且每个parquet文件的数据量基本相等,减少数据自动重分配的IO开销。

CsvDataset

  • input_path: 按如下格式设置

    • ${PATH_TO_DATA_DIR}/*.csv

  • 需设置data_config.delimiter来指名列分隔符,默认为,

  • 需设置 data_config.with_header来指定是否有header行,默认为false

  • 按需设置 data_config.input_fields 来指定schema,详见下文input_fields参数说明

  • 注意:

    • 训练和评估时需要csv文件数是 nproc-per-node * nnodes * num_workers的倍数,并且每个csv文件的数据量相等

    • csv格式数据读性能有瓶颈

KafkaDataset

输入数据为Kafka 或 Datahub 消息流

  • 输入消息流的内容是序列化的ArrowRecordBatch,支持两种序列化格式:

    • schema-less格式 (record_batch.serialize()): 需设置data_config.input_fieldsdata_config.input_fields_str来指定数据的schema

    • 带schema的格式 (Arrow IPC Stream): 无需设置input_fields,schema从消息中自动推断,但schema会占用消息体大小

  • input_path: 按如下格式设置

    • kafka://broker:9092/topic?group.id=consumer_group&auto.offset.reset=earliest

    • 需以&分隔符来分隔kafka的参数,group.id是必选参数,其余参数参考Kafka配置文档

    • enable.auto.commit默认设置为false,KafkaDataset使用自身的checkpoint机制管理消费位点,不依赖Kafka broker端的offset提交。如需启用自动提交,可在URI中显式设置enable.auto.commit=true

    • 支持start.timestamp.ms参数,指定从某个时间戳(毫秒)开始消费,消费者会从各分区中时间戳 >= 该值的最早offset开始读取。当同时存在checkpoint时,checkpoint优先级更高。示例:

      • kafka://broker:9092/topic?group.id=consumer_group&auto.offset.reset=earliest&start.timestamp.ms=1711929600000

  • 注意:

    • Kafka 分片数需是 nproc-per-node * nnodes * num_workers 的倍数,否则会导致数据倾斜

    • 当输入数据为Datahub时,Datahub需设置为Kafka兼容模式的Datahub, input_path按如下格式设置

      • kafka://{dh_endpoint}/{dh_project.dh_topic}?group.id={dh_project.dh_group}&security.protocol=SASL_SSL&sasl.mechanism=PLAIN&sasl.username={access_id}&sasl.password={access_secrect}

    • 当使用 Arrow IPC Stream 格式 (带 schema) 时,每个 Kafka消息应只包含一个 record batch。如果单个消息包含多个 record batches,只有第一个 batch 会被读取,后续 batches 将被忽略。如需发送多个 batches,请将其作为多个独立的 Kafka消息发送。

data_config配置

fg_mode

  • FG(Feature Generator) 的运行模式,支持FG_DAG, FG_NONE, FG_BUCKETIZE, FG_NORMAL

    • FG是进入模型推理前的一层特征变换,可以保证离在线特征变换的一致性,特征变换包含Combo/Lookup/Match/Expr等类型,详见特征章节。以LookupFeature的一种配置为例,特征变换为从cate_map中用cate_key查出值后,用boundaries进行分箱再进入模型推理

    feature_configs {
        lookup_feature {
            feature_name: "lookup_feat"
            map: "user:cate_map"
            key: "item:cate_key"
            embedding_dim: 16
            boundaries: [0, 1, 2, 3, 4]
        }
    }
    
    • 特征输入的side一共支持五种 [user, item, context, feature, const],上述lookup_feat中的cate_map则是属于userside

      • user: 用户侧特征输入,线上推理时从请求中传入

      • item: 物品侧特征输入,线上推理时会从实时缓存在内存中的特征表里获取

      • context: 由上下文产生物品侧特征输入,线上推理时从请求中传入,如recall_name

      • feature: 来自其他特征FG的输出,如下述lookup_age_feat的输入age_binning来自于RawFeature age的分箱结果

      • const: 输入为常量

      feature_configs {
          raw_feature {
              feature_name: "age_binning"
              expression: "user:age"
              embedding_dim: 16
              boundaries: [18, 25, 30]
          }
      }
      feature_configs {
          lookup_feature {
              feature_name: "lookup_age_feat"
              map: "item:age_map"
              key: "feature:age_binning"
              embedding_dim: 16
              boundaries: [0, 1, 2, 3, 4]
          }
      }
      feature_configs {
          lookup_feature {
              feature_name: "lookup_age_feat"
              map: "item:age_map"
              key: "const:age1"
              embedding_dim: 16
              boundaries: [0, 1, 2, 3, 4]
          }
      }
      

fg_mode=FG_DAG

  • 训练时会在Dataset中执行FG,数据列名与各特征的FG所依赖字段来源同名,详见特征,Dataset会自动分析所有特征依赖的字段来源来读取数据。

    • 以上文LookupFeature为例,特征FG所依赖字段来源[cate_map, cate_key],Dataset会从输入表中读取名为cate_mapcate_key的列来做FG得到lookup_feat

  • 该模式可以帮忙我们快速验证FG的训练效果,调优FG的配置,但由于训练时多了FG的过程,训练速度会受到一定程度的影响

fg_mode=FG_NORMAL

  • 训练时会在Dataset中执行FG,但不是以DAG方式运行。因此特征的输入中如果有feature,constside的输入,也需要在输入表中。目前更建议使用FG_DAG模式

fg_mode=FG_NONE

  • 训练时不会在Dataset中执行FG,输入数据为Fg编码后的数据,数据列名与特征名(feature_name)同名,Dataset会自动分析所有特征的特征名来读取数据

    • 以上文LookupFeature为例,特征名lookup_feat,Dataset会从输入表中直接读取编码后的lookup_feat列直接进行模型训练和推理

  • 该模式训练速度最佳,但需提前对数据提前进行FG编码,目前仅提供MaxCompute方式,步骤如下:

    • 在DLC/DSW/Local环境中生成fg json配置,上传至DataWorks的资源中,如果fg_output_dir中有vocab_file等其他文件,也需要上传至资源中

      cat <<EOF>> odps_conf
      access_id=${ACCESS_ID}
      access_key=${ACCESS_KEY}
      end_point=http://service.${region}-vpc.maxcompute.aliyun-inc.com/api
      EOF
      
      ODPS_CONFIG_FILE_PATH=odps_conf \
      python -m tzrec.tools.create_fg_json \
          --pipeline_config_path ${PIPELINE_CONFIG_PATH} \
          --fg_output_dir fg_output \
          --reserves ${COLS_YOU_WANT_RESERVE} \
          --fg_resource_name ${FG_RESOURCE_NAME} \
          --odps_project_name ${PROJECT_NAME}
      
      • –pipeline_config_path: 模型配置文件。

      • –fg_output_dir: fg json的输出文件夹。

      • –reserves: 需要透传到输出表的列,列名用逗号分隔。一般需要保留Label列,也可以保留request_id,user_id,item_id列,注意:如果模型的feature_config中有user_id,item_id作为特征,feature_name需避免与样本中的user_id,item_id列名冲突。

      • –fg_resource_name: 可选,fg json在MaxCompute中的资源名,默认为fg.json

      • –odps_project_name: 可选,将fg json文件上传到MaxCompute项目名,该参数必须配合参数fg_resource_name和环境变量ODPS_CONFIG_FILE_PATH一起使用

      • –ODPS_CONFIG_FILE_PATH: 该环境变量指向的是odpscmd的配置文件

    • DataWorks的独享资源组中安装pyfg,「资源组列表」- 在一个调度资源组的「操作」栏 点「运维助手」-「创建命令」(选手动输入)-「运行命令」

      /home/tops/bin/pip3 install http://tzrec.oss-accelerate.aliyuncs.com/third_party/pyfg104-1.0.4-cp37-cp37m-linux_x86_64.whl --index-url=https://mirrors.aliyun.com/pypi/simple/ --trusted-host=mirrors.cloud.aliyuncs.com
      
    • 在DataWorks中建立PyODPS 3节点运行FG,节点调度参数中配置好bizdate参数

      from pyfg104 import offline_pyfg
      offline_pyfg.run(
        o,
        input_table="YOU_PROJECT.TABLE_NAME",
        output_table="YOU_PROJECT.TABLE_NAME",
        fg_json_file="YOU_FG_FILE_NAME",
        partition_value=args['bizdate']
      )
      

参数

默认值

说明

input_table

输入表

output_table

输出表,会自动创建

fg_json_file

FG 配置文件,json 格式

partition_value

指定输入表的分区作为 FG 的输入,可支持多分区表,以逗号分隔

force_delete_output_table

False

是否删除输出表,设置为 True 时会先自动删除输出表, 再运行任务

force_update_resource

False

是否更新资源,设置为 True 时会先自动更新资源, 再运行任务

set_sql

任务执行的flag,如set odps.stage.mapper.split.size=32;,注意需要以分号结尾

fg_mode=FG_BUCKETIZE

  • 训练时在Dataset中执行FG的Bucketize部分,输入数据为Fg编码但未进行Bucketize的数据,Bucketize配置包含hash_bucket_size,boundaries,vocab_dict,vocab_list,num_buckets

    • 数据列名与特征名(feature_name)同名,Dataset会自动分析所有特征的特征名来读取数据

    • 以上文LookupFeature为例,特征名lookup_feat,Dataset会从输入表中直接读取编码后的lookup_feat列直接进行模型训练和推理

  • 该模式训练速度介于FG_DAGFG_NONE之间,适用于在需要统计Bucketize前的数据分布来设置合适的Bucketize参数的情况,可以避免离线提前跑两次Fg编码

  • 注意:在这种模式下,对数据提前进行FG编码时,使用的fg json配置不应该包含Bucketize配置,可以在create_fg_json时增加--remove_bucketizer参数来去除fg json配置中的Bucketize配置

    python -m tzrec.tools.create_fg_json \
        --pipeline_config_path ${PIPELINE_CONFIG_PATH} \
        --fg_output_dir fg_output \
        --reserves ${COLS_YOU_WANT_RESERVE} \
        --remove_bucketizer
    

fg_threads

  • 每个dataloader worker上fg的运行线程数,默认为1,nproc-per-node * num_workers * fg_threads建议小于单机CPU核数

label_fields

  • label相关的列名,至少设置一个,可以根据算法需要设置多个,如多目标算法

      label_fields: "click"
      label_fields: "buy"
    

sample_weight_fields

  • 训练样本的样本权重列名

drop_remainder

  • 是否丢弃掉最后一个不足batch_size的batch数据,默认为false

batch_cost_size

  • 用于限制一个batch数据的最大cost,主要适用于变长序列的模型(如DlrmHSTU)同步训练时,不同worker间由于序列长度差异过大,导致workload不一样导致的

  • 注:需结合data_config.sample_cost_field使用,sample_cost_field用于指定样本表中的表示样本cost的列名,dataset会根据样本中的cost列,裁切出batch_cost_size限制下的动态Batch。

    • 对于DlrmHSTU模型,cost列一般可以考虑设置为进入模型的token数 (contextual feature num + uih seq length + cand seq length)

  • 注:需将data_config.batch_size设置得较大一些,来保证有足够的数据裁切够batch_cost_size

num_workers

  • 每个proc上的读数据并发度,nproc-per-node * num_workers建议小于单机CPU核数

  • 如果num_workers==0,数据进程和训练进程将会在一个进程中,便于调试

shuffle

  • 是否训练时打散数据,默认为false

shuffle_buffer_size

  • 最多缓存多少个batch用于打散数据,默认为32

fg_encoded_multival_sep

  • fg_mode=FG_NONE 数据已经被FG编码好 时,数据的多值分割符,默认为chr(3)

input_fields

input_fields {
    input_name: "input1"
}
input_fields {
    input_name: "input2"
    input_type: DOUBLE
}
  • 当使用CsvDataset,如果出现以下情况,需要按如下方式指定input_fields,其余Dataset可以自动推理字段类型

    • 情况一:csv文件没有header行时 => 需至少设置input_name

    • 情况二:csv文件中存在某列的整列为空值时,或遇到column [xxx] with dtype null is not supported now报错时 => 需进一步设置input_type,目前input_type支持设置 INT32 | INT64 | FLOAT | DOUBLE | STRING

  • 当使用KafkaDataset:

    • input_type 支持设置 INT32 | INT64 | FLOAT | DOUBLE | STRING | ARRAY_INT32 | ARRAY_INT64 | ARRAY_FLOAT | ARRAY_DOUBLE | ARRAY_STRING | ARRAY_ARRAY_INT32 | ARRAY_ARRAY_INT64 | ARRAY_ARRAY_FLOAT | ARRAY_ARRAY_DOUBLE | ARRAY_ARRAY_STRING | MAP_STRING_INT32 | MAP_STRING_INT64 | MAP_STRING_FLOAT | MAP_STRING_DOUBLE | MAP_STRING_STRING | MAP_INT64_INT32 | MAP_INT64_INT64 | MAP_INT64_FLOAT | MAP_INT64_DOUBLE | MAP_INT64_STRING | MAP_INT32_INT32 | MAP_INT32_INT64 | MAP_INT32_FLOAT | MAP_INT32_DOUBLE | MAP_INT32_STRING

input_fields_str

input_fields_strinput_fields的简化配置格式,格式为: field_name1:field_type1;field_name2:field_type2;

示例:

data_config {
    input_fields_str: "user_id:BIGINT;item_id:BIGINT;label:FLOAT;features:ARRAY<FLOAT>;"
}

支持的类型名:

  • 基本类型: INT, INT32, BIGINT, INT64, STRING, FLOAT, DOUBLE

    • 类型别名: BIGINT=INT64, INT=INT32 (在基本类型、ARRAY、MAP中均可使用)

  • 数组类型: ARRAY, ARRAY, ARRAY, ARRAY, ARRAY

  • 嵌套数组: ARRAY<ARRAY<INT>>, ARRAY<ARRAY<BIGINT>>, ARRAY<ARRAY<FLOAT>>, ARRAY<ARRAY<DOUBLE>>, ARRAY<ARRAY<STRING>>

  • Map类型: MAP<STRING,INT>, MAP<STRING,BIGINT>, MAP<STRING,FLOAT>, MAP<STRING,DOUBLE>, MAP<STRING,STRING>, MAP<INT,INT>, MAP<INT,BIGINT>, MAP<INT,FLOAT>, MAP<INT,DOUBLE>, MAP<INT,STRING>, MAP<BIGINT,INT>, MAP<BIGINT,BIGINT>, MAP<BIGINT,FLOAT>, MAP<BIGINT,DOUBLE>, MAP<BIGINT,STRING>

  • 逗号周围允许空格: MAP<STRING, BIGINT> 是有效的

注意: 如果同时设置了input_fields_strinput_fieldsinput_fields_str优先级更高。

更多配置