SEG-Y to Parquet: How to make large geophysical data files suitable for distributed processing

Cyril Chirkunov
6 min readDec 16, 2020

SEG-Y is a quite convenient format for recording seismic exploration data. You may find a lot of Python/C++ open-source packages to deal with it. However, a gap exists between them and popular distributed frameworks, such as Apache Spark/SparkSQL/Hive and Python Dask, because these cluster-computing technologies support just a few common ways to arrange big data and, unfortunately, know nothing about how to operate with SEG-Y. To address this issue, I will explain the possible option of how to convert an SEG-Y binary to a ubiquitous Parquet format through a Map-Reduce job (Java).

Customizing MapReduce FileInputFormat

As usual, seismic images take up from gigabytes to terabytes, so it looks reasonable to arrange HDFS support for the parallel workload. First of all, we should define the code that splits large binaries into chunks, defining FileInputFormat, RecordReader. To make partitions we should take into consideration such things as:

  • Number of cluster nodes,
  • Trace records,
  • Requirements of processing algorithm that transforms data on the next steps.

Given the large size of SEG-Y, it seems it’s not a good practice to split it into 64 MB chunks because it will result in thousands of parts per each seismic image and, thus, an increase in the workload on the HDFS during further processing. It would be better to start with hundreds of seismic records per file and adjust the final chunk size experimentally.

So, what we should do at first is to extract traces per record (3212–3213 bytes), data samples per trace (3220–3221 bytes), and a size in bytes required to store one data sample (3224–3225 bytes). Knowing a trace header size, we may find the amount of space necessary for one trace. After that, we multiply it on traces per record size and get an initial size of a partition.

Using a special coefficient PARTITIONS_MULTIPLIER(i.e. 10 looks reasonable in most cases), we can adjust a final partition size, like in the code below:

private int adjustSplitLength(Path file, JobContext job) {
// ...
int traceNBytes = TRACE_HEADER_SIZE + nSamples * nBytes;
return traceNBytes * tracesPerRecord * PARTITIONS_MULTIPLIER;
}

Further, we implement FileInputFormat.getSplits method in SEGYInputFormat class, skipping 3600 bytes of SEG-Y file header (FILE_HEADER_SIZE)since we’ve already got all that was needed:

