001/**
002 * Copyright (C) 2006-2022 Talend Inc. - www.talend.com
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.talend.sdk.component.api.record;
017
018import java.io.StringReader;
019import java.nio.charset.Charset;
020import java.nio.charset.CharsetEncoder;
021import java.nio.charset.StandardCharsets;
022import java.time.temporal.Temporal;
023import java.util.Arrays;
024import java.util.Base64;
025import java.util.Collection;
026import java.util.Comparator;
027import java.util.Date;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.Objects;
032import java.util.Optional;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.function.BiConsumer;
035import java.util.function.Function;
036import java.util.function.Supplier;
037import java.util.stream.Collectors;
038import java.util.stream.Stream;
039
040import javax.json.Json;
041import javax.json.JsonValue;
042import javax.json.bind.annotation.JsonbTransient;
043import javax.json.stream.JsonParser;
044
045import lombok.EqualsAndHashCode;
046import lombok.RequiredArgsConstructor;
047import lombok.ToString;
048
049public interface Schema {
050
051    /**
052     * @return the type of this schema.
053     */
054    Type getType();
055
056    /**
057     * @return the nested element schema for arrays.
058     */
059    Schema getElementSchema();
060
061    /**
062     * @return the data entries for records (not contains meta data entries).
063     */
064    List<Entry> getEntries();
065
066    /**
067     * @return the metadata entries for records (not contains ordinary data entries).
068     */
069    List<Entry> getMetadata();
070
071    /**
072     * @return All entries, including data and metadata, of this schema.
073     */
074    Stream<Entry> getAllEntries();
075
076    /**
077     * Get a Builder from the current schema.
078     *
079     * @return a {@link Schema.Builder}
080     */
081    default Schema.Builder toBuilder() {
082        throw new UnsupportedOperationException("#toBuilder is not implemented");
083    }
084
085    /**
086     * Get all entries sorted by schema designed order.
087     *
088     * @return all entries ordered
089     */
090    default List<Entry> getEntriesOrdered() {
091        return getEntriesOrdered(naturalOrder());
092    }
093
094    /**
095     * Get all entries sorted using a custom comparator.
096     *
097     * @param comparator the comparator
098     *
099     * @return all entries ordered with provided comparator
100     */
101    @JsonbTransient
102    default List<Entry> getEntriesOrdered(final Comparator<Entry> comparator) {
103        return getAllEntries().sorted(comparator).collect(Collectors.toList());
104    }
105
106    /**
107     * Get the EntriesOrder defined with Builder.
108     *
109     * @return the EntriesOrder
110     */
111
112    default EntriesOrder naturalOrder() {
113        throw new UnsupportedOperationException("#naturalOrder is not implemented");
114    }
115
116    default Entry getEntry(final String name) {
117        return getAllEntries() //
118                .filter((Entry e) -> Objects.equals(e.getName(), name)) //
119                .findFirst() //
120                .orElse(null);
121    }
122
123    /**
124     * @return the metadata props
125     */
126    Map<String, String> getProps();
127
128    /**
129     * @param property : property name.
130     *
131     * @return the requested metadata prop
132     */
133    String getProp(String property);
134
135    /**
136     * Get a property values from schema with its name.
137     *
138     * @param name : property's name.
139     *
140     * @return property's value.
141     */
142    default JsonValue getJsonProp(final String name) {
143        final String prop = this.getProp(name);
144        if (prop == null) {
145            return null;
146        }
147        try (final StringReader reader = new StringReader(prop);
148                final JsonParser parser = Json.createParser(reader)) {
149            return parser.getValue();
150        } catch (RuntimeException ex) {
151            return Json.createValue(prop);
152        }
153    }
154
155    enum Type {
156
157        RECORD(new Class<?>[] { Record.class }),
158        ARRAY(new Class<?>[] { Collection.class }),
159        STRING(new Class<?>[] { String.class }),
160        BYTES(new Class<?>[] { byte[].class, Byte[].class }),
161        INT(new Class<?>[] { Integer.class }),
162        LONG(new Class<?>[] { Long.class }),
163        FLOAT(new Class<?>[] { Float.class }),
164        DOUBLE(new Class<?>[] { Double.class }),
165        BOOLEAN(new Class<?>[] { Boolean.class }),
166        DATETIME(new Class<?>[] { Long.class, Date.class, Temporal.class });
167
168        /**
169         * All compatibles Java classes
170         */
171        private final Class<?>[] classes;
172
173        Type(final Class<?>[] classes) {
174            this.classes = classes;
175        }
176
177        /**
178         * Check if input can be affected to an entry of this type.
179         *
180         * @param input : object.
181         *
182         * @return true if input is null or ok.
183         */
184        public boolean isCompatible(final Object input) {
185            if (input == null) {
186                return true;
187            }
188            for (final Class<?> clazz : classes) {
189                if (clazz.isInstance(input)) {
190                    return true;
191                }
192            }
193            return false;
194        }
195    }
196
197    interface Entry {
198
199        /**
200         * @return The name of this entry.
201         */
202        String getName();
203
204        /**
205         * @return The raw name of this entry.
206         */
207        String getRawName();
208
209        /**
210         * @return the raw name of this entry if exists, else return name.
211         */
212        String getOriginalFieldName();
213
214        /**
215         * @return Type of the entry, this determine which other fields are populated.
216         */
217        Type getType();
218
219        /**
220         * @return Is this entry nullable or always valued.
221         */
222        boolean isNullable();
223
224        /**
225         * @return true if this entry is for metadata; false for ordinary data.
226         */
227        boolean isMetadata();
228
229        /**
230         * @param <T> the default value type.
231         *
232         * @return Default value for this entry.
233         */
234        <T> T getDefaultValue();
235
236        /**
237         * @return For type == record, the element type.
238         */
239        Schema getElementSchema();
240
241        /**
242         * @return Allows to associate to this field a comment - for doc purposes, no use in the runtime.
243         */
244        String getComment();
245
246        /**
247         * @return the metadata props
248         */
249        Map<String, String> getProps();
250
251        /**
252         * @param property : property name.
253         *
254         * @return the requested metadata prop
255         */
256        String getProp(String property);
257
258        /**
259         * Get a property values from entry with its name.
260         *
261         * @param name : property's name.
262         *
263         * @return property's value.
264         */
265        default JsonValue getJsonProp(final String name) {
266            final String prop = this.getProp(name);
267            if (prop == null) {
268                return null;
269            }
270            try (final StringReader reader = new StringReader(prop);
271                    final JsonParser parser = Json.createParser(reader)) {
272                return parser.getValue();
273            } catch (RuntimeException ex) {
274                return Json.createValue(prop);
275            }
276        }
277
278        /**
279         * @return an {@link Entry.Builder} from this entry.
280         */
281        default Entry.Builder toBuilder() {
282            throw new UnsupportedOperationException("#toBuilder is not implemented");
283        }
284
285        /**
286         * Plain builder matching {@link Entry} structure.
287         */
288        interface Builder {
289
290            Builder withName(String name);
291
292            Builder withRawName(String rawName);
293
294            Builder withType(Type type);
295
296            Builder withNullable(boolean nullable);
297
298            Builder withMetadata(boolean metadata);
299
300            <T> Builder withDefaultValue(T value);
301
302            Builder withElementSchema(Schema schema);
303
304            Builder withComment(String comment);
305
306            Builder withProps(Map<String, String> props);
307
308            Builder withProp(String key, String value);
309
310            Entry build();
311        }
312    }
313
314    /**
315     * Allows to build a {@link Schema}.
316     */
317    interface Builder {
318
319        /**
320         * @param type schema type.
321         *
322         * @return this builder.
323         */
324        Builder withType(Type type);
325
326        /**
327         * @param entry element for either an array or record type.
328         *
329         * @return this builder.
330         */
331        Builder withEntry(Entry entry);
332
333        /**
334         * Insert the entry after the specified entry.
335         *
336         * @param after the entry name reference
337         * @param entry the entry name
338         *
339         * @return this builder
340         */
341        default Builder withEntryAfter(String after, Entry entry) {
342            throw new UnsupportedOperationException("#withEntryAfter is not implemented");
343        }
344
345        /**
346         * Insert the entry before the specified entry.
347         *
348         * @param before the entry name reference
349         * @param entry the entry name
350         *
351         * @return this builder
352         */
353        default Builder withEntryBefore(String before, Entry entry) {
354            throw new UnsupportedOperationException("#withEntryBefore is not implemented");
355        }
356
357        /**
358         * Remove entry from builder.
359         *
360         * @param name the entry name
361         *
362         * @return this builder
363         */
364        default Builder remove(String name) {
365            throw new UnsupportedOperationException("#remove is not implemented");
366        }
367
368        /**
369         * Remove entry from builder.
370         *
371         * @param entry the entry
372         *
373         * @return this builder
374         */
375        default Builder remove(Entry entry) {
376            throw new UnsupportedOperationException("#remove is not implemented");
377        }
378
379        /**
380         * Move an entry after another one.
381         *
382         * @param after the entry name reference
383         * @param name the entry name
384         */
385        default Builder moveAfter(final String after, final String name) {
386            throw new UnsupportedOperationException("#moveAfter is not implemented");
387        }
388
389        /**
390         * Move an entry before another one.
391         *
392         * @param before the entry name reference
393         * @param name the entry name
394         */
395        default Builder moveBefore(final String before, final String name) {
396            throw new UnsupportedOperationException("#moveBefore is not implemented");
397        }
398
399        /**
400         * Swap two entries.
401         *
402         * @param name the entry name
403         * @param with the other entry name
404         */
405        default Builder swap(final String name, final String with) {
406            throw new UnsupportedOperationException("#swap is not implemented");
407        }
408
409        /**
410         * @param schema nested element schema.
411         *
412         * @return this builder.
413         */
414        Builder withElementSchema(Schema schema);
415
416        /**
417         * @param props schema properties
418         *
419         * @return this builder
420         */
421        Builder withProps(Map<String, String> props);
422
423        /**
424         * @param key the prop key name
425         * @param value the prop value
426         *
427         * @return this builder
428         */
429        Builder withProp(String key, String value);
430
431        /**
432         * @return the described schema.
433         */
434        Schema build();
435
436        /**
437         * Same as {@link Builder#build()} but entries order is specified by {@code order}. This takes precedence on any
438         * previous defined order with builder and may void it.
439         *
440         * @param order the wanted order for entries.
441         * @return the described schema.
442         */
443        default Schema build(Comparator<Entry> order) {
444            throw new UnsupportedOperationException("#build(EntriesOrder) is not implemented");
445        }
446    }
447
448    /**
449     * Sanitize name to be avro compatible.
450     *
451     * @param name : original name.
452     *
453     * @return avro compatible name.
454     */
455    static String sanitizeConnectionName(final String name) {
456        if (name == null || name.isEmpty()) {
457            return name;
458        }
459
460        char current = name.charAt(0);
461        final CharsetEncoder ascii = Charset.forName(StandardCharsets.US_ASCII.name()).newEncoder();
462        final boolean skipFirstChar = ((!ascii.canEncode(current)) || (!Character.isLetter(current) && current != '_'))
463                && name.length() > 1 && (!Character.isDigit(name.charAt(1)));
464
465        final StringBuilder sanitizedBuilder = new StringBuilder();
466
467        if (!skipFirstChar) {
468            if (((!Character.isLetter(current)) && current != '_') || (!ascii.canEncode(current))) {
469                sanitizedBuilder.append('_');
470            } else {
471                sanitizedBuilder.append(current);
472            }
473        }
474        for (int i = 1; i < name.length(); i++) {
475            current = name.charAt(i);
476            if (!ascii.canEncode(current)) {
477                if (Character.isLowerCase(current) || Character.isUpperCase(current)) {
478                    sanitizedBuilder.append('_');
479                } else {
480                    final byte[] encoded =
481                            Base64.getEncoder().encode(name.substring(i, i + 1).getBytes(StandardCharsets.UTF_8));
482                    final String enc = new String(encoded);
483                    if (sanitizedBuilder.length() == 0 && Character.isDigit(enc.charAt(0))) {
484                        sanitizedBuilder.append('_');
485                    }
486                    for (int iter = 0; iter < enc.length(); iter++) {
487                        if (Character.isLetterOrDigit(enc.charAt(iter))) {
488                            sanitizedBuilder.append(enc.charAt(iter));
489                        } else {
490                            sanitizedBuilder.append('_');
491                        }
492                    }
493                }
494            } else if (Character.isLetterOrDigit(current)) {
495                sanitizedBuilder.append(current);
496            } else {
497                sanitizedBuilder.append('_');
498            }
499
500        }
501        return sanitizedBuilder.toString();
502    }
503
504    @RequiredArgsConstructor
505    @ToString
506    @EqualsAndHashCode
507    class EntriesOrder implements Comparator<Entry> {
508
509        private final OrderedMap<String> fieldsOrder;
510
511        // Keep comparator while no change occurs in fieldsOrder.
512        private Comparator<Entry> currentComparator = null;
513
514        /**
515         * Build an EntriesOrder according fields.
516         *
517         * @param fields the fields ordering. Each field should be {@code ,} separated.
518         *
519         * @return the order EntriesOrder
520         */
521        public static EntriesOrder of(final String fields) {
522            return new EntriesOrder(fields);
523        }
524
525        /**
526         * Build an EntriesOrder according fields.
527         *
528         * @param fields the fields ordering.
529         *
530         * @return the order EntriesOrder
531         */
532        public static EntriesOrder of(final Iterable<String> fields) {
533            final OrderedMap<String> orders = new OrderedMap<>(Function.identity(), fields);
534            return new EntriesOrder(orders);
535        }
536
537        public EntriesOrder(final String fields) {
538            if (fields == null || fields.isEmpty()) {
539                fieldsOrder = new OrderedMap<>(Function.identity());
540            } else {
541                final List<String> fieldList = Arrays.stream(fields.split(",")).collect(Collectors.toList());
542                fieldsOrder = new OrderedMap<>(Function.identity(), fieldList);
543            }
544        }
545
546        public EntriesOrder(final Iterable<String> fields) {
547            this(new OrderedMap<>(Function.identity(), fields));
548        }
549
550        public Stream<String> getFieldsOrder() {
551            return this.fieldsOrder.streams();
552        }
553
554        /**
555         * Move a field after another one.
556         *
557         * @param after the field name reference
558         * @param name the field name
559         *
560         * @return this EntriesOrder
561         */
562        public EntriesOrder moveAfter(final String after, final String name) {
563            this.currentComparator = null;
564            this.fieldsOrder.moveAfter(after, name);
565            return this;
566        }
567
568        /**
569         * Move a field before another one.
570         *
571         * @param before the field name reference
572         * @param name the field name
573         *
574         * @return this EntriesOrder
575         */
576        public EntriesOrder moveBefore(final String before, final String name) {
577            this.currentComparator = null;
578            this.fieldsOrder.moveBefore(before, name);
579            return this;
580        }
581
582        /**
583         * Swap two fields.
584         *
585         * @param name the field name
586         * @param with the other field
587         *
588         * @return this EntriesOrder
589         */
590        public EntriesOrder swap(final String name, final String with) {
591            this.currentComparator = null;
592            this.fieldsOrder.swap(name, with);
593            return this;
594        }
595
596        public String toFields() {
597            return this.fieldsOrder.streams().collect(Collectors.joining(","));
598        }
599
600        public Comparator<Entry> getComparator() {
601            if (this.currentComparator == null) {
602                final Map<String, Integer> entryPositions = new HashMap<>();
603                final AtomicInteger index = new AtomicInteger(1);
604                this.fieldsOrder.streams()
605                        .forEach(
606                                (final String name) -> entryPositions.put(name, index.getAndIncrement()));
607                this.currentComparator = new EntryComparator(entryPositions);
608            }
609            return this.currentComparator;
610        }
611
612        @Override
613        public int compare(final Entry e1, final Entry e2) {
614            return this.getComparator().compare(e1, e2);
615        }
616
617        @RequiredArgsConstructor
618        static class EntryComparator implements Comparator<Entry> {
619
620            private final Map<String, Integer> entryPositions;
621
622            @Override
623            public int compare(final Entry e1, final Entry e2) {
624                final int index1 = this.entryPositions.getOrDefault(e1.getName(), Integer.MAX_VALUE);
625                final int index2 = this.entryPositions.getOrDefault(e2.getName(), Integer.MAX_VALUE);
626                if (index1 >= 0 && index2 >= 0) {
627                    return index1 - index2;
628                }
629                if (index1 >= 0) {
630                    return -1;
631                }
632                if (index2 >= 0) {
633                    return 1;
634                }
635                return 0;
636            }
637        }
638    }
639
640    // use new avoid collision with entry getter.
641    @Deprecated
642    static Schema.Entry avoidCollision(final Schema.Entry newEntry,
643            final Supplier<Stream<Schema.Entry>> allEntriesSupplier,
644            final BiConsumer<String, Entry> replaceFunction) {
645        final Function<String, Entry> entryGetter = (String name) -> allEntriesSupplier //
646                .get() //
647                .filter((final Entry field) -> field.getName().equals(name))
648                .findFirst()
649                .orElse(null);
650        return avoidCollision(newEntry, entryGetter, replaceFunction);
651    }
652
653    static Schema.Entry avoidCollision(final Schema.Entry newEntry,
654            final Function<String, Entry> entryGetter,
655            final BiConsumer<String, Entry> replaceFunction) {
656        final Optional<Entry> collisionedEntry = Optional.ofNullable(entryGetter //
657                .apply(newEntry.getName())) //
658                .filter((final Entry field) -> !Objects.equals(field, newEntry));
659        if (!collisionedEntry.isPresent()) {
660            // No collision, return new entry.
661            return newEntry;
662        }
663        final Entry matchedEntry = collisionedEntry.get();
664        final boolean matchedToChange = matchedEntry.getRawName() != null && !(matchedEntry.getRawName().isEmpty());
665        if (matchedToChange) {
666            // the rename has to be applied on entry already inside schema, so replace.
667            replaceFunction.accept(matchedEntry.getName(), newEntry);
668        } else if (newEntry.getRawName() == null || newEntry.getRawName().isEmpty()) {
669            // try to add exactly same raw, skip the add here.
670            return null;
671        }
672        final Entry fieldToChange = matchedToChange ? matchedEntry : newEntry;
673        int indexForAnticollision = 1;
674        final String baseName = Schema.sanitizeConnectionName(fieldToChange.getRawName()); // recalc primiti name.
675
676        String newName = baseName + "_" + indexForAnticollision;
677        while (entryGetter.apply(newName) != null) {
678            indexForAnticollision++;
679            newName = baseName + "_" + indexForAnticollision;
680        }
681        final Entry newFieldToAdd = fieldToChange.toBuilder().withName(newName).build();
682
683        return newFieldToAdd; // matchedToChange ? newFieldToAdd : newEntry;
684    }
685}