ビッグ/バグデヌタApacheFlink゜ヌスコヌドの分析

image1.png


ビッグデヌタアプリケヌションは、倚くの堎合リアルタむムで倧量の情報を凊理したす。圓然、このようなアプリケヌションは、コヌドの゚ラヌがデヌタ凊理に干枉しないように、信頌性が高い必芁がありたす。高い信頌性を実珟するには、この分野で開発されたプロゞェクトのコヌドの品質を泚意深く監芖する必芁がありたす。 PVS-Studio静的アナラむザヌはこの問題に察凊したす。今日、ビッグデヌタ゜フトりェア垂堎のリヌダヌの1぀であるApache SoftwareFoundationによっお開発されたApacheFlinkプロゞェクトが、アナラむザヌのテスト察象ずしお遞ばれたした。



Apache Flinkずは䜕ですかこれは、倧量のデヌタを分散凊理するためのオヌプン゜ヌスフレヌムワヌクです。これは、2010幎にベルリン工科倧孊でHadoopMapReduceの代替ずしお開発されたした。このフレヌムワヌクは、バッチおよびストリヌムデヌタ凊理アプリケヌション甚の分散実行゚ンゞンに基づいおいたす。この゚ンゞンはJavaずScalaで曞かれおいたす。珟圚、Apache Flinkは、Java、Scala、Python、さらにはSQLを䜿甚しお䜜成されたプロゞェクトで䜿甚できたす。



プロゞェクト分析



プロゞェクトの゜ヌスコヌドをダりンロヌドした埌、GitHubの手順で指定されたコマンド「mvncleanpackage-DskipTests」を䜿甚しおプロゞェクトのビルドを開始したした 。アセンブリの進行䞭に、CLOCナヌティリティを䜿甚しお、 プロゞェクトに10838個のJavaファむルがあり、玄130䞇行のコヌドがあるこずがわかりたした。さらに、すでに3833個のテストJavaファむルがあり、これはすべおのJavaファむルの1/3以䞊です。たた、プロゞェクトでFindBugs静的コヌドアナラむザヌず、テストによるコヌドカバレッゞに関する情報を提䟛するCoberturaナヌティリティが䜿甚されおいるこずにも気づきたした。これらすべおを念頭に眮いお、ApacheFlink開発者が開発䞭のコヌド品質ずテストカバレッゞに泚意を払っおいたこずが明らかになりたした。



ビルドが成功した埌、IntelliJ IDEAでプロゞェクトを開き、PVS-Studio forIDEAずAndroidStudioプラグむンを䜿甚しお分析を開始したした 。アナラむザヌの譊告は次のように配垃されたした。



  • 183高;
  • 759äž­;
  • 545䜎。


PVS-Studioアナラむザヌトリガヌの玄2/3がテストファむルに割り圓おられたした。この事実ずプロゞェクトのコヌドベヌスのサむズを考慮するず、ApacheFlink開発者はコヌド品質を最高の状態に保぀こずができたず蚀えたす。



アナラむザヌの譊告をより詳现に調査したので、私は自分の意芋で最も興味深いものを遞択したした。それでは、PVS-Studioがこのプロゞェクトで䜕を芋぀けたのか芋おみたしょう





ちょっず䞍泚意



V6001「==」挔算子の巊偎ず右偎に同䞀のサブ匏「processedData」がありたす。 CheckpointStatistics.java229



@Override
public boolean equals(Object o) 
{
  ....
  CheckpointStatistics that = (CheckpointStatistics) o;
  return id == that.id &&
    savepoint == that.savepoint &&
    triggerTimestamp == that.triggerTimestamp &&
    latestAckTimestamp == that.latestAckTimestamp &&
    stateSize == that.stateSize &&
    duration == that.duration &&
    alignmentBuffered == that.alignmentBuffered &&
    processedData == processedData &&                // <=
    persistedData == that.persistedData &&
    numSubtasks == that.numSubtasks &&
    numAckSubtasks == that.numAckSubtasks &&
    status == that.status &&
    Objects.equals(checkpointType, that.checkpointType) &&
    Objects.equals(
      checkpointStatisticsPerTask, 
      that.checkpointStatisticsPerTask);
}
      
      





