Skip to content

Commit 724b6aa

Browse files
authored
Improved support for structures (flow-php#587)
1 parent 37d509d commit 724b6aa

File tree

26 files changed

+606
-218
lines changed

26 files changed

+606
-218
lines changed

composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
},
4040
"require-dev": {
4141
"aeon-php/calendar": "^1.0",
42+
"fakerphp/faker": "^1.23",
4243
"fig/log-test": "^1.1",
4344
"jawira/case-converter": "^3.4",
4445
"laravel/serializable-closure": "^1.1",

composer.lock

Lines changed: 69 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/adapter/etl-adapter-avro/src/Flow/ETL/Adapter/Avro/FlixTech/AvroLoader.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public function closure(Rows $rows, FlowContext $context) : void
5656
}
5757

5858
$context->streams()->close($this->path);
59+
$this->writer = null;
5960
}
6061

6162
public function destination() : Path
@@ -88,6 +89,7 @@ public function load(Rows $rows, FlowContext $context) : void
8889
$rowData[$entry->name()] = match (\get_class($entry)) {
8990
Row\Entry\ListEntry::class => $this->listEntryToValues($entry),
9091
DateTimeEntry::class => (int) $entry->value()->format('Uu'),
92+
Row\Entry\UuidEntry::class => $entry->value()->toString(),
9193
Row\Entry\EnumEntry::class => $entry->value()->name,
9294
default => $entry->value(),
9395
};
@@ -102,6 +104,22 @@ private function listEntryToValues(Row\Entry\ListEntry $entry) : array
102104
$listType = $entry->definition()->metadata()->get(Schema\FlowMetadata::METADATA_LIST_ENTRY_TYPE);
103105

104106
if ($listType instanceof ObjectType) {
107+
if (\is_a($listType->class, Row\Entry\Type\Uuid::class, true)) {
108+
/** @var array<string> $data */
109+
$data = [];
110+
111+
/**
112+
* @psalm-suppress MixedAssignment
113+
* @psalm-suppress MixedMethodCall
114+
*/
115+
foreach ($entry->value() as $value) {
116+
/** @phpstan-ignore-next-line */
117+
$data[] = $value->toString();
118+
}
119+
120+
return $data;
121+
}
122+
105123
if (\is_a($listType->class, \DateTimeInterface::class, true)) {
106124
/** @var array<int> $data */
107125
$data = [];

src/adapter/etl-adapter-avro/src/Flow/ETL/Adapter/Avro/FlixTech/SchemaConverter.php

Lines changed: 44 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,13 @@
1616
use Flow\ETL\Row\Entry\ListEntry;
1717
use Flow\ETL\Row\Entry\NullEntry;
1818
use Flow\ETL\Row\Entry\StringEntry;
19+
use Flow\ETL\Row\Entry\StructureEntry;
1920
use Flow\ETL\Row\Entry\TypedCollection\ObjectType;
2021
use Flow\ETL\Row\Entry\TypedCollection\ScalarType;
22+
use Flow\ETL\Row\Entry\UuidEntry;
2123
use Flow\ETL\Row\Schema;
2224
use Flow\ETL\Row\Schema\Definition;
25+
use Flow\ETL\Row\Schema\FlowMetadata;
2326

2427
final class SchemaConverter
2528
{
@@ -34,20 +37,7 @@ public function toAvroJsonSchema(Schema $schema) : string
3437
);
3538
}
3639

37-
if (\count($definition->types()) === 2 && $definition->isNullable()) {
38-
/** @var class-string<Entry> $type */
39-
$type = \current(\array_diff($definition->types(), [NullEntry::class]));
40-
$fields[] = $this->convertType($type, $definition);
41-
}
42-
43-
if (\count($definition->types()) === 1) {
44-
$type = \current($definition->types());
45-
$fields[] = $this->convertType($type, $definition);
46-
}
47-
48-
if ((\count($definition->types()) === 2 && !$definition->isNullable()) || \count($definition->types()) > 2) {
49-
throw new RuntimeException('Union types are not supported yet. Invalid type: ' . $definition->entry()->name());
50-
}
40+
$fields[] = $this->convert($definition);
5141
}
5242

