Skip to content

Commit

Permalink
start reverting my changes
Browse files Browse the repository at this point in the history
  • Loading branch information
adeet1 committed Mar 26, 2024
1 parent 9770a3a commit 664e0ef
Show file tree
Hide file tree
Showing 3 changed files with 229 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,24 @@ case class SimpleFeatureParquetSchema(sft: SimpleFeatureType, schema: MessageTyp
StorageConfiguration.SftNameKey -> sft.getTypeName,
StorageConfiguration.SftSpecKey -> SimpleFeatureTypes.encodeType(sft, includeUserData = true),
SchemaVersionKey -> version.toString,
GeoParquetSchemaKey -> null
// GeoParquetSchemaKey -> null
).asJava

def addBoundingBoxMetadata(bbox: Envelope): java.util.Map[String, String] = {
val bboxString = s"[${bbox.getMinX}, ${bbox.getMaxX}, ${bbox.getMinY}, ${bbox.getMaxY}]"
val result = SimpleFeatureParquetSchema.geoParquetMetadata(sft) + s""","bbox":${bboxString}}}}""""

// TODO: not an elegant way to do it
// somehow trying to mutate the map, e.g. by calling metadata.put(GeoParquetSchemaKey, result), causes empty parquet files to be written
val newMetadata: java.util.Map[String, String] = Map(
StorageConfiguration.SftNameKey -> metadata.get(StorageConfiguration.SftNameKey),
StorageConfiguration.SftSpecKey -> metadata.get(StorageConfiguration.SftSpecKey),
SchemaVersionKey -> metadata.get(SchemaVersionKey),
GeoParquetSchemaKey -> result
).asJava

newMetadata
}
// def addBoundingBoxMetadata(bbox: Envelope): java.util.Map[String, String] = {
// val bboxString = s"[${bbox.getMinX}, ${bbox.getMaxX}, ${bbox.getMinY}, ${bbox.getMaxY}]"
// val result = SimpleFeatureParquetSchema.geoParquetMetadata(sft) + s""","bbox":${bboxString}}}}""""
//
// // TODO: not an elegant way to do it
// // somehow trying to mutate the map, e.g. by calling metadata.put(GeoParquetSchemaKey, result), causes empty parquet files to be written
// val newMetadata: java.util.Map[String, String] = Map(
// StorageConfiguration.SftNameKey -> metadata.get(StorageConfiguration.SftNameKey),
// StorageConfiguration.SftSpecKey -> metadata.get(StorageConfiguration.SftSpecKey),
// SchemaVersionKey -> metadata.get(SchemaVersionKey),
// GeoParquetSchemaKey -> result
// ).asJava
//
// newMetadata
// }