芋返りに他の衚珟の背景に察しお 、この゚ラヌはそれほど目立ったものではありたせん。CheckpointStatisticsクラスのequalsメ゜ッドをオヌバヌラむドする ず、 プログラマヌはprocessedData == processedData匏で゚ラヌを発生させたした 。これは垞にtrueであるため意味がありたせん。同様に、返される匏の残りの郚分は、 珟圚のオブゞェクトthisずobject That processedData == that.processedDataのフィヌルドず比范されたす。 ..。この状況は、比范関数に芋られる兞型的な゚ラヌパタヌンの1぀であり、「悪は比范関数に䜏んでいたす」の蚘事で詳しく説明されおいたす 。そのため、「少しの䞍泚意」だけで、CheckpointStatisticsクラスのオブゞェクトの同等性をチェックするためのロゞックが壊れたこずが わかりたした。



匏は垞に真です



V6007匏 'input2.length> 0'は垞に真です。Operator.java283



public static <T> Operator<T> createUnionCascade(Operator<T> input1, 
                                                 Operator<T>... input2) 
{
  if (input2 == null || input2.length == 0) 
  {
    return input1;                                // <=
  } 
  else if (input2.length == 1 && input1 == null) 
  {
    return input2[0];
  }
  ....
  if (input1 != null) 
  {
    ....
  } 
  else if (input2.length > 0 && input2[0] != null) // <=
  {
    ....
  } 
  else 
  {
    ....
  }
}
      
      





この方法では、アナラむザヌは人よりも泚意深いこずが刀明し、独自の方法で報告するこずを決定したした。これは、匏 input2.length> 0が垞に真であるこずを瀺しおいたす。その理由は、input2配列の長さ が0の堎合、条件 input2 == null ||であるためです。メ゜ッド内の最初の ifのinput2.length == 0がtrueになり、匏input2.length> 0の行に到達する前に、メ゜ッドの実行が䞭断され たす。



オヌルシヌむングアナラむザヌ



V6007匏 'slotSharingGroup == null'は垞にfalseです。StreamGraphGenerator.java510



private <T> Collection<Integer> transformFeedback(....)
{
  ....
  String slotSharingGroup = determineSlotSharingGroup(null, allFeedbackIds);
  if (slotSharingGroup == null)
  {
    slotSharingGroup = "SlotSharingGroup-" + iterate.getId();
  }
  ....
}
      
      





アナラむザヌは、slotSharingGroup == nullが垞にfalseであるず報告し たした。これは、determineSlotSharingGroupメ゜ッド がnullを返さないこずを瀺しおい たす。アナラむザヌは、このメ゜ッドが返すこずができるすべおの倀を蚈算するこずができたほどスマヌトですかすべおを自分でチェックしおみたしょう。



public class StreamGraphGenerator 
{
  ....
  public static final String DEFAULT_SLOT_SHARING_GROUP = "default";
  ....
  private String determineSlotSharingGroup(String specifiedGroup, 
                                           Collection<Integer> inputIds) 
  {
    if (specifiedGroup != null)
    {
      return specifiedGroup; // <= 1
    }
    else
    {
      String inputGroup = null;
      for (int id: inputIds)
      {
        String inputGroupCandidate = streamGraph.getSlotSharingGroup(id);
        if (inputGroup == null)
        {
          inputGroup = inputGroupCandidate;
        }
        else if (!inputGroup.equals(inputGroupCandidate))
        {
          return DEFAULT_SLOT_SHARING_GROUP; // <= 2
        }
      }
      return inputGroup == null 
             ? DEFAULT_SLOT_SHARING_GROUP 
             : inputGroup; // <= 3
    }
  }
  ....
}
      
      





すべおのリタヌンを通過する順序で、 このメ゜ッドを取り戻すこずができるものを確認したす。



  • 最初の戻り倀は、指定されたGroupメ゜ッドに匕数を返したすが、nullでない堎合に限りたす。
  • return for DEFAULT_SLOT_SHARING_GROUP, ;
  • return inputGroup, null. DEFAULT_SLOT_SHARING_GROUP.


アナラむザヌは、determineSlotSharingGroupメ゜ッドから nullを返すこずの䞍可胜性を実際に蚈算でき、 これに぀いお譊告し、slotSharingGroup == nullチェックが無意味であるこずを瀺しおいるこずが わかりたした。この状況は誀りではありたせんが、アナラむザヌのこのような远加の保護により、他の堎合に゚ラヌを怜出できたす。たずえば、特定の条件䞋でnullを返すメ゜ッドが必芁な堎合です 。



それらをすべお集める



V6007匏 ' currentCount <= lastEnd'は垞にtrueです。 CountSlidingWindowAssigner.java75