5343
return \json_encode([
@@ -63,8 +53,10 @@ public function toAvroJsonSchema(Schema $schema) : string
6353
*
6454
* @return array{name: string, type: string}
6555
*/
66-
private function convertType(string $type, Definition $definition) : array
56+
private function convert(Definition $definition) : array
6757
{
58+
$type = $this->typeFromDefinition($definition);
59+
6860
if ($type === ListEntry::class) {
6961
$listType = $definition->metadata()->get(Schema\FlowMetadata::METADATA_LIST_ENTRY_TYPE);
7062

@@ -86,8 +78,30 @@ private function convertType(string $type, Definition $definition) : array
8678
}
8779
}
8880

81+
if ($type === StructureEntry::class) {
82+
/** @var array<string, Definition> $structureDefinitions */
83+
$structureDefinitions = $definition->metadata()->get(FlowMetadata::METADATA_STRUCTURE_DEFINITIONS);
84+
85+
$structConverter = function (array $definitions) use (&$structConverter) : array {
86+
$structureFields = [];
87+
88+
/** @var Definition $definition */
89+
foreach ($definitions as $name => $definition) {
90+
if (!\is_array($definition)) {
91+
$structureFields[] = $this->convert($definition);
92+
} else {
93+
$structureFields[] = ['name' => $name, 'type' => ['name' => \ucfirst($name), 'type' => \AvroSchema::RECORD_SCHEMA, 'fields' => $structConverter($definition)]];
94+
}
95+
}
96+
97+
return $structureFields;
98+
};
99+
100+
return ['name' => $definition->entry()->name(), 'type' => ['name' => \ucfirst($definition->entry()->name()), 'type' => \AvroSchema::RECORD_SCHEMA, 'fields' => $structConverter($structureDefinitions)]];
101+
}
102+
89103
return match ($type) {
90-
StringEntry::class, JsonEntry::class => ['name' => $definition->entry()->name(), 'type' => \AvroSchema::STRING_TYPE],
104+
StringEntry::class, JsonEntry::class, UuidEntry::class => ['name' => $definition->entry()->name(), 'type' => \AvroSchema::STRING_TYPE],
91105
EnumEntry::class => [
92106
'name' => $definition->entry()->name(),
93107
'type' => [
@@ -107,4 +121,18 @@ private function convertType(string $type, Definition $definition) : array
107121
default => throw new RuntimeException($type . ' is not yet supported.')
108122
};
109123
}
124+
125+
private function typeFromDefinition(Definition $definition) : string
126+
{
127+
if ($definition->isNullable() && \count($definition->types()) === 2) {
128+
/** @var class-string<Entry> $type */
129+
$type = \current(\array_diff($definition->types(), [NullEntry::class]));
130+
} elseif (\count($definition->types()) === 1) {
131+
$type = \current($definition->types());
132+
} else {
133+
throw new RuntimeException('Union types are not supported by Avro file format. Invalid type: ' . $definition->entry()->name());
134+
}
135+
136+
return $type;
137+
}
110138
}

src/adapter/etl-adapter-avro/tests/Flow/ETL/Adapter/Avro/FlixTech/Tests/Integration/AvroTest.php

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,19 @@ public function test_writing_and_reading_avro_with_all_supported_types() : void
113113
Entry::json_object('json_object', ['id' => 1, 'name' => 'test']),
114114
Entry::json('json', [['id' => 1, 'name' => 'test'], ['id' => 2, 'name' => 'test']]),
115115
Entry::list_of_string('list_of_strings', ['a', 'b', 'c']),
116-
Entry::list_of_datetime('list_of_datetimes', [new \DateTimeImmutable(), new \DateTimeImmutable(), new \DateTimeImmutable()])
116+
Entry::list_of_datetime('list_of_datetimes', [new \DateTimeImmutable(), new \DateTimeImmutable(), new \DateTimeImmutable()]),
117+
Entry::structure(
118+
'address',
119+
Entry::string('street', 'street_' . $i),
120+
Entry::string('city', 'city_' . $i),
121+
Entry::string('zip', 'zip_' . $i),
122+
Entry::string('country', 'country_' . $i),
123+
Entry::structure(
124+
'location',
125+
Entry::float('lat', 1.5),
126+
Entry::float('lon', 1.5)
127+
)
128+
),
117129
);
118130
}, \range(1, 100))
119131
)

src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/JSONMachine/JsonExtractor.php

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use Flow\ETL\FlowContext;
1212
use Flow\ETL\Row;
1313
use JsonMachine\Items;
14+
use JsonMachine\JsonDecoder\ExtJsonDecoder;
1415

1516
final class JsonExtractor implements Extractor
1617
{
@@ -51,11 +52,13 @@ public function extract(FlowContext $context) : \Generator
5152
}
5253

5354
/**
54-
* @return array{pointer?: string}
55+
* @return array{pointer?: string, decoder: ExtJsonDecoder}
5556
*/
5657
private function readerOptions() : array
5758
{
58-
$options = [];
59+
$options = [
60+
'decoder' => new ExtJsonDecoder(true),
61+
];
5962

6063
if ($this->pointer !== null) {
6164
$options['pointer'] = $this->pointer;

src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/JsonLoader.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ public function __unserialize(array $data) : void
4646
public function closure(Rows $rows, FlowContext $context) : void
4747
{
4848
foreach ($context->streams() as $stream) {
49-
$this->close($stream);
49+
if ($stream->path()->extension() === 'json') {
50+
$this->close($stream);
51+
}
5052
}
5153

5254
$context->streams()->close($this->path);

0 commit comments

Comments
 (0)