001/**
002 * Copyright (C) 2006-2024 Talend Inc. - www.talend.com
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.talend.sdk.component.api.record;
017
018import java.io.StringReader;
019import java.math.BigDecimal;
020import java.nio.charset.Charset;
021import java.nio.charset.CharsetEncoder;
022import java.nio.charset.StandardCharsets;
023import java.time.temporal.Temporal;
024import java.util.Arrays;
025import java.util.Base64;
026import java.util.Collection;
027import java.util.Comparator;
028import java.util.Date;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Map;
032import java.util.Objects;
033import java.util.Optional;
034import java.util.concurrent.atomic.AtomicInteger;
035import java.util.function.BiConsumer;
036import java.util.function.Function;
037import java.util.function.Supplier;
038import java.util.stream.Collectors;
039import java.util.stream.Stream;
040
041import javax.json.Json;
042import javax.json.JsonValue;
043import javax.json.bind.annotation.JsonbTransient;
044import javax.json.stream.JsonParser;
045
046import lombok.EqualsAndHashCode;
047import lombok.RequiredArgsConstructor;
048import lombok.ToString;
049
050public interface Schema {
051
052    /**
053     * @return the type of this schema.
054     */
055    Type getType();
056
057    /**
058     * @return the nested element schema for arrays.
059     */
060    Schema getElementSchema();
061
062    /**
063     * @return the data entries for records (not contains meta data entries).
064     */
065    List<Entry> getEntries();
066
067    /**
068     * @return the metadata entries for records (not contains ordinary data entries).
069     */
070    List<Entry> getMetadata();
071
072    /**
073     * @return All entries, including data and metadata, of this schema.
074     */
075    Stream<Entry> getAllEntries();
076
077    default Map<String, Entry> getEntryMap() {
078        throw new UnsupportedOperationException("#getEntryMap is not implemented");
079    }
080
081    /**
082     * Get a Builder from the current schema.
083     *
084     * @return a {@link Schema.Builder}
085     */
086    default Schema.Builder toBuilder() {
087        throw new UnsupportedOperationException("#toBuilder is not implemented");
088    }
089
090    /**
091     * Get all entries sorted by schema designed order.
092     *
093     * @return all entries ordered
094     */
095    default List<Entry> getEntriesOrdered() {
096        return getEntriesOrdered(naturalOrder());
097    }
098
099    /**
100     * Get all entries sorted using a custom comparator.
101     *
102     * @param comparator the comparator
103     *
104     * @return all entries ordered with provided comparator
105     */
106    @JsonbTransient
107    default List<Entry> getEntriesOrdered(final Comparator<Entry> comparator) {
108        return getAllEntries().sorted(comparator).collect(Collectors.toList());
109    }
110
111    /**
112     * Get the EntriesOrder defined with Builder.
113     *
114     * @return the EntriesOrder
115     */
116
117    default EntriesOrder naturalOrder() {
118        throw new UnsupportedOperationException("#naturalOrder is not implemented");
119    }
120
121    default Entry getEntry(final String name) {
122        return getEntryMap().get(name);
123    }
124
125    /**
126     * @return the metadata props
127     */
128    Map<String, String> getProps();
129
130    /**
131     * @param property : property name.
132     *
133     * @return the requested metadata prop
134     */
135    String getProp(String property);
136
137    /**
138     * Get a property values from schema with its name.
139     *
140     * @param name : property's name.
141     *
142     * @return property's value.
143     */
144    default JsonValue getJsonProp(final String name) {
145        final String prop = this.getProp(name);
146        if (prop == null) {
147            return null;
148        }
149        try (final StringReader reader = new StringReader(prop);
150                final JsonParser parser = Json.createParser(reader)) {
151            return parser.getValue();
152        } catch (RuntimeException ex) {
153            return Json.createValue(prop);
154        }
155    }
156
157    enum Type {
158
159        RECORD(new Class<?>[] { Record.class }),
160        ARRAY(new Class<?>[] { Collection.class }),
161        STRING(new Class<?>[] { String.class, Object.class }),
162        BYTES(new Class<?>[] { byte[].class, Byte[].class }),
163        INT(new Class<?>[] { Integer.class }),
164        LONG(new Class<?>[] { Long.class }),
165        FLOAT(new Class<?>[] { Float.class }),
166        DOUBLE(new Class<?>[] { Double.class }),
167        BOOLEAN(new Class<?>[] { Boolean.class }),
168        DATETIME(new Class<?>[] { Long.class, Date.class, Temporal.class }),
169        DECIMAL(new Class<?>[] { BigDecimal.class });
170
171        /**
172         * All compatibles Java classes
173         */
174        private final Class<?>[] classes;
175
176        Type(final Class<?>[] classes) {
177            this.classes = classes;
178        }
179
180        /**
181         * Check if input can be affected to an entry of this type.
182         *
183         * @param input : object.
184         *
185         * @return true if input is null or ok.
186         */
187        public boolean isCompatible(final Object input) {
188            if (input == null) {
189                return true;
190            }
191            for (final Class<?> clazz : classes) {
192                if (clazz.isInstance(input)) {
193                    return true;
194                }
195            }
196            return false;
197        }
198    }
199
200    interface Entry {
201
202        /**
203         * @return The name of this entry.
204         */
205        String getName();
206
207        /**
208         * @return The raw name of this entry.
209         */
210        String getRawName();
211
212        /**
213         * @return the raw name of this entry if exists, else return name.
214         */
215        String getOriginalFieldName();
216
217        /**
218         * @return Type of the entry, this determine which other fields are populated.
219         */
220        Type getType();
221
222        /**
223         * @return Is this entry nullable or always valued.
224         */
225        boolean isNullable();
226
227        /**
228         * @return true if this entry is for metadata; false for ordinary data.
229         */
230        boolean isMetadata();
231
232        /**
233         * @param <T> the default value type.
234         *
235         * @return Default value for this entry.
236         */
237        <T> T getDefaultValue();
238
239        /**
240         * @return For type == record, the element type.
241         */
242        Schema getElementSchema();
243
244        /**
245         * @return Allows to associate to this field a comment - for doc purposes, no use in the runtime.
246         */
247        String getComment();
248
249        /**
250         * @return the metadata props
251         */
252        Map<String, String> getProps();
253
254        /**
255         * @param property : property name.
256         *
257         * @return the requested metadata prop
258         */
259        String getProp(String property);
260
261        /**
262         * Get a property values from entry with its name.
263         *
264         * @param name : property's name.
265         *
266         * @return property's value.
267         */
268        default JsonValue getJsonProp(final String name) {
269            final String prop = this.getProp(name);
270            if (prop == null) {
271                return null;
272            }
273            try (final StringReader reader = new StringReader(prop);
274                    final JsonParser parser = Json.createParser(reader)) {
275                return parser.getValue();
276            } catch (RuntimeException ex) {
277                return Json.createValue(prop);
278            }
279        }
280
281        /**
282         *
283         * @return the logical type property
284         */
285        default String getLogicalType() {
286            return this.getProp(SchemaProperty.LOGICAL_TYPE);
287        }
288
289        /**
290         * @return an {@link Entry.Builder} from this entry.
291         */
292        default Entry.Builder toBuilder() {
293            throw new UnsupportedOperationException("#toBuilder is not implemented");
294        }
295
296        /**
297         * Plain builder matching {@link Entry} structure.
298         */
299        interface Builder {
300
301            Builder withName(String name);
302
303            Builder withRawName(String rawName);
304
305            Builder withType(Type type);
306
307            default Builder withLogicalType(SchemaProperty.LogicalType logicalType) {
308                throw new UnsupportedOperationException("#withLogicalType is not implemented");
309            }
310
311            default Builder withLogicalType(String logicalType) {
312                throw new UnsupportedOperationException("#withLogicalType is not implemented");
313            }
314
315            Builder withNullable(boolean nullable);
316
317            Builder withMetadata(boolean metadata);
318
319            <T> Builder withDefaultValue(T value);
320
321            Builder withElementSchema(Schema schema);
322
323            Builder withComment(String comment);
324
325            Builder withProps(Map<String, String> props);
326
327            Builder withProp(String key, String value);
328
329            Entry build();
330        }
331    }
332
333    /**
334     * Allows to build a {@link Schema}.
335     */
336    interface Builder {
337
338        /**
339         * @param type schema type.
340         *
341         * @return this builder.
342         */
343        Builder withType(Type type);
344
345        /**
346         * @param entry element for either an array or record type.
347         *
348         * @return this builder.
349         */
350        Builder withEntry(Entry entry);
351
352        /**
353         * Insert the entry after the specified entry.
354         *
355         * @param after the entry name reference
356         * @param entry the entry name
357         *
358         * @return this builder
359         */
360        default Builder withEntryAfter(String after, Entry entry) {
361            throw new UnsupportedOperationException("#withEntryAfter is not implemented");
362        }
363
364        /**
365         * Insert the entry before the specified entry.
366         *
367         * @param before the entry name reference
368         * @param entry the entry name
369         *
370         * @return this builder
371         */
372        default Builder withEntryBefore(String before, Entry entry) {
373            throw new UnsupportedOperationException("#withEntryBefore is not implemented");
374        }
375
376        /**
377         * Remove entry from builder.
378         *
379         * @param name the entry name
380         *
381         * @return this builder
382         */
383        default Builder remove(String name) {
384            throw new UnsupportedOperationException("#remove is not implemented");
385        }
386
387        /**
388         * Remove entry from builder.
389         *
390         * @param entry the entry
391         *
392         * @return this builder
393         */
394        default Builder remove(Entry entry) {
395            throw new UnsupportedOperationException("#remove is not implemented");
396        }
397
398        /**
399         * Move an entry after another one.
400         *
401         * @param after the entry name reference
402         * @param name the entry name
403         */
404        default Builder moveAfter(final String after, final String name) {
405            throw new UnsupportedOperationException("#moveAfter is not implemented");
406        }
407
408        /**
409         * Move an entry before another one.
410         *
411         * @param before the entry name reference
412         * @param name the entry name
413         */
414        default Builder moveBefore(final String before, final String name) {
415            throw new UnsupportedOperationException("#moveBefore is not implemented");
416        }
417
418        /**
419         * Swap two entries.
420         *
421         * @param name the entry name
422         * @param with the other entry name
423         */
424        default Builder swap(final String name, final String with) {
425            throw new UnsupportedOperationException("#swap is not implemented");
426        }
427
428        /**
429         * @param schema nested element schema.
430         *
431         * @return this builder.
432         */
433        Builder withElementSchema(Schema schema);
434
435        /**
436         * @param props schema properties
437         *
438         * @return this builder
439         */
440        Builder withProps(Map<String, String> props);
441
442        /**
443         * @param key the prop key name
444         * @param value the prop value
445         *
446         * @return this builder
447         */
448        Builder withProp(String key, String value);
449
450        /**
451         * @return the described schema.
452         */
453        Schema build();
454
455        /**
456         * Same as {@link Builder#build()} but entries order is specified by {@code order}. This takes precedence on any
457         * previous defined order with builder and may void it.
458         *
459         * @param order the wanted order for entries.
460         * @return the described schema.
461         */
462        default Schema build(Comparator<Entry> order) {
463            throw new UnsupportedOperationException("#build(EntriesOrder) is not implemented");
464        }
465    }
466
467    /**
468     * Sanitize name to be avro compatible.
469     *
470     * @param name : original name.
471     *
472     * @return avro compatible name.
473     */
474    static String sanitizeConnectionName(final String name) {
475        if (name == null || name.isEmpty()) {
476            return name;
477        }
478
479        char current = name.charAt(0);
480        final CharsetEncoder ascii = Charset.forName(StandardCharsets.US_ASCII.name()).newEncoder();
481        final boolean skipFirstChar = ((!ascii.canEncode(current)) || (!Character.isLetter(current) && current != '_'))
482                && name.length() > 1 && (!Character.isDigit(name.charAt(1)));
483
484        final StringBuilder sanitizedBuilder = new StringBuilder();
485
486        if (!skipFirstChar) {
487            if (((!Character.isLetter(current)) && current != '_') || (!ascii.canEncode(current))) {
488                sanitizedBuilder.append('_');
489            } else {
490                sanitizedBuilder.append(current);
491            }
492        }
493        for (int i = 1; i < name.length(); i++) {
494            current = name.charAt(i);
495            if (!ascii.canEncode(current)) {
496                if (Character.isLowerCase(current) || Character.isUpperCase(current)) {
497                    sanitizedBuilder.append('_');
498                } else {
499                    final byte[] encoded =
500                            Base64.getEncoder().encode(name.substring(i, i + 1).getBytes(StandardCharsets.UTF_8));
501                    final String enc = new String(encoded);
502                    if (sanitizedBuilder.length() == 0 && Character.isDigit(enc.charAt(0))) {
503                        sanitizedBuilder.append('_');
504                    }
505                    for (int iter = 0; iter < enc.length(); iter++) {
506                        if (Character.isLetterOrDigit(enc.charAt(iter))) {
507                            sanitizedBuilder.append(enc.charAt(iter));
508                        } else {
509                            sanitizedBuilder.append('_');
510                        }
511                    }
512                }
513            } else if (Character.isLetterOrDigit(current)) {
514                sanitizedBuilder.append(current);
515            } else {
516                sanitizedBuilder.append('_');
517            }
518
519        }
520        return sanitizedBuilder.toString();
521    }
522
523    @RequiredArgsConstructor
524    @ToString
525    @EqualsAndHashCode
526    class EntriesOrder implements Comparator<Entry> {
527
528        private final OrderedMap<String> fieldsOrder;
529
530        // Keep comparator while no change occurs in fieldsOrder.
531        private Comparator<Entry> currentComparator = null;
532
533        /**
534         * Build an EntriesOrder according fields.
535         *
536         * @param fields the fields ordering. Each field should be {@code ,} separated.
537         *
538         * @return the order EntriesOrder
539         */
540        public static EntriesOrder of(final String fields) {
541            return new EntriesOrder(fields);
542        }
543
544        /**
545         * Build an EntriesOrder according fields.
546         *
547         * @param fields the fields ordering.
548         *
549         * @return the order EntriesOrder
550         */
551        public static EntriesOrder of(final Iterable<String> fields) {
552            final OrderedMap<String> orders = new OrderedMap<>(Function.identity(), fields);
553            return new EntriesOrder(orders);
554        }
555
556        public EntriesOrder(final String fields) {
557            if (fields == null || fields.isEmpty()) {
558                fieldsOrder = new OrderedMap<>(Function.identity());
559            } else {
560                final List<String> fieldList = Arrays.stream(fields.split(",")).collect(Collectors.toList());
561                fieldsOrder = new OrderedMap<>(Function.identity(), fieldList);
562            }
563        }
564
565        public EntriesOrder(final Iterable<String> fields) {
566            this(new OrderedMap<>(Function.identity(), fields));
567        }
568
569        public Stream<String> getFieldsOrder() {
570            return this.fieldsOrder.streams();
571        }
572
573        /**
574         * Move a field after another one.
575         *
576         * @param after the field name reference
577         * @param name the field name
578         *
579         * @return this EntriesOrder
580         */
581        public EntriesOrder moveAfter(final String after, final String name) {
582            this.currentComparator = null;
583            this.fieldsOrder.moveAfter(after, name);
584            return this;
585        }
586
587        /**
588         * Move a field before another one.
589         *
590         * @param before the field name reference
591         * @param name the field name
592         *
593         * @return this EntriesOrder
594         */
595        public EntriesOrder moveBefore(final String before, final String name) {
596            this.currentComparator = null;
597            this.fieldsOrder.moveBefore(before, name);
598            return this;
599        }
600
601        /**
602         * Swap two fields.
603         *
604         * @param name the field name
605         * @param with the other field
606         *
607         * @return this EntriesOrder
608         */
609        public EntriesOrder swap(final String name, final String with) {
610            this.currentComparator = null;
611            this.fieldsOrder.swap(name, with);
612            return this;
613        }
614
615        public String toFields() {
616            return this.fieldsOrder.streams().collect(Collectors.joining(","));
617        }
618
619        public Comparator<Entry> getComparator() {
620            if (this.currentComparator == null) {
621                final Map<String, Integer> entryPositions = new HashMap<>();
622                final AtomicInteger index = new AtomicInteger(1);
623                this.fieldsOrder.streams()
624                        .forEach(
625                                (final String name) -> entryPositions.put(name, index.getAndIncrement()));
626                this.currentComparator = new EntryComparator(entryPositions);
627            }
628            return this.currentComparator;
629        }
630
631        @Override
632        public int compare(final Entry e1, final Entry e2) {
633            return this.getComparator().compare(e1, e2);
634        }
635
636        @RequiredArgsConstructor
637        static class EntryComparator implements Comparator<Entry> {
638
639            private final Map<String, Integer> entryPositions;
640
641            @Override
642            public int compare(final Entry e1, final Entry e2) {
643                final int index1 = this.entryPositions.getOrDefault(e1.getName(), Integer.MAX_VALUE);
644                final int index2 = this.entryPositions.getOrDefault(e2.getName(), Integer.MAX_VALUE);
645                if (index1 >= 0 && index2 >= 0) {
646                    return index1 - index2;
647                }
648                if (index1 >= 0) {
649                    return -1;
650                }
651                if (index2 >= 0) {
652                    return 1;
653                }
654                return 0;
655            }
656        }
657    }
658
659    // use new avoid collision with entry getter.
660    @Deprecated
661    static Schema.Entry avoidCollision(final Schema.Entry newEntry,
662            final Supplier<Stream<Schema.Entry>> allEntriesSupplier,
663            final BiConsumer<String, Entry> replaceFunction) {
664        final Function<String, Entry> entryGetter = (String name) -> allEntriesSupplier //
665                .get() //
666                .filter((final Entry field) -> field.getName().equals(name))
667                .findFirst()
668                .orElse(null);
669        return avoidCollision(newEntry, entryGetter, replaceFunction);
670    }
671
672    static Schema.Entry avoidCollision(final Schema.Entry newEntry,
673            final Function<String, Entry> entryGetter,
674            final BiConsumer<String, Entry> replaceFunction) {
675        final Optional<Entry> collisionedEntry = Optional.ofNullable(entryGetter //
676                .apply(newEntry.getName())) //
677                .filter((final Entry field) -> !Objects.equals(field, newEntry));
678        if (!collisionedEntry.isPresent()) {
679            // No collision, return new entry.
680            return newEntry;
681        }
682        final Entry matchedEntry = collisionedEntry.get();
683        final boolean matchedToChange = matchedEntry.getRawName() != null && !(matchedEntry.getRawName().isEmpty());
684        if (matchedToChange) {
685            // the rename has to be applied on entry already inside schema, so replace.
686            replaceFunction.accept(matchedEntry.getName(), newEntry);
687        } else if (newEntry.getRawName() == null || newEntry.getRawName().isEmpty()) {
688            // try to add exactly same raw, skip the add here.
689            return null;
690        }
691        final Entry fieldToChange = matchedToChange ? matchedEntry : newEntry;
692        int indexForAnticollision = 1;
693        final String baseName = Schema.sanitizeConnectionName(fieldToChange.getRawName()); // recalc primiti name.
694
695        String newName = baseName + "_" + indexForAnticollision;
696        while (entryGetter.apply(newName) != null) {
697            indexForAnticollision++;
698            newName = baseName + "_" + indexForAnticollision;
699        }
700        final Entry newFieldToAdd = fieldToChange.toBuilder().withName(newName).build();
701
702        return newFieldToAdd; // matchedToChange ? newFieldToAdd : newEntry;
703    }
704}