V6007匏 'lastStart <= currentCount'は垞にtrueです。 CountSlidingWindowAssigner.java75



@Override
public Collection<CountWindow> assignWindows(....) throws IOException 
{
  Long countValue = count.value();
  long currentCount = countValue == null ? 0L : countValue;
  count.update(currentCount + 1);
  long lastId = currentCount / windowSlide;
  long lastStart = lastId * windowSlide;
  long lastEnd = lastStart + windowSize - 1;
  List<CountWindow> windows = new ArrayList<>();
  while (lastId >= 0 && 
         lastStart <= currentCount && 
         currentCount <= lastEnd) 
  {
    if (lastStart <= currentCount && currentCount <= lastEnd) // <=
    {
      windows.add(new CountWindow(lastId));
    }
    lastId--;
    lastStart -= windowSlide;
    lastEnd -= windowSlide;
  }
  return windows;
}
      
      





アナラむザヌは、currentCount <= lastEndおよび lastStart <= currentCountずいう匏が垞にtrueであるこずを譊告し たす。実際、whileルヌプの条件を芋るず、 たったく同じ匏がありたす。぀たり、ルヌプ内ではこれらの匏は垞にtrueになるため、ルヌプ内で䜜成されたCountWindowタむプのすべおのオブゞェクトがりィンドりリストに远加さ れ たす。この無意味なチェックの倖芳には倚くのオプションがあり、最初に頭に浮かぶのは、リファクタリングアヌティファクトたたは開発者の安心感です。しかし、䜕か他のものをチェックしたいのであれば、それは間違いかもしれたせん...



匕数の順序が正しくありたせん



V6029メ゜ッドに枡される匕数の順序が正しくない可胜性がありたす 'hasBufferForReleasedChannel'、 'hasBufferForRemovedChannel'。NettyMessageClientDecoderDelegateTest.java165、NettyMessageClientDecoderDelegateTest.java166



private void testNettyMessageClientDecoding(
       boolean hasEmptyBuffer,
       boolean hasBufferForReleasedChannel,
       boolean hasBufferForRemovedChannel) throws Exception 
{
  ....
  List<BufferResponse> messages = createMessageList (
    hasEmptyBuffer,
    hasBufferForReleasedChannel,
    hasBufferForRemovedChannel);
  ....
}
      
      





名前付きパラメヌタヌを䜿甚しおメ゜ッドを呌び出す機胜がJavaにないため、開発者は残酷な冗談を蚀うこずがありたす。これは、アナラむザヌがcreateMessageListメ゜ッドをポむントしたずきに起こったこずずたったく同じ です。このメ゜ッドの定矩を芋るず、hasBufferForReleasedChannelパラメヌタヌの前にhasBufferForRemovedChannelパラメヌタヌをメ゜ッドに枡す必芁があるこずが明らかになり たす 。



private List<BufferResponse> createMessageList(
  boolean hasEmptyBuffer,
  boolean hasBufferForRemovedChannel,
  boolean hasBufferForReleasedChannel) 
{
  ....
  if (hasBufferForReleasedChannel) {
    addBufferResponse(messages, 
                      releasedInputChannelId, 
                      Buffer.DataType.DATA_BUFFER, 
                      BUFFER_SIZE, 
                      seqNumber++);
  }
  if (hasBufferForRemovedChannel) {
    addBufferResponse(messages, 
                      new InputChannelID(), 
                      Buffer.DataType.DATA_BUFFER, 
                      BUFFER_SIZE, 
                      seqNumber++);
  }
  ....
  return messages;
}
      
      





ただし、メ゜ッドを呌び出すずきに、開発者はこれらの匕数の順序を混同しおいるため、混合された匕数の倀が異なる堎合、createMessageListメ゜ッドのロゞック が壊れたす。



ああ、このコピヌペヌスト



V6032メ゜ッド「seekToFirst」の本䜓が別のメ゜ッド「seekToLast」の本䜓ず完党に同等であるのは奇劙です。RocksIteratorWrapper.java53、RocksIteratorWrapper.java59



public class RocksIteratorWrapper implements RocksIteratorInterface, Closeable {
  ....
  private RocksIterator iterator;
  ....

  @Override
  public void seekToFirst() {
    iterator.seekToFirst(); // <=
    status(); 
  }
  
  @Override
  public void seekToLast() {
    iterator.seekToFirst();  // <=
    status();
  }
  
  ....
}
      
      





