001/** 002 * Copyright (C) 2006-2025 Talend Inc. - www.talend.com 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.talend.sdk.component.api.record; 017 018import java.io.StringReader; 019import java.math.BigDecimal; 020import java.time.temporal.Temporal; 021import java.util.Arrays; 022import java.util.Collection; 023import java.util.Comparator; 024import java.util.Date; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.atomic.AtomicInteger; 029import java.util.function.BiConsumer; 030import java.util.function.Function; 031import java.util.stream.Collectors; 032import java.util.stream.Stream; 033 034import javax.json.Json; 035import javax.json.JsonValue; 036import javax.json.bind.annotation.JsonbTransient; 037import javax.json.stream.JsonParser; 038 039import lombok.EqualsAndHashCode; 040import lombok.RequiredArgsConstructor; 041import lombok.ToString; 042 043public interface Schema { 044 045 String SKIP_SANITIZE_PROPERTY = "talend.component.record.skip.sanitize"; 046 047 boolean SKIP_SANITIZE = Boolean.getBoolean(SKIP_SANITIZE_PROPERTY); 048 049 /** 050 * @return the type of this schema. 051 */ 052 Type getType(); 053 054 /** 055 * @return the nested element schema for arrays. 056 */ 057 Schema getElementSchema(); 058 059 /** 060 * @return the data entries for records (not contains meta data entries). 061 */ 062 List<Entry> getEntries(); 063 064 /** 065 * @return the metadata entries for records (not contains ordinary data entries). 066 */ 067 List<Entry> getMetadata(); 068 069 /** 070 * @return All entries, including data and metadata, of this schema. 071 */ 072 Stream<Entry> getAllEntries(); 073 074 @JsonbTransient 075 default Map<String, Entry> getEntryMap() { 076 throw new UnsupportedOperationException("#getEntryMap is not implemented"); 077 } 078 079 /** 080 * Get a Builder from the current schema. 081 * 082 * @return a {@link Schema.Builder} 083 */ 084 default Schema.Builder toBuilder() { 085 throw new UnsupportedOperationException("#toBuilder is not implemented"); 086 } 087 088 /** 089 * Get all entries sorted by schema designed order. 090 * 091 * @return all entries ordered 092 */ 093 @JsonbTransient 094 default List<Entry> getEntriesOrdered() { 095 return getEntriesOrdered(naturalOrder()); 096 } 097 098 /** 099 * Get all entries sorted using a custom comparator. 100 * 101 * @param comparator the comparator 102 * 103 * @return all entries ordered with provided comparator 104 */ 105 @JsonbTransient 106 default List<Entry> getEntriesOrdered(final Comparator<Entry> comparator) { 107 return getAllEntries().sorted(comparator).collect(Collectors.toList()); 108 } 109 110 /** 111 * Get the EntriesOrder defined with Builder. 112 * 113 * @return the EntriesOrder 114 */ 115 116 default EntriesOrder naturalOrder() { 117 throw new UnsupportedOperationException("#naturalOrder is not implemented"); 118 } 119 120 default Entry getEntry(final String name) { 121 return getEntryMap().get(name); 122 } 123 124 /** 125 * @return the metadata props 126 */ 127 Map<String, String> getProps(); 128 129 /** 130 * @param property : property name. 131 * 132 * @return the requested metadata prop 133 */ 134 String getProp(String property); 135 136 /** 137 * Get a property values from schema with its name. 138 * 139 * @param name : property's name. 140 * 141 * @return property's value. 142 */ 143 default JsonValue getJsonProp(final String name) { 144 final String prop = this.getProp(name); 145 if (prop == null) { 146 return null; 147 } 148 try (final StringReader reader = new StringReader(prop); 149 final JsonParser parser = Json.createParser(reader)) { 150 return parser.getValue(); 151 } catch (RuntimeException ex) { 152 return Json.createValue(prop); 153 } 154 } 155 156 enum Type { 157 158 RECORD(new Class<?>[] { Record.class }), 159 ARRAY(new Class<?>[] { Collection.class }), 160 STRING(new Class<?>[] { String.class, Object.class }), 161 BYTES(new Class<?>[] { byte[].class, Byte[].class }), 162 INT(new Class<?>[] { Integer.class }), 163 LONG(new Class<?>[] { Long.class }), 164 FLOAT(new Class<?>[] { Float.class }), 165 DOUBLE(new Class<?>[] { Double.class }), 166 BOOLEAN(new Class<?>[] { Boolean.class }), 167 DATETIME(new Class<?>[] { Long.class, Date.class, Temporal.class }), 168 DECIMAL(new Class<?>[] { BigDecimal.class }); 169 170 /** 171 * All compatibles Java classes 172 */ 173 private final Class<?>[] classes; 174 175 Type(final Class<?>[] classes) { 176 this.classes = classes; 177 } 178 179 /** 180 * Check if input can be affected to an entry of this type. 181 * 182 * @param input : object. 183 * 184 * @return true if input is null or ok. 185 */ 186 public boolean isCompatible(final Object input) { 187 if (input == null) { 188 return true; 189 } 190 for (final Class<?> clazz : classes) { 191 if (clazz.isInstance(input)) { 192 return true; 193 } 194 } 195 return false; 196 } 197 } 198 199 interface Entry { 200 201 /** 202 * @return The name of this entry. 203 */ 204 String getName(); 205 206 /** 207 * @return The raw name of this entry. 208 */ 209 String getRawName(); 210 211 /** 212 * @return the raw name of this entry if exists, else return name. 213 */ 214 String getOriginalFieldName(); 215 216 /** 217 * @return Type of the entry, this determine which other fields are populated. 218 */ 219 Type getType(); 220 221 /** 222 * @return Is this entry nullable or always valued. 223 */ 224 boolean isNullable(); 225 226 /** 227 * @return true if this entry is for metadata; false for ordinary data. 228 */ 229 boolean isMetadata(); 230 231 /** 232 * @return Is this entry can be in error. 233 */ 234 boolean isErrorCapable(); 235 236 /** 237 * @return true if the value of this entry is valid; false for invalid value. 238 */ 239 boolean isValid(); 240 241 /** 242 * @param <T> the default value type. 243 * 244 * @return Default value for this entry. 245 */ 246 <T> T getDefaultValue(); 247 248 /** 249 * @return For type == record, the element type. 250 */ 251 Schema getElementSchema(); 252 253 /** 254 * @return Allows to associate to this field a comment - for doc purposes, no use in the runtime. 255 */ 256 String getComment(); 257 258 /** 259 * @return the metadata props 260 */ 261 Map<String, String> getProps(); 262 263 /** 264 * @param property : property name. 265 * 266 * @return the requested metadata prop 267 */ 268 String getProp(String property); 269 270 /** 271 * Get a property values from entry with its name. 272 * 273 * @param name : property's name. 274 * 275 * @return property's value. 276 */ 277 default JsonValue getJsonProp(final String name) { 278 final String prop = this.getProp(name); 279 if (prop == null) { 280 return null; 281 } 282 try (final StringReader reader = new StringReader(prop); 283 final JsonParser parser = Json.createParser(reader)) { 284 return parser.getValue(); 285 } catch (RuntimeException ex) { 286 return Json.createValue(prop); 287 } 288 } 289 290 /** 291 * 292 * @return the logical type property 293 */ 294 default String getLogicalType() { 295 return this.getProp(SchemaProperty.LOGICAL_TYPE); 296 } 297 298 /** 299 * @return an {@link Entry.Builder} from this entry. 300 */ 301 default Entry.Builder toBuilder() { 302 throw new UnsupportedOperationException("#toBuilder is not implemented"); 303 } 304 305 default String getErrorMessage() { 306 return getProp(SchemaProperty.ENTRY_ERROR_MESSAGE); 307 } 308 309 default String getErrorFallbackValue() { 310 return getProp(SchemaProperty.ENTRY_ERROR_FALLBACK_VALUE); 311 } 312 313 /** 314 * Plain builder matching {@link Entry} structure. 315 */ 316 interface Builder { 317 318 Builder withName(String name); 319 320 Builder withRawName(String rawName); 321 322 Builder withType(Type type); 323 324 default Builder withLogicalType(SchemaProperty.LogicalType logicalType) { 325 throw new UnsupportedOperationException("#withLogicalType is not implemented"); 326 } 327 328 default Builder withLogicalType(String logicalType) { 329 throw new UnsupportedOperationException("#withLogicalType is not implemented"); 330 } 331 332 Builder withNullable(boolean nullable); 333 334 Builder withErrorCapable(boolean errorCapable); 335 336 Builder withMetadata(boolean metadata); 337 338 <T> Builder withDefaultValue(T value); 339 340 Builder withElementSchema(Schema schema); 341 342 Builder withComment(String comment); 343 344 Builder withProps(Map<String, String> props); 345 346 Builder withProp(String key, String value); 347 348 Entry build(); 349 } 350 } 351 352 /** 353 * Allows to build a {@link Schema}. 354 */ 355 interface Builder { 356 357 /** 358 * @param type schema type. 359 * 360 * @return this builder. 361 */ 362 Builder withType(Type type); 363 364 /** 365 * @param entry element for either an array or record type. 366 * 367 * @return this builder. 368 */ 369 Builder withEntry(Entry entry); 370 371 /** 372 * Insert the entry after the specified entry. 373 * 374 * @param after the entry name reference 375 * @param entry the entry name 376 * 377 * @return this builder 378 */ 379 default Builder withEntryAfter(String after, Entry entry) { 380 throw new UnsupportedOperationException("#withEntryAfter is not implemented"); 381 } 382 383 /** 384 * Insert the entry before the specified entry. 385 * 386 * @param before the entry name reference 387 * @param entry the entry name 388 * 389 * @return this builder 390 */ 391 default Builder withEntryBefore(String before, Entry entry) { 392 throw new UnsupportedOperationException("#withEntryBefore is not implemented"); 393 } 394 395 /** 396 * Remove entry from builder. 397 * 398 * @param name the entry name 399 * 400 * @return this builder 401 */ 402 default Builder remove(String name) { 403 throw new UnsupportedOperationException("#remove is not implemented"); 404 } 405 406 /** 407 * Remove entry from builder. 408 * 409 * @param entry the entry 410 * 411 * @return this builder 412 */ 413 default Builder remove(Entry entry) { 414 throw new UnsupportedOperationException("#remove is not implemented"); 415 } 416 417 /** 418 * Move an entry after another one. 419 * 420 * @param after the entry name reference 421 * @param name the entry name 422 */ 423 default Builder moveAfter(final String after, final String name) { 424 throw new UnsupportedOperationException("#moveAfter is not implemented"); 425 } 426 427 /** 428 * Move an entry before another one. 429 * 430 * @param before the entry name reference 431 * @param name the entry name 432 */ 433 default Builder moveBefore(final String before, final String name) { 434 throw new UnsupportedOperationException("#moveBefore is not implemented"); 435 } 436 437 /** 438 * Swap two entries. 439 * 440 * @param name the entry name 441 * @param with the other entry name 442 */ 443 default Builder swap(final String name, final String with) { 444 throw new UnsupportedOperationException("#swap is not implemented"); 445 } 446 447 /** 448 * @param schema nested element schema. 449 * 450 * @return this builder. 451 */ 452 Builder withElementSchema(Schema schema); 453 454 /** 455 * @param props schema properties 456 * 457 * @return this builder 458 */ 459 Builder withProps(Map<String, String> props); 460 461 /** 462 * @param key the prop key name 463 * @param value the prop value 464 * 465 * @return this builder 466 */ 467 Builder withProp(String key, String value); 468 469 /** 470 * @return the described schema. 471 */ 472 Schema build(); 473 474 /** 475 * Same as {@link Builder#build()} but entries order is specified by {@code order}. This takes precedence on any 476 * previous defined order with builder and may void it. 477 * 478 * @param order the wanted order for entries. 479 * @return the described schema. 480 */ 481 default Schema build(Comparator<Entry> order) { 482 throw new UnsupportedOperationException("#build(EntriesOrder) is not implemented"); 483 } 484 } 485 486 /** 487 * Sanitize name to be avro compatible. 488 * 489 * @param name : original name. 490 * 491 * @return avro compatible name. 492 */ 493 @Deprecated 494 static String sanitizeConnectionName(final String name) { 495 return SchemaCompanionUtil.sanitizeName(name); 496 } 497 498 @RequiredArgsConstructor 499 @ToString 500 @EqualsAndHashCode 501 class EntriesOrder implements Comparator<Entry> { 502 503 private final OrderedMap<String> fieldsOrder; 504 505 // Keep comparator while no change occurs in fieldsOrder. 506 private Comparator<Entry> currentComparator = null; 507 508 /** 509 * Build an EntriesOrder according fields. 510 * 511 * @param fields the fields ordering. Each field should be {@code ,} separated. 512 * 513 * @return the order EntriesOrder 514 */ 515 public static EntriesOrder of(final String fields) { 516 return new EntriesOrder(fields); 517 } 518 519 /** 520 * Build an EntriesOrder according fields. 521 * 522 * @param fields the fields ordering. 523 * 524 * @return the order EntriesOrder 525 */ 526 public static EntriesOrder of(final Iterable<String> fields) { 527 final OrderedMap<String> orders = new OrderedMap<>(Function.identity(), fields); 528 return new EntriesOrder(orders); 529 } 530 531 public EntriesOrder(final String fields) { 532 if (fields == null || fields.isEmpty()) { 533 fieldsOrder = new OrderedMap<>(Function.identity()); 534 } else { 535 final List<String> fieldList = Arrays.stream(fields.split(",")).collect(Collectors.toList()); 536 fieldsOrder = new OrderedMap<>(Function.identity(), fieldList); 537 } 538 } 539 540 public EntriesOrder(final Iterable<String> fields) { 541 this(new OrderedMap<>(Function.identity(), fields)); 542 } 543 544 public Stream<String> getFieldsOrder() { 545 return this.fieldsOrder.streams(); 546 } 547 548 /** 549 * Move a field after another one. 550 * 551 * @param after the field name reference 552 * @param name the field name 553 * 554 * @return this EntriesOrder 555 */ 556 public EntriesOrder moveAfter(final String after, final String name) { 557 this.currentComparator = null; 558 this.fieldsOrder.moveAfter(after, name); 559 return this; 560 } 561 562 /** 563 * Move a field before another one. 564 * 565 * @param before the field name reference 566 * @param name the field name 567 * 568 * @return this EntriesOrder 569 */ 570 public EntriesOrder moveBefore(final String before, final String name) { 571 this.currentComparator = null; 572 this.fieldsOrder.moveBefore(before, name); 573 return this; 574 } 575 576 /** 577 * Swap two fields. 578 * 579 * @param name the field name 580 * @param with the other field 581 * 582 * @return this EntriesOrder 583 */ 584 public EntriesOrder swap(final String name, final String with) { 585 this.currentComparator = null; 586 this.fieldsOrder.swap(name, with); 587 return this; 588 } 589 590 public String toFields() { 591 return this.fieldsOrder.streams().collect(Collectors.joining(",")); 592 } 593 594 public Comparator<Entry> getComparator() { 595 if (this.currentComparator == null) { 596 final Map<String, Integer> entryPositions = new HashMap<>(); 597 final AtomicInteger index = new AtomicInteger(1); 598 this.fieldsOrder.streams() 599 .forEach( 600 (final String name) -> entryPositions.put(name, index.getAndIncrement())); 601 this.currentComparator = new EntryComparator(entryPositions); 602 } 603 return this.currentComparator; 604 } 605 606 @Override 607 public int compare(final Entry e1, final Entry e2) { 608 return this.getComparator().compare(e1, e2); 609 } 610 611 @RequiredArgsConstructor 612 static class EntryComparator implements Comparator<Entry> { 613 614 private final Map<String, Integer> entryPositions; 615 616 @Override 617 public int compare(final Entry e1, final Entry e2) { 618 final int index1 = this.entryPositions.getOrDefault(e1.getName(), Integer.MAX_VALUE); 619 final int index2 = this.entryPositions.getOrDefault(e2.getName(), Integer.MAX_VALUE); 620 if (index1 >= 0 && index2 >= 0) { 621 return index1 - index2; 622 } 623 if (index1 >= 0) { 624 return -1; 625 } 626 if (index2 >= 0) { 627 return 1; 628 } 629 return 0; 630 } 631 } 632 } 633 634 /** 635 * Use instead {@since SchemaCompanionUtil#avoidCollision(Schema.Entry, Function, BiConsumer)} 636 */ 637 @Deprecated 638 static Schema.Entry avoidCollision(final Schema.Entry newEntry, 639 final Function<String, Entry> entryGetter, 640 final BiConsumer<String, Entry> replaceFunction) { 641 return SchemaCompanionUtil.avoidCollision(newEntry, entryGetter, replaceFunction); 642 } 643}