001/** 002 * Copyright (C) 2006-2025 Talend Inc. - www.talend.com 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.talend.sdk.component.api.record; 017 018import java.io.StringReader; 019import java.math.BigDecimal; 020import java.time.temporal.Temporal; 021import java.util.Arrays; 022import java.util.Collection; 023import java.util.Comparator; 024import java.util.Date; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.atomic.AtomicInteger; 029import java.util.function.BiConsumer; 030import java.util.function.Function; 031import java.util.stream.Collectors; 032import java.util.stream.Stream; 033 034import javax.json.Json; 035import javax.json.JsonValue; 036import javax.json.bind.annotation.JsonbTransient; 037import javax.json.stream.JsonParser; 038 039import lombok.EqualsAndHashCode; 040import lombok.RequiredArgsConstructor; 041import lombok.ToString; 042 043public interface Schema { 044 045 String SKIP_SANITIZE_PROPERTY = "talend.component.record.skip.sanitize"; 046 047 boolean SKIP_SANITIZE = Boolean.getBoolean(SKIP_SANITIZE_PROPERTY); 048 049 /** 050 * @return the type of this schema. 051 */ 052 Type getType(); 053 054 /** 055 * @return the nested element schema for arrays. 056 */ 057 Schema getElementSchema(); 058 059 /** 060 * @return the data entries for records (not contains meta data entries). 061 */ 062 List<Entry> getEntries(); 063 064 /** 065 * @return the metadata entries for records (not contains ordinary data entries). 066 */ 067 List<Entry> getMetadata(); 068 069 /** 070 * @return All entries, including data and metadata, of this schema. 071 */ 072 Stream<Entry> getAllEntries(); 073 074 default Map<String, Entry> getEntryMap() { 075 throw new UnsupportedOperationException("#getEntryMap is not implemented"); 076 } 077 078 /** 079 * Get a Builder from the current schema. 080 * 081 * @return a {@link Schema.Builder} 082 */ 083 default Schema.Builder toBuilder() { 084 throw new UnsupportedOperationException("#toBuilder is not implemented"); 085 } 086 087 /** 088 * Get all entries sorted by schema designed order. 089 * 090 * @return all entries ordered 091 */ 092 default List<Entry> getEntriesOrdered() { 093 return getEntriesOrdered(naturalOrder()); 094 } 095 096 /** 097 * Get all entries sorted using a custom comparator. 098 * 099 * @param comparator the comparator 100 * 101 * @return all entries ordered with provided comparator 102 */ 103 @JsonbTransient 104 default List<Entry> getEntriesOrdered(final Comparator<Entry> comparator) { 105 return getAllEntries().sorted(comparator).collect(Collectors.toList()); 106 } 107 108 /** 109 * Get the EntriesOrder defined with Builder. 110 * 111 * @return the EntriesOrder 112 */ 113 114 default EntriesOrder naturalOrder() { 115 throw new UnsupportedOperationException("#naturalOrder is not implemented"); 116 } 117 118 default Entry getEntry(final String name) { 119 return getEntryMap().get(name); 120 } 121 122 /** 123 * @return the metadata props 124 */ 125 Map<String, String> getProps(); 126 127 /** 128 * @param property : property name. 129 * 130 * @return the requested metadata prop 131 */ 132 String getProp(String property); 133 134 /** 135 * Get a property values from schema with its name. 136 * 137 * @param name : property's name. 138 * 139 * @return property's value. 140 */ 141 default JsonValue getJsonProp(final String name) { 142 final String prop = this.getProp(name); 143 if (prop == null) { 144 return null; 145 } 146 try (final StringReader reader = new StringReader(prop); 147 final JsonParser parser = Json.createParser(reader)) { 148 return parser.getValue(); 149 } catch (RuntimeException ex) { 150 return Json.createValue(prop); 151 } 152 } 153 154 enum Type { 155 156 RECORD(new Class<?>[] { Record.class }), 157 ARRAY(new Class<?>[] { Collection.class }), 158 STRING(new Class<?>[] { String.class, Object.class }), 159 BYTES(new Class<?>[] { byte[].class, Byte[].class }), 160 INT(new Class<?>[] { Integer.class }), 161 LONG(new Class<?>[] { Long.class }), 162 FLOAT(new Class<?>[] { Float.class }), 163 DOUBLE(new Class<?>[] { Double.class }), 164 BOOLEAN(new Class<?>[] { Boolean.class }), 165 DATETIME(new Class<?>[] { Long.class, Date.class, Temporal.class }), 166 DECIMAL(new Class<?>[] { BigDecimal.class }); 167 168 /** 169 * All compatibles Java classes 170 */ 171 private final Class<?>[] classes; 172 173 Type(final Class<?>[] classes) { 174 this.classes = classes; 175 } 176 177 /** 178 * Check if input can be affected to an entry of this type. 179 * 180 * @param input : object. 181 * 182 * @return true if input is null or ok. 183 */ 184 public boolean isCompatible(final Object input) { 185 if (input == null) { 186 return true; 187 } 188 for (final Class<?> clazz : classes) { 189 if (clazz.isInstance(input)) { 190 return true; 191 } 192 } 193 return false; 194 } 195 } 196 197 interface Entry { 198 199 /** 200 * @return The name of this entry. 201 */ 202 String getName(); 203 204 /** 205 * @return The raw name of this entry. 206 */ 207 String getRawName(); 208 209 /** 210 * @return the raw name of this entry if exists, else return name. 211 */ 212 String getOriginalFieldName(); 213 214 /** 215 * @return Type of the entry, this determine which other fields are populated. 216 */ 217 Type getType(); 218 219 /** 220 * @return Is this entry nullable or always valued. 221 */ 222 boolean isNullable(); 223 224 /** 225 * @return true if this entry is for metadata; false for ordinary data. 226 */ 227 boolean isMetadata(); 228 229 /** 230 * @return Is this entry can be in error. 231 */ 232 boolean isErrorCapable(); 233 234 /** 235 * @return true if the value of this entry is valid; false for invalid value. 236 */ 237 boolean isValid(); 238 239 /** 240 * @param <T> the default value type. 241 * 242 * @return Default value for this entry. 243 */ 244 <T> T getDefaultValue(); 245 246 /** 247 * @return For type == record, the element type. 248 */ 249 Schema getElementSchema(); 250 251 /** 252 * @return Allows to associate to this field a comment - for doc purposes, no use in the runtime. 253 */ 254 String getComment(); 255 256 /** 257 * @return the metadata props 258 */ 259 Map<String, String> getProps(); 260 261 /** 262 * @param property : property name. 263 * 264 * @return the requested metadata prop 265 */ 266 String getProp(String property); 267 268 /** 269 * Get a property values from entry with its name. 270 * 271 * @param name : property's name. 272 * 273 * @return property's value. 274 */ 275 default JsonValue getJsonProp(final String name) { 276 final String prop = this.getProp(name); 277 if (prop == null) { 278 return null; 279 } 280 try (final StringReader reader = new StringReader(prop); 281 final JsonParser parser = Json.createParser(reader)) { 282 return parser.getValue(); 283 } catch (RuntimeException ex) { 284 return Json.createValue(prop); 285 } 286 } 287 288 /** 289 * 290 * @return the logical type property 291 */ 292 default String getLogicalType() { 293 return this.getProp(SchemaProperty.LOGICAL_TYPE); 294 } 295 296 /** 297 * @return an {@link Entry.Builder} from this entry. 298 */ 299 default Entry.Builder toBuilder() { 300 throw new UnsupportedOperationException("#toBuilder is not implemented"); 301 } 302 303 default String getErrorMessage() { 304 return getProp(SchemaProperty.ENTRY_ERROR_MESSAGE); 305 } 306 307 default String getErrorFallbackValue() { 308 return getProp(SchemaProperty.ENTRY_ERROR_FALLBACK_VALUE); 309 } 310 311 /** 312 * Plain builder matching {@link Entry} structure. 313 */ 314 interface Builder { 315 316 Builder withName(String name); 317 318 Builder withRawName(String rawName); 319 320 Builder withType(Type type); 321 322 default Builder withLogicalType(SchemaProperty.LogicalType logicalType) { 323 throw new UnsupportedOperationException("#withLogicalType is not implemented"); 324 } 325 326 default Builder withLogicalType(String logicalType) { 327 throw new UnsupportedOperationException("#withLogicalType is not implemented"); 328 } 329 330 Builder withNullable(boolean nullable); 331 332 Builder withErrorCapable(boolean errorCapable); 333 334 Builder withMetadata(boolean metadata); 335 336 <T> Builder withDefaultValue(T value); 337 338 Builder withElementSchema(Schema schema); 339 340 Builder withComment(String comment); 341 342 Builder withProps(Map<String, String> props); 343 344 Builder withProp(String key, String value); 345 346 Entry build(); 347 } 348 } 349 350 /** 351 * Allows to build a {@link Schema}. 352 */ 353 interface Builder { 354 355 /** 356 * @param type schema type. 357 * 358 * @return this builder. 359 */ 360 Builder withType(Type type); 361 362 /** 363 * @param entry element for either an array or record type. 364 * 365 * @return this builder. 366 */ 367 Builder withEntry(Entry entry); 368 369 /** 370 * Insert the entry after the specified entry. 371 * 372 * @param after the entry name reference 373 * @param entry the entry name 374 * 375 * @return this builder 376 */ 377 default Builder withEntryAfter(String after, Entry entry) { 378 throw new UnsupportedOperationException("#withEntryAfter is not implemented"); 379 } 380 381 /** 382 * Insert the entry before the specified entry. 383 * 384 * @param before the entry name reference 385 * @param entry the entry name 386 * 387 * @return this builder 388 */ 389 default Builder withEntryBefore(String before, Entry entry) { 390 throw new UnsupportedOperationException("#withEntryBefore is not implemented"); 391 } 392 393 /** 394 * Remove entry from builder. 395 * 396 * @param name the entry name 397 * 398 * @return this builder 399 */ 400 default Builder remove(String name) { 401 throw new UnsupportedOperationException("#remove is not implemented"); 402 } 403 404 /** 405 * Remove entry from builder. 406 * 407 * @param entry the entry 408 * 409 * @return this builder 410 */ 411 default Builder remove(Entry entry) { 412 throw new UnsupportedOperationException("#remove is not implemented"); 413 } 414 415 /** 416 * Move an entry after another one. 417 * 418 * @param after the entry name reference 419 * @param name the entry name 420 */ 421 default Builder moveAfter(final String after, final String name) { 422 throw new UnsupportedOperationException("#moveAfter is not implemented"); 423 } 424 425 /** 426 * Move an entry before another one. 427 * 428 * @param before the entry name reference 429 * @param name the entry name 430 */ 431 default Builder moveBefore(final String before, final String name) { 432 throw new UnsupportedOperationException("#moveBefore is not implemented"); 433 } 434 435 /** 436 * Swap two entries. 437 * 438 * @param name the entry name 439 * @param with the other entry name 440 */ 441 default Builder swap(final String name, final String with) { 442 throw new UnsupportedOperationException("#swap is not implemented"); 443 } 444 445 /** 446 * @param schema nested element schema. 447 * 448 * @return this builder. 449 */ 450 Builder withElementSchema(Schema schema); 451 452 /** 453 * @param props schema properties 454 * 455 * @return this builder 456 */ 457 Builder withProps(Map<String, String> props); 458 459 /** 460 * @param key the prop key name 461 * @param value the prop value 462 * 463 * @return this builder 464 */ 465 Builder withProp(String key, String value); 466 467 /** 468 * @return the described schema. 469 */ 470 Schema build(); 471 472 /** 473 * Same as {@link Builder#build()} but entries order is specified by {@code order}. This takes precedence on any 474 * previous defined order with builder and may void it. 475 * 476 * @param order the wanted order for entries. 477 * @return the described schema. 478 */ 479 default Schema build(Comparator<Entry> order) { 480 throw new UnsupportedOperationException("#build(EntriesOrder) is not implemented"); 481 } 482 } 483 484 /** 485 * Sanitize name to be avro compatible. 486 * 487 * @param name : original name. 488 * 489 * @return avro compatible name. 490 */ 491 @Deprecated 492 static String sanitizeConnectionName(final String name) { 493 return SchemaCompanionUtil.sanitizeName(name); 494 } 495 496 @RequiredArgsConstructor 497 @ToString 498 @EqualsAndHashCode 499 class EntriesOrder implements Comparator<Entry> { 500 501 private final OrderedMap<String> fieldsOrder; 502 503 // Keep comparator while no change occurs in fieldsOrder. 504 private Comparator<Entry> currentComparator = null; 505 506 /** 507 * Build an EntriesOrder according fields. 508 * 509 * @param fields the fields ordering. Each field should be {@code ,} separated. 510 * 511 * @return the order EntriesOrder 512 */ 513 public static EntriesOrder of(final String fields) { 514 return new EntriesOrder(fields); 515 } 516 517 /** 518 * Build an EntriesOrder according fields. 519 * 520 * @param fields the fields ordering. 521 * 522 * @return the order EntriesOrder 523 */ 524 public static EntriesOrder of(final Iterable<String> fields) { 525 final OrderedMap<String> orders = new OrderedMap<>(Function.identity(), fields); 526 return new EntriesOrder(orders); 527 } 528 529 public EntriesOrder(final String fields) { 530 if (fields == null || fields.isEmpty()) { 531 fieldsOrder = new OrderedMap<>(Function.identity()); 532 } else { 533 final List<String> fieldList = Arrays.stream(fields.split(",")).collect(Collectors.toList()); 534 fieldsOrder = new OrderedMap<>(Function.identity(), fieldList); 535 } 536 } 537 538 public EntriesOrder(final Iterable<String> fields) { 539 this(new OrderedMap<>(Function.identity(), fields)); 540 } 541 542 public Stream<String> getFieldsOrder() { 543 return this.fieldsOrder.streams(); 544 } 545 546 /** 547 * Move a field after another one. 548 * 549 * @param after the field name reference 550 * @param name the field name 551 * 552 * @return this EntriesOrder 553 */ 554 public EntriesOrder moveAfter(final String after, final String name) { 555 this.currentComparator = null; 556 this.fieldsOrder.moveAfter(after, name); 557 return this; 558 } 559 560 /** 561 * Move a field before another one. 562 * 563 * @param before the field name reference 564 * @param name the field name 565 * 566 * @return this EntriesOrder 567 */ 568 public EntriesOrder moveBefore(final String before, final String name) { 569 this.currentComparator = null; 570 this.fieldsOrder.moveBefore(before, name); 571 return this; 572 } 573 574 /** 575 * Swap two fields. 576 * 577 * @param name the field name 578 * @param with the other field 579 * 580 * @return this EntriesOrder 581 */ 582 public EntriesOrder swap(final String name, final String with) { 583 this.currentComparator = null; 584 this.fieldsOrder.swap(name, with); 585 return this; 586 } 587 588 public String toFields() { 589 return this.fieldsOrder.streams().collect(Collectors.joining(",")); 590 } 591 592 public Comparator<Entry> getComparator() { 593 if (this.currentComparator == null) { 594 final Map<String, Integer> entryPositions = new HashMap<>(); 595 final AtomicInteger index = new AtomicInteger(1); 596 this.fieldsOrder.streams() 597 .forEach( 598 (final String name) -> entryPositions.put(name, index.getAndIncrement())); 599 this.currentComparator = new EntryComparator(entryPositions); 600 } 601 return this.currentComparator; 602 } 603 604 @Override 605 public int compare(final Entry e1, final Entry e2) { 606 return this.getComparator().compare(e1, e2); 607 } 608 609 @RequiredArgsConstructor 610 static class EntryComparator implements Comparator<Entry> { 611 612 private final Map<String, Integer> entryPositions; 613 614 @Override 615 public int compare(final Entry e1, final Entry e2) { 616 final int index1 = this.entryPositions.getOrDefault(e1.getName(), Integer.MAX_VALUE); 617 final int index2 = this.entryPositions.getOrDefault(e2.getName(), Integer.MAX_VALUE); 618 if (index1 >= 0 && index2 >= 0) { 619 return index1 - index2; 620 } 621 if (index1 >= 0) { 622 return -1; 623 } 624 if (index2 >= 0) { 625 return 1; 626 } 627 return 0; 628 } 629 } 630 } 631 632 /** 633 * Use instead {@since SchemaCompanionUtil#avoidCollision(Schema.Entry, Function, BiConsumer)} 634 */ 635 @Deprecated 636 static Schema.Entry avoidCollision(final Schema.Entry newEntry, 637 final Function<String, Entry> entryGetter, 638 final BiConsumer<String, Entry> replaceFunction) { 639 return SchemaCompanionUtil.avoidCollision(newEntry, entryGetter, replaceFunction); 640 } 641}