001/** 002 * Copyright (C) 2006-2022 Talend Inc. - www.talend.com 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.talend.sdk.component.api.record; 017 018import static java.util.Collections.emptyList; 019 020import java.io.StringReader; 021import java.nio.charset.Charset; 022import java.nio.charset.CharsetEncoder; 023import java.nio.charset.StandardCharsets; 024import java.time.temporal.Temporal; 025import java.util.Arrays; 026import java.util.Base64; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.Comparator; 030import java.util.Date; 031import java.util.List; 032import java.util.Map; 033import java.util.Objects; 034import java.util.Optional; 035import java.util.Set; 036import java.util.function.BiConsumer; 037import java.util.function.Supplier; 038import java.util.stream.Collectors; 039import java.util.stream.Stream; 040 041import javax.json.Json; 042import javax.json.JsonValue; 043import javax.json.bind.annotation.JsonbTransient; 044 045import lombok.AllArgsConstructor; 046import lombok.EqualsAndHashCode; 047import lombok.Getter; 048import lombok.ToString; 049 050public interface Schema { 051 052 /** 053 * @return the type of this schema. 054 */ 055 Type getType(); 056 057 /** 058 * @return the nested element schema for arrays. 059 */ 060 Schema getElementSchema(); 061 062 /** 063 * @return the data entries for records (not contains meta data entries). 064 */ 065 List<Entry> getEntries(); 066 067 /** 068 * @return the metadata entries for records (not contains ordinary data entries). 069 */ 070 List<Entry> getMetadata(); 071 072 /** 073 * @return All entries, including data and metadata, of this schema. 074 */ 075 Stream<Entry> getAllEntries(); 076 077 /** 078 * Get a Builder from the current schema. 079 * 080 * @return a {@link Schema.Builder} 081 */ 082 default Schema.Builder toBuilder() { 083 throw new UnsupportedOperationException("#toBuilder is not implemented"); 084 } 085 086 /** 087 * Get all entries sorted by schema designed order. 088 * 089 * @return all entries ordered 090 */ 091 default List<Entry> getEntriesOrdered() { 092 return getEntriesOrdered(naturalOrder()); 093 } 094 095 /** 096 * Get all entries sorted using a custom comparator. 097 * 098 * @param comparator the comparator 099 * 100 * @return all entries ordered with provided comparator 101 */ 102 @JsonbTransient 103 default List<Entry> getEntriesOrdered(final Comparator<Entry> comparator) { 104 return getAllEntries().sorted(comparator).collect(Collectors.toList()); 105 } 106 107 /** 108 * Get the EntriesOrder defined with Builder. 109 * 110 * @return the EntriesOrder 111 */ 112 113 default EntriesOrder naturalOrder() { 114 throw new UnsupportedOperationException("#naturalOrder is not implemented"); 115 } 116 117 default Entry getEntry(final String name) { 118 return getAllEntries() // 119 .filter((Entry e) -> Objects.equals(e.getName(), name)) // 120 .findFirst() // 121 .orElse(null); 122 } 123 124 /** 125 * @return the metadata props 126 */ 127 Map<String, String> getProps(); 128 129 /** 130 * @param property : property name. 131 * 132 * @return the requested metadata prop 133 */ 134 String getProp(String property); 135 136 /** 137 * Get a property values from schema with its name. 138 * 139 * @param name : property's name. 140 * 141 * @return property's value. 142 */ 143 default JsonValue getJsonProp(final String name) { 144 final String prop = this.getProp(name); 145 if (prop == null) { 146 return null; 147 } 148 try { 149 return Json.createParser(new StringReader(prop)).getValue(); 150 } catch (RuntimeException ex) { 151 return Json.createValue(prop); 152 } 153 } 154 155 enum Type { 156 157 RECORD(new Class<?>[] { Record.class }), 158 ARRAY(new Class<?>[] { Collection.class }), 159 STRING(new Class<?>[] { String.class }), 160 BYTES(new Class<?>[] { byte[].class, Byte[].class }), 161 INT(new Class<?>[] { Integer.class }), 162 LONG(new Class<?>[] { Long.class }), 163 FLOAT(new Class<?>[] { Float.class }), 164 DOUBLE(new Class<?>[] { Double.class }), 165 BOOLEAN(new Class<?>[] { Boolean.class }), 166 DATETIME(new Class<?>[] { Long.class, Date.class, Temporal.class }); 167 168 /** 169 * All compatibles Java classes 170 */ 171 private final Class<?>[] classes; 172 173 Type(final Class<?>[] classes) { 174 this.classes = classes; 175 } 176 177 /** 178 * Check if input can be affected to an entry of this type. 179 * 180 * @param input : object. 181 * 182 * @return true if input is null or ok. 183 */ 184 public boolean isCompatible(final Object input) { 185 if (input == null) { 186 return true; 187 } 188 for (final Class<?> clazz : classes) { 189 if (clazz.isInstance(input)) { 190 return true; 191 } 192 } 193 return false; 194 } 195 } 196 197 interface Entry { 198 199 /** 200 * @return The name of this entry. 201 */ 202 String getName(); 203 204 /** 205 * @return The raw name of this entry. 206 */ 207 String getRawName(); 208 209 /** 210 * @return the raw name of this entry if exists, else return name. 211 */ 212 String getOriginalFieldName(); 213 214 /** 215 * @return Type of the entry, this determine which other fields are populated. 216 */ 217 Type getType(); 218 219 /** 220 * @return Is this entry nullable or always valued. 221 */ 222 boolean isNullable(); 223 224 /** 225 * @return true if this entry is for metadata; false for ordinary data. 226 */ 227 boolean isMetadata(); 228 229 /** 230 * @param <T> the default value type. 231 * 232 * @return Default value for this entry. 233 */ 234 <T> T getDefaultValue(); 235 236 /** 237 * @return For type == record, the element type. 238 */ 239 Schema getElementSchema(); 240 241 /** 242 * @return Allows to associate to this field a comment - for doc purposes, no use in the runtime. 243 */ 244 String getComment(); 245 246 /** 247 * @return the metadata props 248 */ 249 Map<String, String> getProps(); 250 251 /** 252 * @param property : property name. 253 * 254 * @return the requested metadata prop 255 */ 256 String getProp(String property); 257 258 /** 259 * Get a property values from entry with its name. 260 * 261 * @param name : property's name. 262 * 263 * @return property's value. 264 */ 265 default JsonValue getJsonProp(final String name) { 266 final String prop = this.getProp(name); 267 if (prop == null) { 268 return null; 269 } 270 try { 271 return Json.createParser(new StringReader(prop)).getValue(); 272 } catch (RuntimeException ex) { 273 return Json.createValue(prop); 274 } 275 } 276 277 /** 278 * @return an {@link Entry.Builder} from this entry. 279 */ 280 default Entry.Builder toBuilder() { 281 throw new UnsupportedOperationException("#toBuilder is not implemented"); 282 } 283 284 /** 285 * Plain builder matching {@link Entry} structure. 286 */ 287 interface Builder { 288 289 Builder withName(String name); 290 291 Builder withRawName(String rawName); 292 293 Builder withType(Type type); 294 295 Builder withNullable(boolean nullable); 296 297 Builder withMetadata(boolean metadata); 298 299 <T> Builder withDefaultValue(T value); 300 301 Builder withElementSchema(Schema schema); 302 303 Builder withComment(String comment); 304 305 Builder withProps(Map<String, String> props); 306 307 Builder withProp(String key, String value); 308 309 Entry build(); 310 } 311 } 312 313 /** 314 * Allows to build a {@link Schema}. 315 */ 316 interface Builder { 317 318 /** 319 * @param type schema type. 320 * 321 * @return this builder. 322 */ 323 Builder withType(Type type); 324 325 /** 326 * @param entry element for either an array or record type. 327 * 328 * @return this builder. 329 */ 330 Builder withEntry(Entry entry); 331 332 /** 333 * Insert the entry after the specified entry. 334 * 335 * @param after the entry name reference 336 * @param entry the entry name 337 * 338 * @return this builder 339 */ 340 default Builder withEntryAfter(String after, Entry entry) { 341 throw new UnsupportedOperationException("#withEntryAfter is not implemented"); 342 } 343 344 /** 345 * Insert the entry before the specified entry. 346 * 347 * @param before the entry name reference 348 * @param entry the entry name 349 * 350 * @return this builder 351 */ 352 default Builder withEntryBefore(String before, Entry entry) { 353 throw new UnsupportedOperationException("#withEntryBefore is not implemented"); 354 } 355 356 /** 357 * Remove entry from builder. 358 * 359 * @param name the entry name 360 * 361 * @return this builder 362 */ 363 default Builder remove(String name) { 364 throw new UnsupportedOperationException("#remove is not implemented"); 365 } 366 367 /** 368 * Remove entry from builder. 369 * 370 * @param entry the entry 371 * 372 * @return this builder 373 */ 374 default Builder remove(Entry entry) { 375 throw new UnsupportedOperationException("#remove is not implemented"); 376 } 377 378 /** 379 * Move an entry after another one. 380 * 381 * @param after the entry name reference 382 * @param name the entry name 383 */ 384 default Builder moveAfter(final String after, final String name) { 385 throw new UnsupportedOperationException("#moveAfter is not implemented"); 386 } 387 388 /** 389 * Move an entry before another one. 390 * 391 * @param before the entry name reference 392 * @param name the entry name 393 */ 394 default Builder moveBefore(final String before, final String name) { 395 throw new UnsupportedOperationException("#moveBefore is not implemented"); 396 } 397 398 /** 399 * Swap two entries. 400 * 401 * @param name the entry name 402 * @param with the other entry name 403 */ 404 default Builder swap(final String name, final String with) { 405 throw new UnsupportedOperationException("#swap is not implemented"); 406 } 407 408 /** 409 * @param schema nested element schema. 410 * 411 * @return this builder. 412 */ 413 Builder withElementSchema(Schema schema); 414 415 /** 416 * @param props schema properties 417 * 418 * @return this builder 419 */ 420 Builder withProps(Map<String, String> props); 421 422 /** 423 * @param key the prop key name 424 * @param value the prop value 425 * 426 * @return this builder 427 */ 428 Builder withProp(String key, String value); 429 430 /** 431 * @return the described schema. 432 */ 433 Schema build(); 434 } 435 436 /** 437 * Sanitize name to be avro compatible. 438 * 439 * @param name : original name. 440 * 441 * @return avro compatible name. 442 */ 443 static String sanitizeConnectionName(final String name) { 444 if (name == null || name.isEmpty()) { 445 return name; 446 } 447 448 char current = name.charAt(0); 449 final CharsetEncoder ascii = Charset.forName(StandardCharsets.US_ASCII.name()).newEncoder(); 450 final boolean skipFirstChar = ((!ascii.canEncode(current)) || (!Character.isLetter(current) && current != '_')) 451 && name.length() > 1 && (!Character.isDigit(name.charAt(1))); 452 453 final StringBuilder sanitizedBuilder = new StringBuilder(); 454 455 if (!skipFirstChar) { 456 if (((!Character.isLetter(current)) && current != '_') || (!ascii.canEncode(current))) { 457 sanitizedBuilder.append('_'); 458 } else { 459 sanitizedBuilder.append(current); 460 } 461 } 462 for (int i = 1; i < name.length(); i++) { 463 current = name.charAt(i); 464 if (!ascii.canEncode(current)) { 465 if (Character.isLowerCase(current) || Character.isUpperCase(current)) { 466 sanitizedBuilder.append('_'); 467 } else { 468 final byte[] encoded = 469 Base64.getEncoder().encode(name.substring(i, i + 1).getBytes(StandardCharsets.UTF_8)); 470 final String enc = new String(encoded); 471 if (sanitizedBuilder.length() == 0 && Character.isDigit(enc.charAt(0))) { 472 sanitizedBuilder.append('_'); 473 } 474 for (int iter = 0; iter < enc.length(); iter++) { 475 if (Character.isLetterOrDigit(enc.charAt(iter))) { 476 sanitizedBuilder.append(enc.charAt(iter)); 477 } else { 478 sanitizedBuilder.append('_'); 479 } 480 } 481 } 482 } else if (Character.isLetterOrDigit(current)) { 483 sanitizedBuilder.append(current); 484 } else { 485 sanitizedBuilder.append('_'); 486 } 487 488 } 489 return sanitizedBuilder.toString(); 490 } 491 492 @AllArgsConstructor 493 @ToString 494 @EqualsAndHashCode 495 class EntriesOrder implements Comparator<Entry> { 496 497 @Getter 498 private final List<String> fieldsOrder; 499 500 /** 501 * Build an EntriesOrder according fields. 502 * 503 * @param fields the fields ordering 504 * 505 * @return the order EntriesOrder 506 */ 507 public static EntriesOrder of(final String fields) { 508 return new EntriesOrder(fields); 509 } 510 511 public EntriesOrder(final String fields) { 512 if (fields == null) { 513 fieldsOrder = emptyList(); 514 } else { 515 fieldsOrder = Arrays.stream(fields.split(",")).collect(Collectors.toList()); 516 } 517 } 518 519 /** 520 * Move a field after another one. 521 * 522 * @param after the field name reference 523 * @param name the field name 524 * 525 * @return this EntriesOrder 526 */ 527 public EntriesOrder moveAfter(final String after, final String name) { 528 if (getFieldsOrder().indexOf(after) == -1) { 529 throw new IllegalArgumentException(String.format("%s not in schema", after)); 530 } 531 getFieldsOrder().remove(name); 532 int destination = getFieldsOrder().indexOf(after); 533 if (!(destination + 1 == getFieldsOrder().size())) { 534 destination += 1; 535 } 536 getFieldsOrder().add(destination, name); 537 return this; 538 } 539 540 /** 541 * Move a field before another one. 542 * 543 * @param before the field name reference 544 * @param name the field name 545 * 546 * @return this EntriesOrder 547 */ 548 public EntriesOrder moveBefore(final String before, final String name) { 549 if (getFieldsOrder().indexOf(before) == -1) { 550 throw new IllegalArgumentException(String.format("%s not in schema", before)); 551 } 552 getFieldsOrder().remove(name); 553 getFieldsOrder().add(getFieldsOrder().indexOf(before), name); 554 return this; 555 } 556 557 /** 558 * Swap two fields. 559 * 560 * @param name the field name 561 * @param with the other field 562 * 563 * @return this EntriesOrder 564 */ 565 public EntriesOrder swap(final String name, final String with) { 566 Collections.swap(getFieldsOrder(), getFieldsOrder().indexOf(name), getFieldsOrder().indexOf(with)); 567 return this; 568 } 569 570 public String toFields() { 571 return getFieldsOrder().stream().collect(Collectors.joining(",")); 572 } 573 574 @Override 575 public int compare(final Entry e1, final Entry e2) { 576 final int index1 = getFieldsOrder().indexOf(e1.getName()); 577 final int index2 = getFieldsOrder().indexOf(e2.getName()); 578 if (index1 >= 0 && index2 >= 0) { 579 return index1 - index2; 580 } 581 if (index1 >= 0) { 582 return -1; 583 } 584 if (index2 >= 0) { 585 return 1; 586 } 587 return 0; 588 } 589 } 590 591 static Schema.Entry avoidCollision(final Schema.Entry newEntry, 592 final Supplier<Stream<Schema.Entry>> allEntriesSupplier, final BiConsumer<String, Entry> replaceFunction) { 593 final Optional<Entry> collisionedEntry = allEntriesSupplier // 594 .get() // 595 .filter((final Entry field) -> field.getName().equals(newEntry.getName()) 596 && !Objects.equals(field, newEntry)) // 597 .findFirst(); 598 if (!collisionedEntry.isPresent()) { 599 // No collision, return new entry. 600 return newEntry; 601 } 602 final Entry matchedEntry = collisionedEntry.get(); 603 final boolean matchedToChange = matchedEntry.getRawName() != null && !(matchedEntry.getRawName().isEmpty()); 604 if (matchedToChange) { 605 // the rename has to be applied on entry already inside schema, so replace. 606 replaceFunction.accept(matchedEntry.getName(), newEntry); 607 } else if (newEntry.getRawName() == null || newEntry.getRawName().isEmpty()) { 608 // try to add exactly same raw, skip the add here. 609 return null; 610 } 611 final Entry fieldToChange = matchedToChange ? matchedEntry : newEntry; 612 int indexForAnticollision = 1; 613 final String baseName = Schema.sanitizeConnectionName(fieldToChange.getRawName()); // recalc primiti name. 614 615 String newName = baseName + "_" + indexForAnticollision; 616 final Set<String> existingNames = allEntriesSupplier // 617 .get() // 618 .map(Entry::getName) // 619 .collect(Collectors.toSet()); 620 while (existingNames.contains(newName)) { 621 indexForAnticollision++; 622 newName = baseName + "_" + indexForAnticollision; 623 } 624 final Entry newFieldToAdd = fieldToChange.toBuilder().withName(newName).build(); 625 626 return newFieldToAdd; // matchedToChange ? newFieldToAdd : newEntry; 627 } 628}