public class SEGYInputFormat extends FileInputFormat<...,...> {

@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
//...
for (FileStatus file : files) {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
long length = file.getLen();
long splitSize = adjustSplitLength(file.getPath(), job);
long bytesRemaining = length - FILE_HEADER_SIZE;
while (((double) bytesRemaining) / splitSize > 1) {
int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
System.out.println(length - bytesRemaining);
splits.add(makeSplit(path, length - bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
}
// ...
return splits;
}
// ...
}

It remains to create a custom reader RecordReader<key, value> for completing the intermediate part. Let’s move trace data into a Writable value and trace header info into a WritableComparable key respectively.

A tricky point is that data samples (an essential part of the value) can be stored in SEG-Y using different types: int, short, float, or byte. For the sake of simplicity, we will consider the final type of each sample as double. The example below illustrates how to read a data point from a byte stream with trace data:

public static double readFrom(int format, DataInputStream dis) throws IOException, IllegalArgumentException{
int res = 4; // by default
switch (format) {
case(1):
return floatFromBytes(dis); // IBM hexadecimal floating point
case(2):
return dis.readInt(); // two's complement integer
case (3):
return dis.readShort(); // two's complement short
case (4):
throw new IllegalArgumentException("Not supported"); // fixed-point with gain (obsolete)
case (5):
return dis.readFloat(); // 4-byte IEEE floating point
case(6):
throw new IllegalArgumentException("Not supported");
case(7):
throw new IllegalArgumentException("Not supported");
case(8):
return dis.read(); // read one byte
default:
return dis.readInt();
}
}

So, in the end, we come to a serializable DoubleWritable array of data samples.

Regarding custom key implementation, you should keep in mind it mainly corresponds to a row in a parquet file. We have picked up just a few fields from the original trace header, such as a unique trace identifier within SEG-Y (4–7 bytes of a trace header), a field record number (8–11 bytes), a distance between a signal source and a receiver (36–39 bytes), source coordinates (72–75 bytes and 76–79 bytes respectively), in-line and cross-line identifiers (188–191 bytes and 192–195 bytes respectively), and a sample interval (116–117 bytes, in milliseconds).

public class TraceHeaderWritable implements WritableComparable {

public void fromBytes(byte[]traceHeaderBytes){
ByteBuffer bb = ByteBuffer.wrap(traceHeaderBytes);
bb.position(TRACEID_OFFSET);
traceID = bb.getInt();
bb.position(FRN_OFFSET);
fieldRecordNumberID = bb.getInt();
bb.position(DST_SRG_OFFSET);
distSRG = bb.getInt();
bb.position(SRCX_OFFSET);
srcX = bb.getInt();
bb.position(SRCY_OFFSET);
srcY = bb.getInt();
bb.position(SI_OFFSET);
sI = bb.getShort();
bb.position(IL_OFFSET);
ilineID = bb.getInt();
bb.position(XL_OFFSET);
xlineID = bb.getInt();
}

@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(traceID);
dataOutput.writeInt(fieldRecordNumberID);
dataOutput.writeInt(distSRG);
dataOutput.writeInt(srcX);
dataOutput.writeInt(srcY);
dataOutput.writeShort(sI);
dataOutput.writeInt(ilineID);
dataOutput.writeInt(xlineID);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
traceID = dataInput.readInt();
fieldRecordNumberID = dataInput.readInt();
distSRG = dataInput.readInt();
srcX = dataInput.readInt();
srcY = dataInput.readInt();
sI = dataInput.readShort();
ilineID = dataInput.readInt();
xlineID = dataInput.readInt();
}

@Override
public int compareTo(Object o) {
TraceHeaderWritable tw = (TraceHeaderWritable)o;
return traceID - tw.traceID;
}
}

Having custom key and value pairs, writing a custom RecordReader seems not a difficult task. Anyway, you may check the final version of the code by the link at the end of the post.

Parquet records from Protobuf messages

As for composing custom Parquet files using Map-Reduce, there are just a few hints presented at Cloudera docs and on GitHub, which say almost nothing about how to operate with custom Group objects. And we will try to eliminate this shortcoming below.

Each Parquet record can be represented as a Protobuf message. In our case, we get the following schema:

private static String writeSchema = "message Trace {\n" +
"required int32 traceID = 1;\n" +
"required int32 fieldRecordNumberID = 2;\n" +
"required int32 distSRG = 3;\n" +
"required int32 srcX = 4;\n" +
"required int32 srcY = 5;\n" +
"required int32 sI = 6;\n" +
"required int32 ilineID = 7;\n" +
"required int32 xlineID = 8;\n" +
"repeated double traceData = 9;\n" +
"}";

As you may notice, each message includes single fields from the trace header and a double array of data samples.

Given the defined schema, implementation of WriteSupport looks as follows:

public class TraceGroupWriteSupport extends WriteSupport<Group> {

private static String writeSchema = //...

private MessageType schema;
private GroupWriter groupWriter;
private Map<String, String> extraMetaData;

public TraceGroupWriteSupport() {
MessageType messageType = MessageTypeParser.parseMessageType(writeSchema);
this.schema = messageType;
this.extraMetaData = new HashMap<String, String>();
}

public static MessageType getSchema(){
return MessageTypeParser.parseMessageType(writeSchema);
}

@Override
public String getName() {
return "trace";
}

@Override
public WriteContext init(Configuration configuration) {
return new WriteContext(MessageTypeParser.parseMessageType(writeSchema), this.extraMetaData);
}

@Override
public void prepareForWrite(RecordConsumer recordConsumer) {
groupWriter = new GroupWriter(recordConsumer, schema);
}

@Override
public void write(Group record) {
groupWriter.write(record);
}
}

Now let’s ponder a bit about how mapping from SEGYInputFormat to ParquetOutputFormat can be arranged. According to the Cloudera example, each (TraceHeaderWritable, TraceWritable) pair should be transformed to (Void, Group). It boils down to the manual creation of Group objects following the aforementioned Protobuf schema.

public static class MapClass extends Mapper<TraceHeaderWritable, TraceWritable, Void, Group> {

@Override
protected void map(TraceHeaderWritable key, TraceWritable tw, Context context) throws IOException, InterruptedException {
Group group = new SimpleGroup(TraceGroupWriteSupport.getSchema());
TraceHeaderWritable thw = key;
// protobuf map order: (1->0), (2->1), (3->2) ...
group.add(0, thw.getTraceID());
group.add(1, thw.getFieldRecordNumberID());
group.add(2, thw.getDistSRG());
group.add(3, thw.getSrcX());
group.add(4, thw.getSrcY());
group.add(5, (int) thw.getSI());
group.add(6, thw.getILineID());
group.add(7, thw.getXLineID());
// write an array of samples data
for(double item:tw.getTraceDataDouble()){
group.add(8, item);
}
context.write(null, group);

}
}

Getting the job done

Our final accord is the job that takes as an input a folder with SEG-Y file, reads geophysical data using custom input format, and saves it into an output folder as a set of Parquet files.

Configuration conf = getConf();

// set 512 MB block size for parquet(+ dfs)
conf.setInt("dfs.blocksize", 512 * 1024 * 1024);
conf.setInt("parquet.block.size", 512 * 1024 * 1024);

Job job = Job.getInstance(conf, "Converting SEGY to Parguet");
Path in = new Path(args[0]);
Path out = new Path(args[1]);
Path outSub = new Path(UUID.randomUUID().toString());
out = Path.mergePaths(out, outSub);
job.setJarByClass(ConverterJob.class);
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setMapperClass(ConverterJob.MapClass.class);
// Default Parquet mapper maps (k,v) to (Void, Group) pair
job.setMapOutputKeyClass(Void.class);
job.setMapOutputValueClass(Group.class);

job.setNumReduceTasks(0);
job.setInputFormatClass(SEGYInputFormat.class);
job.setOutputFormatClass(ParquetOutputFormat.class);

// Enable SNAPPY compression to make result parquet files more compact
ParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
ParquetOutputFormat.setCompressOutput(job, true);
ParquetOutputFormat.setWriteSupportClass(job, TraceGroupWriteSupport.class);

Here is how it could be depicted. Please notice, given the partition, the size of file system blocks was increased. Additionally, we set a SNAPPY compression, thus, reducing the size of Parquet files by approximately 10%.

Conclusion

We considered an end-to-end process of converting the SEG-Y (rev 1) format to Parquet based on the Map-Reduce job. During this, the custom input format along with the map function was developed.

I hope this post was helpful for you.
The full version of the code is located here.

--

--