001/** 002 * Copyright (C) 2006-2025 Talend Inc. - www.talend.com 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.talend.sdk.component.api.record; 017 018import java.io.StringReader; 019import java.math.BigDecimal; 020import java.nio.charset.Charset; 021import java.nio.charset.CharsetEncoder; 022import java.nio.charset.StandardCharsets; 023import java.time.temporal.Temporal; 024import java.util.Arrays; 025import java.util.Base64; 026import java.util.Collection; 027import java.util.Comparator; 028import java.util.Date; 029import java.util.HashMap; 030import java.util.List; 031import java.util.Map; 032import java.util.Objects; 033import java.util.Optional; 034import java.util.concurrent.atomic.AtomicInteger; 035import java.util.function.BiConsumer; 036import java.util.function.Function; 037import java.util.stream.Collectors; 038import java.util.stream.Stream; 039 040import javax.json.Json; 041import javax.json.JsonValue; 042import javax.json.bind.annotation.JsonbTransient; 043import javax.json.stream.JsonParser; 044 045import lombok.EqualsAndHashCode; 046import lombok.RequiredArgsConstructor; 047import lombok.ToString; 048 049public interface Schema { 050 051 String SKIP_SANITIZE_PROPERTY = "talend.component.record.skip.sanitize"; 052 053 boolean SKIP_SANITIZE = Boolean.getBoolean(SKIP_SANITIZE_PROPERTY); 054 055 /** 056 * @return the type of this schema. 057 */ 058 Type getType(); 059 060 /** 061 * @return the nested element schema for arrays. 062 */ 063 Schema getElementSchema(); 064 065 /** 066 * @return the data entries for records (not contains meta data entries). 067 */ 068 List<Entry> getEntries(); 069 070 /** 071 * @return the metadata entries for records (not contains ordinary data entries). 072 */ 073 List<Entry> getMetadata(); 074 075 /** 076 * @return All entries, including data and metadata, of this schema. 077 */ 078 Stream<Entry> getAllEntries(); 079 080 default Map<String, Entry> getEntryMap() { 081 throw new UnsupportedOperationException("#getEntryMap is not implemented"); 082 } 083 084 /** 085 * Get a Builder from the current schema. 086 * 087 * @return a {@link Schema.Builder} 088 */ 089 default Schema.Builder toBuilder() { 090 throw new UnsupportedOperationException("#toBuilder is not implemented"); 091 } 092 093 /** 094 * Get all entries sorted by schema designed order. 095 * 096 * @return all entries ordered 097 */ 098 default List<Entry> getEntriesOrdered() { 099 return getEntriesOrdered(naturalOrder()); 100 } 101 102 /** 103 * Get all entries sorted using a custom comparator. 104 * 105 * @param comparator the comparator 106 * 107 * @return all entries ordered with provided comparator 108 */ 109 @JsonbTransient 110 default List<Entry> getEntriesOrdered(final Comparator<Entry> comparator) { 111 return getAllEntries().sorted(comparator).collect(Collectors.toList()); 112 } 113 114 /** 115 * Get the EntriesOrder defined with Builder. 116 * 117 * @return the EntriesOrder 118 */ 119 120 default EntriesOrder naturalOrder() { 121 throw new UnsupportedOperationException("#naturalOrder is not implemented"); 122 } 123 124 default Entry getEntry(final String name) { 125 return getEntryMap().get(name); 126 } 127 128 /** 129 * @return the metadata props 130 */ 131 Map<String, String> getProps(); 132 133 /** 134 * @param property : property name. 135 * 136 * @return the requested metadata prop 137 */ 138 String getProp(String property); 139 140 /** 141 * Get a property values from schema with its name. 142 * 143 * @param name : property's name. 144 * 145 * @return property's value. 146 */ 147 default JsonValue getJsonProp(final String name) { 148 final String prop = this.getProp(name); 149 if (prop == null) { 150 return null; 151 } 152 try (final StringReader reader = new StringReader(prop); 153 final JsonParser parser = Json.createParser(reader)) { 154 return parser.getValue(); 155 } catch (RuntimeException ex) { 156 return Json.createValue(prop); 157 } 158 } 159 160 enum Type { 161 162 RECORD(new Class<?>[] { Record.class }), 163 ARRAY(new Class<?>[] { Collection.class }), 164 STRING(new Class<?>[] { String.class, Object.class }), 165 BYTES(new Class<?>[] { byte[].class, Byte[].class }), 166 INT(new Class<?>[] { Integer.class }), 167 LONG(new Class<?>[] { Long.class }), 168 FLOAT(new Class<?>[] { Float.class }), 169 DOUBLE(new Class<?>[] { Double.class }), 170 BOOLEAN(new Class<?>[] { Boolean.class }), 171 DATETIME(new Class<?>[] { Long.class, Date.class, Temporal.class }), 172 DECIMAL(new Class<?>[] { BigDecimal.class }); 173 174 /** 175 * All compatibles Java classes 176 */ 177 private final Class<?>[] classes; 178 179 Type(final Class<?>[] classes) { 180 this.classes = classes; 181 } 182 183 /** 184 * Check if input can be affected to an entry of this type. 185 * 186 * @param input : object. 187 * 188 * @return true if input is null or ok. 189 */ 190 public boolean isCompatible(final Object input) { 191 if (input == null) { 192 return true; 193 } 194 for (final Class<?> clazz : classes) { 195 if (clazz.isInstance(input)) { 196 return true; 197 } 198 } 199 return false; 200 } 201 } 202 203 interface Entry { 204 205 /** 206 * @return The name of this entry. 207 */ 208 String getName(); 209 210 /** 211 * @return The raw name of this entry. 212 */ 213 String getRawName(); 214 215 /** 216 * @return the raw name of this entry if exists, else return name. 217 */ 218 String getOriginalFieldName(); 219 220 /** 221 * @return Type of the entry, this determine which other fields are populated. 222 */ 223 Type getType(); 224 225 /** 226 * @return Is this entry nullable or always valued. 227 */ 228 boolean isNullable(); 229 230 /** 231 * @return true if this entry is for metadata; false for ordinary data. 232 */ 233 boolean isMetadata(); 234 235 /** 236 * @return Is this entry can be in error. 237 */ 238 boolean isErrorCapable(); 239 240 /** 241 * @return true if the value of this entry is valid; false for invalid value. 242 */ 243 boolean isValid(); 244 245 /** 246 * @param <T> the default value type. 247 * 248 * @return Default value for this entry. 249 */ 250 <T> T getDefaultValue(); 251 252 /** 253 * @return For type == record, the element type. 254 */ 255 Schema getElementSchema(); 256 257 /** 258 * @return Allows to associate to this field a comment - for doc purposes, no use in the runtime. 259 */ 260 String getComment(); 261 262 /** 263 * @return the metadata props 264 */ 265 Map<String, String> getProps(); 266 267 /** 268 * @param property : property name. 269 * 270 * @return the requested metadata prop 271 */ 272 String getProp(String property); 273 274 /** 275 * Get a property values from entry with its name. 276 * 277 * @param name : property's name. 278 * 279 * @return property's value. 280 */ 281 default JsonValue getJsonProp(final String name) { 282 final String prop = this.getProp(name); 283 if (prop == null) { 284 return null; 285 } 286 try (final StringReader reader = new StringReader(prop); 287 final JsonParser parser = Json.createParser(reader)) { 288 return parser.getValue(); 289 } catch (RuntimeException ex) { 290 return Json.createValue(prop); 291 } 292 } 293 294 /** 295 * 296 * @return the logical type property 297 */ 298 default String getLogicalType() { 299 return this.getProp(SchemaProperty.LOGICAL_TYPE); 300 } 301 302 /** 303 * @return an {@link Entry.Builder} from this entry. 304 */ 305 default Entry.Builder toBuilder() { 306 throw new UnsupportedOperationException("#toBuilder is not implemented"); 307 } 308 309 default String getErrorMessage() { 310 return getProp(SchemaProperty.ENTRY_ERROR_MESSAGE); 311 } 312 313 default String getErrorFallbackValue() { 314 return getProp(SchemaProperty.ENTRY_ERROR_FALLBACK_VALUE); 315 } 316 317 /** 318 * Plain builder matching {@link Entry} structure. 319 */ 320 interface Builder { 321 322 Builder withName(String name); 323 324 Builder withRawName(String rawName); 325 326 Builder withType(Type type); 327 328 default Builder withLogicalType(SchemaProperty.LogicalType logicalType) { 329 throw new UnsupportedOperationException("#withLogicalType is not implemented"); 330 } 331 332 default Builder withLogicalType(String logicalType) { 333 throw new UnsupportedOperationException("#withLogicalType is not implemented"); 334 } 335 336 Builder withNullable(boolean nullable); 337 338 Builder withErrorCapable(boolean errorCapable); 339 340 Builder withMetadata(boolean metadata); 341 342 <T> Builder withDefaultValue(T value); 343 344 Builder withElementSchema(Schema schema); 345 346 Builder withComment(String comment); 347 348 Builder withProps(Map<String, String> props); 349 350 Builder withProp(String key, String value); 351 352 Entry build(); 353 } 354 } 355 356 /** 357 * Allows to build a {@link Schema}. 358 */ 359 interface Builder { 360 361 /** 362 * @param type schema type. 363 * 364 * @return this builder. 365 */ 366 Builder withType(Type type); 367 368 /** 369 * @param entry element for either an array or record type. 370 * 371 * @return this builder. 372 */ 373 Builder withEntry(Entry entry); 374 375 /** 376 * Insert the entry after the specified entry. 377 * 378 * @param after the entry name reference 379 * @param entry the entry name 380 * 381 * @return this builder 382 */ 383 default Builder withEntryAfter(String after, Entry entry) { 384 throw new UnsupportedOperationException("#withEntryAfter is not implemented"); 385 } 386 387 /** 388 * Insert the entry before the specified entry. 389 * 390 * @param before the entry name reference 391 * @param entry the entry name 392 * 393 * @return this builder 394 */ 395 default Builder withEntryBefore(String before, Entry entry) { 396 throw new UnsupportedOperationException("#withEntryBefore is not implemented"); 397 } 398 399 /** 400 * Remove entry from builder. 401 * 402 * @param name the entry name 403 * 404 * @return this builder 405 */ 406 default Builder remove(String name) { 407 throw new UnsupportedOperationException("#remove is not implemented"); 408 } 409 410 /** 411 * Remove entry from builder. 412 * 413 * @param entry the entry 414 * 415 * @return this builder 416 */ 417 default Builder remove(Entry entry) { 418 throw new UnsupportedOperationException("#remove is not implemented"); 419 } 420 421 /** 422 * Move an entry after another one. 423 * 424 * @param after the entry name reference 425 * @param name the entry name 426 */ 427 default Builder moveAfter(final String after, final String name) { 428 throw new UnsupportedOperationException("#moveAfter is not implemented"); 429 } 430 431 /** 432 * Move an entry before another one. 433 * 434 * @param before the entry name reference 435 * @param name the entry name 436 */ 437 default Builder moveBefore(final String before, final String name) { 438 throw new UnsupportedOperationException("#moveBefore is not implemented"); 439 } 440 441 /** 442 * Swap two entries. 443 * 444 * @param name the entry name 445 * @param with the other entry name 446 */ 447 default Builder swap(final String name, final String with) { 448 throw new UnsupportedOperationException("#swap is not implemented"); 449 } 450 451 /** 452 * @param schema nested element schema. 453 * 454 * @return this builder. 455 */ 456 Builder withElementSchema(Schema schema); 457 458 /** 459 * @param props schema properties 460 * 461 * @return this builder 462 */ 463 Builder withProps(Map<String, String> props); 464 465 /** 466 * @param key the prop key name 467 * @param value the prop value 468 * 469 * @return this builder 470 */ 471 Builder withProp(String key, String value); 472 473 /** 474 * @return the described schema. 475 */ 476 Schema build(); 477 478 /** 479 * Same as {@link Builder#build()} but entries order is specified by {@code order}. This takes precedence on any 480 * previous defined order with builder and may void it. 481 * 482 * @param order the wanted order for entries. 483 * @return the described schema. 484 */ 485 default Schema build(Comparator<Entry> order) { 486 throw new UnsupportedOperationException("#build(EntriesOrder) is not implemented"); 487 } 488 } 489 490 /** 491 * Sanitize name to be avro compatible. 492 * 493 * @param name : original name. 494 * 495 * @return avro compatible name. 496 */ 497 static String sanitizeConnectionName(final String name) { 498 if (SKIP_SANITIZE || name == null || name.isEmpty()) { 499 return name; 500 } 501 502 char current = name.charAt(0); 503 final CharsetEncoder ascii = Charset.forName(StandardCharsets.US_ASCII.name()).newEncoder(); 504 final boolean skipFirstChar = ((!ascii.canEncode(current)) || (!Character.isLetter(current) && current != '_')) 505 && name.length() > 1 && (!Character.isDigit(name.charAt(1))); 506 507 final StringBuilder sanitizedBuilder = new StringBuilder(); 508 509 if (!skipFirstChar) { 510 if (((!Character.isLetter(current)) && current != '_') || (!ascii.canEncode(current))) { 511 sanitizedBuilder.append('_'); 512 } else { 513 sanitizedBuilder.append(current); 514 } 515 } 516 for (int i = 1; i < name.length(); i++) { 517 current = name.charAt(i); 518 if (!ascii.canEncode(current)) { 519 if (Character.isLowerCase(current) || Character.isUpperCase(current)) { 520 sanitizedBuilder.append('_'); 521 } else { 522 final byte[] encoded = 523 Base64.getEncoder().encode(name.substring(i, i + 1).getBytes(StandardCharsets.UTF_8)); 524 final String enc = new String(encoded); 525 if (sanitizedBuilder.length() == 0 && Character.isDigit(enc.charAt(0))) { 526 sanitizedBuilder.append('_'); 527 } 528 for (int iter = 0; iter < enc.length(); iter++) { 529 if (Character.isLetterOrDigit(enc.charAt(iter))) { 530 sanitizedBuilder.append(enc.charAt(iter)); 531 } else { 532 sanitizedBuilder.append('_'); 533 } 534 } 535 } 536 } else if (Character.isLetterOrDigit(current)) { 537 sanitizedBuilder.append(current); 538 } else { 539 sanitizedBuilder.append('_'); 540 } 541 542 } 543 return sanitizedBuilder.toString(); 544 } 545 546 @RequiredArgsConstructor 547 @ToString 548 @EqualsAndHashCode 549 class EntriesOrder implements Comparator<Entry> { 550 551 private final OrderedMap<String> fieldsOrder; 552 553 // Keep comparator while no change occurs in fieldsOrder. 554 private Comparator<Entry> currentComparator = null; 555 556 /** 557 * Build an EntriesOrder according fields. 558 * 559 * @param fields the fields ordering. Each field should be {@code ,} separated. 560 * 561 * @return the order EntriesOrder 562 */ 563 public static EntriesOrder of(final String fields) { 564 return new EntriesOrder(fields); 565 } 566 567 /** 568 * Build an EntriesOrder according fields. 569 * 570 * @param fields the fields ordering. 571 * 572 * @return the order EntriesOrder 573 */ 574 public static EntriesOrder of(final Iterable<String> fields) { 575 final OrderedMap<String> orders = new OrderedMap<>(Function.identity(), fields); 576 return new EntriesOrder(orders); 577 } 578 579 public EntriesOrder(final String fields) { 580 if (fields == null || fields.isEmpty()) { 581 fieldsOrder = new OrderedMap<>(Function.identity()); 582 } else { 583 final List<String> fieldList = Arrays.stream(fields.split(",")).collect(Collectors.toList()); 584 fieldsOrder = new OrderedMap<>(Function.identity(), fieldList); 585 } 586 } 587 588 public EntriesOrder(final Iterable<String> fields) { 589 this(new OrderedMap<>(Function.identity(), fields)); 590 } 591 592 public Stream<String> getFieldsOrder() { 593 return this.fieldsOrder.streams(); 594 } 595 596 /** 597 * Move a field after another one. 598 * 599 * @param after the field name reference 600 * @param name the field name 601 * 602 * @return this EntriesOrder 603 */ 604 public EntriesOrder moveAfter(final String after, final String name) { 605 this.currentComparator = null; 606 this.fieldsOrder.moveAfter(after, name); 607 return this; 608 } 609 610 /** 611 * Move a field before another one. 612 * 613 * @param before the field name reference 614 * @param name the field name 615 * 616 * @return this EntriesOrder 617 */ 618 public EntriesOrder moveBefore(final String before, final String name) { 619 this.currentComparator = null; 620 this.fieldsOrder.moveBefore(before, name); 621 return this; 622 } 623 624 /** 625 * Swap two fields. 626 * 627 * @param name the field name 628 * @param with the other field 629 * 630 * @return this EntriesOrder 631 */ 632 public EntriesOrder swap(final String name, final String with) { 633 this.currentComparator = null; 634 this.fieldsOrder.swap(name, with); 635 return this; 636 } 637 638 public String toFields() { 639 return this.fieldsOrder.streams().collect(Collectors.joining(",")); 640 } 641 642 public Comparator<Entry> getComparator() { 643 if (this.currentComparator == null) { 644 final Map<String, Integer> entryPositions = new HashMap<>(); 645 final AtomicInteger index = new AtomicInteger(1); 646 this.fieldsOrder.streams() 647 .forEach( 648 (final String name) -> entryPositions.put(name, index.getAndIncrement())); 649 this.currentComparator = new EntryComparator(entryPositions); 650 } 651 return this.currentComparator; 652 } 653 654 @Override 655 public int compare(final Entry e1, final Entry e2) { 656 return this.getComparator().compare(e1, e2); 657 } 658 659 @RequiredArgsConstructor 660 static class EntryComparator implements Comparator<Entry> { 661 662 private final Map<String, Integer> entryPositions; 663 664 @Override 665 public int compare(final Entry e1, final Entry e2) { 666 final int index1 = this.entryPositions.getOrDefault(e1.getName(), Integer.MAX_VALUE); 667 final int index2 = this.entryPositions.getOrDefault(e2.getName(), Integer.MAX_VALUE); 668 if (index1 >= 0 && index2 >= 0) { 669 return index1 - index2; 670 } 671 if (index1 >= 0) { 672 return -1; 673 } 674 if (index2 >= 0) { 675 return 1; 676 } 677 return 0; 678 } 679 } 680 } 681 682 static Schema.Entry avoidCollision(final Schema.Entry newEntry, 683 final Function<String, Entry> entryGetter, 684 final BiConsumer<String, Entry> replaceFunction) { 685 if (SKIP_SANITIZE) { 686 return newEntry; 687 } 688 final Optional<Entry> collisionedEntry = Optional.ofNullable(entryGetter // 689 .apply(newEntry.getName())) // 690 .filter((final Entry field) -> !Objects.equals(field, newEntry)); 691 if (!collisionedEntry.isPresent()) { 692 // No collision, return new entry. 693 return newEntry; 694 } 695 final Entry matchedEntry = collisionedEntry.get(); 696 final boolean matchedToChange = matchedEntry.getRawName() != null && !(matchedEntry.getRawName().isEmpty()); 697 if (matchedToChange) { 698 // the rename has to be applied on entry already inside schema, so replace. 699 replaceFunction.accept(matchedEntry.getName(), newEntry); 700 } else if (newEntry.getRawName() == null || newEntry.getRawName().isEmpty()) { 701 // try to add exactly same raw, skip the add here. 702 return null; 703 } 704 final Entry fieldToChange = matchedToChange ? matchedEntry : newEntry; 705 int indexForAnticollision = 1; 706 final String baseName = Schema.sanitizeConnectionName(fieldToChange.getRawName()); // recalc primiti name. 707 708 String newName = baseName + "_" + indexForAnticollision; 709 while (entryGetter.apply(newName) != null) { 710 indexForAnticollision++; 711 newName = baseName + "_" + indexForAnticollision; 712 } 713 final Entry newFieldToAdd = fieldToChange.toBuilder().withName(newName).build(); 714 715 return newFieldToAdd; // matchedToChange ? newFieldToAdd : newEntry; 716 } 717}