001/**
002 * Copyright (C) 2006-2021 Talend Inc. - www.talend.com
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.talend.sdk.component.api.record;
017
018import static java.util.Collections.emptyList;
019
020import java.io.StringReader;
021import java.nio.charset.Charset;
022import java.nio.charset.CharsetEncoder;
023import java.nio.charset.StandardCharsets;
024import java.time.temporal.Temporal;
025import java.util.Arrays;
026import java.util.Base64;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.Comparator;
030import java.util.Date;
031import java.util.List;
032import java.util.Map;
033import java.util.Objects;
034import java.util.Optional;
035import java.util.Set;
036import java.util.function.BiConsumer;
037import java.util.function.Supplier;
038import java.util.stream.Collectors;
039import java.util.stream.Stream;
040
041import javax.json.Json;
042import javax.json.JsonValue;
043import javax.json.bind.annotation.JsonbTransient;
044
045import lombok.AllArgsConstructor;
046import lombok.EqualsAndHashCode;
047import lombok.Getter;
048import lombok.ToString;
049
050public interface Schema {
051
052    /**
053     * @return the type of this schema.
054     */
055    Type getType();
056
057    /**
058     * @return the nested element schema for arrays.
059     */
060    Schema getElementSchema();
061
062    /**
063     * @return the data entries for records (not contains meta data entries).
064     */
065    List<Entry> getEntries();
066
067    /**
068     * @return the metadata entries for records (not contains ordinary data entries).
069     */
070    List<Entry> getMetadata();
071
072    /**
073     * @return All entries, including data and metadata, of this schema.
074     */
075    Stream<Entry> getAllEntries();
076
077    /**
078     * Get a Builder from the current schema.
079     *
080     * @return a {@link Schema.Builder}
081     */
082    Schema.Builder toBuilder();
083
084    /**
085     * Get all entries sorted by schema designed order.
086     *
087     * @return all entries ordered
088     */
089    default List<Entry> getEntriesOrdered() {
090        return getEntriesOrdered(naturalOrder());
091    }
092
093    /**
094     * Get all entries sorted using a custom comparator.
095     *
096     * @param comparator the comparator
097     *
098     * @return all entries ordered with provided comparator
099     */
100    @JsonbTransient
101    default List<Entry> getEntriesOrdered(final Comparator<Entry> comparator) {
102        return getAllEntries().sorted(comparator).collect(Collectors.toList());
103    }
104
105    /**
106     * Get the EntriesOrder defined with Builder.
107     *
108     * @return the EntriesOrder
109     */
110
111    EntriesOrder naturalOrder();
112
113    default Entry getEntry(final String name) {
114        return getAllEntries() //
115                .filter((Entry e) -> Objects.equals(e.getName(), name)) //
116                .findFirst() //
117                .orElse(null);
118    }
119
120    /**
121     * @return the metadata props
122     */
123    Map<String, String> getProps();
124
125    /**
126     * @param property : property name.
127     *
128     * @return the requested metadata prop
129     */
130    String getProp(String property);
131
132    /**
133     * Get a property values from schema with its name.
134     *
135     * @param name : property's name.
136     *
137     * @return property's value.
138     */
139    default JsonValue getJsonProp(final String name) {
140        final String prop = this.getProp(name);
141        if (prop == null) {
142            return null;
143        }
144        try {
145            return Json.createParser(new StringReader(prop)).getValue();
146        } catch (RuntimeException ex) {
147            return Json.createValue(prop);
148        }
149    }
150
151    enum Type {
152
153        RECORD(new Class<?>[] { Record.class }),
154        ARRAY(new Class<?>[] { Collection.class }),
155        STRING(new Class<?>[] { String.class }),
156        BYTES(new Class<?>[] { byte[].class, Byte[].class }),
157        INT(new Class<?>[] { Integer.class }),
158        LONG(new Class<?>[] { Long.class }),
159        FLOAT(new Class<?>[] { Float.class }),
160        DOUBLE(new Class<?>[] { Double.class }),
161        BOOLEAN(new Class<?>[] { Boolean.class }),
162        DATETIME(new Class<?>[] { Long.class, Date.class, Temporal.class });
163
164        /**
165         * All compatibles Java classes
166         */
167        private final Class<?>[] classes;
168
169        Type(final Class<?>[] classes) {
170            this.classes = classes;
171        }
172
173        /**
174         * Check if input can be affected to an entry of this type.
175         *
176         * @param input : object.
177         *
178         * @return true if input is null or ok.
179         */
180        public boolean isCompatible(final Object input) {
181            if (input == null) {
182                return true;
183            }
184            for (final Class<?> clazz : classes) {
185                if (clazz.isInstance(input)) {
186                    return true;
187                }
188            }
189            return false;
190        }
191    }
192
193    interface Entry {
194
195        /**
196         * @return The name of this entry.
197         */
198        String getName();
199
200        /**
201         * @return The raw name of this entry.
202         */
203        String getRawName();
204
205        /**
206         * @return the raw name of this entry if exists, else return name.
207         */
208        String getOriginalFieldName();
209
210        /**
211         * @return Type of the entry, this determine which other fields are populated.
212         */
213        Type getType();
214
215        /**
216         * @return Is this entry nullable or always valued.
217         */
218        boolean isNullable();
219
220        /**
221         * @return true if this entry is for metadata; false for ordinary data.
222         */
223        boolean isMetadata();
224
225        /**
226         * @param <T> the default value type.
227         *
228         * @return Default value for this entry.
229         */
230        <T> T getDefaultValue();
231
232        /**
233         * @return For type == record, the element type.
234         */
235        Schema getElementSchema();
236
237        /**
238         * @return Allows to associate to this field a comment - for doc purposes, no use in the runtime.
239         */
240        String getComment();
241
242        /**
243         * @return the metadata props
244         */
245        Map<String, String> getProps();
246
247        /**
248         * @param property : property name.
249         *
250         * @return the requested metadata prop
251         */
252        String getProp(String property);
253
254        /**
255         * Get a property values from entry with its name.
256         *
257         * @param name : property's name.
258         *
259         * @return property's value.
260         */
261        default JsonValue getJsonProp(final String name) {
262            final String prop = this.getProp(name);
263            if (prop == null) {
264                return null;
265            }
266            try {
267                return Json.createParser(new StringReader(prop)).getValue();
268            } catch (RuntimeException ex) {
269                return Json.createValue(prop);
270            }
271        }
272
273        /**
274         * @return an {@link Entry.Builder} from this entry.
275         */
276        Entry.Builder toBuilder();
277
278        /**
279         * Plain builder matching {@link Entry} structure.
280         */
281        interface Builder {
282
283            Builder withName(String name);
284
285            Builder withRawName(String rawName);
286
287            Builder withType(Type type);
288
289            Builder withNullable(boolean nullable);
290
291            Builder withMetadata(boolean metadata);
292
293            <T> Builder withDefaultValue(T value);
294
295            Builder withElementSchema(Schema schema);
296
297            Builder withComment(String comment);
298
299            Builder withProps(Map<String, String> props);
300
301            Builder withProp(String key, String value);
302
303            Entry build();
304        }
305    }
306
307    /**
308     * Allows to build a {@link Schema}.
309     */
310    interface Builder {
311
312        /**
313         * @param type schema type.
314         *
315         * @return this builder.
316         */
317        Builder withType(Type type);
318
319        /**
320         * @param entry element for either an array or record type.
321         *
322         * @return this builder.
323         */
324        Builder withEntry(Entry entry);
325
326        /**
327         * Insert the entry after the specified entry.
328         *
329         * @param after the entry name reference
330         * @param entry the entry name
331         *
332         * @return this builder
333         */
334        Builder withEntryAfter(String after, Entry entry);
335
336        /**
337         * Insert the entry before the specified entry.
338         *
339         * @param before the entry name reference
340         * @param entry the entry name
341         *
342         * @return this builder
343         */
344        Builder withEntryBefore(String before, Entry entry);
345
346        /**
347         * Remove entry from builder.
348         *
349         * @param name the entry name
350         *
351         * @return this builder
352         */
353        Builder remove(String name);
354
355        /**
356         * Remove entry from builder.
357         *
358         * @param entry the entry
359         *
360         * @return this builder
361         */
362        Builder remove(Entry entry);
363
364        /**
365         * Move an entry after another one.
366         *
367         * @param after the entry name reference
368         * @param name the entry name
369         */
370        Builder moveAfter(final String after, final String name);
371
372        /**
373         * Move an entry before another one.
374         *
375         * @param before the entry name reference
376         * @param name the entry name
377         */
378        Builder moveBefore(final String before, final String name);
379
380        /**
381         * Swap two entries.
382         *
383         * @param name the entry name
384         * @param with the other entry name
385         */
386        Builder swap(final String name, final String with);
387
388        /**
389         * @param schema nested element schema.
390         *
391         * @return this builder.
392         */
393        Builder withElementSchema(Schema schema);
394
395        /**
396         * @param props schema properties
397         *
398         * @return this builder
399         */
400        Builder withProps(Map<String, String> props);
401
402        /**
403         * @param key the prop key name
404         * @param value the prop value
405         *
406         * @return this builder
407         */
408        Builder withProp(String key, String value);
409
410        /**
411         * @return the described schema.
412         */
413        Schema build();
414    }
415
416    /**
417     * Sanitize name to be avro compatible.
418     *
419     * @param name : original name.
420     *
421     * @return avro compatible name.
422     */
423    static String sanitizeConnectionName(final String name) {
424        if (name == null || name.isEmpty()) {
425            return name;
426        }
427
428        char current = name.charAt(0);
429        final CharsetEncoder ascii = Charset.forName(StandardCharsets.US_ASCII.name()).newEncoder();
430        final boolean skipFirstChar = ((!ascii.canEncode(current)) || (!Character.isLetter(current) && current != '_'))
431                && name.length() > 1 && (!Character.isDigit(name.charAt(1)));
432
433        final StringBuilder sanitizedBuilder = new StringBuilder();
434
435        if (!skipFirstChar) {
436            if (((!Character.isLetter(current)) && current != '_') || (!ascii.canEncode(current))) {
437                sanitizedBuilder.append('_');
438            } else {
439                sanitizedBuilder.append(current);
440            }
441        }
442        for (int i = 1; i < name.length(); i++) {
443            current = name.charAt(i);
444            if (!ascii.canEncode(current)) {
445                if (Character.isLowerCase(current) || Character.isUpperCase(current)) {
446                    sanitizedBuilder.append('_');
447                } else {
448                    final byte[] encoded =
449                            Base64.getEncoder().encode(name.substring(i, i + 1).getBytes(StandardCharsets.UTF_8));
450                    final String enc = new String(encoded);
451                    if (sanitizedBuilder.length() == 0 && Character.isDigit(enc.charAt(0))) {
452                        sanitizedBuilder.append('_');
453                    }
454                    for (int iter = 0; iter < enc.length(); iter++) {
455                        if (Character.isLetterOrDigit(enc.charAt(iter))) {
456                            sanitizedBuilder.append(enc.charAt(iter));
457                        } else {
458                            sanitizedBuilder.append('_');
459                        }
460                    }
461                }
462            } else if (Character.isLetterOrDigit(current)) {
463                sanitizedBuilder.append(current);
464            } else {
465                sanitizedBuilder.append('_');
466            }
467
468        }
469        return sanitizedBuilder.toString();
470    }
471
472    @AllArgsConstructor
473    @ToString
474    @EqualsAndHashCode
475    class EntriesOrder implements Comparator<Entry> {
476
477        @Getter
478        private final List<String> fieldsOrder;
479
480        /**
481         * Build an EntriesOrder according fields.
482         *
483         * @param fields the fields ordering
484         *
485         * @return the order EntriesOrder
486         */
487        public static EntriesOrder of(final String fields) {
488            return new EntriesOrder(fields);
489        }
490
491        public EntriesOrder(final String fields) {
492            if (fields == null) {
493                fieldsOrder = emptyList();
494            } else {
495                fieldsOrder = Arrays.stream(fields.split(",")).collect(Collectors.toList());
496            }
497        }
498
499        /**
500         * Move a field after another one.
501         *
502         * @param after the field name reference
503         * @param name the field name
504         *
505         * @return this EntriesOrder
506         */
507        public EntriesOrder moveAfter(final String after, final String name) {
508            if (getFieldsOrder().indexOf(after) == -1) {
509                throw new IllegalArgumentException(String.format("%s not in schema", after));
510            }
511            getFieldsOrder().remove(name);
512            int destination = getFieldsOrder().indexOf(after);
513            if (!(destination + 1 == getFieldsOrder().size())) {
514                destination += 1;
515            }
516            getFieldsOrder().add(destination, name);
517            return this;
518        }
519
520        /**
521         * Move a field before another one.
522         *
523         * @param before the field name reference
524         * @param name the field name
525         *
526         * @return this EntriesOrder
527         */
528        public EntriesOrder moveBefore(final String before, final String name) {
529            if (getFieldsOrder().indexOf(before) == -1) {
530                throw new IllegalArgumentException(String.format("%s not in schema", before));
531            }
532            getFieldsOrder().remove(name);
533            getFieldsOrder().add(getFieldsOrder().indexOf(before), name);
534            return this;
535        }
536
537        /**
538         * Swap two fields.
539         *
540         * @param name the field name
541         * @param with the other field
542         *
543         * @return this EntriesOrder
544         */
545        public EntriesOrder swap(final String name, final String with) {
546            Collections.swap(getFieldsOrder(), getFieldsOrder().indexOf(name), getFieldsOrder().indexOf(with));
547            return this;
548        }
549
550        public String toFields() {
551            return getFieldsOrder().stream().collect(Collectors.joining(","));
552        }
553
554        @Override
555        public int compare(final Entry e1, final Entry e2) {
556            final int index1 = getFieldsOrder().indexOf(e1.getName());
557            final int index2 = getFieldsOrder().indexOf(e2.getName());
558            if (index1 >= 0 && index2 >= 0) {
559                return index1 - index2;
560            }
561            if (index1 >= 0) {
562                return -1;
563            }
564            if (index2 >= 0) {
565                return 1;
566            }
567            return 0;
568        }
569    }
570
571    static Schema.Entry avoidCollision(final Schema.Entry newEntry,
572            final Supplier<Stream<Schema.Entry>> allEntriesSupplier, final BiConsumer<String, Entry> replaceFunction) {
573        final Optional<Entry> collisionedEntry = allEntriesSupplier //
574                .get() //
575                .filter((final Entry field) -> field.getName().equals(newEntry.getName())
576                        && !Objects.equals(field, newEntry)) //
577                .findFirst();
578        if (!collisionedEntry.isPresent()) {
579            // No collision, return new entry.
580            return newEntry;
581        }
582        final Entry matchedEntry = collisionedEntry.get();
583        final boolean matchedToChange = matchedEntry.getRawName() != null && !(matchedEntry.getRawName().isEmpty());
584        if (matchedToChange) {
585            // the rename has to be applied on entry already inside schema, so replace.
586            replaceFunction.accept(matchedEntry.getName(), newEntry);
587        } else if (newEntry.getRawName() == null || newEntry.getRawName().isEmpty()) {
588            // try to add exactly same raw, skip the add here.
589            return null;
590        }
591        final Entry fieldToChange = matchedToChange ? matchedEntry : newEntry;
592        int indexForAnticollision = 1;
593        final String baseName = Schema.sanitizeConnectionName(fieldToChange.getRawName()); // recalc primiti name.
594
595        String newName = baseName + "_" + indexForAnticollision;
596        final Set<String> existingNames = allEntriesSupplier //
597                .get() //
598                .map(Entry::getName) //
599                .collect(Collectors.toSet());
600        while (existingNames.contains(newName)) {
601            indexForAnticollision++;
602            newName = baseName + "_" + indexForAnticollision;
603        }
604        final Entry newFieldToAdd = fieldToChange.toBuilder().withName(newName).build();
605
606        return newFieldToAdd; // matchedToChange ? newFieldToAdd : newEntry;
607    }
608}