Skip to content

Commit

Permalink
Allow project to build with Java 17 and 21
Browse files Browse the repository at this point in the history
The formatting changes and a result of the new version
of Google Java Format being used, which in turn
is needed for spotless to build on Java 17 and 21.

This change makes no guarantee that project actually
runs on 17 or 21, so it's only a very small first step
  • Loading branch information
geoand committed Dec 4, 2024
1 parent 2f33802 commit ecdd129
Show file tree
Hide file tree
Showing 42 changed files with 114 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
class BucketScanStatus {
private long offset; // last consumed position
private long highWatermark; // the high watermark from last fetch

// TODO add resetStrategy and nextAllowedRetryTimeMs.

public BucketScanStatus() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,7 @@ private void insertInSequenceOrder(Deque<WriteBatch> deque, WriteBatch batch) {
public static final class RecordAppendResult {
public final boolean batchIsFull;
public final boolean newBatchCreated;

/** Whether this record was abort because the new batch created in record accumulator. */
public final boolean abortRecordForNewBatch;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class WriterClient {
private static final Logger LOG = LoggerFactory.getLogger(WriterClient.class);

public static final String SENDER_THREAD_PREFIX = "fluss-write-sender";

/**
* {@link ConfigOptions#CLIENT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET} should be less than or
* equal to this value when idempotence producer enabled to ensure message ordering.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,9 @@ void testLimitScanPrimaryTable() throws Exception {
expectedRows.set(i, new Object[] {expectedRows.get(i)[1]});
}
actualRows =
table.limitScan(new TableBucket(tableId, 0), limitSize, projectedFields).get()
table
.limitScan(new TableBucket(tableId, 0), limitSize, projectedFields)
.get()
.stream()
.map(ScanRecord::getRow)
.collect(Collectors.toList());
Expand Down Expand Up @@ -298,7 +300,9 @@ void testLimitScanLogTable() throws Exception {
expectedRows.set(i, new Object[] {expectedRows.get(i)[1]});
}
actualRows =
table.limitScan(new TableBucket(tableId, 0), limitSize, projectedFields).get()
table
.limitScan(new TableBucket(tableId, 0), limitSize, projectedFields)
.get()
.stream()
.map(ScanRecord::getRow)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ public ServerNode getCoordinatorServer() {
return coordinatorServer;
}

/** @return The known set of alive tablet servers. */
/**
* @return The known set of alive tablet servers.
*/
public Map<Integer, ServerNode> getAliveTabletServers() {
return aliveTabletServersById;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ public interface MemorySegmentPool extends MemorySegmentSource {
*/
void returnAll(List<MemorySegment> memory);

/** @return Free page number. */
/**
* @return Free page number.
*/
int freePages();

void close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,16 @@ public class MeterView implements Meter, MetricView {

/** The underlying counter maintaining the count. */
private final Counter counter;

/** The time-span over which the average is calculated. */
private final int timeSpanInSeconds;

/** Circular array containing the history of values. */
private final long[] values;

/** The index in the array for the current time. */
private int time = 0;

/** The last rate we computed. */
private double currentRate = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,9 @@ public void append(short schemaId, BinaryRow row) throws IOException {
currentRecordNumber++;
}

/** @param valueBytes consisted of schema id and the row encoded in the value bytes */
/**
* @param valueBytes consisted of schema id and the row encoded in the value bytes
*/
public void append(byte[] valueBytes) throws IOException {
if (isClosed) {
throw new IllegalStateException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ public class SendWritableOutput extends ByteBufWritableOutput {
/** The current reader index of the underlying {@link #buf} for building next {@link Send}. */
private int currentReaderIndex = 0;

/** @param buf The ByteBuf that has capacity of data size excluding zero-copy. */
/**
* @param buf The ByteBuf that has capacity of data size excluding zero-copy.
*/
public SendWritableOutput(ByteBuf buf) {
super(buf);
this.sends = new ArrayDeque<>(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,16 @@ public UUID remoteLogSegmentId() {
return remoteLogSegmentId;
}

/** @return Remote log start offset of this segment (inclusive). */
/**
* @return Remote log start offset of this segment (inclusive).
*/
public long remoteLogStartOffset() {
return remoteLogStartOffset;
}

/** @return Remote log end offset of this segment (inclusive). */
/**
* @return Remote log end offset of this segment (inclusive).
*/
public long remoteLogEndOffset() {
return remoteLogEndOffset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public static void checkArgument(
throw new IllegalArgumentException(format(errorMessageTemplate, errorMessageArgs));
}
}

// ------------------------------------------------------------------------
// Boolean Condition Checking (State)
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@
public class Projection {
/** the projection indexes including both selected fields and reordering them. */
final int[] projection;

/** the projection indexes that only select fields but not reordering them. */
final int[] projectionInOrder;

/** the indexes to reorder the fields of {@link #projectionInOrder} to {@link #projection}. */
final int[] reorderingIndexes;

/** the flag to indicate whether reordering is needed. */
final boolean reorderingNeeded;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,16 @@ public static <L, R> Either<L, R> right(R value) {
*/
public abstract R right() throws IllegalStateException;

/** @return true if this is a Left value, false if this is a Right value */
/**
* @return true if this is a Left value, false if this is a Right value
*/
public final boolean isLeft() {
return getClass() == Left.class;
}

/** @return true if this is a Right value, false if this is a Left value */
/**
* @return true if this is a Right value, false if this is a Left value
*/
public final boolean isRight() {
return getClass() == Right.class;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class Tuple2<T0, T1> extends Tuple {

/** Field 0 of the tuple. */
public T0 f0;

/** Field 1 of the tuple. */
public T1 f1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,10 @@ public List<CatalogPartitionSpec> listPartitions(ObjectPath objectPath)
@Override
public List<CatalogPartitionSpec> listPartitions(
ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec)
throws TableNotExistException, TableNotPartitionedException,
PartitionSpecInvalidException, CatalogException {
throws TableNotExistException,
TableNotPartitionedException,
PartitionSpecInvalidException,
CatalogException {
throw new UnsupportedOperationException();
}

Expand Down Expand Up @@ -397,8 +399,10 @@ public void createPartition(
CatalogPartitionSpec catalogPartitionSpec,
CatalogPartition catalogPartition,
boolean b)
throws TableNotExistException, TableNotPartitionedException,
PartitionSpecInvalidException, PartitionAlreadyExistsException,
throws TableNotExistException,
TableNotPartitionedException,
PartitionSpecInvalidException,
PartitionAlreadyExistsException,
CatalogException {
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
public class PaimonSnapshotSplitState extends SourceSplitState {

private final PaimonSnapshotSplit paimonSnapshotSplit;

/** The records to skip while reading a snapshot. */
private long recordsToSkip;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class FlinkRecordsWithSplitIds implements RecordsWithSplitIds<RecordAndPo

/** SplitId iterator. */
private final Iterator<String> splitIterator;

/** The table buckets of the split in splitIterator. */
private final Iterator<TableBucket> tableBucketIterator;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class HybridSnapshotLogSplitState extends SourceSplitState {

/** The records to skip while reading a snapshot. */
private long recordsToSkip;

/** Whether the snapshot reading is finished. */
private boolean snapshotFinished;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ public static long countLogTable(TablePath tablePath, Configuration flussConfig)
throw new FlussRuntimeException(e);
}
}

// ------------------------------------------------------------------------------------------

/** A structure represents a source field equal literal expression. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -875,12 +875,16 @@ public static class ListAllMyBucketsHandler extends AbstractHandler {

private Bucket currentBucket = null;

/** @return the buckets listed in the document. */
/**
* @return the buckets listed in the document.
*/
public List<Bucket> getBuckets() {
return buckets;
}

/** @return the owner of the buckets. */
/**
* @return the owner of the buckets.
*/
public Owner getOwner() {
return bucketsOwner;
}
Expand Down Expand Up @@ -937,7 +941,9 @@ public static class AccessControlListHandler extends AbstractHandler {
private Grantee currentGrantee = null;
private Permission currentPermission = null;

/** @return an object representing the ACL document. */
/**
* @return an object representing the ACL document.
*/
public AccessControlList getAccessControlList() {
return accessControlList;
}
Expand Down Expand Up @@ -1018,7 +1024,9 @@ public static class BucketLoggingConfigurationHandler extends AbstractHandler {
private final BucketLoggingConfiguration bucketLoggingConfiguration =
new BucketLoggingConfiguration();

/** @return an object representing the bucket's LoggingStatus document. */
/**
* @return an object representing the bucket's LoggingStatus document.
*/
public BucketLoggingConfiguration getBucketLoggingConfiguration() {
return bucketLoggingConfiguration;
}
Expand Down Expand Up @@ -1048,7 +1056,9 @@ public static class BucketLocationHandler extends AbstractHandler {

private String location = null;

/** @return the bucket's location. */
/**
* @return the bucket's location.
*/
public String getLocation() {
return location;
}
Expand Down Expand Up @@ -1527,6 +1537,7 @@ public static class CompleteMultipartUploadHandler extends AbstractSSEHandler
protected ServerSideEncryptionResult sseResult() {
return result;
}

/**
* @see com.amazonaws.services.s3.model.CompleteMultipartUploadResult#getExpirationTime()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class FlinkRecordsWithSplitIds implements RecordsWithSplitIds<MultiplexCd

/** SplitId iterator. */
private final Iterator<String> splitIterator;

/** The table buckets of the split in splitIterator. */
private final Iterator<TableBucket> tableBucketIterator;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ public void testBytes() throws Exception {

// test binary equals to protobuf
Bytes.B pbb =
Bytes.B
.newBuilder()
Bytes.B.newBuilder()
.setPayload(ByteString.copyFrom(new byte[] {1, 2, 3, 4, 5}))
.build();

Expand Down Expand Up @@ -254,8 +253,7 @@ public void testRepeatedBytes() throws Exception {
assertThat(lpb.getExtraItemAt(1)).isEqualTo(new byte[] {4, 5, 6, 7});

Bytes.B pbb =
Bytes.B
.newBuilder()
Bytes.B.newBuilder()
.addExtraItems(ByteString.copyFrom(new byte[] {1, 2, 3}))
.addExtraItems(ByteString.copyFrom(new byte[] {4, 5, 6, 7}))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ public void testStrings() throws Exception {
assertThat(lps.getNameAt(2)).isEqualTo("c");

Strings.S pbs =
Strings.S
.newBuilder()
Strings.S.newBuilder()
.setId("id")
.addNames("a")
.addNames("b")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
/** Utils of netty. */
public class NettyUtils {

/** @return an EventLoopGroup suitable for the current platform */
/**
* @return an EventLoopGroup suitable for the current platform
*/
public static EventLoopGroup newEventLoopGroup(int nThreads, String threadNamePrefix) {
if (Epoll.isAvailable()) {
// Regular Epoll based event loop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class KvSnapshotResource {

/** A scheduler to schedule kv snapshot. */
private final ScheduledExecutorService kvSnapshotScheduler;

/** Thread pool for async snapshot workers. */
private final ExecutorService asyncOperationsThreadPool;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class KvSnapshotHandle {

/** The shared file(like data file) handles of the kv snapshot. */
private final List<KvFileHandleAndLocalPath> sharedFileHandles;

/** The private file(like meta file) handles of the kv snapshot. */
private final List<KvFileHandleAndLocalPath> privateFileHandles;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public class PeriodicSnapshotManager implements Closeable {
private volatile boolean started = false;

private final long initialDelay;

/** The table bucket that the snapshot manager is for. */
private final TableBucket tableBucket;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ private static final class SharedKvEntry {

private final long createdBySnapshotID;
private long lastUsedSnapshotID;

/** The shared kv file handle. */
KvFileHandle kvFileHandle;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public abstract class AbstractIndex implements Closeable {

/** The maximum number of entries this index can hold. */
private volatile int maxEntries;

/** The number of entries in this index. */
private volatile int entries;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class ListOffsetsParam {
* from follower, it represents listing LocalLogStartOffset.
*/
public static final int EARLIEST_OFFSET_TYPE = 0;

/**
* Latest offset type. If the list offsets request come from client, it represents listing
* HighWatermark. otherwise, the request come from follower, it represents listing
Expand Down
Loading

0 comments on commit ecdd129

Please sign in to comment.