Skip to content

Commit

Permalink
wip implementing callback
Browse files Browse the repository at this point in the history
try more stuff

Revert "try more stuff"

This reverts commit 394143b.

Revert "wip implementing callback"

This reverts commit 9a1146b.

implement callback

add action to callback class, but it didn't seem to make any difference

invoke callback in onClose method

a thought

create UpdateObserver in OrcFileSystemStorage

cleanup

concatenate an update observer to the one we passed in
  • Loading branch information
adeet1 committed Jun 3, 2024
1 parent f2300d6 commit 9ad9332
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.locationtech.geomesa.fs.storage.api.StorageMetadata.StorageFileAction
import org.locationtech.geomesa.fs.storage.api.StorageMetadata._
import org.locationtech.geomesa.fs.storage.api._
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.{FileSystemPathReader, MetadataObserver, WriterConfig}
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.CompositeObserver
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.{CompositeObserver, NoOpObserver}
import org.locationtech.geomesa.fs.storage.common.observer.{BoundsObserver, FileSystemObserver, FileSystemObserverFactory}
import org.locationtech.geomesa.fs.storage.common.utils.StorageUtils.FileType
import org.locationtech.geomesa.fs.storage.common.utils.StorageUtils.FileType.FileType
Expand Down Expand Up @@ -67,11 +67,13 @@ abstract class AbstractFileSystemStorage(
/**
* Create a writer for the given file
*
* @param partition the partition that the file belongs to
* @param action whether to append or modify
* @param file file to write to
* @param observer observer to report stats on the data written
* @return
*/
protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter
protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: FileSystemObserver): FileSystemWriter

