001/** 002 * Copyright (C) 2006-2022 Talend Inc. - www.talend.com 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.talend.sdk.component.api.record; 017 018import java.io.StringReader; 019import java.math.BigDecimal; 020import java.nio.charset.Charset; 021import java.nio.charset.CharsetEncoder; 022import java.nio.charset.StandardCharsets; 023import java.time.temporal.Temporal; 024import java.util.Arrays; 025import java.util.Base64; 026import java.util.Collection; 027import java.util.Comparator; 028import java.util.Date; 029import java.util.HashMap; 030import java.util.List; 031import java.util.Map; 032import java.util.Objects; 033import java.util.Optional; 034import java.util.concurrent.atomic.AtomicInteger; 035import java.util.function.BiConsumer; 036import java.util.function.Function; 037import java.util.function.Supplier; 038import java.util.stream.Collectors; 039import java.util.stream.Stream; 040 041import javax.json.Json; 042import javax.json.JsonValue; 043import javax.json.bind.annotation.JsonbTransient; 044import javax.json.stream.JsonParser; 045 046import lombok.EqualsAndHashCode; 047import lombok.RequiredArgsConstructor; 048import lombok.ToString; 049 050public interface Schema { 051 052 /** 053 * @return the type of this schema. 054 */ 055 Type getType(); 056 057 /** 058 * @return the nested element schema for arrays. 059 */ 060 Schema getElementSchema(); 061 062 /** 063 * @return the data entries for records (not contains meta data entries). 064 */ 065 List<Entry> getEntries(); 066 067 /** 068 * @return the metadata entries for records (not contains ordinary data entries). 069 */ 070 List<Entry> getMetadata(); 071 072 /** 073 * @return All entries, including data and metadata, of this schema. 074 */ 075 Stream<Entry> getAllEntries(); 076 077 /** 078 * Get a Builder from the current schema. 079 * 080 * @return a {@link Schema.Builder} 081 */ 082 default Schema.Builder toBuilder() { 083 throw new UnsupportedOperationException("#toBuilder is not implemented"); 084 } 085 086 /** 087 * Get all entries sorted by schema designed order. 088 * 089 * @return all entries ordered 090 */ 091 default List<Entry> getEntriesOrdered() { 092 return getEntriesOrdered(naturalOrder()); 093 } 094 095 /** 096 * Get all entries sorted using a custom comparator. 097 * 098 * @param comparator the comparator 099 * 100 * @return all entries ordered with provided comparator 101 */ 102 @JsonbTransient 103 default List<Entry> getEntriesOrdered(final Comparator<Entry> comparator) { 104 return getAllEntries().sorted(comparator).collect(Collectors.toList()); 105 } 106 107 /** 108 * Get the EntriesOrder defined with Builder. 109 * 110 * @return the EntriesOrder 111 */ 112 113 default EntriesOrder naturalOrder() { 114 throw new UnsupportedOperationException("#naturalOrder is not implemented"); 115 } 116 117 default Entry getEntry(final String name) { 118 return getAllEntries() // 119 .filter((Entry e) -> Objects.equals(e.getName(), name)) // 120 .findFirst() // 121 .orElse(null); 122 } 123 124 /** 125 * @return the metadata props 126 */ 127 Map<String, String> getProps(); 128 129 /** 130 * @param property : property name. 131 * 132 * @return the requested metadata prop 133 */ 134 String getProp(String property); 135 136 /** 137 * Get a property values from schema with its name. 138 * 139 * @param name : property's name. 140 * 141 * @return property's value. 142 */ 143 default JsonValue getJsonProp(final String name) { 144 final String prop = this.getProp(name); 145 if (prop == null) { 146 return null; 147 } 148 try (final StringReader reader = new StringReader(prop); 149 final JsonParser parser = Json.createParser(reader)) { 150 return parser.getValue(); 151 } catch (RuntimeException ex) { 152 return Json.createValue(prop); 153 } 154 } 155 156 enum Type { 157 158 RECORD(new Class<?>[] { Record.class }), 159 ARRAY(new Class<?>[] { Collection.class }), 160 STRING(new Class<?>[] { String.class }), 161 BYTES(new Class<?>[] { byte[].class, Byte[].class }), 162 INT(new Class<?>[] { Integer.class }), 163 LONG(new Class<?>[] { Long.class }), 164 FLOAT(new Class<?>[] { Float.class }), 165 DOUBLE(new Class<?>[] { Double.class }), 166 BOOLEAN(new Class<?>[] { Boolean.class }), 167 DATETIME(new Class<?>[] { Long.class, Date.class, Temporal.class }), 168 DECIMAL(new Class<?>[] { BigDecimal.class }); 169 170 /** 171 * All compatibles Java classes 172 */ 173 private final Class<?>[] classes; 174 175 Type(final Class<?>[] classes) { 176 this.classes = classes; 177 } 178 179 /** 180 * Check if input can be affected to an entry of this type. 181 * 182 * @param input : object. 183 * 184 * @return true if input is null or ok. 185 */ 186 public boolean isCompatible(final Object input) { 187 if (input == null) { 188 return true; 189 } 190 for (final Class<?> clazz : classes) { 191 if (clazz.isInstance(input)) { 192 return true; 193 } 194 } 195 return false; 196 } 197 } 198 199 interface Entry { 200 201 /** 202 * @return The name of this entry. 203 */ 204 String getName(); 205 206 /** 207 * @return The raw name of this entry. 208 */ 209 String getRawName(); 210 211 /** 212 * @return the raw name of this entry if exists, else return name. 213 */ 214 String getOriginalFieldName(); 215 216 /** 217 * @return Type of the entry, this determine which other fields are populated. 218 */ 219 Type getType(); 220 221 /** 222 * @return Is this entry nullable or always valued. 223 */ 224 boolean isNullable(); 225 226 /** 227 * @return true if this entry is for metadata; false for ordinary data. 228 */ 229 boolean isMetadata(); 230 231 /** 232 * @param <T> the default value type. 233 * 234 * @return Default value for this entry. 235 */ 236 <T> T getDefaultValue(); 237 238 /** 239 * @return For type == record, the element type. 240 */ 241 Schema getElementSchema(); 242 243 /** 244 * @return Allows to associate to this field a comment - for doc purposes, no use in the runtime. 245 */ 246 String getComment(); 247 248 /** 249 * @return the metadata props 250 */ 251 Map<String, String> getProps(); 252 253 /** 254 * @param property : property name. 255 * 256 * @return the requested metadata prop 257 */ 258 String getProp(String property); 259 260 /** 261 * Get a property values from entry with its name. 262 * 263 * @param name : property's name. 264 * 265 * @return property's value. 266 */ 267 default JsonValue getJsonProp(final String name) { 268 final String prop = this.getProp(name); 269 if (prop == null) { 270 return null; 271 } 272 try (final StringReader reader = new StringReader(prop); 273 final JsonParser parser = Json.createParser(reader)) { 274 return parser.getValue(); 275 } catch (RuntimeException ex) { 276 return Json.createValue(prop); 277 } 278 } 279 280 /** 281 * @return an {@link Entry.Builder} from this entry. 282 */ 283 default Entry.Builder toBuilder() { 284 throw new UnsupportedOperationException("#toBuilder is not implemented"); 285 } 286 287 /** 288 * Plain builder matching {@link Entry} structure. 289 */ 290 interface Builder { 291 292 Builder withName(String name); 293 294 Builder withRawName(String rawName); 295 296 Builder withType(Type type); 297 298 Builder withNullable(boolean nullable); 299 300 Builder withMetadata(boolean metadata); 301 302 <T> Builder withDefaultValue(T value); 303 304 Builder withElementSchema(Schema schema); 305 306 Builder withComment(String comment); 307 308 Builder withProps(Map<String, String> props); 309 310 Builder withProp(String key, String value); 311 312 Entry build(); 313 } 314 } 315 316 /** 317 * Allows to build a {@link Schema}. 318 */ 319 interface Builder { 320 321 /** 322 * @param type schema type. 323 * 324 * @return this builder. 325 */ 326 Builder withType(Type type); 327 328 /** 329 * @param entry element for either an array or record type. 330 * 331 * @return this builder. 332 */ 333 Builder withEntry(Entry entry); 334 335 /** 336 * Insert the entry after the specified entry. 337 * 338 * @param after the entry name reference 339 * @param entry the entry name 340 * 341 * @return this builder 342 */ 343 default Builder withEntryAfter(String after, Entry entry) { 344 throw new UnsupportedOperationException("#withEntryAfter is not implemented"); 345 } 346 347 /** 348 * Insert the entry before the specified entry. 349 * 350 * @param before the entry name reference 351 * @param entry the entry name 352 * 353 * @return this builder 354 */ 355 default Builder withEntryBefore(String before, Entry entry) { 356 throw new UnsupportedOperationException("#withEntryBefore is not implemented"); 357 } 358 359 /** 360 * Remove entry from builder. 361 * 362 * @param name the entry name 363 * 364 * @return this builder 365 */ 366 default Builder remove(String name) { 367 throw new UnsupportedOperationException("#remove is not implemented"); 368 } 369 370 /** 371 * Remove entry from builder. 372 * 373 * @param entry the entry 374 * 375 * @return this builder 376 */ 377 default Builder remove(Entry entry) { 378 throw new UnsupportedOperationException("#remove is not implemented"); 379 } 380 381 /** 382 * Move an entry after another one. 383 * 384 * @param after the entry name reference 385 * @param name the entry name 386 */ 387 default Builder moveAfter(final String after, final String name) { 388 throw new UnsupportedOperationException("#moveAfter is not implemented"); 389 } 390 391 /** 392 * Move an entry before another one. 393 * 394 * @param before the entry name reference 395 * @param name the entry name 396 */ 397 default Builder moveBefore(final String before, final String name) { 398 throw new UnsupportedOperationException("#moveBefore is not implemented"); 399 } 400 401 /** 402 * Swap two entries. 403 * 404 * @param name the entry name 405 * @param with the other entry name 406 */ 407 default Builder swap(final String name, final String with) { 408 throw new UnsupportedOperationException("#swap is not implemented"); 409 } 410 411 /** 412 * @param schema nested element schema. 413 * 414 * @return this builder. 415 */ 416 Builder withElementSchema(Schema schema); 417 418 /** 419 * @param props schema properties 420 * 421 * @return this builder 422 */ 423 Builder withProps(Map<String, String> props); 424 425 /** 426 * @param key the prop key name 427 * @param value the prop value 428 * 429 * @return this builder 430 */ 431 Builder withProp(String key, String value); 432 433 /** 434 * @return the described schema. 435 */ 436 Schema build(); 437 438 /** 439 * Same as {@link Builder#build()} but entries order is specified by {@code order}. This takes precedence on any 440 * previous defined order with builder and may void it. 441 * 442 * @param order the wanted order for entries. 443 * @return the described schema. 444 */ 445 default Schema build(Comparator<Entry> order) { 446 throw new UnsupportedOperationException("#build(EntriesOrder) is not implemented"); 447 } 448 } 449 450 /** 451 * Sanitize name to be avro compatible. 452 * 453 * @param name : original name. 454 * 455 * @return avro compatible name. 456 */ 457 static String sanitizeConnectionName(final String name) { 458 if (name == null || name.isEmpty()) { 459 return name; 460 } 461 462 char current = name.charAt(0); 463 final CharsetEncoder ascii = Charset.forName(StandardCharsets.US_ASCII.name()).newEncoder(); 464 final boolean skipFirstChar = ((!ascii.canEncode(current)) || (!Character.isLetter(current) && current != '_')) 465 && name.length() > 1 && (!Character.isDigit(name.charAt(1))); 466 467 final StringBuilder sanitizedBuilder = new StringBuilder(); 468 469 if (!skipFirstChar) { 470 if (((!Character.isLetter(current)) && current != '_') || (!ascii.canEncode(current))) { 471 sanitizedBuilder.append('_'); 472 } else { 473 sanitizedBuilder.append(current); 474 } 475 } 476 for (int i = 1; i < name.length(); i++) { 477 current = name.charAt(i); 478 if (!ascii.canEncode(current)) { 479 if (Character.isLowerCase(current) || Character.isUpperCase(current)) { 480 sanitizedBuilder.append('_'); 481 } else { 482 final byte[] encoded = 483 Base64.getEncoder().encode(name.substring(i, i + 1).getBytes(StandardCharsets.UTF_8)); 484 final String enc = new String(encoded); 485 if (sanitizedBuilder.length() == 0 && Character.isDigit(enc.charAt(0))) { 486 sanitizedBuilder.append('_'); 487 } 488 for (int iter = 0; iter < enc.length(); iter++) { 489 if (Character.isLetterOrDigit(enc.charAt(iter))) { 490 sanitizedBuilder.append(enc.charAt(iter)); 491 } else { 492 sanitizedBuilder.append('_'); 493 } 494 } 495 } 496 } else if (Character.isLetterOrDigit(current)) { 497 sanitizedBuilder.append(current); 498 } else { 499 sanitizedBuilder.append('_'); 500 } 501 502 } 503 return sanitizedBuilder.toString(); 504 } 505 506 @RequiredArgsConstructor 507 @ToString 508 @EqualsAndHashCode 509 class EntriesOrder implements Comparator<Entry> { 510 511 private final OrderedMap<String> fieldsOrder; 512 513 // Keep comparator while no change occurs in fieldsOrder. 514 private Comparator<Entry> currentComparator = null; 515 516 /** 517 * Build an EntriesOrder according fields. 518 * 519 * @param fields the fields ordering. Each field should be {@code ,} separated. 520 * 521 * @return the order EntriesOrder 522 */ 523 public static EntriesOrder of(final String fields) { 524 return new EntriesOrder(fields); 525 } 526 527 /** 528 * Build an EntriesOrder according fields. 529 * 530 * @param fields the fields ordering. 531 * 532 * @return the order EntriesOrder 533 */ 534 public static EntriesOrder of(final Iterable<String> fields) { 535 final OrderedMap<String> orders = new OrderedMap<>(Function.identity(), fields); 536 return new EntriesOrder(orders); 537 } 538 539 public EntriesOrder(final String fields) { 540 if (fields == null || fields.isEmpty()) { 541 fieldsOrder = new OrderedMap<>(Function.identity()); 542 } else { 543 final List<String> fieldList = Arrays.stream(fields.split(",")).collect(Collectors.toList()); 544 fieldsOrder = new OrderedMap<>(Function.identity(), fieldList); 545 } 546 } 547 548 public EntriesOrder(final Iterable<String> fields) { 549 this(new OrderedMap<>(Function.identity(), fields)); 550 } 551 552 public Stream<String> getFieldsOrder() { 553 return this.fieldsOrder.streams(); 554 } 555 556 /** 557 * Move a field after another one. 558 * 559 * @param after the field name reference 560 * @param name the field name 561 * 562 * @return this EntriesOrder 563 */ 564 public EntriesOrder moveAfter(final String after, final String name) { 565 this.currentComparator = null; 566 this.fieldsOrder.moveAfter(after, name); 567 return this; 568 } 569 570 /** 571 * Move a field before another one. 572 * 573 * @param before the field name reference 574 * @param name the field name 575 * 576 * @return this EntriesOrder 577 */ 578 public EntriesOrder moveBefore(final String before, final String name) { 579 this.currentComparator = null; 580 this.fieldsOrder.moveBefore(before, name); 581 return this; 582 } 583 584 /** 585 * Swap two fields. 586 * 587 * @param name the field name 588 * @param with the other field 589 * 590 * @return this EntriesOrder 591 */ 592 public EntriesOrder swap(final String name, final String with) { 593 this.currentComparator = null; 594 this.fieldsOrder.swap(name, with); 595 return this; 596 } 597 598 public String toFields() { 599 return this.fieldsOrder.streams().collect(Collectors.joining(",")); 600 } 601 602 public Comparator<Entry> getComparator() { 603 if (this.currentComparator == null) { 604 final Map<String, Integer> entryPositions = new HashMap<>(); 605 final AtomicInteger index = new AtomicInteger(1); 606 this.fieldsOrder.streams() 607 .forEach( 608 (final String name) -> entryPositions.put(name, index.getAndIncrement())); 609 this.currentComparator = new EntryComparator(entryPositions); 610 } 611 return this.currentComparator; 612 } 613 614 @Override 615 public int compare(final Entry e1, final Entry e2) { 616 return this.getComparator().compare(e1, e2); 617 } 618 619 @RequiredArgsConstructor 620 static class EntryComparator implements Comparator<Entry> { 621 622 private final Map<String, Integer> entryPositions; 623 624 @Override 625 public int compare(final Entry e1, final Entry e2) { 626 final int index1 = this.entryPositions.getOrDefault(e1.getName(), Integer.MAX_VALUE); 627 final int index2 = this.entryPositions.getOrDefault(e2.getName(), Integer.MAX_VALUE); 628 if (index1 >= 0 && index2 >= 0) { 629 return index1 - index2; 630 } 631 if (index1 >= 0) { 632 return -1; 633 } 634 if (index2 >= 0) { 635 return 1; 636 } 637 return 0; 638 } 639 } 640 } 641 642 // use new avoid collision with entry getter. 643 @Deprecated 644 static Schema.Entry avoidCollision(final Schema.Entry newEntry, 645 final Supplier<Stream<Schema.Entry>> allEntriesSupplier, 646 final BiConsumer<String, Entry> replaceFunction) { 647 final Function<String, Entry> entryGetter = (String name) -> allEntriesSupplier // 648 .get() // 649 .filter((final Entry field) -> field.getName().equals(name)) 650 .findFirst() 651 .orElse(null); 652 return avoidCollision(newEntry, entryGetter, replaceFunction); 653 } 654 655 static Schema.Entry avoidCollision(final Schema.Entry newEntry, 656 final Function<String, Entry> entryGetter, 657 final BiConsumer<String, Entry> replaceFunction) { 658 final Optional<Entry> collisionedEntry = Optional.ofNullable(entryGetter // 659 .apply(newEntry.getName())) // 660 .filter((final Entry field) -> !Objects.equals(field, newEntry)); 661 if (!collisionedEntry.isPresent()) { 662 // No collision, return new entry. 663 return newEntry; 664 } 665 final Entry matchedEntry = collisionedEntry.get(); 666 final boolean matchedToChange = matchedEntry.getRawName() != null && !(matchedEntry.getRawName().isEmpty()); 667 if (matchedToChange) { 668 // the rename has to be applied on entry already inside schema, so replace. 669 replaceFunction.accept(matchedEntry.getName(), newEntry); 670 } else if (newEntry.getRawName() == null || newEntry.getRawName().isEmpty()) { 671 // try to add exactly same raw, skip the add here. 672 return null; 673 } 674 final Entry fieldToChange = matchedToChange ? matchedEntry : newEntry; 675 int indexForAnticollision = 1; 676 final String baseName = Schema.sanitizeConnectionName(fieldToChange.getRawName()); // recalc primiti name. 677 678 String newName = baseName + "_" + indexForAnticollision; 679 while (entryGetter.apply(newName) != null) { 680 indexForAnticollision++; 681 newName = baseName + "_" + indexForAnticollision; 682 } 683 final Entry newFieldToAdd = fieldToChange.toBuilder().withName(newName).build(); 684 685 return newFieldToAdd; // matchedToChange ? newFieldToAdd : newEntry; 686 } 687}