001/** 002 * Copyright (C) 2006-2022 Talend Inc. - www.talend.com 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.talend.sdk.component.api.record; 017 018import java.io.StringReader; 019import java.nio.charset.Charset; 020import java.nio.charset.CharsetEncoder; 021import java.nio.charset.StandardCharsets; 022import java.time.temporal.Temporal; 023import java.util.Arrays; 024import java.util.Base64; 025import java.util.Collection; 026import java.util.Comparator; 027import java.util.Date; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.Objects; 032import java.util.Optional; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.function.BiConsumer; 035import java.util.function.Function; 036import java.util.function.Supplier; 037import java.util.stream.Collectors; 038import java.util.stream.Stream; 039 040import javax.json.Json; 041import javax.json.JsonValue; 042import javax.json.bind.annotation.JsonbTransient; 043import javax.json.stream.JsonParser; 044 045import lombok.EqualsAndHashCode; 046import lombok.RequiredArgsConstructor; 047import lombok.ToString; 048 049public interface Schema { 050 051 /** 052 * @return the type of this schema. 053 */ 054 Type getType(); 055 056 /** 057 * @return the nested element schema for arrays. 058 */ 059 Schema getElementSchema(); 060 061 /** 062 * @return the data entries for records (not contains meta data entries). 063 */ 064 List<Entry> getEntries(); 065 066 /** 067 * @return the metadata entries for records (not contains ordinary data entries). 068 */ 069 List<Entry> getMetadata(); 070 071 /** 072 * @return All entries, including data and metadata, of this schema. 073 */ 074 Stream<Entry> getAllEntries(); 075 076 /** 077 * Get a Builder from the current schema. 078 * 079 * @return a {@link Schema.Builder} 080 */ 081 default Schema.Builder toBuilder() { 082 throw new UnsupportedOperationException("#toBuilder is not implemented"); 083 } 084 085 /** 086 * Get all entries sorted by schema designed order. 087 * 088 * @return all entries ordered 089 */ 090 default List<Entry> getEntriesOrdered() { 091 return getEntriesOrdered(naturalOrder()); 092 } 093 094 /** 095 * Get all entries sorted using a custom comparator. 096 * 097 * @param comparator the comparator 098 * 099 * @return all entries ordered with provided comparator 100 */ 101 @JsonbTransient 102 default List<Entry> getEntriesOrdered(final Comparator<Entry> comparator) { 103 return getAllEntries().sorted(comparator).collect(Collectors.toList()); 104 } 105 106 /** 107 * Get the EntriesOrder defined with Builder. 108 * 109 * @return the EntriesOrder 110 */ 111 112 default EntriesOrder naturalOrder() { 113 throw new UnsupportedOperationException("#naturalOrder is not implemented"); 114 } 115 116 default Entry getEntry(final String name) { 117 return getAllEntries() // 118 .filter((Entry e) -> Objects.equals(e.getName(), name)) // 119 .findFirst() // 120 .orElse(null); 121 } 122 123 /** 124 * @return the metadata props 125 */ 126 Map<String, String> getProps(); 127 128 /** 129 * @param property : property name. 130 * 131 * @return the requested metadata prop 132 */ 133 String getProp(String property); 134 135 /** 136 * Get a property values from schema with its name. 137 * 138 * @param name : property's name. 139 * 140 * @return property's value. 141 */ 142 default JsonValue getJsonProp(final String name) { 143 final String prop = this.getProp(name); 144 if (prop == null) { 145 return null; 146 } 147 try (final StringReader reader = new StringReader(prop); 148 final JsonParser parser = Json.createParser(reader)) { 149 return parser.getValue(); 150 } catch (RuntimeException ex) { 151 return Json.createValue(prop); 152 } 153 } 154 155 enum Type { 156 157 RECORD(new Class<?>[] { Record.class }), 158 ARRAY(new Class<?>[] { Collection.class }), 159 STRING(new Class<?>[] { String.class }), 160 BYTES(new Class<?>[] { byte[].class, Byte[].class }), 161 INT(new Class<?>[] { Integer.class }), 162 LONG(new Class<?>[] { Long.class }), 163 FLOAT(new Class<?>[] { Float.class }), 164 DOUBLE(new Class<?>[] { Double.class }), 165 BOOLEAN(new Class<?>[] { Boolean.class }), 166 DATETIME(new Class<?>[] { Long.class, Date.class, Temporal.class }); 167 168 /** 169 * All compatibles Java classes 170 */ 171 private final Class<?>[] classes; 172 173 Type(final Class<?>[] classes) { 174 this.classes = classes; 175 } 176 177 /** 178 * Check if input can be affected to an entry of this type. 179 * 180 * @param input : object. 181 * 182 * @return true if input is null or ok. 183 */ 184 public boolean isCompatible(final Object input) { 185 if (input == null) { 186 return true; 187 } 188 for (final Class<?> clazz : classes) { 189 if (clazz.isInstance(input)) { 190 return true; 191 } 192 } 193 return false; 194 } 195 } 196 197 interface Entry { 198 199 /** 200 * @return The name of this entry. 201 */ 202 String getName(); 203 204 /** 205 * @return The raw name of this entry. 206 */ 207 String getRawName(); 208 209 /** 210 * @return the raw name of this entry if exists, else return name. 211 */ 212 String getOriginalFieldName(); 213 214 /** 215 * @return Type of the entry, this determine which other fields are populated. 216 */ 217 Type getType(); 218 219 /** 220 * @return Is this entry nullable or always valued. 221 */ 222 boolean isNullable(); 223 224 /** 225 * @return true if this entry is for metadata; false for ordinary data. 226 */ 227 boolean isMetadata(); 228 229 /** 230 * @param <T> the default value type. 231 * 232 * @return Default value for this entry. 233 */ 234 <T> T getDefaultValue(); 235 236 /** 237 * @return For type == record, the element type. 238 */ 239 Schema getElementSchema(); 240 241 /** 242 * @return Allows to associate to this field a comment - for doc purposes, no use in the runtime. 243 */ 244 String getComment(); 245 246 /** 247 * @return the metadata props 248 */ 249 Map<String, String> getProps(); 250 251 /** 252 * @param property : property name. 253 * 254 * @return the requested metadata prop 255 */ 256 String getProp(String property); 257 258 /** 259 * Get a property values from entry with its name. 260 * 261 * @param name : property's name. 262 * 263 * @return property's value. 264 */ 265 default JsonValue getJsonProp(final String name) { 266 final String prop = this.getProp(name); 267 if (prop == null) { 268 return null; 269 } 270 try (final StringReader reader = new StringReader(prop); 271 final JsonParser parser = Json.createParser(reader)) { 272 return parser.getValue(); 273 } catch (RuntimeException ex) { 274 return Json.createValue(prop); 275 } 276 } 277 278 /** 279 * @return an {@link Entry.Builder} from this entry. 280 */ 281 default Entry.Builder toBuilder() { 282 throw new UnsupportedOperationException("#toBuilder is not implemented"); 283 } 284 285 /** 286 * Plain builder matching {@link Entry} structure. 287 */ 288 interface Builder { 289 290 Builder withName(String name); 291 292 Builder withRawName(String rawName); 293 294 Builder withType(Type type); 295 296 Builder withNullable(boolean nullable); 297 298 Builder withMetadata(boolean metadata); 299 300 <T> Builder withDefaultValue(T value); 301 302 Builder withElementSchema(Schema schema); 303 304 Builder withComment(String comment); 305 306 Builder withProps(Map<String, String> props); 307 308 Builder withProp(String key, String value); 309 310 Entry build(); 311 } 312 } 313 314 /** 315 * Allows to build a {@link Schema}. 316 */ 317 interface Builder { 318 319 /** 320 * @param type schema type. 321 * 322 * @return this builder. 323 */ 324 Builder withType(Type type); 325 326 /** 327 * @param entry element for either an array or record type. 328 * 329 * @return this builder. 330 */ 331 Builder withEntry(Entry entry); 332 333 /** 334 * Insert the entry after the specified entry. 335 * 336 * @param after the entry name reference 337 * @param entry the entry name 338 * 339 * @return this builder 340 */ 341 default Builder withEntryAfter(String after, Entry entry) { 342 throw new UnsupportedOperationException("#withEntryAfter is not implemented"); 343 } 344 345 /** 346 * Insert the entry before the specified entry. 347 * 348 * @param before the entry name reference 349 * @param entry the entry name 350 * 351 * @return this builder 352 */ 353 default Builder withEntryBefore(String before, Entry entry) { 354 throw new UnsupportedOperationException("#withEntryBefore is not implemented"); 355 } 356 357 /** 358 * Remove entry from builder. 359 * 360 * @param name the entry name 361 * 362 * @return this builder 363 */ 364 default Builder remove(String name) { 365 throw new UnsupportedOperationException("#remove is not implemented"); 366 } 367 368 /** 369 * Remove entry from builder. 370 * 371 * @param entry the entry 372 * 373 * @return this builder 374 */ 375 default Builder remove(Entry entry) { 376 throw new UnsupportedOperationException("#remove is not implemented"); 377 } 378 379 /** 380 * Move an entry after another one. 381 * 382 * @param after the entry name reference 383 * @param name the entry name 384 */ 385 default Builder moveAfter(final String after, final String name) { 386 throw new UnsupportedOperationException("#moveAfter is not implemented"); 387 } 388 389 /** 390 * Move an entry before another one. 391 * 392 * @param before the entry name reference 393 * @param name the entry name 394 */ 395 default Builder moveBefore(final String before, final String name) { 396 throw new UnsupportedOperationException("#moveBefore is not implemented"); 397 } 398 399 /** 400 * Swap two entries. 401 * 402 * @param name the entry name 403 * @param with the other entry name 404 */ 405 default Builder swap(final String name, final String with) { 406 throw new UnsupportedOperationException("#swap is not implemented"); 407 } 408 409 /** 410 * @param schema nested element schema. 411 * 412 * @return this builder. 413 */ 414 Builder withElementSchema(Schema schema); 415 416 /** 417 * @param props schema properties 418 * 419 * @return this builder 420 */ 421 Builder withProps(Map<String, String> props); 422 423 /** 424 * @param key the prop key name 425 * @param value the prop value 426 * 427 * @return this builder 428 */ 429 Builder withProp(String key, String value); 430 431 /** 432 * @return the described schema. 433 */ 434 Schema build(); 435 436 /** 437 * Same as {@link Builder#build()} but entries order is specified by {@code order}. This takes precedence on any 438 * previous defined order with builder and may void it. 439 * 440 * @param order the wanted order for entries. 441 * @return the described schema. 442 */ 443 default Schema build(Comparator<Entry> order) { 444 throw new UnsupportedOperationException("#build(EntriesOrder) is not implemented"); 445 } 446 } 447 448 /** 449 * Sanitize name to be avro compatible. 450 * 451 * @param name : original name. 452 * 453 * @return avro compatible name. 454 */ 455 static String sanitizeConnectionName(final String name) { 456 if (name == null || name.isEmpty()) { 457 return name; 458 } 459 460 char current = name.charAt(0); 461 final CharsetEncoder ascii = Charset.forName(StandardCharsets.US_ASCII.name()).newEncoder(); 462 final boolean skipFirstChar = ((!ascii.canEncode(current)) || (!Character.isLetter(current) && current != '_')) 463 && name.length() > 1 && (!Character.isDigit(name.charAt(1))); 464 465 final StringBuilder sanitizedBuilder = new StringBuilder(); 466 467 if (!skipFirstChar) { 468 if (((!Character.isLetter(current)) && current != '_') || (!ascii.canEncode(current))) { 469 sanitizedBuilder.append('_'); 470 } else { 471 sanitizedBuilder.append(current); 472 } 473 } 474 for (int i = 1; i < name.length(); i++) { 475 current = name.charAt(i); 476 if (!ascii.canEncode(current)) { 477 if (Character.isLowerCase(current) || Character.isUpperCase(current)) { 478 sanitizedBuilder.append('_'); 479 } else { 480 final byte[] encoded = 481 Base64.getEncoder().encode(name.substring(i, i + 1).getBytes(StandardCharsets.UTF_8)); 482 final String enc = new String(encoded); 483 if (sanitizedBuilder.length() == 0 && Character.isDigit(enc.charAt(0))) { 484 sanitizedBuilder.append('_'); 485 } 486 for (int iter = 0; iter < enc.length(); iter++) { 487 if (Character.isLetterOrDigit(enc.charAt(iter))) { 488 sanitizedBuilder.append(enc.charAt(iter)); 489 } else { 490 sanitizedBuilder.append('_'); 491 } 492 } 493 } 494 } else if (Character.isLetterOrDigit(current)) { 495 sanitizedBuilder.append(current); 496 } else { 497 sanitizedBuilder.append('_'); 498 } 499 500 } 501 return sanitizedBuilder.toString(); 502 } 503 504 @RequiredArgsConstructor 505 @ToString 506 @EqualsAndHashCode 507 class EntriesOrder implements Comparator<Entry> { 508 509 private final OrderedMap<String> fieldsOrder; 510 511 // Keep comparator while no change occurs in fieldsOrder. 512 private Comparator<Entry> currentComparator = null; 513 514 /** 515 * Build an EntriesOrder according fields. 516 * 517 * @param fields the fields ordering. Each field should be {@code ,} separated. 518 * 519 * @return the order EntriesOrder 520 */ 521 public static EntriesOrder of(final String fields) { 522 return new EntriesOrder(fields); 523 } 524 525 /** 526 * Build an EntriesOrder according fields. 527 * 528 * @param fields the fields ordering. 529 * 530 * @return the order EntriesOrder 531 */ 532 public static EntriesOrder of(final Iterable<String> fields) { 533 final OrderedMap<String> orders = new OrderedMap<>(Function.identity(), fields); 534 return new EntriesOrder(orders); 535 } 536 537 public EntriesOrder(final String fields) { 538 if (fields == null || fields.isEmpty()) { 539 fieldsOrder = new OrderedMap<>(Function.identity()); 540 } else { 541 final List<String> fieldList = Arrays.stream(fields.split(",")).collect(Collectors.toList()); 542 fieldsOrder = new OrderedMap<>(Function.identity(), fieldList); 543 } 544 } 545 546 public EntriesOrder(final Iterable<String> fields) { 547 this(new OrderedMap<>(Function.identity(), fields)); 548 } 549 550 public Stream<String> getFieldsOrder() { 551 return this.fieldsOrder.streams(); 552 } 553 554 /** 555 * Move a field after another one. 556 * 557 * @param after the field name reference 558 * @param name the field name 559 * 560 * @return this EntriesOrder 561 */ 562 public EntriesOrder moveAfter(final String after, final String name) { 563 this.currentComparator = null; 564 this.fieldsOrder.moveAfter(after, name); 565 return this; 566 } 567 568 /** 569 * Move a field before another one. 570 * 571 * @param before the field name reference 572 * @param name the field name 573 * 574 * @return this EntriesOrder 575 */ 576 public EntriesOrder moveBefore(final String before, final String name) { 577 this.currentComparator = null; 578 this.fieldsOrder.moveBefore(before, name); 579 return this; 580 } 581 582 /** 583 * Swap two fields. 584 * 585 * @param name the field name 586 * @param with the other field 587 * 588 * @return this EntriesOrder 589 */ 590 public EntriesOrder swap(final String name, final String with) { 591 this.currentComparator = null; 592 this.fieldsOrder.swap(name, with); 593 return this; 594 } 595 596 public String toFields() { 597 return this.fieldsOrder.streams().collect(Collectors.joining(",")); 598 } 599 600 public Comparator<Entry> getComparator() { 601 if (this.currentComparator == null) { 602 final Map<String, Integer> entryPositions = new HashMap<>(); 603 final AtomicInteger index = new AtomicInteger(1); 604 this.fieldsOrder.streams() 605 .forEach( 606 (final String name) -> entryPositions.put(name, index.getAndIncrement())); 607 this.currentComparator = new EntryComparator(entryPositions); 608 } 609 return this.currentComparator; 610 } 611 612 @Override 613 public int compare(final Entry e1, final Entry e2) { 614 return this.getComparator().compare(e1, e2); 615 } 616 617 @RequiredArgsConstructor 618 static class EntryComparator implements Comparator<Entry> { 619 620 private final Map<String, Integer> entryPositions; 621 622 @Override 623 public int compare(final Entry e1, final Entry e2) { 624 final int index1 = this.entryPositions.getOrDefault(e1.getName(), Integer.MAX_VALUE); 625 final int index2 = this.entryPositions.getOrDefault(e2.getName(), Integer.MAX_VALUE); 626 if (index1 >= 0 && index2 >= 0) { 627 return index1 - index2; 628 } 629 if (index1 >= 0) { 630 return -1; 631 } 632 if (index2 >= 0) { 633 return 1; 634 } 635 return 0; 636 } 637 } 638 } 639 640 // use new avoid collision with entry getter. 641 @Deprecated 642 static Schema.Entry avoidCollision(final Schema.Entry newEntry, 643 final Supplier<Stream<Schema.Entry>> allEntriesSupplier, 644 final BiConsumer<String, Entry> replaceFunction) { 645 final Function<String, Entry> entryGetter = (String name) -> allEntriesSupplier // 646 .get() // 647 .filter((final Entry field) -> field.getName().equals(name)) 648 .findFirst() 649 .orElse(null); 650 return avoidCollision(newEntry, entryGetter, replaceFunction); 651 } 652 653 static Schema.Entry avoidCollision(final Schema.Entry newEntry, 654 final Function<String, Entry> entryGetter, 655 final BiConsumer<String, Entry> replaceFunction) { 656 final Optional<Entry> collisionedEntry = Optional.ofNullable(entryGetter // 657 .apply(newEntry.getName())) // 658 .filter((final Entry field) -> !Objects.equals(field, newEntry)); 659 if (!collisionedEntry.isPresent()) { 660 // No collision, return new entry. 661 return newEntry; 662 } 663 final Entry matchedEntry = collisionedEntry.get(); 664 final boolean matchedToChange = matchedEntry.getRawName() != null && !(matchedEntry.getRawName().isEmpty()); 665 if (matchedToChange) { 666 // the rename has to be applied on entry already inside schema, so replace. 667 replaceFunction.accept(matchedEntry.getName(), newEntry); 668 } else if (newEntry.getRawName() == null || newEntry.getRawName().isEmpty()) { 669 // try to add exactly same raw, skip the add here. 670 return null; 671 } 672 final Entry fieldToChange = matchedToChange ? matchedEntry : newEntry; 673 int indexForAnticollision = 1; 674 final String baseName = Schema.sanitizeConnectionName(fieldToChange.getRawName()); // recalc primiti name. 675 676 String newName = baseName + "_" + indexForAnticollision; 677 while (entryGetter.apply(newName) != null) { 678 indexForAnticollision++; 679 newName = baseName + "_" + indexForAnticollision; 680 } 681 final Entry newFieldToAdd = fieldToChange.toBuilder().withName(newName).build(); 682 683 return newFieldToAdd; // matchedToChange ? newFieldToAdd : newEntry; 684 } 685}