001/**
002 * Copyright (C) 2006-2025 Talend Inc. - www.talend.com
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.talend.sdk.component.api.record;
017
018import java.io.StringReader;
019import java.math.BigDecimal;
020import java.nio.charset.Charset;
021import java.nio.charset.CharsetEncoder;
022import java.nio.charset.StandardCharsets;
023import java.time.temporal.Temporal;
024import java.util.Arrays;
025import java.util.Base64;
026import java.util.Collection;
027import java.util.Comparator;
028import java.util.Date;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Map;
032import java.util.Objects;
033import java.util.Optional;
034import java.util.concurrent.atomic.AtomicInteger;
035import java.util.function.BiConsumer;
036import java.util.function.Function;
037import java.util.stream.Collectors;
038import java.util.stream.Stream;
039
040import javax.json.Json;
041import javax.json.JsonValue;
042import javax.json.bind.annotation.JsonbTransient;
043import javax.json.stream.JsonParser;
044
045import lombok.EqualsAndHashCode;
046import lombok.RequiredArgsConstructor;
047import lombok.ToString;
048
049public interface Schema {
050
051    String SKIP_SANITIZE_PROPERTY = "talend.component.record.skip.sanitize";
052
053    boolean SKIP_SANITIZE = Boolean.getBoolean(SKIP_SANITIZE_PROPERTY);
054
055    /**
056     * @return the type of this schema.
057     */
058    Type getType();
059
060    /**
061     * @return the nested element schema for arrays.
062     */
063    Schema getElementSchema();
064
065    /**
066     * @return the data entries for records (not contains meta data entries).
067     */
068    List<Entry> getEntries();
069
070    /**
071     * @return the metadata entries for records (not contains ordinary data entries).
072     */
073    List<Entry> getMetadata();
074
075    /**
076     * @return All entries, including data and metadata, of this schema.
077     */
078    Stream<Entry> getAllEntries();
079
080    default Map<String, Entry> getEntryMap() {
081        throw new UnsupportedOperationException("#getEntryMap is not implemented");
082    }
083
084    /**
085     * Get a Builder from the current schema.
086     *
087     * @return a {@link Schema.Builder}
088     */
089    default Schema.Builder toBuilder() {
090        throw new UnsupportedOperationException("#toBuilder is not implemented");
091    }
092
093    /**
094     * Get all entries sorted by schema designed order.
095     *
096     * @return all entries ordered
097     */
098    default List<Entry> getEntriesOrdered() {
099        return getEntriesOrdered(naturalOrder());
100    }
101
102    /**
103     * Get all entries sorted using a custom comparator.
104     *
105     * @param comparator the comparator
106     *
107     * @return all entries ordered with provided comparator
108     */
109    @JsonbTransient
110    default List<Entry> getEntriesOrdered(final Comparator<Entry> comparator) {
111        return getAllEntries().sorted(comparator).collect(Collectors.toList());
112    }
113
114    /**
115     * Get the EntriesOrder defined with Builder.
116     *
117     * @return the EntriesOrder
118     */
119
120    default EntriesOrder naturalOrder() {
121        throw new UnsupportedOperationException("#naturalOrder is not implemented");
122    }
123
124    default Entry getEntry(final String name) {
125        return getEntryMap().get(name);
126    }
127
128    /**
129     * @return the metadata props
130     */
131    Map<String, String> getProps();
132
133    /**
134     * @param property : property name.
135     *
136     * @return the requested metadata prop
137     */
138    String getProp(String property);
139
140    /**
141     * Get a property values from schema with its name.
142     *
143     * @param name : property's name.
144     *
145     * @return property's value.
146     */
147    default JsonValue getJsonProp(final String name) {
148        final String prop = this.getProp(name);
149        if (prop == null) {
150            return null;
151        }
152        try (final StringReader reader = new StringReader(prop);
153                final JsonParser parser = Json.createParser(reader)) {
154            return parser.getValue();
155        } catch (RuntimeException ex) {
156            return Json.createValue(prop);
157        }
158    }
159
160    enum Type {
161
162        RECORD(new Class<?>[] { Record.class }),
163        ARRAY(new Class<?>[] { Collection.class }),
164        STRING(new Class<?>[] { String.class, Object.class }),
165        BYTES(new Class<?>[] { byte[].class, Byte[].class }),
166        INT(new Class<?>[] { Integer.class }),
167        LONG(new Class<?>[] { Long.class }),
168        FLOAT(new Class<?>[] { Float.class }),
169        DOUBLE(new Class<?>[] { Double.class }),
170        BOOLEAN(new Class<?>[] { Boolean.class }),
171        DATETIME(new Class<?>[] { Long.class, Date.class, Temporal.class }),
172        DECIMAL(new Class<?>[] { BigDecimal.class });
173
174        /**
175         * All compatibles Java classes
176         */
177        private final Class<?>[] classes;
178
179        Type(final Class<?>[] classes) {
180            this.classes = classes;
181        }
182
183        /**
184         * Check if input can be affected to an entry of this type.
185         *
186         * @param input : object.
187         *
188         * @return true if input is null or ok.
189         */
190        public boolean isCompatible(final Object input) {
191            if (input == null) {
192                return true;
193            }
194            for (final Class<?> clazz : classes) {
195                if (clazz.isInstance(input)) {
196                    return true;
197                }
198            }
199            return false;
200        }
201    }
202
203    interface Entry {
204
205        /**
206         * @return The name of this entry.
207         */
208        String getName();
209
210        /**
211         * @return The raw name of this entry.
212         */
213        String getRawName();
214
215        /**
216         * @return the raw name of this entry if exists, else return name.
217         */
218        String getOriginalFieldName();
219
220        /**
221         * @return Type of the entry, this determine which other fields are populated.
222         */
223        Type getType();
224
225        /**
226         * @return Is this entry nullable or always valued.
227         */
228        boolean isNullable();
229
230        /**
231         * @return true if this entry is for metadata; false for ordinary data.
232         */
233        boolean isMetadata();
234
235        /**
236         * @return Is this entry can be in error.
237         */
238        boolean isErrorCapable();
239
240        /**
241         * @return true if the value of this entry is valid; false for invalid value.
242         */
243        boolean isValid();
244
245        /**
246         * @param <T> the default value type.
247         *
248         * @return Default value for this entry.
249         */
250        <T> T getDefaultValue();
251
252        /**
253         * @return For type == record, the element type.
254         */
255        Schema getElementSchema();
256
257        /**
258         * @return Allows to associate to this field a comment - for doc purposes, no use in the runtime.
259         */
260        String getComment();
261
262        /**
263         * @return the metadata props
264         */
265        Map<String, String> getProps();
266
267        /**
268         * @param property : property name.
269         *
270         * @return the requested metadata prop
271         */
272        String getProp(String property);
273
274        /**
275         * Get a property values from entry with its name.
276         *
277         * @param name : property's name.
278         *
279         * @return property's value.
280         */
281        default JsonValue getJsonProp(final String name) {
282            final String prop = this.getProp(name);
283            if (prop == null) {
284                return null;
285            }
286            try (final StringReader reader = new StringReader(prop);
287                    final JsonParser parser = Json.createParser(reader)) {
288                return parser.getValue();
289            } catch (RuntimeException ex) {
290                return Json.createValue(prop);
291            }
292        }
293
294        /**
295         *
296         * @return the logical type property
297         */
298        default String getLogicalType() {
299            return this.getProp(SchemaProperty.LOGICAL_TYPE);
300        }
301
302        /**
303         * @return an {@link Entry.Builder} from this entry.
304         */
305        default Entry.Builder toBuilder() {
306            throw new UnsupportedOperationException("#toBuilder is not implemented");
307        }
308
309        default String getErrorMessage() {
310            return getProp(SchemaProperty.ENTRY_ERROR_MESSAGE);
311        }
312
313        default String getErrorFallbackValue() {
314            return getProp(SchemaProperty.ENTRY_ERROR_FALLBACK_VALUE);
315        }
316
317        /**
318         * Plain builder matching {@link Entry} structure.
319         */
320        interface Builder {
321
322            Builder withName(String name);
323
324            Builder withRawName(String rawName);
325
326            Builder withType(Type type);
327
328            default Builder withLogicalType(SchemaProperty.LogicalType logicalType) {
329                throw new UnsupportedOperationException("#withLogicalType is not implemented");
330            }
331
332            default Builder withLogicalType(String logicalType) {
333                throw new UnsupportedOperationException("#withLogicalType is not implemented");
334            }
335
336            Builder withNullable(boolean nullable);
337
338            Builder withErrorCapable(boolean errorCapable);
339
340            Builder withMetadata(boolean metadata);
341
342            <T> Builder withDefaultValue(T value);
343
344            Builder withElementSchema(Schema schema);
345
346            Builder withComment(String comment);
347
348            Builder withProps(Map<String, String> props);
349
350            Builder withProp(String key, String value);
351
352            Entry build();
353        }
354    }
355
356    /**
357     * Allows to build a {@link Schema}.
358     */
359    interface Builder {
360
361        /**
362         * @param type schema type.
363         *
364         * @return this builder.
365         */
366        Builder withType(Type type);
367
368        /**
369         * @param entry element for either an array or record type.
370         *
371         * @return this builder.
372         */
373        Builder withEntry(Entry entry);
374
375        /**
376         * Insert the entry after the specified entry.
377         *
378         * @param after the entry name reference
379         * @param entry the entry name
380         *
381         * @return this builder
382         */
383        default Builder withEntryAfter(String after, Entry entry) {
384            throw new UnsupportedOperationException("#withEntryAfter is not implemented");
385        }
386
387        /**
388         * Insert the entry before the specified entry.
389         *
390         * @param before the entry name reference
391         * @param entry the entry name
392         *
393         * @return this builder
394         */
395        default Builder withEntryBefore(String before, Entry entry) {
396            throw new UnsupportedOperationException("#withEntryBefore is not implemented");
397        }
398
399        /**
400         * Remove entry from builder.
401         *
402         * @param name the entry name
403         *
404         * @return this builder
405         */
406        default Builder remove(String name) {
407            throw new UnsupportedOperationException("#remove is not implemented");
408        }
409
410        /**
411         * Remove entry from builder.
412         *
413         * @param entry the entry
414         *
415         * @return this builder
416         */
417        default Builder remove(Entry entry) {
418            throw new UnsupportedOperationException("#remove is not implemented");
419        }
420
421        /**
422         * Move an entry after another one.
423         *
424         * @param after the entry name reference
425         * @param name the entry name
426         */
427        default Builder moveAfter(final String after, final String name) {
428            throw new UnsupportedOperationException("#moveAfter is not implemented");
429        }
430
431        /**
432         * Move an entry before another one.
433         *
434         * @param before the entry name reference
435         * @param name the entry name
436         */
437        default Builder moveBefore(final String before, final String name) {
438            throw new UnsupportedOperationException("#moveBefore is not implemented");
439        }
440
441        /**
442         * Swap two entries.
443         *
444         * @param name the entry name
445         * @param with the other entry name
446         */
447        default Builder swap(final String name, final String with) {
448            throw new UnsupportedOperationException("#swap is not implemented");
449        }
450
451        /**
452         * @param schema nested element schema.
453         *
454         * @return this builder.
455         */
456        Builder withElementSchema(Schema schema);
457
458        /**
459         * @param props schema properties
460         *
461         * @return this builder
462         */
463        Builder withProps(Map<String, String> props);
464
465        /**
466         * @param key the prop key name
467         * @param value the prop value
468         *
469         * @return this builder
470         */
471        Builder withProp(String key, String value);
472
473        /**
474         * @return the described schema.
475         */
476        Schema build();
477
478        /**
479         * Same as {@link Builder#build()} but entries order is specified by {@code order}. This takes precedence on any
480         * previous defined order with builder and may void it.
481         *
482         * @param order the wanted order for entries.
483         * @return the described schema.
484         */
485        default Schema build(Comparator<Entry> order) {
486            throw new UnsupportedOperationException("#build(EntriesOrder) is not implemented");
487        }
488    }
489
490    /**
491     * Sanitize name to be avro compatible.
492     *
493     * @param name : original name.
494     *
495     * @return avro compatible name.
496     */
497    static String sanitizeConnectionName(final String name) {
498        if (SKIP_SANITIZE || name == null || name.isEmpty()) {
499            return name;
500        }
501
502        char current = name.charAt(0);
503        final CharsetEncoder ascii = Charset.forName(StandardCharsets.US_ASCII.name()).newEncoder();
504        final boolean skipFirstChar = ((!ascii.canEncode(current)) || (!Character.isLetter(current) && current != '_'))
505                && name.length() > 1 && (!Character.isDigit(name.charAt(1)));
506
507        final StringBuilder sanitizedBuilder = new StringBuilder();
508
509        if (!skipFirstChar) {
510            if (((!Character.isLetter(current)) && current != '_') || (!ascii.canEncode(current))) {
511                sanitizedBuilder.append('_');
512            } else {
513                sanitizedBuilder.append(current);
514            }
515        }
516        for (int i = 1; i < name.length(); i++) {
517            current = name.charAt(i);
518            if (!ascii.canEncode(current)) {
519                if (Character.isLowerCase(current) || Character.isUpperCase(current)) {
520                    sanitizedBuilder.append('_');
521                } else {
522                    final byte[] encoded =
523                            Base64.getEncoder().encode(name.substring(i, i + 1).getBytes(StandardCharsets.UTF_8));
524                    final String enc = new String(encoded);
525                    if (sanitizedBuilder.length() == 0 && Character.isDigit(enc.charAt(0))) {
526                        sanitizedBuilder.append('_');
527                    }
528                    for (int iter = 0; iter < enc.length(); iter++) {
529                        if (Character.isLetterOrDigit(enc.charAt(iter))) {
530                            sanitizedBuilder.append(enc.charAt(iter));
531                        } else {
532                            sanitizedBuilder.append('_');
533                        }
534                    }
535                }
536            } else if (Character.isLetterOrDigit(current)) {
537                sanitizedBuilder.append(current);
538            } else {
539                sanitizedBuilder.append('_');
540            }
541
542        }
543        return sanitizedBuilder.toString();
544    }
545
546    @RequiredArgsConstructor
547    @ToString
548    @EqualsAndHashCode
549    class EntriesOrder implements Comparator<Entry> {
550
551        private final OrderedMap<String> fieldsOrder;
552
553        // Keep comparator while no change occurs in fieldsOrder.
554        private Comparator<Entry> currentComparator = null;
555
556        /**
557         * Build an EntriesOrder according fields.
558         *
559         * @param fields the fields ordering. Each field should be {@code ,} separated.
560         *
561         * @return the order EntriesOrder
562         */
563        public static EntriesOrder of(final String fields) {
564            return new EntriesOrder(fields);
565        }
566
567        /**
568         * Build an EntriesOrder according fields.
569         *
570         * @param fields the fields ordering.
571         *
572         * @return the order EntriesOrder
573         */
574        public static EntriesOrder of(final Iterable<String> fields) {
575            final OrderedMap<String> orders = new OrderedMap<>(Function.identity(), fields);
576            return new EntriesOrder(orders);
577        }
578
579        public EntriesOrder(final String fields) {
580            if (fields == null || fields.isEmpty()) {
581                fieldsOrder = new OrderedMap<>(Function.identity());
582            } else {
583                final List<String> fieldList = Arrays.stream(fields.split(",")).collect(Collectors.toList());
584                fieldsOrder = new OrderedMap<>(Function.identity(), fieldList);
585            }
586        }
587
588        public EntriesOrder(final Iterable<String> fields) {
589            this(new OrderedMap<>(Function.identity(), fields));
590        }
591
592        public Stream<String> getFieldsOrder() {
593            return this.fieldsOrder.streams();
594        }
595
596        /**
597         * Move a field after another one.
598         *
599         * @param after the field name reference
600         * @param name the field name
601         *
602         * @return this EntriesOrder
603         */
604        public EntriesOrder moveAfter(final String after, final String name) {
605            this.currentComparator = null;
606            this.fieldsOrder.moveAfter(after, name);
607            return this;
608        }
609
610        /**
611         * Move a field before another one.
612         *
613         * @param before the field name reference
614         * @param name the field name
615         *
616         * @return this EntriesOrder
617         */
618        public EntriesOrder moveBefore(final String before, final String name) {
619            this.currentComparator = null;
620            this.fieldsOrder.moveBefore(before, name);
621            return this;
622        }
623
624        /**
625         * Swap two fields.
626         *
627         * @param name the field name
628         * @param with the other field
629         *
630         * @return this EntriesOrder
631         */
632        public EntriesOrder swap(final String name, final String with) {
633            this.currentComparator = null;
634            this.fieldsOrder.swap(name, with);
635            return this;
636        }
637
638        public String toFields() {
639            return this.fieldsOrder.streams().collect(Collectors.joining(","));
640        }
641
642        public Comparator<Entry> getComparator() {
643            if (this.currentComparator == null) {
644                final Map<String, Integer> entryPositions = new HashMap<>();
645                final AtomicInteger index = new AtomicInteger(1);
646                this.fieldsOrder.streams()
647                        .forEach(
648                                (final String name) -> entryPositions.put(name, index.getAndIncrement()));
649                this.currentComparator = new EntryComparator(entryPositions);
650            }
651            return this.currentComparator;
652        }
653
654        @Override
655        public int compare(final Entry e1, final Entry e2) {
656            return this.getComparator().compare(e1, e2);
657        }
658
659        @RequiredArgsConstructor
660        static class EntryComparator implements Comparator<Entry> {
661
662            private final Map<String, Integer> entryPositions;
663
664            @Override
665            public int compare(final Entry e1, final Entry e2) {
666                final int index1 = this.entryPositions.getOrDefault(e1.getName(), Integer.MAX_VALUE);
667                final int index2 = this.entryPositions.getOrDefault(e2.getName(), Integer.MAX_VALUE);
668                if (index1 >= 0 && index2 >= 0) {
669                    return index1 - index2;
670                }
671                if (index1 >= 0) {
672                    return -1;
673                }
674                if (index2 >= 0) {
675                    return 1;
676                }
677                return 0;
678            }
679        }
680    }
681
682    static Schema.Entry avoidCollision(final Schema.Entry newEntry,
683            final Function<String, Entry> entryGetter,
684            final BiConsumer<String, Entry> replaceFunction) {
685        if (SKIP_SANITIZE) {
686            return newEntry;
687        }
688        final Optional<Entry> collisionedEntry = Optional.ofNullable(entryGetter //
689                .apply(newEntry.getName())) //
690                .filter((final Entry field) -> !Objects.equals(field, newEntry));
691        if (!collisionedEntry.isPresent()) {
692            // No collision, return new entry.
693            return newEntry;
694        }
695        final Entry matchedEntry = collisionedEntry.get();
696        final boolean matchedToChange = matchedEntry.getRawName() != null && !(matchedEntry.getRawName().isEmpty());
697        if (matchedToChange) {
698            // the rename has to be applied on entry already inside schema, so replace.
699            replaceFunction.accept(matchedEntry.getName(), newEntry);
700        } else if (newEntry.getRawName() == null || newEntry.getRawName().isEmpty()) {
701            // try to add exactly same raw, skip the add here.
702            return null;
703        }
704        final Entry fieldToChange = matchedToChange ? matchedEntry : newEntry;
705        int indexForAnticollision = 1;
706        final String baseName = Schema.sanitizeConnectionName(fieldToChange.getRawName()); // recalc primiti name.
707
708        String newName = baseName + "_" + indexForAnticollision;
709        while (entryGetter.apply(newName) != null) {
710            indexForAnticollision++;
711            newName = baseName + "_" + indexForAnticollision;
712        }
713        final Entry newFieldToAdd = fieldToChange.toBuilder().withName(newName).build();
714
715        return newFieldToAdd; // matchedToChange ? newFieldToAdd : newEntry;
716    }
717}