FileIngestionTask
is responsible for converting files into segments. These files can originate from various sources, such as Amazon S3, Google Cloud Storage (GCS), Azure Data Lake Storage (ADLS), or a local file system. Additionally, other file systems are supported by implementing the PinotFS interface.
By default, the FileIngestionTask is designed for OFFLINE tables during the ingestion stage. However, it can also be configured to backfill data into REALTIME tables when necessary.
This task can be customized to suit various use cases. For instance, you can configure it to allow the addition of new files or enable updates to existing files, depending on your specific requirements.
Property Name | Required | Description |
---|---|---|
inputDirURI | Yes | The input folder. |
inputFormat | Yes | The input file format. |
includeFileNamePattern | No | To filter the target input files out, e.g. to exlude mark file _SUCCESS. It’s empty by default to allow all files. The syntax is defined by FileSystem.getPathMatcher. |
input.fs.className | No | The className used to ingest the files. It’s inferred per URI. |
input.fs.prop.<> | No | Those props are based on which fs.className is picked. |
No | Enable a sanity check on whether a file URI is directory. DEPRECATED, as the sanity check got optimized a lot, and it’s enabled all the time. | |
skipFileIfFailToInitReader | No | Skip input file if record reader fails to init for it. Enabled by default, the names and count of skipping files are tracked in task metadata for debugging. |
push.mode | No | Tar (default) or Metadata . Use Metadata push mode if controller becomes a bottlelneck when tasks upload segments. |
Property Name | Required | Description |
---|---|---|
tableMaxNumTasks | No | The max number of parallel tasks a table can run at any time. It’s 10,000 by default. But if set to -1 explicitly, tasks are generated as many as needed to ingest all files in one round. |
taskMaxDataSize | No | The max number of bytes one task can process, to spread workload among parallel tasks. It’s 1G by default. If set, taskMaxNumFiles is ignored. |
taskMaxNumFiles | No | The max number of files one task can process, to spread workload among parallel tasks. It’s 32 by default. This config is ignored if taskMaxDataSize is set explicitly. |
maxNumRecordsPerSegment | No | The number of records to put in output segment, to control size of output segments. It’s 5M by default. |
desiredSegmentSize | No | The segment size desired (Default is 500M. K for kilobyte, M megabyte, G for gigabyte). With this configuration, we do not need both maxNumRecordsPerSegment and taskMaxDataSize properties to be configured.. |
schedule | No | CRON per Quartz cron syntax for when the job will be routinely triggered. If not set, the task is not cron scheduled but can still be triggered via endpoint /tasks/schedule. |
tableMaxNumTasks
to -1, tasks are generated as many as needed to ingest all files in one round, with taskMaxDataSize
to control the workload for each subtask. In this one-shot manner, user should still check subtask completion states to decide if another round is needed to retry failed subtasks, by simply triggering another task generation.Property Name | Required | Description |
---|---|---|
pathColumn.<seq> .name | Yes | The name of the value that is extracted from the file path or transformed from other values. the name should be unique in the list If pathColumn.<seq>.output (the next property) is true, the name must match one name defined in the schema, and the value must match the dataType and format defined in the schema |
pathColumn.<seq> .output | No | Whether the value should be output to the Pinot table. |
pathColumn.<seq> .pathComponentStartIndex | No | The start path component index used to extract a sub part of the file path. The scheme part is omitted when calculating the number of path components. For example, s3://bucket-name/year=2022/month=01/day=01/city=sunnyvale/file.csv has 6 components The index is inclusive If the value is non-negative, it is 0 based The value can also be negative, where the last path component has index -1, and the first path component has index ”-(the total number of path component)” If it is not specified, it is 0 |
pathColumn.<seq> .numPathComponentsToExtract | No | The number of path components to extract starting from pathColumn.<seq>.pathComponentStartIndex . If it is not specified, it will extract path components from pathColumn.<seq>.pathComponentStartIndex to the last one |
pathColumn.<seq> .charStartIndex | No | The start character index used to further extract a sub part from the part determined by pathColumn.<seq>.pathComponentStartIndex & pathColumn.<seq>.numPathComponentsToExtract . The index is inclusive If the value is non-negative, it is 0 based The value can also be negative, where the last path component has index -1, and the first path component has index ”-(the total number of characters)” If it is not specified, it is 0 |
pathColumn.<seq> .numCharsToExtract | No | The number of chars to extract starting from pathColumn.<seq>.charStartIndex . If it is not specified, we will extract chars from pathColumn.<seq>.charStartIndex to the last one |
pathColumn.<seq> .regex | No | A regex to further extract one or more groups from the part determined by pathColumn.<seq>.pathComponentStartIndex & pathColumn.<seq>.numPathComponentsToExtract & pathColumn.<seq>.charStartIndex & pathColumn.<seq>.charEndIndex |
pathColumn.<seq> .transform | No | A function or an expression used to transform values referred by the names to a new value. Currently, only string concatenation is supported. Values are referred by their names (the first property), the format is ${another value’s name} This field cannot be used together with the 3rd ~ 5th properties |
<seq>
is an identifier, properties with the same <seq>
form a group and are used together to extract or transform a value.
To help readers understand how it works, here is an example. Say the file path is s3://bucket/year=2022/month=05/day=20/file.csv
, and a table has the following schema and configurations
pathColumn
work as following:
Property Name | Description |
---|---|
subDirLevelForDataPartition | Treated inputDirURI as the root directory (the level of sub-directory = 0), subDirLevelForDataPartition defines the level of sub-directory used to group/partition data. Files within the same sub-directory defined by subDirLevelForDataPartition are grouped into the same segment or set of segments As a special case, files with sub-directory levels less than subDirLevelForDataPartition are treated as a special group, and also are grouped into the same segment or set of segments |
taskMaxNumFiles
is still respected when subDirLevelForDataPartition
is configured.
To help readers understand how it works, here is an exmaple. Given the following list of files and the following configurations
Derive Columns from The Source File Paths
, one can achieve the goal of partitionning data by columns extracted from file paths.mergeType
, <metricName>.aggregationType
, windowStartMs
, windowEndMs
, etc.
You can find some examples in the Pinot documentation for MergeRollupTask and RealtimeToOfflineTask.
enableSync
. The segmentNameGeneratorType
should be fixed
in sync mode, but it is set automatically if omit.
Below is a sample task configuration:
enableSync
and consistentPushEnabled
to true.
Below is a sample task configuration using consistentPushEnabled
flag :
<input>/2022/05/11/
should be added before <input>/2022/05/12/
, and <input>/2022/05/11/000000.csv
should be added before <input>/2022/05/11/000001.csv
05
./
or -
as separator, or if you have a prefix like year=2022/month=05.checkpointTTLSec
to enable incremental ingestion. With this config, the checkpoints are kept around for as long as TTL so that files added within this time window can be detected.
tableMaxNumTasks
so that enough tasks are generated to ingest all input files in the folder.
But no problem if more tasks are needed as we can call this endpoint again and again based on the ingestion progress./tasks/execute
again with same task configs but a different taskName. Don’t worry about duplicating or missing any files, as there is internal progress metadata to make sure files are ingested exactly once even though the same input folder is ingested again and again.
If all subtasks are completed, also call /tasks/execute
again with same task configs but a different taskName just to make sure no files are left behind. The response will say that no subtasks are generated when all files in the folder are ingested.