001/** 002 * Copyright (C) 2006-2024 Talend Inc. - www.talend.com 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.talend.sdk.component.api.record; 017 018import java.io.StringReader; 019import java.math.BigDecimal; 020import java.nio.charset.Charset; 021import java.nio.charset.CharsetEncoder; 022import java.nio.charset.StandardCharsets; 023import java.time.temporal.Temporal; 024import java.util.Arrays; 025import java.util.Base64; 026import java.util.Collection; 027import java.util.Comparator; 028import java.util.Date; 029import java.util.HashMap; 030import java.util.List; 031import java.util.Map; 032import java.util.Objects; 033import java.util.Optional; 034import java.util.concurrent.atomic.AtomicInteger; 035import java.util.function.BiConsumer; 036import java.util.function.Function; 037import java.util.function.Supplier; 038import java.util.stream.Collectors; 039import java.util.stream.Stream; 040 041import javax.json.Json; 042import javax.json.JsonValue; 043import javax.json.bind.annotation.JsonbTransient; 044import javax.json.stream.JsonParser; 045 046import lombok.EqualsAndHashCode; 047import lombok.RequiredArgsConstructor; 048import lombok.ToString; 049 050public interface Schema { 051 052 /** 053 * @return the type of this schema. 054 */ 055 Type getType(); 056 057 /** 058 * @return the nested element schema for arrays. 059 */ 060 Schema getElementSchema(); 061 062 /** 063 * @return the data entries for records (not contains meta data entries). 064 */ 065 List<Entry> getEntries(); 066 067 /** 068 * @return the metadata entries for records (not contains ordinary data entries). 069 */ 070 List<Entry> getMetadata(); 071 072 /** 073 * @return All entries, including data and metadata, of this schema. 074 */ 075 Stream<Entry> getAllEntries(); 076 077 default Map<String, Entry> getEntryMap() { 078 throw new UnsupportedOperationException("#getEntryMap is not implemented"); 079 } 080 081 /** 082 * Get a Builder from the current schema. 083 * 084 * @return a {@link Schema.Builder} 085 */ 086 default Schema.Builder toBuilder() { 087 throw new UnsupportedOperationException("#toBuilder is not implemented"); 088 } 089 090 /** 091 * Get all entries sorted by schema designed order. 092 * 093 * @return all entries ordered 094 */ 095 default List<Entry> getEntriesOrdered() { 096 return getEntriesOrdered(naturalOrder()); 097 } 098 099 /** 100 * Get all entries sorted using a custom comparator. 101 * 102 * @param comparator the comparator 103 * 104 * @return all entries ordered with provided comparator 105 */ 106 @JsonbTransient 107 default List<Entry> getEntriesOrdered(final Comparator<Entry> comparator) { 108 return getAllEntries().sorted(comparator).collect(Collectors.toList()); 109 } 110 111 /** 112 * Get the EntriesOrder defined with Builder. 113 * 114 * @return the EntriesOrder 115 */ 116 117 default EntriesOrder naturalOrder() { 118 throw new UnsupportedOperationException("#naturalOrder is not implemented"); 119 } 120 121 default Entry getEntry(final String name) { 122 return getEntryMap().get(name); 123 } 124 125 /** 126 * @return the metadata props 127 */ 128 Map<String, String> getProps(); 129 130 /** 131 * @param property : property name. 132 * 133 * @return the requested metadata prop 134 */ 135 String getProp(String property); 136 137 /** 138 * Get a property values from schema with its name. 139 * 140 * @param name : property's name. 141 * 142 * @return property's value. 143 */ 144 default JsonValue getJsonProp(final String name) { 145 final String prop = this.getProp(name); 146 if (prop == null) { 147 return null; 148 } 149 try (final StringReader reader = new StringReader(prop); 150 final JsonParser parser = Json.createParser(reader)) { 151 return parser.getValue(); 152 } catch (RuntimeException ex) { 153 return Json.createValue(prop); 154 } 155 } 156 157 enum Type { 158 159 RECORD(new Class<?>[] { Record.class }), 160 ARRAY(new Class<?>[] { Collection.class }), 161 STRING(new Class<?>[] { String.class, Object.class }), 162 BYTES(new Class<?>[] { byte[].class, Byte[].class }), 163 INT(new Class<?>[] { Integer.class }), 164 LONG(new Class<?>[] { Long.class }), 165 FLOAT(new Class<?>[] { Float.class }), 166 DOUBLE(new Class<?>[] { Double.class }), 167 BOOLEAN(new Class<?>[] { Boolean.class }), 168 DATETIME(new Class<?>[] { Long.class, Date.class, Temporal.class }), 169 DECIMAL(new Class<?>[] { BigDecimal.class }); 170 171 /** 172 * All compatibles Java classes 173 */ 174 private final Class<?>[] classes; 175 176 Type(final Class<?>[] classes) { 177 this.classes = classes; 178 } 179 180 /** 181 * Check if input can be affected to an entry of this type. 182 * 183 * @param input : object. 184 * 185 * @return true if input is null or ok. 186 */ 187 public boolean isCompatible(final Object input) { 188 if (input == null) { 189 return true; 190 } 191 for (final Class<?> clazz : classes) { 192 if (clazz.isInstance(input)) { 193 return true; 194 } 195 } 196 return false; 197 } 198 } 199 200 interface Entry { 201 202 /** 203 * @return The name of this entry. 204 */ 205 String getName(); 206 207 /** 208 * @return The raw name of this entry. 209 */ 210 String getRawName(); 211 212 /** 213 * @return the raw name of this entry if exists, else return name. 214 */ 215 String getOriginalFieldName(); 216 217 /** 218 * @return Type of the entry, this determine which other fields are populated. 219 */ 220 Type getType(); 221 222 /** 223 * @return Is this entry nullable or always valued. 224 */ 225 boolean isNullable(); 226 227 /** 228 * @return true if this entry is for metadata; false for ordinary data. 229 */ 230 boolean isMetadata(); 231 232 /** 233 * @param <T> the default value type. 234 * 235 * @return Default value for this entry. 236 */ 237 <T> T getDefaultValue(); 238 239 /** 240 * @return For type == record, the element type. 241 */ 242 Schema getElementSchema(); 243 244 /** 245 * @return Allows to associate to this field a comment - for doc purposes, no use in the runtime. 246 */ 247 String getComment(); 248 249 /** 250 * @return the metadata props 251 */ 252 Map<String, String> getProps(); 253 254 /** 255 * @param property : property name. 256 * 257 * @return the requested metadata prop 258 */ 259 String getProp(String property); 260 261 /** 262 * Get a property values from entry with its name. 263 * 264 * @param name : property's name. 265 * 266 * @return property's value. 267 */ 268 default JsonValue getJsonProp(final String name) { 269 final String prop = this.getProp(name); 270 if (prop == null) { 271 return null; 272 } 273 try (final StringReader reader = new StringReader(prop); 274 final JsonParser parser = Json.createParser(reader)) { 275 return parser.getValue(); 276 } catch (RuntimeException ex) { 277 return Json.createValue(prop); 278 } 279 } 280 281 /** 282 * 283 * @return the logical type property 284 */ 285 default String getLogicalType() { 286 return this.getProp(SchemaProperty.LOGICAL_TYPE); 287 } 288 289 /** 290 * @return an {@link Entry.Builder} from this entry. 291 */ 292 default Entry.Builder toBuilder() { 293 throw new UnsupportedOperationException("#toBuilder is not implemented"); 294 } 295 296 /** 297 * Plain builder matching {@link Entry} structure. 298 */ 299 interface Builder { 300 301 Builder withName(String name); 302 303 Builder withRawName(String rawName); 304 305 Builder withType(Type type); 306 307 default Builder withLogicalType(SchemaProperty.LogicalType logicalType) { 308 throw new UnsupportedOperationException("#withLogicalType is not implemented"); 309 } 310 311 default Builder withLogicalType(String logicalType) { 312 throw new UnsupportedOperationException("#withLogicalType is not implemented"); 313 } 314 315 Builder withNullable(boolean nullable); 316 317 Builder withMetadata(boolean metadata); 318 319 <T> Builder withDefaultValue(T value); 320 321 Builder withElementSchema(Schema schema); 322 323 Builder withComment(String comment); 324 325 Builder withProps(Map<String, String> props); 326 327 Builder withProp(String key, String value); 328 329 Entry build(); 330 } 331 } 332 333 /** 334 * Allows to build a {@link Schema}. 335 */ 336 interface Builder { 337 338 /** 339 * @param type schema type. 340 * 341 * @return this builder. 342 */ 343 Builder withType(Type type); 344 345 /** 346 * @param entry element for either an array or record type. 347 * 348 * @return this builder. 349 */ 350 Builder withEntry(Entry entry); 351 352 /** 353 * Insert the entry after the specified entry. 354 * 355 * @param after the entry name reference 356 * @param entry the entry name 357 * 358 * @return this builder 359 */ 360 default Builder withEntryAfter(String after, Entry entry) { 361 throw new UnsupportedOperationException("#withEntryAfter is not implemented"); 362 } 363 364 /** 365 * Insert the entry before the specified entry. 366 * 367 * @param before the entry name reference 368 * @param entry the entry name 369 * 370 * @return this builder 371 */ 372 default Builder withEntryBefore(String before, Entry entry) { 373 throw new UnsupportedOperationException("#withEntryBefore is not implemented"); 374 } 375 376 /** 377 * Remove entry from builder. 378 * 379 * @param name the entry name 380 * 381 * @return this builder 382 */ 383 default Builder remove(String name) { 384 throw new UnsupportedOperationException("#remove is not implemented"); 385 } 386 387 /** 388 * Remove entry from builder. 389 * 390 * @param entry the entry 391 * 392 * @return this builder 393 */ 394 default Builder remove(Entry entry) { 395 throw new UnsupportedOperationException("#remove is not implemented"); 396 } 397 398 /** 399 * Move an entry after another one. 400 * 401 * @param after the entry name reference 402 * @param name the entry name 403 */ 404 default Builder moveAfter(final String after, final String name) { 405 throw new UnsupportedOperationException("#moveAfter is not implemented"); 406 } 407 408 /** 409 * Move an entry before another one. 410 * 411 * @param before the entry name reference 412 * @param name the entry name 413 */ 414 default Builder moveBefore(final String before, final String name) { 415 throw new UnsupportedOperationException("#moveBefore is not implemented"); 416 } 417 418 /** 419 * Swap two entries. 420 * 421 * @param name the entry name 422 * @param with the other entry name 423 */ 424 default Builder swap(final String name, final String with) { 425 throw new UnsupportedOperationException("#swap is not implemented"); 426 } 427 428 /** 429 * @param schema nested element schema. 430 * 431 * @return this builder. 432 */ 433 Builder withElementSchema(Schema schema); 434 435 /** 436 * @param props schema properties 437 * 438 * @return this builder 439 */ 440 Builder withProps(Map<String, String> props); 441 442 /** 443 * @param key the prop key name 444 * @param value the prop value 445 * 446 * @return this builder 447 */ 448 Builder withProp(String key, String value); 449 450 /** 451 * @return the described schema. 452 */ 453 Schema build(); 454 455 /** 456 * Same as {@link Builder#build()} but entries order is specified by {@code order}. This takes precedence on any 457 * previous defined order with builder and may void it. 458 * 459 * @param order the wanted order for entries. 460 * @return the described schema. 461 */ 462 default Schema build(Comparator<Entry> order) { 463 throw new UnsupportedOperationException("#build(EntriesOrder) is not implemented"); 464 } 465 } 466 467 /** 468 * Sanitize name to be avro compatible. 469 * 470 * @param name : original name. 471 * 472 * @return avro compatible name. 473 */ 474 static String sanitizeConnectionName(final String name) { 475 if (name == null || name.isEmpty()) { 476 return name; 477 } 478 479 char current = name.charAt(0); 480 final CharsetEncoder ascii = Charset.forName(StandardCharsets.US_ASCII.name()).newEncoder(); 481 final boolean skipFirstChar = ((!ascii.canEncode(current)) || (!Character.isLetter(current) && current != '_')) 482 && name.length() > 1 && (!Character.isDigit(name.charAt(1))); 483 484 final StringBuilder sanitizedBuilder = new StringBuilder(); 485 486 if (!skipFirstChar) { 487 if (((!Character.isLetter(current)) && current != '_') || (!ascii.canEncode(current))) { 488 sanitizedBuilder.append('_'); 489 } else { 490 sanitizedBuilder.append(current); 491 } 492 } 493 for (int i = 1; i < name.length(); i++) { 494 current = name.charAt(i); 495 if (!ascii.canEncode(current)) { 496 if (Character.isLowerCase(current) || Character.isUpperCase(current)) { 497 sanitizedBuilder.append('_'); 498 } else { 499 final byte[] encoded = 500 Base64.getEncoder().encode(name.substring(i, i + 1).getBytes(StandardCharsets.UTF_8)); 501 final String enc = new String(encoded); 502 if (sanitizedBuilder.length() == 0 && Character.isDigit(enc.charAt(0))) { 503 sanitizedBuilder.append('_'); 504 } 505 for (int iter = 0; iter < enc.length(); iter++) { 506 if (Character.isLetterOrDigit(enc.charAt(iter))) { 507 sanitizedBuilder.append(enc.charAt(iter)); 508 } else { 509 sanitizedBuilder.append('_'); 510 } 511 } 512 } 513 } else if (Character.isLetterOrDigit(current)) { 514 sanitizedBuilder.append(current); 515 } else { 516 sanitizedBuilder.append('_'); 517 } 518 519 } 520 return sanitizedBuilder.toString(); 521 } 522 523 @RequiredArgsConstructor 524 @ToString 525 @EqualsAndHashCode 526 class EntriesOrder implements Comparator<Entry> { 527 528 private final OrderedMap<String> fieldsOrder; 529 530 // Keep comparator while no change occurs in fieldsOrder. 531 private Comparator<Entry> currentComparator = null; 532 533 /** 534 * Build an EntriesOrder according fields. 535 * 536 * @param fields the fields ordering. Each field should be {@code ,} separated. 537 * 538 * @return the order EntriesOrder 539 */ 540 public static EntriesOrder of(final String fields) { 541 return new EntriesOrder(fields); 542 } 543 544 /** 545 * Build an EntriesOrder according fields. 546 * 547 * @param fields the fields ordering. 548 * 549 * @return the order EntriesOrder 550 */ 551 public static EntriesOrder of(final Iterable<String> fields) { 552 final OrderedMap<String> orders = new OrderedMap<>(Function.identity(), fields); 553 return new EntriesOrder(orders); 554 } 555 556 public EntriesOrder(final String fields) { 557 if (fields == null || fields.isEmpty()) { 558 fieldsOrder = new OrderedMap<>(Function.identity()); 559 } else { 560 final List<String> fieldList = Arrays.stream(fields.split(",")).collect(Collectors.toList()); 561 fieldsOrder = new OrderedMap<>(Function.identity(), fieldList); 562 } 563 } 564 565 public EntriesOrder(final Iterable<String> fields) { 566 this(new OrderedMap<>(Function.identity(), fields)); 567 } 568 569 public Stream<String> getFieldsOrder() { 570 return this.fieldsOrder.streams(); 571 } 572 573 /** 574 * Move a field after another one. 575 * 576 * @param after the field name reference 577 * @param name the field name 578 * 579 * @return this EntriesOrder 580 */ 581 public EntriesOrder moveAfter(final String after, final String name) { 582 this.currentComparator = null; 583 this.fieldsOrder.moveAfter(after, name); 584 return this; 585 } 586 587 /** 588 * Move a field before another one. 589 * 590 * @param before the field name reference 591 * @param name the field name 592 * 593 * @return this EntriesOrder 594 */ 595 public EntriesOrder moveBefore(final String before, final String name) { 596 this.currentComparator = null; 597 this.fieldsOrder.moveBefore(before, name); 598 return this; 599 } 600 601 /** 602 * Swap two fields. 603 * 604 * @param name the field name 605 * @param with the other field 606 * 607 * @return this EntriesOrder 608 */ 609 public EntriesOrder swap(final String name, final String with) { 610 this.currentComparator = null; 611 this.fieldsOrder.swap(name, with); 612 return this; 613 } 614 615 public String toFields() { 616 return this.fieldsOrder.streams().collect(Collectors.joining(",")); 617 } 618 619 public Comparator<Entry> getComparator() { 620 if (this.currentComparator == null) { 621 final Map<String, Integer> entryPositions = new HashMap<>(); 622 final AtomicInteger index = new AtomicInteger(1); 623 this.fieldsOrder.streams() 624 .forEach( 625 (final String name) -> entryPositions.put(name, index.getAndIncrement())); 626 this.currentComparator = new EntryComparator(entryPositions); 627 } 628 return this.currentComparator; 629 } 630 631 @Override 632 public int compare(final Entry e1, final Entry e2) { 633 return this.getComparator().compare(e1, e2); 634 } 635 636 @RequiredArgsConstructor 637 static class EntryComparator implements Comparator<Entry> { 638 639 private final Map<String, Integer> entryPositions; 640 641 @Override 642 public int compare(final Entry e1, final Entry e2) { 643 final int index1 = this.entryPositions.getOrDefault(e1.getName(), Integer.MAX_VALUE); 644 final int index2 = this.entryPositions.getOrDefault(e2.getName(), Integer.MAX_VALUE); 645 if (index1 >= 0 && index2 >= 0) { 646 return index1 - index2; 647 } 648 if (index1 >= 0) { 649 return -1; 650 } 651 if (index2 >= 0) { 652 return 1; 653 } 654 return 0; 655 } 656 } 657 } 658 659 // use new avoid collision with entry getter. 660 @Deprecated 661 static Schema.Entry avoidCollision(final Schema.Entry newEntry, 662 final Supplier<Stream<Schema.Entry>> allEntriesSupplier, 663 final BiConsumer<String, Entry> replaceFunction) { 664 final Function<String, Entry> entryGetter = (String name) -> allEntriesSupplier // 665 .get() // 666 .filter((final Entry field) -> field.getName().equals(name)) 667 .findFirst() 668 .orElse(null); 669 return avoidCollision(newEntry, entryGetter, replaceFunction); 670 } 671 672 static Schema.Entry avoidCollision(final Schema.Entry newEntry, 673 final Function<String, Entry> entryGetter, 674 final BiConsumer<String, Entry> replaceFunction) { 675 final Optional<Entry> collisionedEntry = Optional.ofNullable(entryGetter // 676 .apply(newEntry.getName())) // 677 .filter((final Entry field) -> !Objects.equals(field, newEntry)); 678 if (!collisionedEntry.isPresent()) { 679 // No collision, return new entry. 680 return newEntry; 681 } 682 final Entry matchedEntry = collisionedEntry.get(); 683 final boolean matchedToChange = matchedEntry.getRawName() != null && !(matchedEntry.getRawName().isEmpty()); 684 if (matchedToChange) { 685 // the rename has to be applied on entry already inside schema, so replace. 686 replaceFunction.accept(matchedEntry.getName(), newEntry); 687 } else if (newEntry.getRawName() == null || newEntry.getRawName().isEmpty()) { 688 // try to add exactly same raw, skip the add here. 689 return null; 690 } 691 final Entry fieldToChange = matchedToChange ? matchedEntry : newEntry; 692 int indexForAnticollision = 1; 693 final String baseName = Schema.sanitizeConnectionName(fieldToChange.getRawName()); // recalc primiti name. 694 695 String newName = baseName + "_" + indexForAnticollision; 696 while (entryGetter.apply(newName) != null) { 697 indexForAnticollision++; 698 newName = baseName + "_" + indexForAnticollision; 699 } 700 final Entry newFieldToAdd = fieldToChange.toBuilder().withName(newName).build(); 701 702 return newFieldToAdd; // matchedToChange ? newFieldToAdd : newEntry; 703 } 704}