HDFS File formats and how to choose file formats

The default file format for Spark is Parquet, but as we discussed above, there are use cases where other formats are better suited, including:

  • SequenceFiles: Binary key/value pair that is a good choice for blob storage when the overhead of rich schema support is not required
  • Parquet: Supports efficient queries, strongly typed schemas, and has a number of other benefits not covered in this article
  • Avro: Ideal for large binary data or when downstream consumers read records in their entirety and also supports random seek access to records. Provides the ability to define a strongly typed schema.
  • JSON: Ideal when records are stored across a number of small files

By choosing the optimal HDFS file format for your Spark jobs, you can ensure they will efficiently utilize data center resources and best meet the needs of downstream consumers.

Imagery

Parquet is not optimal for storing large binary data such as imagery as it is a resource-intensive process to arrange large binary data in a columnar format. On the other hand, Avro works quite well for storing imagery. As discussed in the previous section, however, Avro is not optimal for queries.

In order to support queries, two columns are added to the imagery metadata Parquet files to serve as a foreign key to the imagery. This allows clients to query the imagery via the metadata.

Two key details need to be covered to understand how the cross-reference is implemented:

  1. Part File: Spark subdivides data into partitions and when the data is written to Avro each partition is written to a separate part file.
  2. Record Offset: The Avro API supports the ability to obtain the offset into a file where a specific record is stored. Given an offset, the Avro API can efficiently seek to the file location and read the record. This functionality is available via the native Avro API and not via the Spark wrapper API for Avro.

The two columns appended to the imagery metadata used as the cross-reference to the imagery are the name of the part file in which the image record is stored and the file offset of the record within the part file. Since the Spark wrapper API for Avro files does not expose the record offset, the native Avro API must be used to write the imagery.

Imagery is written within a Spark map. The implementation will vary depending on the version of Spark and whether the DataFrame or Resilient Distributed Dataset APIs are used, but the concept is the same. Execute a method on each partition via a Spark map call and use the native Avro API to write an individual part file that contains all imagery contained within the partition. The general steps are as follows:

  1. Read the ingest SequenceFile.
  2. Map each partition of the ingest SequenceFile and pass the partition id to the map function. For an RDD, call rdd.mapPartitionsWithIndex(). For a DataFrame, you can obtain the partition id via spark_partition_id(), group by partition id via df.groupByKey(), and then call df.flatMapGroups().
  3. Within the map function do the following:
    1. Create a standard Avro Writer (not Spark) and include the partition id within the file name.
    2. Iterate through each record of the ingest SequenceFile and write records to the Avro file.
    3. Call DataFileWriter.sync() within the Avro API. This will flush the record to disk and return the offset of the record.
    4. Pass both the file name and record offset via the return value of the map function along with any additional metadata you would like to extract from the imagery.
  4. Save the resulting DataFrame or RDD to Parquet format.

The results are an Avro and a companion Parquet file. The Avro file contains the imagery and the companion Parquet file contains the Avro file path and record offset to efficiently perform a seek on the Avro file for a given image record. The general pattern for querying and then reading the imagery records is to:

  1. Query the Parquet files.
  2. Include the file path and offset in the results.
  3. Optionally repartition the results to tune the degree of parallelism for reading the image records.
  4. Map each partition of the query results.
  5. Within the map function do the following:
    1. Create a standard Avro reader for the Avro part file that contains the image record.
    2. Call DataFileReader.seek(long) to read the image record at the specified offset.

Aggregated data

In addition to metadata for a given image, it is useful for us to store aggregated metadata about the entire set of imagery stored in a given Avro and Parquet file pair. For Uber’s use case, examples of aggregated metadata include both:

  • The version of the pipeline used to process a given set of imagery. If a bug is found in the pipeline, this data is used to efficiently identify the imagery that needs to be reprocessed.
  • The geographic area in which the imagery was collected. This allows clients to identify which Avro and Parquet file pairs to include in geospatial searches.

Aggregated data is stored in JSON files for the following reasons:

  1. Debuggability: Since the JSON files are formatted text and typically contain a small number of records, they can be easily displayed without code or special tooling.
  2. “Efficient Enough” Reads: In many cases, the JSON file will contain a single record for a given Avro and Parquet pair. Both Parquet and Avro have overhead because both file formats contain header information. JSON does not have this overhead because of the formats’ lack of header information.
  3. Referential Integrity: An alternative would be to store the aggregated records in a database. However if the JSON, Avro, and Parquet files for a given set of imagery are stored in a single parent directory, then the imagery, imagery metadata, and aggregated metadata can be archived by moving the parent directory with a single atomic HDFS operation.

Popular posts from this blog

How to change column name in Dataframe and selection of few columns in Dataframe using Pyspark with example

What is Garbage collection in Spark and its impact and resolution

Window function in PySpark with Joins example using 2 Dataframes (inner join)