遺䜓 seekToFirstず seekToLast方法がある同じ。さらに、䞡方のメ゜ッドがコヌドで䜿甚されおいたす。



ここで䜕かが汚れおいたす実際、むテレヌタヌオブゞェクトのメ゜ッドを芋るず 、アナラむザヌがどの゚ラヌを芋぀けるのに圹立ったかが明らかになりたす。



public class RocksIterator extends AbstractRocksIterator<RocksDB>
{
  ....
}

public abstract class AbstractRocksIterator<....> extends ....
{
  ....
  public void seekToFirst() // <=
  {
    assert this.isOwningHandle();
    this.seekToFirst0(this.nativeHandle_);
  }
  
  public void seekToLast() // <=
  {
    assert this.isOwningHandle();
    this.seekToLast0(this.nativeHandle_);
  }
  ....
}
      
      





メ゜ッド seekToLastクラス RocksIteratorWrapperは、同じクラスのコピヌアンドペヌストメ゜ッドseekToFirstによっお䜜成されたこずが わかりたした。ただし、䜕らかの理由で、開発者はむテレヌタの seekToFirstメ゜ッド呌び出しをseekToLastに眮き換えるのを忘れおいたし た 。



フォヌマット文字列ずの混同



V6046フォヌマットが正しくありたせん。異なる数のフォヌマットアむテムが予想されたす。䜿甚されおいない匕数1。UnsignedTypeConversionITCase.java102



public static void prepareMariaDB() throws IllegalStateException {
  ....
  if (!initDbSuccess) {
    throw new IllegalStateException(
      String.format(
        "Initialize MySQL database instance failed after {} attempts," + // <=
        " please open an issue.", INITIALIZE_DB_MAX_RETRY));
  }
}
      
      





String.formatメ゜ッドずJavaロガヌのフォヌマット文字列 は異なりたす。匕数の眮換が「」文字を䜿甚しお指定されるString.formatメ゜ッドのフォヌマット文字列ずは異なり、 ロガヌフォヌマット文字列は代わりに「{}」文字の組み合わせを䜿甚したす。この混乱のため、この゚ラヌが発生したした。フォヌマット文字列ずしお、文字列はString.formatメ゜ッドに枡されたす。このメ゜ッドは 、ロガヌで䜿甚されおいた別の堎所からコピヌされた可胜性がありたす。その結果、INITIALIZE_DB_MAX_RETRYフィヌルド倀はIllegalStateExceptionメッセヌゞで眮き換えられたせん 。 '{}'の代わりに、この䟋倖をキャッチたたはログに蚘録した人は、デヌタベヌスぞの接続が䜕回詊行されたかを知るこずはできたせん。



異垞な分垃



V6048この匏は簡略化できたす。操䜜のOperand'index 'は0に等しい。CollectionUtil.java76



public static <T> Collection<List<T>> partition(Collection<T> elements, 
                                                int numBuckets) 
{
  Map<Integer, List<T>> buckets = new HashMap<>(numBuckets);
  
  int initialCapacity = elements.size() / numBuckets;

  int index = 0;
  for (T element : elements) 
  {
    int bucket = index % numBuckets;                                 // <=
    buckets.computeIfAbsent(bucket, 
                            key -> new ArrayList<>(initialCapacity))

           .add(element); 
  }

  return buckets.values();
}
      
      





パヌティションメ゜ッド は、芁玠コレクションの芁玠を耇数のセグメントに分割しおから、 それらのセグメントを返したす。ただし、アナラむザヌで指摘された゚ラヌのため、分離は発生したせん。むンデックスは垞に0であるため 、セグメント番号indexnumBucketsを決定するために䜿甚される匏 は、垞に0になりたす。圓初、このメ゜ッドのコヌドがリファクタリングされたず考えおいたした。その結果、forルヌプに むンデックス倉数の増分を远加するのを忘れおいたした 。しかし、コミットを芋お このメ゜ッドが远加された堎合、この゚ラヌはこのメ゜ッドに付随するこずが刀明したした。コヌドの修正バヌゞョン



public static <T> Collection<List<T>> partition(Collection<T> elements, 
                                                int numBuckets) 
{
  Map<Integer, List<T>> buckets = new HashMap<>(numBuckets);
  
  int initialCapacity = elements.size() / numBuckets;

  int index = 0;
  for (T element : elements) 
  {
    int bucket = index % numBuckets; 
    buckets.computeIfAbsent(bucket, 
                            key -> new ArrayList<>(initialCapacity))
           .add(element);
    index++;
  }

  return buckets.values();
}
      
      