/**
* Gets the name of the parquet field for the given simple feature type attribute
Expand Down Expand Up @@ -149,8 +149,8 @@ object SimpleFeatureParquetSchema {
Seq(
StorageConfiguration.SftNameKey,
StorageConfiguration.SftSpecKey,
SchemaVersionKey,
GeoParquetSchemaKey).foreach { key =>
SchemaVersionKey).foreach { key =>
// GeoParquetSchemaKey).foreach { key =>
val value = conf.get(key)
if (value != null) {
metadata.put(key, value)
Expand Down Expand Up @@ -201,7 +201,7 @@ object SimpleFeatureParquetSchema {

val schemaVersion = Option(metadata.get(SchemaVersionKey)).map(_.toInt).getOrElse(0)
val messageType = schemaVersion match {
case 2 => schema(sft)
case 2 => SimpleFeatureParquetSchemaV1(sft)
case 1 => SimpleFeatureParquetSchemaV1(sft)
case 0 => SimpleFeatureParquetSchemaV0(sft)
case v => throw new IllegalArgumentException(s"Unknown SimpleFeatureParquetSchema version: $v")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ object SimpleFeatureReadSupport {

private def geometry(schemaVersion: Int, binding: ObjectType, i: Int, callback: Settable): Converter = {
schemaVersion match {
case 2 => new GeometryWkbConverter(i, callback)
// case 2 => new GeometryWkbConverter(i, callback)
case 2 => geometryV0V1(binding, i, callback)
case 1 => geometryV0V1(binding, i, callback)
case 0 => geometryV0V1(binding, i, callback)
case v => throw new IllegalArgumentException(s"Unknown SimpleFeatureParquetSchema version: $v")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class SimpleFeatureWriteSupport(observer: BoundsObserver) extends WriteSupport[S
case _ => None
}
val bbox = envelope.getOrElse(throw new RuntimeException("Unable to get bounding box from observer"))
val newMetadata = schema.addBoundingBoxMetadata(bbox)
val newMetadata = schema.metadata//.addBoundingBoxMetadata(bbox)

new FinalizedWriteContext(newMetadata)
}
Expand Down Expand Up @@ -96,7 +96,7 @@ object SimpleFeatureWriteSupport {

def attribute(name: String, index: Int, bindings: Seq[ObjectType]): AttributeWriter[_] = {
bindings.head match {
case ObjectType.GEOMETRY => new GeometryWkbAttributeWriter(name, index) // TODO support z/m
case ObjectType.GEOMETRY => geometry(name, index, bindings.last) // TODO support z/m
case ObjectType.DATE => new DateWriter(name, index)
case ObjectType.STRING => new StringWriter(name, index)
case ObjectType.INT => new IntegerWriter(name, index)
Expand All @@ -112,6 +112,212 @@ object SimpleFeatureWriteSupport {
}
}

// TODO support z/m
private def geometry(name: String, index: Int, binding: ObjectType): AttributeWriter[_] = {
binding match {
case ObjectType.POINT => new PointAttributeWriter(name, index)
case ObjectType.LINESTRING => new LineStringAttributeWriter(name, index)
case ObjectType.POLYGON => new PolygonAttributeWriter(name, index)
case ObjectType.MULTIPOINT => new MultiPointAttributeWriter(name, index)
case ObjectType.MULTILINESTRING => new MultiLineStringAttributeWriter(name, index)
case ObjectType.MULTIPOLYGON => new MultiPolygonAttributeWriter(name, index)
case ObjectType.GEOMETRY => new GeometryWkbAttributeWriter(name, index)
case _ => throw new IllegalArgumentException(s"Can't serialize field '$name' of type $binding")
}
}

class PointAttributeWriter(name: String, index: Int) extends AttributeWriter[Point](name, index) {
override def write(consumer: RecordConsumer, value: Point): Unit = {
consumer.startGroup()
consumer.startField(SimpleFeatureParquetSchemaV1.GeometryColumnX, 0)
consumer.addDouble(value.getX)
consumer.endField(SimpleFeatureParquetSchemaV1.GeometryColumnX, 0)
consumer.startField(SimpleFeatureParquetSchemaV1.GeometryColumnY, 1)
consumer.addDouble(value.getY)
consumer.endField(SimpleFeatureParquetSchemaV1.GeometryColumnY, 1)
consumer.endGroup()
}
}

class LineStringAttributeWriter(name: String, index: Int) extends AttributeWriter[LineString](name, index) {
override def write(consumer: RecordConsumer, value: LineString): Unit = {
consumer.startGroup()
consumer.startField(SimpleFeatureParquetSchemaV1.GeometryColumnX, 0)
var i = 0
while (i < value.getNumPoints) {
consumer.addDouble(value.getCoordinateN(i).x)
i += 1
}
consumer.endField(SimpleFeatureParquetSchemaV1.GeometryColumnX, 0)
consumer.startField(SimpleFeatureParquetSchemaV1.GeometryColumnY, 1)
i = 0
while (i < value.getNumPoints) {
consumer.addDouble(value.getCoordinateN(i).y)
i += 1
}
consumer.endField(SimpleFeatureParquetSchemaV1.GeometryColumnY, 1)
consumer.endGroup()
}
}

class MultiPointAttributeWriter(name: String, index: Int) extends AttributeWriter[MultiPoint](name, index) {
override def write(consumer: RecordConsumer, value: MultiPoint): Unit = {
consumer.startGroup()
consumer.startField(SimpleFeatureParquetSchemaV1.GeometryColumnX, 0)
var i = 0
while (i < value.getNumPoints) {
consumer.addDouble(value.getGeometryN(i).asInstanceOf[Point].getX)
i += 1
}
consumer.endField(SimpleFeatureParquetSchemaV1.GeometryColumnX, 0)
consumer.startField(SimpleFeatureParquetSchemaV1.GeometryColumnY, 1)
i = 0
while (i < value.getNumPoints) {
consumer.addDouble(value.getGeometryN(i).asInstanceOf[Point].getY)
i += 1
}
consumer.endField(SimpleFeatureParquetSchemaV1.GeometryColumnY, 1)
consumer.endGroup()
}
}

abstract class AbstractLinesWriter[T <: Geometry](name: String, index: Int)
extends AttributeWriter[T](name, index) {

protected def lines(value: T): Seq[LineString]

override def write(consumer: RecordConsumer, value: T): Unit = {
val lines = this.lines(value)
consumer.startGroup()

consumer.startField(SimpleFeatureParquetSchemaV1.GeometryColumnX, 0)
consumer.startGroup()
consumer.startField("list", 0)
lines.foreach { line =>
consumer.startGroup()
writeLineStringX(consumer, line)
consumer.endGroup()
}
consumer.endField("list", 0)
consumer.endGroup()
consumer.endField(SimpleFeatureParquetSchemaV1.GeometryColumnX, 0)

consumer.startField(SimpleFeatureParquetSchemaV1.GeometryColumnY, 1)
consumer.startGroup()
consumer.startField("list", 0)
lines.foreach { line =>
consumer.startGroup()
writeLineStringY(consumer, line)
consumer.endGroup()
}
consumer.endField("list", 0)
consumer.endGroup()
consumer.endField(SimpleFeatureParquetSchemaV1.GeometryColumnY, 1)

consumer.endGroup()
}
}

class PolygonAttributeWriter(name: String, index: Int) extends AbstractLinesWriter[Polygon](name, index) {
override protected def lines(value: Polygon): Seq[LineString] =
Seq.tabulate(value.getNumInteriorRing + 1) { i =>
if (i == 0) {
value.getExteriorRing
} else {
value.getInteriorRingN(i - 1)
}
}
}

class MultiLineStringAttributeWriter(name: String, index: Int)
extends AbstractLinesWriter[MultiLineString](name, index) {
override protected def lines(value: MultiLineString): Seq[LineString] =
Seq.tabulate(value.getNumGeometries)(i => value.getGeometryN(i).asInstanceOf[LineString])
}

class MultiPolygonAttributeWriter(name: String, index: Int) extends AttributeWriter[MultiPolygon](name, index) {
override def write(consumer: RecordConsumer, value: MultiPolygon): Unit = {
val polys = Seq.tabulate(value.getNumGeometries) { i =>
val poly = value.getGeometryN(i).asInstanceOf[Polygon]
Seq.tabulate(poly.getNumInteriorRing + 1) { i =>
if (i == 0) {
poly.getExteriorRing
} else {
poly.getInteriorRingN(i - 1)
}
}
}
consumer.startGroup()

consumer.startField(SimpleFeatureParquetSchemaV1.GeometryColumnX, 0)
consumer.startGroup()
consumer.startField("list", 0)
polys.foreach { lines =>
consumer.startGroup()
consumer.startField("element", 0)
consumer.startGroup()
consumer.startField("list", 0)
lines.foreach { line =>
consumer.startGroup()
writeLineStringX(consumer, line)
consumer.endGroup()
}
consumer.endField("list", 0)
consumer.endGroup()
consumer.endField("element", 0)
consumer.endGroup()
}
consumer.endField("list", 0)
consumer.endGroup()
consumer.endField(SimpleFeatureParquetSchemaV1.GeometryColumnX, 0)

consumer.startField(SimpleFeatureParquetSchemaV1.GeometryColumnY, 1)
consumer.startGroup()
consumer.startField("list", 0)
polys.foreach { lines =>
consumer.startGroup()
consumer.startField("element", 0)
consumer.startGroup()
consumer.startField("list", 0)
lines.foreach { line =>
consumer.startGroup()
writeLineStringY(consumer, line)
consumer.endGroup()
}
consumer.endField("list", 0)
consumer.endGroup()
consumer.endField("element", 0)
consumer.endGroup()
}
consumer.endField("list", 0)
consumer.endGroup()
consumer.endField(SimpleFeatureParquetSchemaV1.GeometryColumnY, 1)

consumer.endGroup()
}
}

private def writeLineStringX(consumer: RecordConsumer, ring: LineString): Unit = {
consumer.startField("element", 0)
var i = 0
while (i < ring.getNumPoints) {
consumer.addDouble(ring.getCoordinateN(i).x)
i += 1
}
consumer.endField("element", 0)
}

private def writeLineStringY(consumer: RecordConsumer, ring: LineString): Unit = {
consumer.startField("element", 0)
var i = 0
while (i < ring.getNumPoints) {
consumer.addDouble(ring.getCoordinateN(i).y)
i += 1
}
consumer.endField("element", 0)
}


/**
* Writes a simple feature attribute to a Parquet file
*/
Expand Down

0 comments on commit 664e0ef

Please sign in to comment.