ããã°ããŒã¿ã¢ããªã±ãŒã·ã§ã³ã¯ãå€ãã®å Žåãªã¢ã«ã¿ã€ã ã§å€§éã®æ å ±ãåŠçããŸããåœç¶ããã®ãããªã¢ããªã±ãŒã·ã§ã³ã¯ãã³ãŒãã®ãšã©ãŒãããŒã¿åŠçã«å¹²æžããªãããã«ãä¿¡é Œæ§ãé«ãå¿ èŠããããŸããé«ãä¿¡é Œæ§ãå®çŸããã«ã¯ããã®åéã§éçºããããããžã§ã¯ãã®ã³ãŒãã®åè³ªãæ³šææ·±ãç£èŠããå¿ èŠããããŸãã PVS-Studioéçã¢ãã©ã€ã¶ãŒã¯ãã®åé¡ã«å¯ŸåŠããŸãã仿¥ãããã°ããŒã¿ãœãããŠã§ã¢åžå Žã®ãªãŒããŒã®1ã€ã§ããApache SoftwareFoundationã«ãã£ãŠéçºãããApacheFlinkãããžã§ã¯ãããã¢ãã©ã€ã¶ãŒã®ãã¹ã察象ãšããŠéžã°ããŸããã
Apache Flinkãšã¯äœã§ããïŒããã¯ã倧éã®ããŒã¿ã忣åŠçããããã®ãªãŒãã³ãœãŒã¹ãã¬ãŒã ã¯ãŒã¯ã§ããããã¯ã2010幎ã«ãã«ãªã³å·¥ç§å€§åŠã§HadoopMapReduceã®ä»£æ¿ãšããŠéçºãããŸããããã®ãã¬ãŒã ã¯ãŒã¯ã¯ããããããã³ã¹ããªãŒã ããŒã¿åŠçã¢ããªã±ãŒã·ã§ã³çšã®åæ£å®è¡ãšã³ãžã³ã«åºã¥ããŠããŸãããã®ãšã³ãžã³ã¯JavaãšScalaã§æžãããŠããŸããçŸåšãApache Flinkã¯ãJavaãScalaãPythonãããã«ã¯SQLã䜿çšããŠäœæããããããžã§ã¯ãã§äœ¿çšã§ããŸãã
ãããžã§ã¯ãåæ
ãããžã§ã¯ãã®ãœãŒã¹ã³ãŒããããŠã³ããŒãããåŸãGitHubã®æé ã§æå®ãããã³ãã³ããmvncleanpackage-DskipTestsãã䜿çšããŠãããžã§ã¯ãã®ãã«ããéå§ããŸãã ãã¢ã»ã³ããªã®é²è¡äžã«ãCLOCãŠãŒãã£ãªãã£ã䜿çšããŠã ãããžã§ã¯ãã«10838åã®Javaãã¡ã€ã«ããããçŽ130äžè¡ã®ã³ãŒããããããšãããããŸãããããã«ããã§ã«3833åã®ãã¹ãJavaãã¡ã€ã«ããããããã¯ãã¹ãŠã®Javaãã¡ã€ã«ã®1/3以äžã§ãããŸãããããžã§ã¯ãã§FindBugséçã³ãŒãã¢ãã©ã€ã¶ãŒãšããã¹ãã«ããã³ãŒãã«ãã¬ããžã«é¢ããæ å ±ãæäŸããCoberturaãŠãŒãã£ãªãã£ã䜿çšãããŠããããšã«ãæ°ã¥ããŸãããããããã¹ãŠã念é ã«çœ®ããŠãApacheFlinkéçºè ãéçºäžã®ã³ãŒãå質ãšãã¹ãã«ãã¬ããžã«æ³šæãæã£ãŠããããšãæããã«ãªããŸããã
ãã«ããæåããåŸãIntelliJ IDEAã§ãããžã§ã¯ããéããPVS-Studio forIDEAãšAndroidStudioãã©ã°ã€ã³ã䜿çšããŠåæãéå§ããŸãã ãã¢ãã©ã€ã¶ãŒã®èŠåã¯æ¬¡ã®ããã«é åžãããŸããã
- 183é«;
- 759äž;
- 545äœã
PVS-Studioã¢ãã©ã€ã¶ãŒããªã¬ãŒã®çŽ2/3ããã¹ããã¡ã€ã«ã«å²ãåœãŠãããŸããããã®äºå®ãšãããžã§ã¯ãã®ã³ãŒãããŒã¹ã®ãµã€ãºãèæ ®ãããšãApacheFlinkéçºè ã¯ã³ãŒãå質ãæé«ã®ç¶æ ã«ä¿ã€ããšãã§ãããšèšããŸãã
ã¢ãã©ã€ã¶ãŒã®èŠåããã詳现ã«èª¿æ»ããã®ã§ãç§ã¯èªåã®æèŠã§æãè峿·±ããã®ãéžæããŸãããããã§ã¯ãPVS-Studioããã®ãããžã§ã¯ãã§äœãèŠã€ããã®ãèŠãŠã¿ãŸãããïŒ
ã¡ãã£ãšäžæ³šæ
V6001ã==ãæŒç®åã®å·ŠåŽãšå³åŽã«åäžã®ãµãåŒãprocessedDataãããããŸãã CheckpointStatistics.javaïŒ229ïŒ
@Override
public boolean equals(Object o)
{
....
CheckpointStatistics that = (CheckpointStatistics) o;
return id == that.id &&
savepoint == that.savepoint &&
triggerTimestamp == that.triggerTimestamp &&
latestAckTimestamp == that.latestAckTimestamp &&
stateSize == that.stateSize &&
duration == that.duration &&
alignmentBuffered == that.alignmentBuffered &&
processedData == processedData && // <=
persistedData == that.persistedData &&
numSubtasks == that.numSubtasks &&
numAckSubtasks == that.numAckSubtasks &&
status == that.status &&
Objects.equals(checkpointType, that.checkpointType) &&
Objects.equals(
checkpointStatisticsPerTask,
that.checkpointStatisticsPerTask);
}
èŠè¿ãã«ä»ã®è¡šçŸã®èæ¯ã«å¯Ÿã㊠ããã®ãšã©ãŒã¯ããã»ã©ç®ç«ã£ããã®ã§ã¯ãããŸãããCheckpointStatisticsã¯ã©ã¹ã®equalsã¡ãœããããªãŒããŒã©ã€ããã ãšã ããã°ã©ããŒã¯processedData == processedDataåŒã§ãšã©ãŒãçºçãããŸãã ãããã¯åžžã«trueã§ããããæå³ããããŸãããåæ§ã«ãè¿ãããåŒã®æ®ãã®éšåã¯ã çŸåšã®ãªããžã§ã¯ãthisãšobject ThatïŒ processedData == that.processedDataã®ãã£ãŒã«ããšæ¯èŒãããŸãã ..ããã®ç¶æ³ã¯ãæ¯èŒé¢æ°ã«èŠãããå žåçãªãšã©ãŒãã¿ãŒã³ã®1ã€ã§ããããæªã¯æ¯èŒé¢æ°ã«äœãã§ããŸããã®èšäºã§è©³ãã説æãããŠããŸã ããã®ããããå°ãã®äžæ³šæãã ãã§ãCheckpointStatisticsã¯ã©ã¹ã®ãªããžã§ã¯ãã®åçæ§ããã§ãã¯ããããã®ããžãã¯ãå£ããããšã ããããŸããã
åŒã¯åžžã«çã§ã
V6007åŒ 'input2.length> 0'ã¯åžžã«çã§ããOperator.javaïŒ283ïŒ
public static <T> Operator<T> createUnionCascade(Operator<T> input1,
Operator<T>... input2)
{
if (input2 == null || input2.length == 0)
{
return input1; // <=
}
else if (input2.length == 1 && input1 == null)
{
return input2[0];
}
....
if (input1 != null)
{
....
}
else if (input2.length > 0 && input2[0] != null) // <=
{
....
}
else
{
....
}
}
ãã®æ¹æ³ã§ã¯ãã¢ãã©ã€ã¶ãŒã¯äººãããæ³šææ·±ãããšã倿ããç¬èªã®æ¹æ³ã§å ±åããããšã決å®ããŸãããããã¯ãåŒ input2.length> 0ãåžžã«çã§ããããšã瀺ããŠããŸãããã®çç±ã¯ãinput2é åã®é·ã ã0ã®å Žåãæ¡ä»¶ input2 == null ||ã§ããããã§ããã¡ãœããå ã®æåã® ifã®input2.length == 0ãtrueã«ãªããåŒinput2.length> 0ã®è¡ã«å°éããåã«ãã¡ãœããã®å®è¡ãäžæãã ãŸãã
ãªãŒã«ã·ãŒã€ã³ã°ã¢ãã©ã€ã¶ãŒ
V6007åŒ 'slotSharingGroup == null'ã¯åžžã«falseã§ããStreamGraphGenerator.javaïŒ510ïŒ
private <T> Collection<Integer> transformFeedback(....)
{
....
String slotSharingGroup = determineSlotSharingGroup(null, allFeedbackIds);
if (slotSharingGroup == null)
{
slotSharingGroup = "SlotSharingGroup-" + iterate.getId();
}
....
}
ã¢ãã©ã€ã¶ãŒã¯ãslotSharingGroup == nullãåžžã«falseã§ãããšå ±åã ãŸãããããã¯ãdetermineSlotSharingGroupã¡ãœãã ãnullãè¿ããªãããšã瀺ããŠã ãŸããã¢ãã©ã€ã¶ãŒã¯ããã®ã¡ãœãããè¿ãããšãã§ãããã¹ãŠã®å€ãèšç®ããããšãã§ããã»ã©ã¹ããŒãã§ããïŒãã¹ãŠãèªåã§ãã§ãã¯ããŠã¿ãŸãããã
public class StreamGraphGenerator
{
....
public static final String DEFAULT_SLOT_SHARING_GROUP = "default";
....
private String determineSlotSharingGroup(String specifiedGroup,
Collection<Integer> inputIds)
{
if (specifiedGroup != null)
{
return specifiedGroup; // <= 1
}
else
{
String inputGroup = null;
for (int id: inputIds)
{
String inputGroupCandidate = streamGraph.getSlotSharingGroup(id);
if (inputGroup == null)
{
inputGroup = inputGroupCandidate;
}
else if (!inputGroup.equals(inputGroupCandidate))
{
return DEFAULT_SLOT_SHARING_GROUP; // <= 2
}
}
return inputGroup == null
? DEFAULT_SLOT_SHARING_GROUP
: inputGroup; // <= 3
}
}
....
}
ãã¹ãŠã®ãªã¿ãŒã³ãééããé åºã§ã ãã®ã¡ãœãããåãæ»ãããšãã§ãããã®ã確èªããŸãã
- æåã®æ»ãå€ã¯ãæå®ãããGroupã¡ãœããã«åŒæ°ãè¿ããŸãããnullã§ãªãå Žåã«éããŸãã
- return for DEFAULT_SLOT_SHARING_GROUP, ;
- return inputGroup, null. DEFAULT_SLOT_SHARING_GROUP.
ã¢ãã©ã€ã¶ãŒã¯ãdetermineSlotSharingGroupã¡ãœãããã nullãè¿ãããšã®äžå¯èœæ§ãå®éã«èšç®ã§ãã ããã«ã€ããŠèŠåããslotSharingGroup == nullãã§ãã¯ãç¡æå³ã§ããããšã瀺ããŠããããšã ããããŸããããã®ç¶æ³ã¯èª€ãã§ã¯ãããŸããããã¢ãã©ã€ã¶ãŒã®ãã®ãããªè¿œå ã®ä¿è·ã«ãããä»ã®å Žåã«ãšã©ãŒãæ€åºã§ããŸããããšãã°ãç¹å®ã®æ¡ä»¶äžã§nullãè¿ãã¡ãœãããå¿ èŠãªå Žåã§ã ã
ãããããã¹ãŠéãã
V6007åŒ ' currentCount <= lastEnd'ã¯åžžã«trueã§ãã CountSlidingWindowAssigner.javaïŒ75ïŒ
V6007åŒ 'lastStart <= currentCount'ã¯åžžã«trueã§ãã CountSlidingWindowAssigner.javaïŒ75ïŒ
@Override
public Collection<CountWindow> assignWindows(....) throws IOException
{
Long countValue = count.value();
long currentCount = countValue == null ? 0L : countValue;
count.update(currentCount + 1);
long lastId = currentCount / windowSlide;
long lastStart = lastId * windowSlide;
long lastEnd = lastStart + windowSize - 1;
List<CountWindow> windows = new ArrayList<>();
while (lastId >= 0 &&
lastStart <= currentCount &&
currentCount <= lastEnd)
{
if (lastStart <= currentCount && currentCount <= lastEnd) // <=
{
windows.add(new CountWindow(lastId));
}
lastId--;
lastStart -= windowSlide;
lastEnd -= windowSlide;
}
return windows;
}
ã¢ãã©ã€ã¶ãŒã¯ãcurrentCount <= lastEndããã³ lastStart <= currentCountãšããåŒãåžžã«trueã§ããããšãèŠåã ãŸããå®éãwhileã«ãŒãã®æ¡ä»¶ãèŠããšã ãŸã£ããåãåŒããããŸããã€ãŸããã«ãŒãå ã§ã¯ãããã®åŒã¯åžžã«trueã«ãªããããã«ãŒãå ã§äœæãããCountWindowã¿ã€ãã®ãã¹ãŠã®ãªããžã§ã¯ãããŠã£ã³ããŠãªã¹ãã«è¿œå ã ã ãŸãããã®ç¡æå³ãªãã§ãã¯ã®å€èгã«ã¯å€ãã®ãªãã·ã§ã³ããããæåã«é ã«æµ®ãã¶ã®ã¯ããªãã¡ã¯ã¿ãªã³ã°ã¢ãŒãã£ãã¡ã¯ããŸãã¯éçºè ã®å®å¿æã§ããããããäœãä»ã®ãã®ããã§ãã¯ãããã®ã§ããã°ãããã¯ééããããããŸãã...
åŒæ°ã®é åºãæ£ãããããŸãã
V6029ã¡ãœããã«æž¡ãããåŒæ°ã®é åºãæ£ãããªãå¯èœæ§ããããŸãïŒ 'hasBufferForReleasedChannel'ã 'hasBufferForRemovedChannel'ãNettyMessageClientDecoderDelegateTest.javaïŒ165ïŒãNettyMessageClientDecoderDelegateTest.javaïŒ166ïŒ
private void testNettyMessageClientDecoding(
boolean hasEmptyBuffer,
boolean hasBufferForReleasedChannel,
boolean hasBufferForRemovedChannel) throws Exception
{
....
List<BufferResponse> messages = createMessageList (
hasEmptyBuffer,
hasBufferForReleasedChannel,
hasBufferForRemovedChannel);
....
}
ååä»ããã©ã¡ãŒã¿ãŒã䜿çšããŠã¡ãœãããåŒã³åºãæ©èœãJavaã«ãªããããéçºè ã¯æ®é ·ãªåè«ãèšãããšããããŸããããã¯ãã¢ãã©ã€ã¶ãŒãcreateMessageListã¡ãœããããã€ã³ããããšãã«èµ·ãã£ãããšãšãŸã£ããåã ã§ãããã®ã¡ãœããã®å®çŸ©ãèŠããšãhasBufferForReleasedChannelãã©ã¡ãŒã¿ãŒã®åã«hasBufferForRemovedChannelãã©ã¡ãŒã¿ãŒãã¡ãœããã«æž¡ãå¿ èŠãããããšãæããã«ãªã ãŸã ã
private List<BufferResponse> createMessageList(
boolean hasEmptyBuffer,
boolean hasBufferForRemovedChannel,
boolean hasBufferForReleasedChannel)
{
....
if (hasBufferForReleasedChannel) {
addBufferResponse(messages,
releasedInputChannelId,
Buffer.DataType.DATA_BUFFER,
BUFFER_SIZE,
seqNumber++);
}
if (hasBufferForRemovedChannel) {
addBufferResponse(messages,
new InputChannelID(),
Buffer.DataType.DATA_BUFFER,
BUFFER_SIZE,
seqNumber++);
}
....
return messages;
}
ãã ããã¡ãœãããåŒã³åºããšãã«ãéçºè ã¯ãããã®åŒæ°ã®é åºãæ··åããŠãããããæ··åãããåŒæ°ã®å€ãç°ãªãå ŽåãcreateMessageListã¡ãœããã®ããžã㯠ãå£ããŸãã
ããããã®ã³ããŒïŒããŒã¹ã
V6032ã¡ãœãããseekToFirstãã®æ¬äœãå¥ã®ã¡ãœãããseekToLastãã®æ¬äœãšå®å šã«åçã§ããã®ã¯å¥åŠã§ããRocksIteratorWrapper.javaïŒ53ïŒãRocksIteratorWrapper.javaïŒ59ïŒ
public class RocksIteratorWrapper implements RocksIteratorInterface, Closeable {
....
private RocksIterator iterator;
....
@Override
public void seekToFirst() {
iterator.seekToFirst(); // <=
status();
}
@Override
public void seekToLast() {
iterator.seekToFirst(); // <=
status();
}
....
}
éºäœ seekToFirstãš seekToLastæ¹æ³ãããåããããã«ãäž¡æ¹ã®ã¡ãœãããã³ãŒãã§äœ¿çšãããŠããŸãã
ããã§äœããæ±ããŠããŸãïŒå®éãã€ãã¬ãŒã¿ãŒãªããžã§ã¯ãã®ã¡ãœãããèŠããš ãã¢ãã©ã€ã¶ãŒãã©ã®ãšã©ãŒãèŠã€ããã®ã«åœ¹ç«ã£ãããæããã«ãªããŸãã
public class RocksIterator extends AbstractRocksIterator<RocksDB>
{
....
}
public abstract class AbstractRocksIterator<....> extends ....
{
....
public void seekToFirst() // <=
{
assert this.isOwningHandle();
this.seekToFirst0(this.nativeHandle_);
}
public void seekToLast() // <=
{
assert this.isOwningHandle();
this.seekToLast0(this.nativeHandle_);
}
....
}
ã¡ãœãã seekToLastã¯ã©ã¹ RocksIteratorWrapperã¯ãåãã¯ã©ã¹ã®ã³ããŒã¢ã³ãããŒã¹ãã¡ãœããseekToFirstã«ãã£ãŠäœæãããããšã ããããŸããããã ããäœããã®çç±ã§ãéçºè ã¯ã€ãã¬ãŒã¿ã® seekToFirstã¡ãœããåŒã³åºããseekToLastã«çœ®ãæããã®ãå¿ããŠããŸã ã ã
ãã©ãŒãããæååãšã®æ··å
V6046ãã©ãŒããããæ£ãããããŸãããç°ãªãæ°ã®ãã©ãŒãããã¢ã€ãã ãäºæ³ãããŸãã䜿çšãããŠããªãåŒæ°ïŒ1ãUnsignedTypeConversionITCase.javaïŒ102ïŒ
public static void prepareMariaDB() throws IllegalStateException {
....
if (!initDbSuccess) {
throw new IllegalStateException(
String.format(
"Initialize MySQL database instance failed after {} attempts," + // <=
" please open an issue.", INITIALIZE_DB_MAX_RETRY));
}
}
String.formatã¡ãœãããšJavaãã¬ãŒã®ãã©ãŒãããæåå ã¯ç°ãªããŸããåŒæ°ã®çœ®æããïŒ ãæåã䜿çšããŠæå®ãããString.formatã¡ãœããã®ãã©ãŒãããæååãšã¯ç°ãªãã ãã¬ãŒãã©ãŒãããæååã¯ä»£ããã«ã{}ãæåã®çµã¿åããã䜿çšããŸãããã®æ··ä¹±ã®ããããã®ãšã©ãŒãçºçããŸããããã©ãŒãããæååãšããŠãæååã¯String.formatã¡ãœããã«æž¡ãããŸãããã®ã¡ãœãã㯠ããã¬ãŒã§äœ¿çšãããŠããå¥ã®å Žæããã³ããŒãããå¯èœæ§ããããŸãããã®çµæãINITIALIZE_DB_MAX_RETRYãã£ãŒã«ãå€ã¯IllegalStateExceptionã¡ãã»ãŒãžã§çœ®ãæããããŸãã ã '{}'ã®ä»£ããã«ããã®äŸå€ããã£ãããŸãã¯ãã°ã«èšé²ãã人ã¯ãããŒã¿ããŒã¹ãžã®æ¥ç¶ãäœå詊è¡ãããããç¥ãããšã¯ã§ããŸããã
ç°åžžãªååž
V6048ãã®åŒã¯ç°¡ç¥åã§ããŸããæäœã®Operand'index 'ã¯0ã«çãããCollectionUtil.javaïŒ76ïŒ
public static <T> Collection<List<T>> partition(Collection<T> elements,
int numBuckets)
{
Map<Integer, List<T>> buckets = new HashMap<>(numBuckets);
int initialCapacity = elements.size() / numBuckets;
int index = 0;
for (T element : elements)
{
int bucket = index % numBuckets; // <=
buckets.computeIfAbsent(bucket,
key -> new ArrayList<>(initialCapacity))
.add(element);
}
return buckets.values();
}
ããŒãã£ã·ã§ã³ã¡ãœãã ã¯ãèŠçŽ ã³ã¬ã¯ã·ã§ã³ã®èŠçŽ ãè€æ°ã®ã»ã°ã¡ã³ãã«åå²ããŠããã ãããã®ã»ã°ã¡ã³ããè¿ããŸãããã ããã¢ãã©ã€ã¶ãŒã§ææããããšã©ãŒã®ãããåé¢ã¯çºçããŸãããã€ã³ããã¯ã¹ã¯åžžã«0ã§ãããã ãã»ã°ã¡ã³ãçªå·indexïŒ numBucketsãæ±ºå®ããããã«äœ¿çšãããåŒ ã¯ãåžžã«0ã«ãªããŸããåœåããã®ã¡ãœããã®ã³ãŒãããªãã¡ã¯ã¿ãªã³ã°ããããšèããŠããŸããããã®çµæãforã«ãŒãã« ã€ã³ããã¯ã¹å€æ°ã®å¢åã远å ããã®ãå¿ããŠããŸãã ãããããã³ããããèŠãŠ ãã®ã¡ãœããã远å ãããå Žåããã®ãšã©ãŒã¯ãã®ã¡ãœããã«ä»éããããšã倿ããŸãããã³ãŒãã®ä¿®æ£ããŒãžã§ã³ïŒ
public static <T> Collection<List<T>> partition(Collection<T> elements,
int numBuckets)
{
Map<Integer, List<T>> buckets = new HashMap<>(numBuckets);
int initialCapacity = elements.size() / numBuckets;
int index = 0;
for (T element : elements)
{
int bucket = index % numBuckets;
buckets.computeIfAbsent(bucket,
key -> new ArrayList<>(initialCapacity))
.add(element);
index++;
}
return buckets.values();
}
äºææ§ã®ãªãã¿ã€ã
V6066åŒæ°ãšããŠæž¡ããããªããžã§ã¯ãã®ã¿ã€ãã¯ãã³ã¬ã¯ã·ã§ã³ã®ã¿ã€ããšäºææ§ããããŸããïŒStringãListStateDescriptor <NextTransactionalIdHint>ãFlinkKafkaProducer.javaïŒ1083ïŒ
public interface OperatorStateStore
{
Set<String> getRegisteredStateNames();
}
public class FlinkKafkaProducer<IN> extends ....
{
....
private static final
ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>
NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR = ....;
@Override
public void initializeState(FunctionInitializationContext context)....
{
....
if (context.getOperatorStateStore()
.getRegisteredStateNames()
.contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR)) // <=
{
migrateNextTransactionalIdHindState(context);
}
....
}
}
ã¢ãã©ã€ã¶ãŒãæãåŒã¯åžžã«falseã«ãªããŸããã€ãŸããmigrateNextTransactionalIdHindStateã¡ãœããã®åŒã³åºã ã¯çºçããŸãããSet <String> --ListStateDescriptor <FlinkKafkaProducer.NextTransactionalIdHint>ã¿ã€ãã®ã³ã¬ã¯ã·ã§ã³ã§ããŸã£ããç°ãªãã¿ã€ãã®èŠçŽ ã誰ããæ¢ããŠããã®ã¯ ã©ãããŠã§ããïŒã¢ãã©ã€ã¶ãŒã®å©ãããªããã°ããã®ãããªãšã©ãŒã¯éåžžã«é·ãéã³ãŒãã«ååšããŠããå¯èœæ§ããããŸããããã¯ãç®ã«å ¥ã£ãããšããªãããã®ã¡ãœããã®åŸ¹åºçãªãã¹ããªãã§ã¯èŠã€ããããšãã§ããªãããã§ãã
éåå倿°ã®å€å
V6074æ®çºæ§å€æ°ã®éåå修食ã'currentNumAcknowledgedSubtasks'ãæ€æ»ããŸããä¿çäžã®ãã§ãã¯ãã€ã³ãçµ±èš.javaïŒ131ïŒ
boolean reportSubtaskStats(JobVertexID jobVertexId, SubtaskStateStats subtask) {
TaskStateStats taskStateStats = taskStats.get(jobVertexId);
if (taskStateStats != null && taskStateStats.reportSubtaskStats(subtask)) {
currentNumAcknowledgedSubtasks++; // <=
latestAcknowledgedSubtask = subtask;
currentStateSize += subtask.getStateSize(); // <=
long processedData = subtask.getProcessedData();
if (processedData > 0) {
currentProcessedData += processedData; // <=
}
long persistedData = subtask.getPersistedData();
if (persistedData > 0) {
currentPersistedData += persistedData; // <=
}
return true;
} else {
return false;
}
}
ããã«ãåãæ¹æ³ã§ããã«3ã€ã®ã¢ãã©ã€ã¶ãŒèŠåïŒ
- V6074æ®çºæ§å€æ°ã®éåå修食ã'currentStateSize'ãæ€æ»ããŸããBookingCheckpointStats.javaïŒ134ïŒ
- V6074æ®çºæ§å€æ°ã®éåå修食ã'currentProcessedData'ãæ€æ»ããŸããä¿çäžã®ãã§ãã¯ãã€ã³ãçµ±èš.javaïŒ138ïŒ
- V6074æ®çºæ§å€æ°ã®éåå修食ã'currentPersistedData'ãæ€æ»ããŸããBookingCheckpointStats.javaïŒ143ïŒ
ã¢ãã©ã€ã¶ãŒã¯ãã¡ãœããå ã®4ã€ã®æ®çºæ§ãã£ãŒã«ããéã¢ãããã¯ã«å€åããããšã瀺åããŸãã ããããŠããã€ãã®ããã«ãã¢ãã©ã€ã¶ãŒã¯æ£ããããšãããããŸãããªããªãã++ããã³ + =æäœã¯ ãå®éã«ã¯ãããã€ãã®èªã¿åã-倿Ž-æžãèŸŒã¿æäœã®ã·ãŒã±ã³ã¹ã ããã§ãããåç¥ã®ããã«ããã£ãŒã«ãã®æ®çºæ§ã®å€ã¯ ãã¹ãŠã®ã¹ã¬ããã«è¡šç€ºãããŸããã€ãŸããã¬ãŒã¹ã®ç¶æ ã«ããããã£ãŒã«ãã®å€æŽã®äžéšã倱ãããå¯èœæ§ããããŸããããã«é¢ãã詳现æ å ±ã¯ã蚺æã®èª¬æã§èªãããšãã§ããŸãã
çµè«
ããã°ããŒã¿ãããžã§ã¯ãã§ã¯ãä¿¡é Œæ§ãéèŠãªèŠä»¶ã®1ã€ã§ããããããããžã§ã¯ãå ã®ã³ãŒãã®åè³ªãæ³šææ·±ãç£èŠããå¿ èŠããããŸãã Apache Flinkã®éçºè ã¯ãããã€ãã®ããŒã«ã«ãã£ãŠãããæ¯æŽãããããªãã®æ°ã®ãã¹ããäœæããŸããããã ãããã®ãããªç¶æ³ã§ããPVS-Studioã¢ãã©ã€ã¶ãŒã¯ãšã©ãŒãæ€åºã§ããŸããããšã©ãŒãå®å šã«æé€ããããšã¯äžå¯èœã§ãããããŸããŸãªéçã³ãŒãåæããŒã«ã宿çã«äœ¿çšããããšã§ããã®çæ³ã«è¿ã¥ãããšãã§ããŸããã¯ããæ£ç¢ºã«å®æçã«ãéçåæã¯ã宿çã«äœ¿çšããå Žåã«ã®ã¿ãã®æå¹æ§ã瀺ããŸããããã«ã€ããŠã¯ããã®èšäºã§è©³ãã説æã ãŸãã
ãã®èšäºãè±èªã話ãèŽè¡ãšå ±æãããå Žåã¯ã翻蚳ãªã³ã¯ValeryKomarovã䜿çšããŠãã ããã ããã°/ãã°ããŒã¿ïŒApacheFlinkãœãŒã¹ã³ãŒãã®åæã