001/** 002 * Copyright (C) 2006-2021 Talend Inc. - www.talend.com 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.talend.sdk.component.api.record; 017 018import static java.util.Collections.emptyList; 019 020import java.io.StringReader; 021import java.nio.charset.Charset; 022import java.nio.charset.CharsetEncoder; 023import java.nio.charset.StandardCharsets; 024import java.time.temporal.Temporal; 025import java.util.Arrays; 026import java.util.Base64; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.Comparator; 030import java.util.Date; 031import java.util.List; 032import java.util.Map; 033import java.util.Objects; 034import java.util.Optional; 035import java.util.Set; 036import java.util.function.BiConsumer; 037import java.util.function.Supplier; 038import java.util.stream.Collectors; 039import java.util.stream.Stream; 040 041import javax.json.Json; 042import javax.json.JsonValue; 043import javax.json.bind.annotation.JsonbTransient; 044 045import lombok.AllArgsConstructor; 046import lombok.EqualsAndHashCode; 047import lombok.Getter; 048import lombok.ToString; 049 050public interface Schema { 051 052 /** 053 * @return the type of this schema. 054 */ 055 Type getType(); 056 057 /** 058 * @return the nested element schema for arrays. 059 */ 060 Schema getElementSchema(); 061 062 /** 063 * @return the data entries for records (not contains meta data entries). 064 */ 065 List<Entry> getEntries(); 066 067 /** 068 * @return the metadata entries for records (not contains ordinary data entries). 069 */ 070 List<Entry> getMetadata(); 071 072 /** 073 * @return All entries, including data and metadata, of this schema. 074 */ 075 Stream<Entry> getAllEntries(); 076 077 /** 078 * Get a Builder from the current schema. 079 * 080 * @return a {@link Schema.Builder} 081 */ 082 Schema.Builder toBuilder(); 083 084 /** 085 * Get all entries sorted by schema designed order. 086 * 087 * @return all entries ordered 088 */ 089 default List<Entry> getEntriesOrdered() { 090 return getEntriesOrdered(naturalOrder()); 091 } 092 093 /** 094 * Get all entries sorted using a custom comparator. 095 * 096 * @param comparator the comparator 097 * 098 * @return all entries ordered with provided comparator 099 */ 100 @JsonbTransient 101 default List<Entry> getEntriesOrdered(final Comparator<Entry> comparator) { 102 return getAllEntries().sorted(comparator).collect(Collectors.toList()); 103 } 104 105 /** 106 * Get the EntriesOrder defined with Builder. 107 * 108 * @return the EntriesOrder 109 */ 110 111 EntriesOrder naturalOrder(); 112 113 default Entry getEntry(final String name) { 114 return getAllEntries() // 115 .filter((Entry e) -> Objects.equals(e.getName(), name)) // 116 .findFirst() // 117 .orElse(null); 118 } 119 120 /** 121 * @return the metadata props 122 */ 123 Map<String, String> getProps(); 124 125 /** 126 * @param property : property name. 127 * 128 * @return the requested metadata prop 129 */ 130 String getProp(String property); 131 132 /** 133 * Get a property values from schema with its name. 134 * 135 * @param name : property's name. 136 * 137 * @return property's value. 138 */ 139 default JsonValue getJsonProp(final String name) { 140 final String prop = this.getProp(name); 141 if (prop == null) { 142 return null; 143 } 144 try { 145 return Json.createParser(new StringReader(prop)).getValue(); 146 } catch (RuntimeException ex) { 147 return Json.createValue(prop); 148 } 149 } 150 151 enum Type { 152 153 RECORD(new Class<?>[] { Record.class }), 154 ARRAY(new Class<?>[] { Collection.class }), 155 STRING(new Class<?>[] { String.class }), 156 BYTES(new Class<?>[] { byte[].class, Byte[].class }), 157 INT(new Class<?>[] { Integer.class }), 158 LONG(new Class<?>[] { Long.class }), 159 FLOAT(new Class<?>[] { Float.class }), 160 DOUBLE(new Class<?>[] { Double.class }), 161 BOOLEAN(new Class<?>[] { Boolean.class }), 162 DATETIME(new Class<?>[] { Long.class, Date.class, Temporal.class }); 163 164 /** 165 * All compatibles Java classes 166 */ 167 private final Class<?>[] classes; 168 169 Type(final Class<?>[] classes) { 170 this.classes = classes; 171 } 172 173 /** 174 * Check if input can be affected to an entry of this type. 175 * 176 * @param input : object. 177 * 178 * @return true if input is null or ok. 179 */ 180 public boolean isCompatible(final Object input) { 181 if (input == null) { 182 return true; 183 } 184 for (final Class<?> clazz : classes) { 185 if (clazz.isInstance(input)) { 186 return true; 187 } 188 } 189 return false; 190 } 191 } 192 193 interface Entry { 194 195 /** 196 * @return The name of this entry. 197 */ 198 String getName(); 199 200 /** 201 * @return The raw name of this entry. 202 */ 203 String getRawName(); 204 205 /** 206 * @return the raw name of this entry if exists, else return name. 207 */ 208 String getOriginalFieldName(); 209 210 /** 211 * @return Type of the entry, this determine which other fields are populated. 212 */ 213 Type getType(); 214 215 /** 216 * @return Is this entry nullable or always valued. 217 */ 218 boolean isNullable(); 219 220 /** 221 * @return true if this entry is for metadata; false for ordinary data. 222 */ 223 boolean isMetadata(); 224 225 /** 226 * @param <T> the default value type. 227 * 228 * @return Default value for this entry. 229 */ 230 <T> T getDefaultValue(); 231 232 /** 233 * @return For type == record, the element type. 234 */ 235 Schema getElementSchema(); 236 237 /** 238 * @return Allows to associate to this field a comment - for doc purposes, no use in the runtime. 239 */ 240 String getComment(); 241 242 /** 243 * @return the metadata props 244 */ 245 Map<String, String> getProps(); 246 247 /** 248 * @param property : property name. 249 * 250 * @return the requested metadata prop 251 */ 252 String getProp(String property); 253 254 /** 255 * Get a property values from entry with its name. 256 * 257 * @param name : property's name. 258 * 259 * @return property's value. 260 */ 261 default JsonValue getJsonProp(final String name) { 262 final String prop = this.getProp(name); 263 if (prop == null) { 264 return null; 265 } 266 try { 267 return Json.createParser(new StringReader(prop)).getValue(); 268 } catch (RuntimeException ex) { 269 return Json.createValue(prop); 270 } 271 } 272 273 /** 274 * @return an {@link Entry.Builder} from this entry. 275 */ 276 Entry.Builder toBuilder(); 277 278 /** 279 * Plain builder matching {@link Entry} structure. 280 */ 281 interface Builder { 282 283 Builder withName(String name); 284 285 Builder withRawName(String rawName); 286 287 Builder withType(Type type); 288 289 Builder withNullable(boolean nullable); 290 291 Builder withMetadata(boolean metadata); 292 293 <T> Builder withDefaultValue(T value); 294 295 Builder withElementSchema(Schema schema); 296 297 Builder withComment(String comment); 298 299 Builder withProps(Map<String, String> props); 300 301 Builder withProp(String key, String value); 302 303 Entry build(); 304 } 305 } 306 307 /** 308 * Allows to build a {@link Schema}. 309 */ 310 interface Builder { 311 312 /** 313 * @param type schema type. 314 * 315 * @return this builder. 316 */ 317 Builder withType(Type type); 318 319 /** 320 * @param entry element for either an array or record type. 321 * 322 * @return this builder. 323 */ 324 Builder withEntry(Entry entry); 325 326 /** 327 * Insert the entry after the specified entry. 328 * 329 * @param after the entry name reference 330 * @param entry the entry name 331 * 332 * @return this builder 333 */ 334 Builder withEntryAfter(String after, Entry entry); 335 336 /** 337 * Insert the entry before the specified entry. 338 * 339 * @param before the entry name reference 340 * @param entry the entry name 341 * 342 * @return this builder 343 */ 344 Builder withEntryBefore(String before, Entry entry); 345 346 /** 347 * Remove entry from builder. 348 * 349 * @param name the entry name 350 * 351 * @return this builder 352 */ 353 Builder remove(String name); 354 355 /** 356 * Remove entry from builder. 357 * 358 * @param entry the entry 359 * 360 * @return this builder 361 */ 362 Builder remove(Entry entry); 363 364 /** 365 * Move an entry after another one. 366 * 367 * @param after the entry name reference 368 * @param name the entry name 369 */ 370 Builder moveAfter(final String after, final String name); 371 372 /** 373 * Move an entry before another one. 374 * 375 * @param before the entry name reference 376 * @param name the entry name 377 */ 378 Builder moveBefore(final String before, final String name); 379 380 /** 381 * Swap two entries. 382 * 383 * @param name the entry name 384 * @param with the other entry name 385 */ 386 Builder swap(final String name, final String with); 387 388 /** 389 * @param schema nested element schema. 390 * 391 * @return this builder. 392 */ 393 Builder withElementSchema(Schema schema); 394 395 /** 396 * @param props schema properties 397 * 398 * @return this builder 399 */ 400 Builder withProps(Map<String, String> props); 401 402 /** 403 * @param key the prop key name 404 * @param value the prop value 405 * 406 * @return this builder 407 */ 408 Builder withProp(String key, String value); 409 410 /** 411 * @return the described schema. 412 */ 413 Schema build(); 414 } 415 416 /** 417 * Sanitize name to be avro compatible. 418 * 419 * @param name : original name. 420 * 421 * @return avro compatible name. 422 */ 423 static String sanitizeConnectionName(final String name) { 424 if (name == null || name.isEmpty()) { 425 return name; 426 } 427 428 char current = name.charAt(0); 429 final CharsetEncoder ascii = Charset.forName(StandardCharsets.US_ASCII.name()).newEncoder(); 430 final boolean skipFirstChar = ((!ascii.canEncode(current)) || (!Character.isLetter(current) && current != '_')) 431 && name.length() > 1 && (!Character.isDigit(name.charAt(1))); 432 433 final StringBuilder sanitizedBuilder = new StringBuilder(); 434 435 if (!skipFirstChar) { 436 if (((!Character.isLetter(current)) && current != '_') || (!ascii.canEncode(current))) { 437 sanitizedBuilder.append('_'); 438 } else { 439 sanitizedBuilder.append(current); 440 } 441 } 442 for (int i = 1; i < name.length(); i++) { 443 current = name.charAt(i); 444 if (!ascii.canEncode(current)) { 445 if (Character.isLowerCase(current) || Character.isUpperCase(current)) { 446 sanitizedBuilder.append('_'); 447 } else { 448 final byte[] encoded = 449 Base64.getEncoder().encode(name.substring(i, i + 1).getBytes(StandardCharsets.UTF_8)); 450 final String enc = new String(encoded); 451 if (sanitizedBuilder.length() == 0 && Character.isDigit(enc.charAt(0))) { 452 sanitizedBuilder.append('_'); 453 } 454 for (int iter = 0; iter < enc.length(); iter++) { 455 if (Character.isLetterOrDigit(enc.charAt(iter))) { 456 sanitizedBuilder.append(enc.charAt(iter)); 457 } else { 458 sanitizedBuilder.append('_'); 459 } 460 } 461 } 462 } else if (Character.isLetterOrDigit(current)) { 463 sanitizedBuilder.append(current); 464 } else { 465 sanitizedBuilder.append('_'); 466 } 467 468 } 469 return sanitizedBuilder.toString(); 470 } 471 472 @AllArgsConstructor 473 @ToString 474 @EqualsAndHashCode 475 class EntriesOrder implements Comparator<Entry> { 476 477 @Getter 478 private final List<String> fieldsOrder; 479 480 /** 481 * Build an EntriesOrder according fields. 482 * 483 * @param fields the fields ordering 484 * 485 * @return the order EntriesOrder 486 */ 487 public static EntriesOrder of(final String fields) { 488 return new EntriesOrder(fields); 489 } 490 491 public EntriesOrder(final String fields) { 492 if (fields == null) { 493 fieldsOrder = emptyList(); 494 } else { 495 fieldsOrder = Arrays.stream(fields.split(",")).collect(Collectors.toList()); 496 } 497 } 498 499 /** 500 * Move a field after another one. 501 * 502 * @param after the field name reference 503 * @param name the field name 504 * 505 * @return this EntriesOrder 506 */ 507 public EntriesOrder moveAfter(final String after, final String name) { 508 if (getFieldsOrder().indexOf(after) == -1) { 509 throw new IllegalArgumentException(String.format("%s not in schema", after)); 510 } 511 getFieldsOrder().remove(name); 512 int destination = getFieldsOrder().indexOf(after); 513 if (!(destination + 1 == getFieldsOrder().size())) { 514 destination += 1; 515 } 516 getFieldsOrder().add(destination, name); 517 return this; 518 } 519 520 /** 521 * Move a field before another one. 522 * 523 * @param before the field name reference 524 * @param name the field name 525 * 526 * @return this EntriesOrder 527 */ 528 public EntriesOrder moveBefore(final String before, final String name) { 529 if (getFieldsOrder().indexOf(before) == -1) { 530 throw new IllegalArgumentException(String.format("%s not in schema", before)); 531 } 532 getFieldsOrder().remove(name); 533 getFieldsOrder().add(getFieldsOrder().indexOf(before), name); 534 return this; 535 } 536 537 /** 538 * Swap two fields. 539 * 540 * @param name the field name 541 * @param with the other field 542 * 543 * @return this EntriesOrder 544 */ 545 public EntriesOrder swap(final String name, final String with) { 546 Collections.swap(getFieldsOrder(), getFieldsOrder().indexOf(name), getFieldsOrder().indexOf(with)); 547 return this; 548 } 549 550 public String toFields() { 551 return getFieldsOrder().stream().collect(Collectors.joining(",")); 552 } 553 554 @Override 555 public int compare(final Entry e1, final Entry e2) { 556 final int index1 = getFieldsOrder().indexOf(e1.getName()); 557 final int index2 = getFieldsOrder().indexOf(e2.getName()); 558 if (index1 >= 0 && index2 >= 0) { 559 return index1 - index2; 560 } 561 if (index1 >= 0) { 562 return -1; 563 } 564 if (index2 >= 0) { 565 return 1; 566 } 567 return 0; 568 } 569 } 570 571 static Schema.Entry avoidCollision(final Schema.Entry newEntry, 572 final Supplier<Stream<Schema.Entry>> allEntriesSupplier, final BiConsumer<String, Entry> replaceFunction) { 573 final Optional<Entry> collisionedEntry = allEntriesSupplier // 574 .get() // 575 .filter((final Entry field) -> field.getName().equals(newEntry.getName()) 576 && !Objects.equals(field, newEntry)) // 577 .findFirst(); 578 if (!collisionedEntry.isPresent()) { 579 // No collision, return new entry. 580 return newEntry; 581 } 582 final Entry matchedEntry = collisionedEntry.get(); 583 final boolean matchedToChange = matchedEntry.getRawName() != null && !(matchedEntry.getRawName().isEmpty()); 584 if (matchedToChange) { 585 // the rename has to be applied on entry already inside schema, so replace. 586 replaceFunction.accept(matchedEntry.getName(), newEntry); 587 } else if (newEntry.getRawName() == null || newEntry.getRawName().isEmpty()) { 588 // try to add exactly same raw, skip the add here. 589 return null; 590 } 591 final Entry fieldToChange = matchedToChange ? matchedEntry : newEntry; 592 int indexForAnticollision = 1; 593 final String baseName = Schema.sanitizeConnectionName(fieldToChange.getRawName()); // recalc primiti name. 594 595 String newName = baseName + "_" + indexForAnticollision; 596 final Set<String> existingNames = allEntriesSupplier // 597 .get() // 598 .map(Entry::getName) // 599 .collect(Collectors.toSet()); 600 while (existingNames.contains(newName)) { 601 indexForAnticollision++; 602 newName = baseName + "_" + indexForAnticollision; 603 } 604 final Entry newFieldToAdd = fieldToChange.toBuilder().withName(newName).build(); 605 606 return newFieldToAdd; // matchedToChange ? newFieldToAdd : newEntry; 607 } 608}