/**
* Create a path reader with the given filter and transform
Expand Down Expand Up @@ -234,11 +236,11 @@ abstract class AbstractFileSystemStorage(
def pathAndObserver: WriterConfig = {
val path = StorageUtils.nextFile(context.root, partition, metadata.leafStorage, extension, fileType)
PathCache.register(context.fc, path)
val updateObserver = new UpdateObserver(partition, path, action)
val observer = if (observers.isEmpty) { updateObserver.asInstanceOf[BoundsObserver] } else {
new CompositeObserver(observers.map(_.apply(path)).+:(updateObserver)).asInstanceOf[BoundsObserver]
val noopObserver = NoOpObserver
val observer = if (observers.isEmpty) { noopObserver } else {
new CompositeObserver(observers.map(_.apply(path)).+:(noopObserver)).asInstanceOf[BoundsObserver]
}
WriterConfig(path, observer)
WriterConfig(partition, action, path, observer)
}

targetSize(targetFileSize) match {
Expand All @@ -247,7 +249,7 @@ abstract class AbstractFileSystemStorage(
}
}

private def createWriter(config: WriterConfig): FileSystemWriter = createWriter(config.path, config.observer)
private def createWriter(config: WriterConfig): FileSystemWriter = createWriter(config.partition, config.action, config.path, config.observer)

/**
* Writes files up to a given size, then starts a new file
Expand Down Expand Up @@ -396,5 +398,5 @@ object AbstractFileSystemStorage {
protected def onClose(bounds: Envelope, count: Long): Unit
}

private case class WriterConfig(path: Path, observer: BoundsObserver)
private case class WriterConfig(partition: String, action: StorageFileAction, path: Path, observer: BoundsObserver)
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.geotools.api.feature.simple.SimpleFeatureType
import org.geotools.api.filter.Filter
import org.locationtech.geomesa.convert2.SimpleFeatureConverter
import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter
import org.locationtech.geomesa.fs.storage.api.StorageMetadata.StorageFileAction.StorageFileAction
import org.locationtech.geomesa.fs.storage.api.StorageMetadata.{StorageFile, StorageFilePath}
import org.locationtech.geomesa.fs.storage.api._
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage
Expand All @@ -30,7 +31,7 @@ class ConverterStorage(context: FileSystemContext, metadata: StorageMetadata, co
// actually need to be closed, and since they will only open a single connection per converter, the
// impact should be low

override protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter =
override protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: FileSystemObserver): FileSystemWriter =
throw new NotImplementedError()

override protected def createReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import org.geotools.api.feature.simple.SimpleFeatureType
import org.geotools.api.filter.Filter
import org.locationtech.geomesa.filter.factory.FastFilterFactory
import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter
import org.locationtech.geomesa.fs.storage.api.StorageMetadata.StorageFileAction.StorageFileAction
import org.locationtech.geomesa.fs.storage.api._
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.FileSystemPathReader
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.CompositeObserver
import org.locationtech.geomesa.fs.storage.common.observer.{BoundsObserver, FileSystemObserver}
import org.locationtech.geomesa.utils.geotools.ObjectType
import org.locationtech.geomesa.utils.geotools.ObjectType.ObjectType
import org.locationtech.jts.geom.Geometry
Expand All @@ -32,8 +34,11 @@ import org.locationtech.jts.geom.Geometry
class OrcFileSystemStorage(context: FileSystemContext, metadata: StorageMetadata)
extends AbstractFileSystemStorage(context, metadata, OrcFileSystemStorage.FileExtension) {

override protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter =
new OrcFileSystemWriter(metadata.sft, context.conf, file, observer)
override protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: FileSystemObserver): FileSystemWriter = {
val updateObserver = new UpdateObserver(partition, file, action)
val compositeObserver = new CompositeObserver(Seq(updateObserver, observer))
new OrcFileSystemWriter(metadata.sft, context.conf, file, compositeObserver)
}

override protected def createReader(
filter: Option[Filter],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.geotools.api.filter.Filter
import org.locationtech.geomesa.filter.factory.FastFilterFactory
import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter
import org.locationtech.geomesa.fs.storage.api.StorageMetadata.StorageFileAction.StorageFileAction
import org.locationtech.geomesa.fs.storage.api.StorageMetadata.{PartitionBounds, PartitionMetadata, StorageFile, StorageFileAction}
import org.locationtech.geomesa.fs.storage.api._
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.FileSystemPathReader
import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration
Expand All @@ -26,6 +28,7 @@ import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFac
import org.locationtech.geomesa.fs.storage.common.{AbstractFileSystemStorage, FileValidationEnabled}
import org.locationtech.geomesa.fs.storage.parquet.ParquetFileSystemStorage.ParquetFileSystemWriter
import org.locationtech.geomesa.utils.io.CloseQuietly
import org.locationtech.jts.geom.Envelope

/**
*
Expand All @@ -35,10 +38,18 @@ import org.locationtech.geomesa.utils.io.CloseQuietly
class ParquetFileSystemStorage(context: FileSystemContext, metadata: StorageMetadata)
extends AbstractFileSystemStorage(context, metadata, ParquetFileSystemStorage.FileExtension) {

override protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter = {
class StorageMetadataCallback(partition: String, action: StorageFileAction, file: Path) extends ((Envelope, Long) => Unit) {
override def apply(env: Envelope, count: Long): Unit = {
val files = Seq(StorageFile(file.getName, System.currentTimeMillis(), action))
println(s"[ParquetFileSystemStorage] Adding metadata to the partition containing these files: $files")
metadata.addPartition(PartitionMetadata(partition, files, PartitionBounds(env), count))
}
}

override protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: FileSystemObserver): FileSystemWriter = {
val sftConf = new Configuration(context.conf)
StorageConfiguration.setSft(sftConf, metadata.sft)
new ParquetFileSystemWriter(metadata.sft, file, sftConf, observer)
new ParquetFileSystemWriter(metadata.sft, file, sftConf, observer, new StorageMetadataCallback(partition, action, file))
}

override protected def createReader(
Expand Down Expand Up @@ -74,10 +85,11 @@ object ParquetFileSystemStorage extends LazyLogging {
sft: SimpleFeatureType,
file: Path,
conf: Configuration,
observer: FileSystemObserver = NoOpObserver
observer: FileSystemObserver = NoOpObserver,
callback: (Envelope, Long) => Unit
) extends FileSystemWriter {

private val writer = SimpleFeatureParquetWriter.builder(file, conf).build()
private val writer = SimpleFeatureParquetWriter.builder(file, conf, callback).build()

override def write(f: SimpleFeature): Unit = {
writer.write(f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.hadoop.{ParquetFileWriter, ParquetWriter}
import org.geotools.api.feature.simple.SimpleFeature
import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureWriteSupport
import org.locationtech.jts.geom.Envelope

object SimpleFeatureParquetWriter extends LazyLogging {

def builder(file: Path, conf: Configuration): Builder = {
def builder(file: Path, conf: Configuration, callback: (Envelope, Long) => Unit = ((_, _) => {})): Builder = {
val codec = CompressionCodecName.fromConf(conf.get("parquet.compression", "SNAPPY"))
logger.debug(s"Using Parquet Compression codec ${codec.name()}")
new Builder(file)
new Builder(file, callback)
.withConf(conf)
.withCompressionCodec(codec)
.withDictionaryEncoding(true)
Expand All @@ -36,10 +37,10 @@ object SimpleFeatureParquetWriter extends LazyLogging {
.withRowGroupSize(8*1024*1024)
}

class Builder private [SimpleFeatureParquetWriter] (file: Path)
class Builder private [SimpleFeatureParquetWriter] (file: Path, callback: (Envelope, Long) => Unit)
extends ParquetWriter.Builder[SimpleFeature, Builder](file) {
override def self(): Builder = this
override protected def getWriteSupport(conf: Configuration): WriteSupport[SimpleFeature] =
new SimpleFeatureWriteSupport
new SimpleFeatureWriteSupport(callback: (Envelope, Long) => Unit)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import java.nio.ByteBuffer
import java.util.{Date, UUID}
import scala.collection.JavaConverters._

class SimpleFeatureWriteSupport extends WriteSupport[SimpleFeature] {
class SimpleFeatureWriteSupport(callback: (Envelope, Long) => Unit) extends WriteSupport[SimpleFeature] {

private class MultipleGeometriesObserver extends MetadataObserver {
private var count: Long = 0L
Expand Down Expand Up @@ -57,7 +57,13 @@ class SimpleFeatureWriteSupport extends WriteSupport[SimpleFeature] {

def getBoundingBoxes: Array[Envelope] = bounds

override protected def onClose(bounds: Envelope, count: Long): Unit = {}
def getCount: Long = count

// Invoke the callback function for adding bounds to the storage metadata partition
override protected def onClose(bounds: Envelope, count: Long): Unit = {
println("MultipleGeometriesObserver onClose")
callback(bounds, count)
}
}

private val observer = new MultipleGeometriesObserver
Expand All @@ -82,6 +88,7 @@ class SimpleFeatureWriteSupport extends WriteSupport[SimpleFeature] {
override def finalizeWrite(): FinalizedWriteContext = {
// Get the bounding boxes that span each geometry type
val bboxes = observer.getBoundingBoxes
observer.close()

// If the SFT has no geometries, then there's no need to create GeoParquet metadata
if (bboxes.isEmpty) {
Expand Down

0 comments on commit 9ad9332

Please sign in to comment.