001/**
002 * Copyright (C) 2006-2022 Talend Inc. - www.talend.com
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.talend.sdk.component.api.record;
017
018import java.io.StringReader;
019import java.nio.charset.Charset;
020import java.nio.charset.CharsetEncoder;
021import java.nio.charset.StandardCharsets;
022import java.time.temporal.Temporal;
023import java.util.Arrays;
024import java.util.Base64;
025import java.util.Collection;
026import java.util.Comparator;
027import java.util.Date;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.Objects;
032import java.util.Optional;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.function.BiConsumer;
035import java.util.function.Function;
036import java.util.function.Supplier;
037import java.util.stream.Collectors;
038import java.util.stream.Stream;
039
040import javax.json.Json;
041import javax.json.JsonValue;
042import javax.json.bind.annotation.JsonbTransient;
043
044import lombok.EqualsAndHashCode;
045import lombok.RequiredArgsConstructor;
046import lombok.ToString;
047
048public interface Schema {
049
050    /**
051     * @return the type of this schema.
052     */
053    Type getType();
054
055    /**
056     * @return the nested element schema for arrays.
057     */
058    Schema getElementSchema();
059
060    /**
061     * @return the data entries for records (not contains meta data entries).
062     */
063    List<Entry> getEntries();
064
065    /**
066     * @return the metadata entries for records (not contains ordinary data entries).
067     */
068    List<Entry> getMetadata();
069
070    /**
071     * @return All entries, including data and metadata, of this schema.
072     */
073    Stream<Entry> getAllEntries();
074
075    /**
076     * Get a Builder from the current schema.
077     *
078     * @return a {@link Schema.Builder}
079     */
080    default Schema.Builder toBuilder() {
081        throw new UnsupportedOperationException("#toBuilder is not implemented");
082    }
083
084    /**
085     * Get all entries sorted by schema designed order.
086     *
087     * @return all entries ordered
088     */
089    default List<Entry> getEntriesOrdered() {
090        return getEntriesOrdered(naturalOrder());
091    }
092
093    /**
094     * Get all entries sorted using a custom comparator.
095     *
096     * @param comparator the comparator
097     *
098     * @return all entries ordered with provided comparator
099     */
100    @JsonbTransient
101    default List<Entry> getEntriesOrdered(final Comparator<Entry> comparator) {
102        return getAllEntries().sorted(comparator).collect(Collectors.toList());
103    }
104
105    /**
106     * Get the EntriesOrder defined with Builder.
107     *
108     * @return the EntriesOrder
109     */
110
111    default EntriesOrder naturalOrder() {
112        throw new UnsupportedOperationException("#naturalOrder is not implemented");
113    }
114
115    default Entry getEntry(final String name) {
116        return getAllEntries() //
117                .filter((Entry e) -> Objects.equals(e.getName(), name)) //
118                .findFirst() //
119                .orElse(null);
120    }
121
122    /**
123     * @return the metadata props
124     */
125    Map<String, String> getProps();
126
127    /**
128     * @param property : property name.
129     *
130     * @return the requested metadata prop
131     */
132    String getProp(String property);
133
134    /**
135     * Get a property values from schema with its name.
136     *
137     * @param name : property's name.
138     *
139     * @return property's value.
140     */
141    default JsonValue getJsonProp(final String name) {
142        final String prop = this.getProp(name);
143        if (prop == null) {
144            return null;
145        }
146        try {
147            return Json.createParser(new StringReader(prop)).getValue();
148        } catch (RuntimeException ex) {
149            return Json.createValue(prop);
150        }
151    }
152
153    enum Type {
154
155        RECORD(new Class<?>[] { Record.class }),
156        ARRAY(new Class<?>[] { Collection.class }),
157        STRING(new Class<?>[] { String.class }),
158        BYTES(new Class<?>[] { byte[].class, Byte[].class }),
159        INT(new Class<?>[] { Integer.class }),
160        LONG(new Class<?>[] { Long.class }),
161        FLOAT(new Class<?>[] { Float.class }),
162        DOUBLE(new Class<?>[] { Double.class }),
163        BOOLEAN(new Class<?>[] { Boolean.class }),
164        DATETIME(new Class<?>[] { Long.class, Date.class, Temporal.class });
165
166        /**
167         * All compatibles Java classes
168         */
169        private final Class<?>[] classes;
170
171        Type(final Class<?>[] classes) {
172            this.classes = classes;
173        }
174
175        /**
176         * Check if input can be affected to an entry of this type.
177         *
178         * @param input : object.
179         *
180         * @return true if input is null or ok.
181         */
182        public boolean isCompatible(final Object input) {
183            if (input == null) {
184                return true;
185            }
186            for (final Class<?> clazz : classes) {
187                if (clazz.isInstance(input)) {
188                    return true;
189                }
190            }
191            return false;
192        }
193    }
194
195    interface Entry {
196
197        /**
198         * @return The name of this entry.
199         */
200        String getName();
201
202        /**
203         * @return The raw name of this entry.
204         */
205        String getRawName();
206
207        /**
208         * @return the raw name of this entry if exists, else return name.
209         */
210        String getOriginalFieldName();
211
212        /**
213         * @return Type of the entry, this determine which other fields are populated.
214         */
215        Type getType();
216
217        /**
218         * @return Is this entry nullable or always valued.
219         */
220        boolean isNullable();
221
222        /**
223         * @return true if this entry is for metadata; false for ordinary data.
224         */
225        boolean isMetadata();
226
227        /**
228         * @param <T> the default value type.
229         *
230         * @return Default value for this entry.
231         */
232        <T> T getDefaultValue();
233
234        /**
235         * @return For type == record, the element type.
236         */
237        Schema getElementSchema();
238
239        /**
240         * @return Allows to associate to this field a comment - for doc purposes, no use in the runtime.
241         */
242        String getComment();
243
244        /**
245         * @return the metadata props
246         */
247        Map<String, String> getProps();
248
249        /**
250         * @param property : property name.
251         *
252         * @return the requested metadata prop
253         */
254        String getProp(String property);
255
256        /**
257         * Get a property values from entry with its name.
258         *
259         * @param name : property's name.
260         *
261         * @return property's value.
262         */
263        default JsonValue getJsonProp(final String name) {
264            final String prop = this.getProp(name);
265            if (prop == null) {
266                return null;
267            }
268            try {
269                return Json.createParser(new StringReader(prop)).getValue();
270            } catch (RuntimeException ex) {
271                return Json.createValue(prop);
272            }
273        }
274
275        /**
276         * @return an {@link Entry.Builder} from this entry.
277         */
278        default Entry.Builder toBuilder() {
279            throw new UnsupportedOperationException("#toBuilder is not implemented");
280        }
281
282        /**
283         * Plain builder matching {@link Entry} structure.
284         */
285        interface Builder {
286
287            Builder withName(String name);
288
289            Builder withRawName(String rawName);
290
291            Builder withType(Type type);
292
293            Builder withNullable(boolean nullable);
294
295            Builder withMetadata(boolean metadata);
296
297            <T> Builder withDefaultValue(T value);
298
299            Builder withElementSchema(Schema schema);
300
301            Builder withComment(String comment);
302
303            Builder withProps(Map<String, String> props);
304
305            Builder withProp(String key, String value);
306
307            Entry build();
308        }
309    }
310
311    /**
312     * Allows to build a {@link Schema}.
313     */
314    interface Builder {
315
316        /**
317         * @param type schema type.
318         *
319         * @return this builder.
320         */
321        Builder withType(Type type);
322
323        /**
324         * @param entry element for either an array or record type.
325         *
326         * @return this builder.
327         */
328        Builder withEntry(Entry entry);
329
330        /**
331         * Insert the entry after the specified entry.
332         *
333         * @param after the entry name reference
334         * @param entry the entry name
335         *
336         * @return this builder
337         */
338        default Builder withEntryAfter(String after, Entry entry) {
339            throw new UnsupportedOperationException("#withEntryAfter is not implemented");
340        }
341
342        /**
343         * Insert the entry before the specified entry.
344         *
345         * @param before the entry name reference
346         * @param entry the entry name
347         *
348         * @return this builder
349         */
350        default Builder withEntryBefore(String before, Entry entry) {
351            throw new UnsupportedOperationException("#withEntryBefore is not implemented");
352        }
353
354        /**
355         * Remove entry from builder.
356         *
357         * @param name the entry name
358         *
359         * @return this builder
360         */
361        default Builder remove(String name) {
362            throw new UnsupportedOperationException("#remove is not implemented");
363        }
364
365        /**
366         * Remove entry from builder.
367         *
368         * @param entry the entry
369         *
370         * @return this builder
371         */
372        default Builder remove(Entry entry) {
373            throw new UnsupportedOperationException("#remove is not implemented");
374        }
375
376        /**
377         * Move an entry after another one.
378         *
379         * @param after the entry name reference
380         * @param name the entry name
381         */
382        default Builder moveAfter(final String after, final String name) {
383            throw new UnsupportedOperationException("#moveAfter is not implemented");
384        }
385
386        /**
387         * Move an entry before another one.
388         *
389         * @param before the entry name reference
390         * @param name the entry name
391         */
392        default Builder moveBefore(final String before, final String name) {
393            throw new UnsupportedOperationException("#moveBefore is not implemented");
394        }
395
396        /**
397         * Swap two entries.
398         *
399         * @param name the entry name
400         * @param with the other entry name
401         */
402        default Builder swap(final String name, final String with) {
403            throw new UnsupportedOperationException("#swap is not implemented");
404        }
405
406        /**
407         * @param schema nested element schema.
408         *
409         * @return this builder.
410         */
411        Builder withElementSchema(Schema schema);
412
413        /**
414         * @param props schema properties
415         *
416         * @return this builder
417         */
418        Builder withProps(Map<String, String> props);
419
420        /**
421         * @param key the prop key name
422         * @param value the prop value
423         *
424         * @return this builder
425         */
426        Builder withProp(String key, String value);
427
428        /**
429         * @return the described schema.
430         */
431        Schema build();
432
433        /**
434         * Same as {@link Builder#build()} but entries order is specified by {@code order}. This takes precedence on any
435         * previous defined order with builder and may void it.
436         *
437         * @param order the wanted order for entries.
438         * @return the described schema.
439         */
440        default Schema build(Comparator<Entry> order) {
441            throw new UnsupportedOperationException("#build(EntriesOrder) is not implemented");
442        }
443    }
444
445    /**
446     * Sanitize name to be avro compatible.
447     *
448     * @param name : original name.
449     *
450     * @return avro compatible name.
451     */
452    static String sanitizeConnectionName(final String name) {
453        if (name == null || name.isEmpty()) {
454            return name;
455        }
456
457        char current = name.charAt(0);
458        final CharsetEncoder ascii = Charset.forName(StandardCharsets.US_ASCII.name()).newEncoder();
459        final boolean skipFirstChar = ((!ascii.canEncode(current)) || (!Character.isLetter(current) && current != '_'))
460                && name.length() > 1 && (!Character.isDigit(name.charAt(1)));
461
462        final StringBuilder sanitizedBuilder = new StringBuilder();
463
464        if (!skipFirstChar) {
465            if (((!Character.isLetter(current)) && current != '_') || (!ascii.canEncode(current))) {
466                sanitizedBuilder.append('_');
467            } else {
468                sanitizedBuilder.append(current);
469            }
470        }
471        for (int i = 1; i < name.length(); i++) {
472            current = name.charAt(i);
473            if (!ascii.canEncode(current)) {
474                if (Character.isLowerCase(current) || Character.isUpperCase(current)) {
475                    sanitizedBuilder.append('_');
476                } else {
477                    final byte[] encoded =
478                            Base64.getEncoder().encode(name.substring(i, i + 1).getBytes(StandardCharsets.UTF_8));
479                    final String enc = new String(encoded);
480                    if (sanitizedBuilder.length() == 0 && Character.isDigit(enc.charAt(0))) {
481                        sanitizedBuilder.append('_');
482                    }
483                    for (int iter = 0; iter < enc.length(); iter++) {
484                        if (Character.isLetterOrDigit(enc.charAt(iter))) {
485                            sanitizedBuilder.append(enc.charAt(iter));
486                        } else {
487                            sanitizedBuilder.append('_');
488                        }
489                    }
490                }
491            } else if (Character.isLetterOrDigit(current)) {
492                sanitizedBuilder.append(current);
493            } else {
494                sanitizedBuilder.append('_');
495            }
496
497        }
498        return sanitizedBuilder.toString();
499    }
500
501    @RequiredArgsConstructor
502    @ToString
503    @EqualsAndHashCode
504    class EntriesOrder implements Comparator<Entry> {
505
506        private final OrderedMap<String> fieldsOrder;
507
508        // Keep comparator while no change occurs in fieldsOrder.
509        private Comparator<Entry> currentComparator = null;
510
511        /**
512         * Build an EntriesOrder according fields.
513         *
514         * @param fields the fields ordering. Each field should be {@code ,} separated.
515         *
516         * @return the order EntriesOrder
517         */
518        public static EntriesOrder of(final String fields) {
519            return new EntriesOrder(fields);
520        }
521
522        /**
523         * Build an EntriesOrder according fields.
524         *
525         * @param fields the fields ordering.
526         *
527         * @return the order EntriesOrder
528         */
529        public static EntriesOrder of(final Iterable<String> fields) {
530            final OrderedMap<String> orders = new OrderedMap<>(Function.identity(), fields);
531            return new EntriesOrder(orders);
532        }
533
534        public EntriesOrder(final String fields) {
535            if (fields == null || fields.isEmpty()) {
536                fieldsOrder = new OrderedMap<>(Function.identity());
537            } else {
538                final List<String> fieldList = Arrays.stream(fields.split(",")).collect(Collectors.toList());
539                fieldsOrder = new OrderedMap<>(Function.identity(), fieldList);
540            }
541        }
542
543        public EntriesOrder(final Iterable<String> fields) {
544            this(new OrderedMap<>(Function.identity(), fields));
545        }
546
547        public Stream<String> getFieldsOrder() {
548            return this.fieldsOrder.streams();
549        }
550
551        /**
552         * Move a field after another one.
553         *
554         * @param after the field name reference
555         * @param name the field name
556         *
557         * @return this EntriesOrder
558         */
559        public EntriesOrder moveAfter(final String after, final String name) {
560            this.currentComparator = null;
561            this.fieldsOrder.moveAfter(after, name);
562            return this;
563        }
564
565        /**
566         * Move a field before another one.
567         *
568         * @param before the field name reference
569         * @param name the field name
570         *
571         * @return this EntriesOrder
572         */
573        public EntriesOrder moveBefore(final String before, final String name) {
574            this.currentComparator = null;
575            this.fieldsOrder.moveBefore(before, name);
576            return this;
577        }
578
579        /**
580         * Swap two fields.
581         *
582         * @param name the field name
583         * @param with the other field
584         *
585         * @return this EntriesOrder
586         */
587        public EntriesOrder swap(final String name, final String with) {
588            this.currentComparator = null;
589            this.fieldsOrder.swap(name, with);
590            return this;
591        }
592
593        public String toFields() {
594            return this.fieldsOrder.streams().collect(Collectors.joining(","));
595        }
596
597        public Comparator<Entry> getComparator() {
598            if (this.currentComparator == null) {
599                final Map<String, Integer> entryPositions = new HashMap<>();
600                final AtomicInteger index = new AtomicInteger(1);
601                this.fieldsOrder.streams()
602                        .forEach(
603                                (final String name) -> entryPositions.put(name, index.getAndIncrement()));
604                this.currentComparator = new EntryComparator(entryPositions);
605            }
606            return this.currentComparator;
607        }
608
609        @Override
610        public int compare(final Entry e1, final Entry e2) {
611            return this.getComparator().compare(e1, e2);
612        }
613
614        @RequiredArgsConstructor
615        static class EntryComparator implements Comparator<Entry> {
616
617            private final Map<String, Integer> entryPositions;
618
619            @Override
620            public int compare(final Entry e1, final Entry e2) {
621                final int index1 = this.entryPositions.getOrDefault(e1.getName(), Integer.MAX_VALUE);
622                final int index2 = this.entryPositions.getOrDefault(e2.getName(), Integer.MAX_VALUE);
623                if (index1 >= 0 && index2 >= 0) {
624                    return index1 - index2;
625                }
626                if (index1 >= 0) {
627                    return -1;
628                }
629                if (index2 >= 0) {
630                    return 1;
631                }
632                return 0;
633            }
634        }
635    }
636
637    // use new avoid collision with entry getter.
638    @Deprecated
639    static Schema.Entry avoidCollision(final Schema.Entry newEntry,
640            final Supplier<Stream<Schema.Entry>> allEntriesSupplier,
641            final BiConsumer<String, Entry> replaceFunction) {
642        final Function<String, Entry> entryGetter = (String name) -> allEntriesSupplier //
643                .get() //
644                .filter((final Entry field) -> field.getName().equals(name))
645                .findFirst()
646                .orElse(null);
647        return avoidCollision(newEntry, entryGetter, replaceFunction);
648    }
649
650    static Schema.Entry avoidCollision(final Schema.Entry newEntry,
651            final Function<String, Entry> entryGetter,
652            final BiConsumer<String, Entry> replaceFunction) {
653        final Optional<Entry> collisionedEntry = Optional.ofNullable(entryGetter //
654                .apply(newEntry.getName())) //
655                .filter((final Entry field) -> !Objects.equals(field, newEntry));
656        if (!collisionedEntry.isPresent()) {
657            // No collision, return new entry.
658            return newEntry;
659        }
660        final Entry matchedEntry = collisionedEntry.get();
661        final boolean matchedToChange = matchedEntry.getRawName() != null && !(matchedEntry.getRawName().isEmpty());
662        if (matchedToChange) {
663            // the rename has to be applied on entry already inside schema, so replace.
664            replaceFunction.accept(matchedEntry.getName(), newEntry);
665        } else if (newEntry.getRawName() == null || newEntry.getRawName().isEmpty()) {
666            // try to add exactly same raw, skip the add here.
667            return null;
668        }
669        final Entry fieldToChange = matchedToChange ? matchedEntry : newEntry;
670        int indexForAnticollision = 1;
671        final String baseName = Schema.sanitizeConnectionName(fieldToChange.getRawName()); // recalc primiti name.
672
673        String newName = baseName + "_" + indexForAnticollision;
674        while (entryGetter.apply(newName) != null) {
675            indexForAnticollision++;
676            newName = baseName + "_" + indexForAnticollision;
677        }
678        final Entry newFieldToAdd = fieldToChange.toBuilder().withName(newName).build();
679
680        return newFieldToAdd; // matchedToChange ? newFieldToAdd : newEntry;
681    }
682}