001/** 002 * Copyright (C) 2006-2022 Talend Inc. - www.talend.com 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.talend.sdk.component.api.record; 017 018import java.io.StringReader; 019import java.nio.charset.Charset; 020import java.nio.charset.CharsetEncoder; 021import java.nio.charset.StandardCharsets; 022import java.time.temporal.Temporal; 023import java.util.Arrays; 024import java.util.Base64; 025import java.util.Collection; 026import java.util.Comparator; 027import java.util.Date; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.Objects; 032import java.util.Optional; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.function.BiConsumer; 035import java.util.function.Function; 036import java.util.function.Supplier; 037import java.util.stream.Collectors; 038import java.util.stream.Stream; 039 040import javax.json.Json; 041import javax.json.JsonValue; 042import javax.json.bind.annotation.JsonbTransient; 043 044import lombok.EqualsAndHashCode; 045import lombok.RequiredArgsConstructor; 046import lombok.ToString; 047 048public interface Schema { 049 050 /** 051 * @return the type of this schema. 052 */ 053 Type getType(); 054 055 /** 056 * @return the nested element schema for arrays. 057 */ 058 Schema getElementSchema(); 059 060 /** 061 * @return the data entries for records (not contains meta data entries). 062 */ 063 List<Entry> getEntries(); 064 065 /** 066 * @return the metadata entries for records (not contains ordinary data entries). 067 */ 068 List<Entry> getMetadata(); 069 070 /** 071 * @return All entries, including data and metadata, of this schema. 072 */ 073 Stream<Entry> getAllEntries(); 074 075 /** 076 * Get a Builder from the current schema. 077 * 078 * @return a {@link Schema.Builder} 079 */ 080 default Schema.Builder toBuilder() { 081 throw new UnsupportedOperationException("#toBuilder is not implemented"); 082 } 083 084 /** 085 * Get all entries sorted by schema designed order. 086 * 087 * @return all entries ordered 088 */ 089 default List<Entry> getEntriesOrdered() { 090 return getEntriesOrdered(naturalOrder()); 091 } 092 093 /** 094 * Get all entries sorted using a custom comparator. 095 * 096 * @param comparator the comparator 097 * 098 * @return all entries ordered with provided comparator 099 */ 100 @JsonbTransient 101 default List<Entry> getEntriesOrdered(final Comparator<Entry> comparator) { 102 return getAllEntries().sorted(comparator).collect(Collectors.toList()); 103 } 104 105 /** 106 * Get the EntriesOrder defined with Builder. 107 * 108 * @return the EntriesOrder 109 */ 110 111 default EntriesOrder naturalOrder() { 112 throw new UnsupportedOperationException("#naturalOrder is not implemented"); 113 } 114 115 default Entry getEntry(final String name) { 116 return getAllEntries() // 117 .filter((Entry e) -> Objects.equals(e.getName(), name)) // 118 .findFirst() // 119 .orElse(null); 120 } 121 122 /** 123 * @return the metadata props 124 */ 125 Map<String, String> getProps(); 126 127 /** 128 * @param property : property name. 129 * 130 * @return the requested metadata prop 131 */ 132 String getProp(String property); 133 134 /** 135 * Get a property values from schema with its name. 136 * 137 * @param name : property's name. 138 * 139 * @return property's value. 140 */ 141 default JsonValue getJsonProp(final String name) { 142 final String prop = this.getProp(name); 143 if (prop == null) { 144 return null; 145 } 146 try { 147 return Json.createParser(new StringReader(prop)).getValue(); 148 } catch (RuntimeException ex) { 149 return Json.createValue(prop); 150 } 151 } 152 153 enum Type { 154 155 RECORD(new Class<?>[] { Record.class }), 156 ARRAY(new Class<?>[] { Collection.class }), 157 STRING(new Class<?>[] { String.class }), 158 BYTES(new Class<?>[] { byte[].class, Byte[].class }), 159 INT(new Class<?>[] { Integer.class }), 160 LONG(new Class<?>[] { Long.class }), 161 FLOAT(new Class<?>[] { Float.class }), 162 DOUBLE(new Class<?>[] { Double.class }), 163 BOOLEAN(new Class<?>[] { Boolean.class }), 164 DATETIME(new Class<?>[] { Long.class, Date.class, Temporal.class }); 165 166 /** 167 * All compatibles Java classes 168 */ 169 private final Class<?>[] classes; 170 171 Type(final Class<?>[] classes) { 172 this.classes = classes; 173 } 174 175 /** 176 * Check if input can be affected to an entry of this type. 177 * 178 * @param input : object. 179 * 180 * @return true if input is null or ok. 181 */ 182 public boolean isCompatible(final Object input) { 183 if (input == null) { 184 return true; 185 } 186 for (final Class<?> clazz : classes) { 187 if (clazz.isInstance(input)) { 188 return true; 189 } 190 } 191 return false; 192 } 193 } 194 195 interface Entry { 196 197 /** 198 * @return The name of this entry. 199 */ 200 String getName(); 201 202 /** 203 * @return The raw name of this entry. 204 */ 205 String getRawName(); 206 207 /** 208 * @return the raw name of this entry if exists, else return name. 209 */ 210 String getOriginalFieldName(); 211 212 /** 213 * @return Type of the entry, this determine which other fields are populated. 214 */ 215 Type getType(); 216 217 /** 218 * @return Is this entry nullable or always valued. 219 */ 220 boolean isNullable(); 221 222 /** 223 * @return true if this entry is for metadata; false for ordinary data. 224 */ 225 boolean isMetadata(); 226 227 /** 228 * @param <T> the default value type. 229 * 230 * @return Default value for this entry. 231 */ 232 <T> T getDefaultValue(); 233 234 /** 235 * @return For type == record, the element type. 236 */ 237 Schema getElementSchema(); 238 239 /** 240 * @return Allows to associate to this field a comment - for doc purposes, no use in the runtime. 241 */ 242 String getComment(); 243 244 /** 245 * @return the metadata props 246 */ 247 Map<String, String> getProps(); 248 249 /** 250 * @param property : property name. 251 * 252 * @return the requested metadata prop 253 */ 254 String getProp(String property); 255 256 /** 257 * Get a property values from entry with its name. 258 * 259 * @param name : property's name. 260 * 261 * @return property's value. 262 */ 263 default JsonValue getJsonProp(final String name) { 264 final String prop = this.getProp(name); 265 if (prop == null) { 266 return null; 267 } 268 try { 269 return Json.createParser(new StringReader(prop)).getValue(); 270 } catch (RuntimeException ex) { 271 return Json.createValue(prop); 272 } 273 } 274 275 /** 276 * @return an {@link Entry.Builder} from this entry. 277 */ 278 default Entry.Builder toBuilder() { 279 throw new UnsupportedOperationException("#toBuilder is not implemented"); 280 } 281 282 /** 283 * Plain builder matching {@link Entry} structure. 284 */ 285 interface Builder { 286 287 Builder withName(String name); 288 289 Builder withRawName(String rawName); 290 291 Builder withType(Type type); 292 293 Builder withNullable(boolean nullable); 294 295 Builder withMetadata(boolean metadata); 296 297 <T> Builder withDefaultValue(T value); 298 299 Builder withElementSchema(Schema schema); 300 301 Builder withComment(String comment); 302 303 Builder withProps(Map<String, String> props); 304 305 Builder withProp(String key, String value); 306 307 Entry build(); 308 } 309 } 310 311 /** 312 * Allows to build a {@link Schema}. 313 */ 314 interface Builder { 315 316 /** 317 * @param type schema type. 318 * 319 * @return this builder. 320 */ 321 Builder withType(Type type); 322 323 /** 324 * @param entry element for either an array or record type. 325 * 326 * @return this builder. 327 */ 328 Builder withEntry(Entry entry); 329 330 /** 331 * Insert the entry after the specified entry. 332 * 333 * @param after the entry name reference 334 * @param entry the entry name 335 * 336 * @return this builder 337 */ 338 default Builder withEntryAfter(String after, Entry entry) { 339 throw new UnsupportedOperationException("#withEntryAfter is not implemented"); 340 } 341 342 /** 343 * Insert the entry before the specified entry. 344 * 345 * @param before the entry name reference 346 * @param entry the entry name 347 * 348 * @return this builder 349 */ 350 default Builder withEntryBefore(String before, Entry entry) { 351 throw new UnsupportedOperationException("#withEntryBefore is not implemented"); 352 } 353 354 /** 355 * Remove entry from builder. 356 * 357 * @param name the entry name 358 * 359 * @return this builder 360 */ 361 default Builder remove(String name) { 362 throw new UnsupportedOperationException("#remove is not implemented"); 363 } 364 365 /** 366 * Remove entry from builder. 367 * 368 * @param entry the entry 369 * 370 * @return this builder 371 */ 372 default Builder remove(Entry entry) { 373 throw new UnsupportedOperationException("#remove is not implemented"); 374 } 375 376 /** 377 * Move an entry after another one. 378 * 379 * @param after the entry name reference 380 * @param name the entry name 381 */ 382 default Builder moveAfter(final String after, final String name) { 383 throw new UnsupportedOperationException("#moveAfter is not implemented"); 384 } 385 386 /** 387 * Move an entry before another one. 388 * 389 * @param before the entry name reference 390 * @param name the entry name 391 */ 392 default Builder moveBefore(final String before, final String name) { 393 throw new UnsupportedOperationException("#moveBefore is not implemented"); 394 } 395 396 /** 397 * Swap two entries. 398 * 399 * @param name the entry name 400 * @param with the other entry name 401 */ 402 default Builder swap(final String name, final String with) { 403 throw new UnsupportedOperationException("#swap is not implemented"); 404 } 405 406 /** 407 * @param schema nested element schema. 408 * 409 * @return this builder. 410 */ 411 Builder withElementSchema(Schema schema); 412 413 /** 414 * @param props schema properties 415 * 416 * @return this builder 417 */ 418 Builder withProps(Map<String, String> props); 419 420 /** 421 * @param key the prop key name 422 * @param value the prop value 423 * 424 * @return this builder 425 */ 426 Builder withProp(String key, String value); 427 428 /** 429 * @return the described schema. 430 */ 431 Schema build(); 432 433 /** 434 * Same as {@link Builder#build()} but entries order is specified by {@code order}. This takes precedence on any 435 * previous defined order with builder and may void it. 436 * 437 * @param order the wanted order for entries. 438 * @return the described schema. 439 */ 440 default Schema build(Comparator<Entry> order) { 441 throw new UnsupportedOperationException("#build(EntriesOrder) is not implemented"); 442 } 443 } 444 445 /** 446 * Sanitize name to be avro compatible. 447 * 448 * @param name : original name. 449 * 450 * @return avro compatible name. 451 */ 452 static String sanitizeConnectionName(final String name) { 453 if (name == null || name.isEmpty()) { 454 return name; 455 } 456 457 char current = name.charAt(0); 458 final CharsetEncoder ascii = Charset.forName(StandardCharsets.US_ASCII.name()).newEncoder(); 459 final boolean skipFirstChar = ((!ascii.canEncode(current)) || (!Character.isLetter(current) && current != '_')) 460 && name.length() > 1 && (!Character.isDigit(name.charAt(1))); 461 462 final StringBuilder sanitizedBuilder = new StringBuilder(); 463 464 if (!skipFirstChar) { 465 if (((!Character.isLetter(current)) && current != '_') || (!ascii.canEncode(current))) { 466 sanitizedBuilder.append('_'); 467 } else { 468 sanitizedBuilder.append(current); 469 } 470 } 471 for (int i = 1; i < name.length(); i++) { 472 current = name.charAt(i); 473 if (!ascii.canEncode(current)) { 474 if (Character.isLowerCase(current) || Character.isUpperCase(current)) { 475 sanitizedBuilder.append('_'); 476 } else { 477 final byte[] encoded = 478 Base64.getEncoder().encode(name.substring(i, i + 1).getBytes(StandardCharsets.UTF_8)); 479 final String enc = new String(encoded); 480 if (sanitizedBuilder.length() == 0 && Character.isDigit(enc.charAt(0))) { 481 sanitizedBuilder.append('_'); 482 } 483 for (int iter = 0; iter < enc.length(); iter++) { 484 if (Character.isLetterOrDigit(enc.charAt(iter))) { 485 sanitizedBuilder.append(enc.charAt(iter)); 486 } else { 487 sanitizedBuilder.append('_'); 488 } 489 } 490 } 491 } else if (Character.isLetterOrDigit(current)) { 492 sanitizedBuilder.append(current); 493 } else { 494 sanitizedBuilder.append('_'); 495 } 496 497 } 498 return sanitizedBuilder.toString(); 499 } 500 501 @RequiredArgsConstructor 502 @ToString 503 @EqualsAndHashCode 504 class EntriesOrder implements Comparator<Entry> { 505 506 private final OrderedMap<String> fieldsOrder; 507 508 // Keep comparator while no change occurs in fieldsOrder. 509 private Comparator<Entry> currentComparator = null; 510 511 /** 512 * Build an EntriesOrder according fields. 513 * 514 * @param fields the fields ordering. Each field should be {@code ,} separated. 515 * 516 * @return the order EntriesOrder 517 */ 518 public static EntriesOrder of(final String fields) { 519 return new EntriesOrder(fields); 520 } 521 522 /** 523 * Build an EntriesOrder according fields. 524 * 525 * @param fields the fields ordering. 526 * 527 * @return the order EntriesOrder 528 */ 529 public static EntriesOrder of(final Iterable<String> fields) { 530 final OrderedMap<String> orders = new OrderedMap<>(Function.identity(), fields); 531 return new EntriesOrder(orders); 532 } 533 534 public EntriesOrder(final String fields) { 535 if (fields == null || fields.isEmpty()) { 536 fieldsOrder = new OrderedMap<>(Function.identity()); 537 } else { 538 final List<String> fieldList = Arrays.stream(fields.split(",")).collect(Collectors.toList()); 539 fieldsOrder = new OrderedMap<>(Function.identity(), fieldList); 540 } 541 } 542 543 public EntriesOrder(final Iterable<String> fields) { 544 this(new OrderedMap<>(Function.identity(), fields)); 545 } 546 547 public Stream<String> getFieldsOrder() { 548 return this.fieldsOrder.streams(); 549 } 550 551 /** 552 * Move a field after another one. 553 * 554 * @param after the field name reference 555 * @param name the field name 556 * 557 * @return this EntriesOrder 558 */ 559 public EntriesOrder moveAfter(final String after, final String name) { 560 this.currentComparator = null; 561 this.fieldsOrder.moveAfter(after, name); 562 return this; 563 } 564 565 /** 566 * Move a field before another one. 567 * 568 * @param before the field name reference 569 * @param name the field name 570 * 571 * @return this EntriesOrder 572 */ 573 public EntriesOrder moveBefore(final String before, final String name) { 574 this.currentComparator = null; 575 this.fieldsOrder.moveBefore(before, name); 576 return this; 577 } 578 579 /** 580 * Swap two fields. 581 * 582 * @param name the field name 583 * @param with the other field 584 * 585 * @return this EntriesOrder 586 */ 587 public EntriesOrder swap(final String name, final String with) { 588 this.currentComparator = null; 589 this.fieldsOrder.swap(name, with); 590 return this; 591 } 592 593 public String toFields() { 594 return this.fieldsOrder.streams().collect(Collectors.joining(",")); 595 } 596 597 public Comparator<Entry> getComparator() { 598 if (this.currentComparator == null) { 599 final Map<String, Integer> entryPositions = new HashMap<>(); 600 final AtomicInteger index = new AtomicInteger(1); 601 this.fieldsOrder.streams() 602 .forEach( 603 (final String name) -> entryPositions.put(name, index.getAndIncrement())); 604 this.currentComparator = new EntryComparator(entryPositions); 605 } 606 return this.currentComparator; 607 } 608 609 @Override 610 public int compare(final Entry e1, final Entry e2) { 611 return this.getComparator().compare(e1, e2); 612 } 613 614 @RequiredArgsConstructor 615 static class EntryComparator implements Comparator<Entry> { 616 617 private final Map<String, Integer> entryPositions; 618 619 @Override 620 public int compare(final Entry e1, final Entry e2) { 621 final int index1 = this.entryPositions.getOrDefault(e1.getName(), Integer.MAX_VALUE); 622 final int index2 = this.entryPositions.getOrDefault(e2.getName(), Integer.MAX_VALUE); 623 if (index1 >= 0 && index2 >= 0) { 624 return index1 - index2; 625 } 626 if (index1 >= 0) { 627 return -1; 628 } 629 if (index2 >= 0) { 630 return 1; 631 } 632 return 0; 633 } 634 } 635 } 636 637 // use new avoid collision with entry getter. 638 @Deprecated 639 static Schema.Entry avoidCollision(final Schema.Entry newEntry, 640 final Supplier<Stream<Schema.Entry>> allEntriesSupplier, 641 final BiConsumer<String, Entry> replaceFunction) { 642 final Function<String, Entry> entryGetter = (String name) -> allEntriesSupplier // 643 .get() // 644 .filter((final Entry field) -> field.getName().equals(name)) 645 .findFirst() 646 .orElse(null); 647 return avoidCollision(newEntry, entryGetter, replaceFunction); 648 } 649 650 static Schema.Entry avoidCollision(final Schema.Entry newEntry, 651 final Function<String, Entry> entryGetter, 652 final BiConsumer<String, Entry> replaceFunction) { 653 final Optional<Entry> collisionedEntry = Optional.ofNullable(entryGetter // 654 .apply(newEntry.getName())) // 655 .filter((final Entry field) -> !Objects.equals(field, newEntry)); 656 if (!collisionedEntry.isPresent()) { 657 // No collision, return new entry. 658 return newEntry; 659 } 660 final Entry matchedEntry = collisionedEntry.get(); 661 final boolean matchedToChange = matchedEntry.getRawName() != null && !(matchedEntry.getRawName().isEmpty()); 662 if (matchedToChange) { 663 // the rename has to be applied on entry already inside schema, so replace. 664 replaceFunction.accept(matchedEntry.getName(), newEntry); 665 } else if (newEntry.getRawName() == null || newEntry.getRawName().isEmpty()) { 666 // try to add exactly same raw, skip the add here. 667 return null; 668 } 669 final Entry fieldToChange = matchedToChange ? matchedEntry : newEntry; 670 int indexForAnticollision = 1; 671 final String baseName = Schema.sanitizeConnectionName(fieldToChange.getRawName()); // recalc primiti name. 672 673 String newName = baseName + "_" + indexForAnticollision; 674 while (entryGetter.apply(newName) != null) { 675 indexForAnticollision++; 676 newName = baseName + "_" + indexForAnticollision; 677 } 678 final Entry newFieldToAdd = fieldToChange.toBuilder().withName(newName).build(); 679 680 return newFieldToAdd; // matchedToChange ? newFieldToAdd : newEntry; 681 } 682}