互換性のないタむプ



V6066匕数ずしお枡されたオブゞェクトのタむプは、コレクションのタむプず互換性がありたせんString、ListStateDescriptor <NextTransactionalIdHint>。FlinkKafkaProducer.java1083



public interface OperatorStateStore 
{
  Set<String> getRegisteredStateNames();
}
public class FlinkKafkaProducer<IN> extends ....
{
  ....
  private static final 
  ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>
  NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR = ....;

  @Override
  public void initializeState(FunctionInitializationContext context).... 
  {
    ....
    if (context.getOperatorStateStore()
               .getRegisteredStateNames()
               .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR))    // <=
    {
       migrateNextTransactionalIdHindState(context);
    }
    ....
  }
}
      
      





アナラむザヌが指す匏は垞にfalseになりたす。぀たり、migrateNextTransactionalIdHindStateメ゜ッドの呌び出し は発生したせん。Set <String> --ListStateDescriptor <FlinkKafkaProducer.NextTransactionalIdHint>タむプのコレクションで、たったく異なるタむプの芁玠 を誰かが探しおいるのは どうしおですかアナラむザヌの助けがなければ、このような゚ラヌは非垞に長い間コヌドに存圚しおいた可胜性がありたす。これは、目に入ったこずがなく、このメ゜ッドの培底的なテストなしでは芋぀けるこずができないためです。



非原子倉数の倉化



V6074揮発性倉数の非原子修食。'currentNumAcknowledgedSubtasks'を怜査したす。保留䞭のチェックポむント統蚈.java131



boolean reportSubtaskStats(JobVertexID jobVertexId, SubtaskStateStats subtask) {
  TaskStateStats taskStateStats = taskStats.get(jobVertexId);

  if (taskStateStats != null && taskStateStats.reportSubtaskStats(subtask)) {
    currentNumAcknowledgedSubtasks++;                // <=
    latestAcknowledgedSubtask = subtask;

    currentStateSize += subtask.getStateSize();      // <=

    long processedData = subtask.getProcessedData();
    if (processedData > 0) {
      currentProcessedData += processedData;         // <=
    }

    long persistedData = subtask.getPersistedData();
    if (persistedData > 0) {
      currentPersistedData += persistedData;         // <=
    }
    return true;
  } else {
    return false;
  }
}
      
      





さらに、同じ方法でさらに3぀のアナラむザヌ譊告



  • V6074揮発性倉数の非原子修食。'currentStateSize'を怜査したす。BookingCheckpointStats.java134
  • V6074揮発性倉数の非原子修食。'currentProcessedData'を怜査したす。保留䞭のチェックポむント統蚈.java138
  • V6074揮発性倉数の非原子修食。'currentPersistedData'を怜査したす。BookingCheckpointStats.java143


アナラむザヌは、メ゜ッド内の4぀の揮発性フィヌルドが非アトミックに倉化するこずを瀺唆したした 。そしお、い぀ものように、アナラむザヌは正しいこずがわかりたす。なぜなら、++および + =操䜜は 、実際には、いく぀かの読み取り-倉曎-曞き蟌み操䜜のシヌケンスだからです。ご存知のように、フィヌルドの揮発性の倀は すべおのスレッドに衚瀺されたす。぀たり、レヌスの状態により、フィヌルドの倉曎の䞀郚が倱われる可胜性がありたす。これに関する詳现情報は、蚺断の説明で読むこずができたす。



結論



ビッグデヌタプロゞェクトでは、信頌性が重芁な芁件の1぀であるため、プロゞェクト内のコヌドの品質を泚意深く監芖する必芁がありたす。 Apache Flinkの開発者は、いく぀かのツヌルによっおこれを支揎され、かなりの数のテストも䜜成したした。ただし、このような状況でも、PVS-Studioアナラむザヌぱラヌを怜出できたした。゚ラヌを完党に排陀するこずは䞍可胜ですが、さたざたな静的コヌド分析ツヌルを定期的に䜿甚するこずで、この理想に近づくこずができたす。はい、正確に定期的に。静的分析は、定期的に䜿甚する堎合にのみその有効性を瀺したす。これに぀いおは、この蚘事で詳しく説明し たす。





この蚘事を英語を話す聎衆ず共有したい堎合は、翻蚳リンクValeryKomarovを䜿甚しおください。 ビッグ/バグデヌタApacheFlink゜ヌスコヌドの分析。



All Articles