001/**
002 * Copyright (C) 2006-2022 Talend Inc. - www.talend.com
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.talend.sdk.component.api.record;
017
018import java.io.StringReader;
019import java.math.BigDecimal;
020import java.nio.charset.Charset;
021import java.nio.charset.CharsetEncoder;
022import java.nio.charset.StandardCharsets;
023import java.time.temporal.Temporal;
024import java.util.Arrays;
025import java.util.Base64;
026import java.util.Collection;
027import java.util.Comparator;
028import java.util.Date;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Map;
032import java.util.Objects;
033import java.util.Optional;
034import java.util.concurrent.atomic.AtomicInteger;
035import java.util.function.BiConsumer;
036import java.util.function.Function;
037import java.util.function.Supplier;
038import java.util.stream.Collectors;
039import java.util.stream.Stream;
040
041import javax.json.Json;
042import javax.json.JsonValue;
043import javax.json.bind.annotation.JsonbTransient;
044import javax.json.stream.JsonParser;
045
046import lombok.EqualsAndHashCode;
047import lombok.RequiredArgsConstructor;
048import lombok.ToString;
049
050public interface Schema {
051
052    /**
053     * @return the type of this schema.
054     */
055    Type getType();
056
057    /**
058     * @return the nested element schema for arrays.
059     */
060    Schema getElementSchema();
061
062    /**
063     * @return the data entries for records (not contains meta data entries).
064     */
065    List<Entry> getEntries();
066
067    /**
068     * @return the metadata entries for records (not contains ordinary data entries).
069     */
070    List<Entry> getMetadata();
071
072    /**
073     * @return All entries, including data and metadata, of this schema.
074     */
075    Stream<Entry> getAllEntries();
076
077    /**
078     * Get a Builder from the current schema.
079     *
080     * @return a {@link Schema.Builder}
081     */
082    default Schema.Builder toBuilder() {
083        throw new UnsupportedOperationException("#toBuilder is not implemented");
084    }
085
086    /**
087     * Get all entries sorted by schema designed order.
088     *
089     * @return all entries ordered
090     */
091    default List<Entry> getEntriesOrdered() {
092        return getEntriesOrdered(naturalOrder());
093    }
094
095    /**
096     * Get all entries sorted using a custom comparator.
097     *
098     * @param comparator the comparator
099     *
100     * @return all entries ordered with provided comparator
101     */
102    @JsonbTransient
103    default List<Entry> getEntriesOrdered(final Comparator<Entry> comparator) {
104        return getAllEntries().sorted(comparator).collect(Collectors.toList());
105    }
106
107    /**
108     * Get the EntriesOrder defined with Builder.
109     *
110     * @return the EntriesOrder
111     */
112
113    default EntriesOrder naturalOrder() {
114        throw new UnsupportedOperationException("#naturalOrder is not implemented");
115    }
116
117    default Entry getEntry(final String name) {
118        return getAllEntries() //
119                .filter((Entry e) -> Objects.equals(e.getName(), name)) //
120                .findFirst() //
121                .orElse(null);
122    }
123
124    /**
125     * @return the metadata props
126     */
127    Map<String, String> getProps();
128
129    /**
130     * @param property : property name.
131     *
132     * @return the requested metadata prop
133     */
134    String getProp(String property);
135
136    /**
137     * Get a property values from schema with its name.
138     *
139     * @param name : property's name.
140     *
141     * @return property's value.
142     */
143    default JsonValue getJsonProp(final String name) {
144        final String prop = this.getProp(name);
145        if (prop == null) {
146            return null;
147        }
148        try (final StringReader reader = new StringReader(prop);
149                final JsonParser parser = Json.createParser(reader)) {
150            return parser.getValue();
151        } catch (RuntimeException ex) {
152            return Json.createValue(prop);
153        }
154    }
155
156    enum Type {
157
158        RECORD(new Class<?>[] { Record.class }),
159        ARRAY(new Class<?>[] { Collection.class }),
160        STRING(new Class<?>[] { String.class, Object.class }),
161        BYTES(new Class<?>[] { byte[].class, Byte[].class }),
162        INT(new Class<?>[] { Integer.class }),
163        LONG(new Class<?>[] { Long.class }),
164        FLOAT(new Class<?>[] { Float.class }),
165        DOUBLE(new Class<?>[] { Double.class }),
166        BOOLEAN(new Class<?>[] { Boolean.class }),
167        DATETIME(new Class<?>[] { Long.class, Date.class, Temporal.class }),
168        DECIMAL(new Class<?>[] { BigDecimal.class });
169
170        /**
171         * All compatibles Java classes
172         */
173        private final Class<?>[] classes;
174
175        Type(final Class<?>[] classes) {
176            this.classes = classes;
177        }
178
179        /**
180         * Check if input can be affected to an entry of this type.
181         *
182         * @param input : object.
183         *
184         * @return true if input is null or ok.
185         */
186        public boolean isCompatible(final Object input) {
187            if (input == null) {
188                return true;
189            }
190            for (final Class<?> clazz : classes) {
191                if (clazz.isInstance(input)) {
192                    return true;
193                }
194            }
195            return false;
196        }
197    }
198
199    interface Entry {
200
201        /**
202         * @return The name of this entry.
203         */
204        String getName();
205
206        /**
207         * @return The raw name of this entry.
208         */
209        String getRawName();
210
211        /**
212         * @return the raw name of this entry if exists, else return name.
213         */
214        String getOriginalFieldName();
215
216        /**
217         * @return Type of the entry, this determine which other fields are populated.
218         */
219        Type getType();
220
221        /**
222         * @return Is this entry nullable or always valued.
223         */
224        boolean isNullable();
225
226        /**
227         * @return true if this entry is for metadata; false for ordinary data.
228         */
229        boolean isMetadata();
230
231        /**
232         * @param <T> the default value type.
233         *
234         * @return Default value for this entry.
235         */
236        <T> T getDefaultValue();
237
238        /**
239         * @return For type == record, the element type.
240         */
241        Schema getElementSchema();
242
243        /**
244         * @return Allows to associate to this field a comment - for doc purposes, no use in the runtime.
245         */
246        String getComment();
247
248        /**
249         * @return the metadata props
250         */
251        Map<String, String> getProps();
252
253        /**
254         * @param property : property name.
255         *
256         * @return the requested metadata prop
257         */
258        String getProp(String property);
259
260        /**
261         * Get a property values from entry with its name.
262         *
263         * @param name : property's name.
264         *
265         * @return property's value.
266         */
267        default JsonValue getJsonProp(final String name) {
268            final String prop = this.getProp(name);
269            if (prop == null) {
270                return null;
271            }
272            try (final StringReader reader = new StringReader(prop);
273                    final JsonParser parser = Json.createParser(reader)) {
274                return parser.getValue();
275            } catch (RuntimeException ex) {
276                return Json.createValue(prop);
277            }
278        }
279
280        /**
281         * @return an {@link Entry.Builder} from this entry.
282         */
283        default Entry.Builder toBuilder() {
284            throw new UnsupportedOperationException("#toBuilder is not implemented");
285        }
286
287        /**
288         * Plain builder matching {@link Entry} structure.
289         */
290        interface Builder {
291
292            Builder withName(String name);
293
294            Builder withRawName(String rawName);
295
296            Builder withType(Type type);
297
298            Builder withNullable(boolean nullable);
299
300            Builder withMetadata(boolean metadata);
301
302            <T> Builder withDefaultValue(T value);
303
304            Builder withElementSchema(Schema schema);
305
306            Builder withComment(String comment);
307
308            Builder withProps(Map<String, String> props);
309
310            Builder withProp(String key, String value);
311
312            Entry build();
313        }
314    }
315
316    /**
317     * Allows to build a {@link Schema}.
318     */
319    interface Builder {
320
321        /**
322         * @param type schema type.
323         *
324         * @return this builder.
325         */
326        Builder withType(Type type);
327
328        /**
329         * @param entry element for either an array or record type.
330         *
331         * @return this builder.
332         */
333        Builder withEntry(Entry entry);
334
335        /**
336         * Insert the entry after the specified entry.
337         *
338         * @param after the entry name reference
339         * @param entry the entry name
340         *
341         * @return this builder
342         */
343        default Builder withEntryAfter(String after, Entry entry) {
344            throw new UnsupportedOperationException("#withEntryAfter is not implemented");
345        }
346
347        /**
348         * Insert the entry before the specified entry.
349         *
350         * @param before the entry name reference
351         * @param entry the entry name
352         *
353         * @return this builder
354         */
355        default Builder withEntryBefore(String before, Entry entry) {
356            throw new UnsupportedOperationException("#withEntryBefore is not implemented");
357        }
358
359        /**
360         * Remove entry from builder.
361         *
362         * @param name the entry name
363         *
364         * @return this builder
365         */
366        default Builder remove(String name) {
367            throw new UnsupportedOperationException("#remove is not implemented");
368        }
369
370        /**
371         * Remove entry from builder.
372         *
373         * @param entry the entry
374         *
375         * @return this builder
376         */
377        default Builder remove(Entry entry) {
378            throw new UnsupportedOperationException("#remove is not implemented");
379        }
380
381        /**
382         * Move an entry after another one.
383         *
384         * @param after the entry name reference
385         * @param name the entry name
386         */
387        default Builder moveAfter(final String after, final String name) {
388            throw new UnsupportedOperationException("#moveAfter is not implemented");
389        }
390
391        /**
392         * Move an entry before another one.
393         *
394         * @param before the entry name reference
395         * @param name the entry name
396         */
397        default Builder moveBefore(final String before, final String name) {
398            throw new UnsupportedOperationException("#moveBefore is not implemented");
399        }
400
401        /**
402         * Swap two entries.
403         *
404         * @param name the entry name
405         * @param with the other entry name
406         */
407        default Builder swap(final String name, final String with) {
408            throw new UnsupportedOperationException("#swap is not implemented");
409        }
410
411        /**
412         * @param schema nested element schema.
413         *
414         * @return this builder.
415         */
416        Builder withElementSchema(Schema schema);
417
418        /**
419         * @param props schema properties
420         *
421         * @return this builder
422         */
423        Builder withProps(Map<String, String> props);
424
425        /**
426         * @param key the prop key name
427         * @param value the prop value
428         *
429         * @return this builder
430         */
431        Builder withProp(String key, String value);
432
433        /**
434         * @return the described schema.
435         */
436        Schema build();
437
438        /**
439         * Same as {@link Builder#build()} but entries order is specified by {@code order}. This takes precedence on any
440         * previous defined order with builder and may void it.
441         *
442         * @param order the wanted order for entries.
443         * @return the described schema.
444         */
445        default Schema build(Comparator<Entry> order) {
446            throw new UnsupportedOperationException("#build(EntriesOrder) is not implemented");
447        }
448    }
449
450    /**
451     * Sanitize name to be avro compatible.
452     *
453     * @param name : original name.
454     *
455     * @return avro compatible name.
456     */
457    static String sanitizeConnectionName(final String name) {
458        if (name == null || name.isEmpty()) {
459            return name;
460        }
461
462        char current = name.charAt(0);
463        final CharsetEncoder ascii = Charset.forName(StandardCharsets.US_ASCII.name()).newEncoder();
464        final boolean skipFirstChar = ((!ascii.canEncode(current)) || (!Character.isLetter(current) && current != '_'))
465                && name.length() > 1 && (!Character.isDigit(name.charAt(1)));
466
467        final StringBuilder sanitizedBuilder = new StringBuilder();
468
469        if (!skipFirstChar) {
470            if (((!Character.isLetter(current)) && current != '_') || (!ascii.canEncode(current))) {
471                sanitizedBuilder.append('_');
472            } else {
473                sanitizedBuilder.append(current);
474            }
475        }
476        for (int i = 1; i < name.length(); i++) {
477            current = name.charAt(i);
478            if (!ascii.canEncode(current)) {
479                if (Character.isLowerCase(current) || Character.isUpperCase(current)) {
480                    sanitizedBuilder.append('_');
481                } else {
482                    final byte[] encoded =
483                            Base64.getEncoder().encode(name.substring(i, i + 1).getBytes(StandardCharsets.UTF_8));
484                    final String enc = new String(encoded);
485                    if (sanitizedBuilder.length() == 0 && Character.isDigit(enc.charAt(0))) {
486                        sanitizedBuilder.append('_');
487                    }
488                    for (int iter = 0; iter < enc.length(); iter++) {
489                        if (Character.isLetterOrDigit(enc.charAt(iter))) {
490                            sanitizedBuilder.append(enc.charAt(iter));
491                        } else {
492                            sanitizedBuilder.append('_');
493                        }
494                    }
495                }
496            } else if (Character.isLetterOrDigit(current)) {
497                sanitizedBuilder.append(current);
498            } else {
499                sanitizedBuilder.append('_');
500            }
501
502        }
503        return sanitizedBuilder.toString();
504    }
505
506    @RequiredArgsConstructor
507    @ToString
508    @EqualsAndHashCode
509    class EntriesOrder implements Comparator<Entry> {
510
511        private final OrderedMap<String> fieldsOrder;
512
513        // Keep comparator while no change occurs in fieldsOrder.
514        private Comparator<Entry> currentComparator = null;
515
516        /**
517         * Build an EntriesOrder according fields.
518         *
519         * @param fields the fields ordering. Each field should be {@code ,} separated.
520         *
521         * @return the order EntriesOrder
522         */
523        public static EntriesOrder of(final String fields) {
524            return new EntriesOrder(fields);
525        }
526
527        /**
528         * Build an EntriesOrder according fields.
529         *
530         * @param fields the fields ordering.
531         *
532         * @return the order EntriesOrder
533         */
534        public static EntriesOrder of(final Iterable<String> fields) {
535            final OrderedMap<String> orders = new OrderedMap<>(Function.identity(), fields);
536            return new EntriesOrder(orders);
537        }
538
539        public EntriesOrder(final String fields) {
540            if (fields == null || fields.isEmpty()) {
541                fieldsOrder = new OrderedMap<>(Function.identity());
542            } else {
543                final List<String> fieldList = Arrays.stream(fields.split(",")).collect(Collectors.toList());
544                fieldsOrder = new OrderedMap<>(Function.identity(), fieldList);
545            }
546        }
547
548        public EntriesOrder(final Iterable<String> fields) {
549            this(new OrderedMap<>(Function.identity(), fields));
550        }
551
552        public Stream<String> getFieldsOrder() {
553            return this.fieldsOrder.streams();
554        }
555
556        /**
557         * Move a field after another one.
558         *
559         * @param after the field name reference
560         * @param name the field name
561         *
562         * @return this EntriesOrder
563         */
564        public EntriesOrder moveAfter(final String after, final String name) {
565            this.currentComparator = null;
566            this.fieldsOrder.moveAfter(after, name);
567            return this;
568        }
569
570        /**
571         * Move a field before another one.
572         *
573         * @param before the field name reference
574         * @param name the field name
575         *
576         * @return this EntriesOrder
577         */
578        public EntriesOrder moveBefore(final String before, final String name) {
579            this.currentComparator = null;
580            this.fieldsOrder.moveBefore(before, name);
581            return this;
582        }
583
584        /**
585         * Swap two fields.
586         *
587         * @param name the field name
588         * @param with the other field
589         *
590         * @return this EntriesOrder
591         */
592        public EntriesOrder swap(final String name, final String with) {
593            this.currentComparator = null;
594            this.fieldsOrder.swap(name, with);
595            return this;
596        }
597
598        public String toFields() {
599            return this.fieldsOrder.streams().collect(Collectors.joining(","));
600        }
601
602        public Comparator<Entry> getComparator() {
603            if (this.currentComparator == null) {
604                final Map<String, Integer> entryPositions = new HashMap<>();
605                final AtomicInteger index = new AtomicInteger(1);
606                this.fieldsOrder.streams()
607                        .forEach(
608                                (final String name) -> entryPositions.put(name, index.getAndIncrement()));
609                this.currentComparator = new EntryComparator(entryPositions);
610            }
611            return this.currentComparator;
612        }
613
614        @Override
615        public int compare(final Entry e1, final Entry e2) {
616            return this.getComparator().compare(e1, e2);
617        }
618
619        @RequiredArgsConstructor
620        static class EntryComparator implements Comparator<Entry> {
621
622            private final Map<String, Integer> entryPositions;
623
624            @Override
625            public int compare(final Entry e1, final Entry e2) {
626                final int index1 = this.entryPositions.getOrDefault(e1.getName(), Integer.MAX_VALUE);
627                final int index2 = this.entryPositions.getOrDefault(e2.getName(), Integer.MAX_VALUE);
628                if (index1 >= 0 && index2 >= 0) {
629                    return index1 - index2;
630                }
631                if (index1 >= 0) {
632                    return -1;
633                }
634                if (index2 >= 0) {
635                    return 1;
636                }
637                return 0;
638            }
639        }
640    }
641
642    // use new avoid collision with entry getter.
643    @Deprecated
644    static Schema.Entry avoidCollision(final Schema.Entry newEntry,
645            final Supplier<Stream<Schema.Entry>> allEntriesSupplier,
646            final BiConsumer<String, Entry> replaceFunction) {
647        final Function<String, Entry> entryGetter = (String name) -> allEntriesSupplier //
648                .get() //
649                .filter((final Entry field) -> field.getName().equals(name))
650                .findFirst()
651                .orElse(null);
652        return avoidCollision(newEntry, entryGetter, replaceFunction);
653    }
654
655    static Schema.Entry avoidCollision(final Schema.Entry newEntry,
656            final Function<String, Entry> entryGetter,
657            final BiConsumer<String, Entry> replaceFunction) {
658        final Optional<Entry> collisionedEntry = Optional.ofNullable(entryGetter //
659                .apply(newEntry.getName())) //
660                .filter((final Entry field) -> !Objects.equals(field, newEntry));
661        if (!collisionedEntry.isPresent()) {
662            // No collision, return new entry.
663            return newEntry;
664        }
665        final Entry matchedEntry = collisionedEntry.get();
666        final boolean matchedToChange = matchedEntry.getRawName() != null && !(matchedEntry.getRawName().isEmpty());
667        if (matchedToChange) {
668            // the rename has to be applied on entry already inside schema, so replace.
669            replaceFunction.accept(matchedEntry.getName(), newEntry);
670        } else if (newEntry.getRawName() == null || newEntry.getRawName().isEmpty()) {
671            // try to add exactly same raw, skip the add here.
672            return null;
673        }
674        final Entry fieldToChange = matchedToChange ? matchedEntry : newEntry;
675        int indexForAnticollision = 1;
676        final String baseName = Schema.sanitizeConnectionName(fieldToChange.getRawName()); // recalc primiti name.
677
678        String newName = baseName + "_" + indexForAnticollision;
679        while (entryGetter.apply(newName) != null) {
680            indexForAnticollision++;
681            newName = baseName + "_" + indexForAnticollision;
682        }
683        final Entry newFieldToAdd = fieldToChange.toBuilder().withName(newName).build();
684
685        return newFieldToAdd; // matchedToChange ? newFieldToAdd : newEntry;
686    }
687}