Skip to content

Commit

Permalink
Merge pull request #41 from codecrafters-io/logs-before-boot
Browse files Browse the repository at this point in the history
Finish generating LogDirectory before booting up any executable
  • Loading branch information
ryan-gang authored Jan 7, 2025
2 parents 265a79f + df9178b commit d726367
Show file tree
Hide file tree
Showing 23 changed files with 386 additions and 381 deletions.
11 changes: 5 additions & 6 deletions internal/stage1.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@ import (

func testBindToPort(stageHarness *test_case_harness.TestCaseHarness) error {
b := kafka_executable.NewKafkaExecutable(stageHarness)
if err := b.Run(); err != nil {
err := serializer.GenerateLogDirs(logger.GetQuietLogger(""), true)
if err != nil {
return err
}

quietLogger := logger.GetQuietLogger("")
logger := stageHarness.Logger
err := serializer.GenerateLogDirs(quietLogger, true)
if err != nil {
stageLogger := stageHarness.Logger
if err := b.Run(); err != nil {
return err
}

Expand All @@ -26,5 +25,5 @@ func testBindToPort(stageHarness *test_case_harness.TestCaseHarness) error {
Retries: 15,
}

return bindTestCase.Run(b, logger)
return bindTestCase.Run(b, stageLogger)
}
35 changes: 18 additions & 17 deletions internal/stage2.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,23 @@ import (

func testHardcodedCorrelationId(stageHarness *test_case_harness.TestCaseHarness) error {
b := kafka_executable.NewKafkaExecutable(stageHarness)
if err := b.Run(); err != nil {
err := serializer.GenerateLogDirs(logger.GetQuietLogger(""), true)
if err != nil {
return err
}

quietLogger := logger.GetQuietLogger("")
logger := stageHarness.Logger
err := serializer.GenerateLogDirs(quietLogger, true)
if err != nil {
stageLogger := stageHarness.Logger
if err := b.Run(); err != nil {
return err
}

broker := protocol.NewBroker("localhost:9092")
if err := broker.ConnectWithRetries(b, logger); err != nil {
if err := broker.ConnectWithRetries(b, stageLogger); err != nil {
return err
}
defer broker.Close()
defer func(broker *protocol.Broker) {
_ = broker.Close()
}(broker)

correlationId := 7

Expand All @@ -50,8 +51,8 @@ func testHardcodedCorrelationId(stageHarness *test_case_harness.TestCaseHarness)
}

message := kafkaapi.EncodeApiVersionsRequest(&request)
logger.Infof("Sending \"ApiVersions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId)
logger.Debugf("Hexdump of sent \"ApiVersions\" request: \n%v\n", GetFormattedHexdump(message))
stageLogger.Infof("Sending \"ApiVersions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId)
stageLogger.Debugf("Hexdump of sent \"ApiVersions\" request: \n%v\n", GetFormattedHexdump(message))

err = broker.Send(message)
if err != nil {
Expand All @@ -61,13 +62,13 @@ func testHardcodedCorrelationId(stageHarness *test_case_harness.TestCaseHarness)
if err != nil {
return err
}
logger.Debugf("Hexdump of received \"ApiVersions\" response: \n%v\n", GetFormattedHexdump(response))
stageLogger.Debugf("Hexdump of received \"ApiVersions\" response: \n%v\n", GetFormattedHexdump(response))

decoder := realdecoder.RealDecoder{}
decoder.Init(response)
logger.UpdateSecondaryPrefix("Decoder")
stageLogger.UpdateSecondaryPrefix("Decoder")

logger.Debugf("- .Response")
stageLogger.Debugf("- .Response")
messageLength, err := decoder.GetInt32()
if err != nil {
if decodingErr, ok := err.(*errors.PacketDecodingError); ok {
Expand All @@ -76,9 +77,9 @@ func testHardcodedCorrelationId(stageHarness *test_case_harness.TestCaseHarness)
}
return err
}
protocol.LogWithIndentation(logger, 1, "- .message_length (%d)", messageLength)
protocol.LogWithIndentation(stageLogger, 1, "- .message_length (%d)", messageLength)

logger.Debugf("- .ResponseHeader")
stageLogger.Debugf("- .ResponseHeader")
responseCorrelationId, err := decoder.GetInt32()
if err != nil {
if decodingErr, ok := err.(*errors.PacketDecodingError); ok {
Expand All @@ -87,14 +88,14 @@ func testHardcodedCorrelationId(stageHarness *test_case_harness.TestCaseHarness)
}
return err
}
protocol.LogWithIndentation(logger, 1, "- .correlation_id (%d)", responseCorrelationId)
logger.ResetSecondaryPrefix()
protocol.LogWithIndentation(stageLogger, 1, "- .correlation_id (%d)", responseCorrelationId)
stageLogger.ResetSecondaryPrefix()

if responseCorrelationId != int32(correlationId) {
return fmt.Errorf("Expected Correlation ID to be %v, got %v", int32(correlationId), responseCorrelationId)
}

logger.Successf("✓ Correlation ID: %v", responseCorrelationId)
stageLogger.Successf("✓ Correlation ID: %v", responseCorrelationId)

return nil
}
37 changes: 19 additions & 18 deletions internal/stage3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal

import (
"fmt"
"github.com/codecrafters-io/tester-utils/logger"

realdecoder "github.com/codecrafters-io/kafka-tester/protocol/decoder"

Expand All @@ -10,28 +11,28 @@ import (
kafkaapi "github.com/codecrafters-io/kafka-tester/protocol/api"
"github.com/codecrafters-io/kafka-tester/protocol/errors"
"github.com/codecrafters-io/kafka-tester/protocol/serializer"
"github.com/codecrafters-io/tester-utils/logger"
"github.com/codecrafters-io/tester-utils/test_case_harness"
)

func testCorrelationId(stageHarness *test_case_harness.TestCaseHarness) error {
b := kafka_executable.NewKafkaExecutable(stageHarness)
if err := b.Run(); err != nil {
err := serializer.GenerateLogDirs(logger.GetQuietLogger(""), true)
if err != nil {
return err
}

quietLogger := logger.GetQuietLogger("")
logger := stageHarness.Logger
err := serializer.GenerateLogDirs(quietLogger, true)
if err != nil {
stageLogger := stageHarness.Logger
if err := b.Run(); err != nil {
return err
}

broker := protocol.NewBroker("localhost:9092")
if err := broker.ConnectWithRetries(b, logger); err != nil {
if err := broker.ConnectWithRetries(b, stageLogger); err != nil {
return err
}
defer broker.Close()
defer func(broker *protocol.Broker) {
_ = broker.Close()
}(broker)

correlationId := getRandomCorrelationId()

Expand All @@ -50,8 +51,8 @@ func testCorrelationId(stageHarness *test_case_harness.TestCaseHarness) error {
}

message := kafkaapi.EncodeApiVersionsRequest(&request)
logger.Infof("Sending \"ApiVersions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId)
logger.Debugf("Hexdump of sent \"ApiVersions\" request: \n%v\n", GetFormattedHexdump(message))
stageLogger.Infof("Sending \"ApiVersions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId)
stageLogger.Debugf("Hexdump of sent \"ApiVersions\" request: \n%v\n", GetFormattedHexdump(message))

err = broker.Send(message)
if err != nil {
Expand All @@ -61,13 +62,13 @@ func testCorrelationId(stageHarness *test_case_harness.TestCaseHarness) error {
if err != nil {
return err
}
logger.Debugf("Hexdump of received \"ApiVersions\" response: \n%v\n", GetFormattedHexdump(response))
stageLogger.Debugf("Hexdump of received \"ApiVersions\" response: \n%v\n", GetFormattedHexdump(response))

decoder := realdecoder.RealDecoder{}
decoder.Init(response)
logger.UpdateSecondaryPrefix("Decoder")
stageLogger.UpdateSecondaryPrefix("Decoder")

logger.Debugf("- .Response")
stageLogger.Debugf("- .Response")
messageLength, err := decoder.GetInt32()
if err != nil {
if decodingErr, ok := err.(*errors.PacketDecodingError); ok {
Expand All @@ -76,9 +77,9 @@ func testCorrelationId(stageHarness *test_case_harness.TestCaseHarness) error {
}
return err
}
protocol.LogWithIndentation(logger, 1, "- .message_length (%d)", messageLength)
protocol.LogWithIndentation(stageLogger, 1, "- .message_length (%d)", messageLength)

logger.Debugf("- .ResponseHeader")
stageLogger.Debugf("- .ResponseHeader")
responseCorrelationId, err := decoder.GetInt32()
if err != nil {
if decodingErr, ok := err.(*errors.PacketDecodingError); ok {
Expand All @@ -87,14 +88,14 @@ func testCorrelationId(stageHarness *test_case_harness.TestCaseHarness) error {
}
return err
}
protocol.LogWithIndentation(logger, 1, "- .correlation_id (%d)", responseCorrelationId)
logger.ResetSecondaryPrefix()
protocol.LogWithIndentation(stageLogger, 1, "- .correlation_id (%d)", responseCorrelationId)
stageLogger.ResetSecondaryPrefix()

if responseCorrelationId != correlationId {
return fmt.Errorf("Expected Correlation ID to be %v, got %v", correlationId, responseCorrelationId)
}

logger.Successf("✓ Correlation ID: %v", responseCorrelationId)
stageLogger.Successf("✓ Correlation ID: %v", responseCorrelationId)

return nil
}
41 changes: 21 additions & 20 deletions internal/stage4.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,39 @@ package internal

import (
"fmt"
"github.com/codecrafters-io/tester-utils/logger"

"github.com/codecrafters-io/kafka-tester/internal/kafka_executable"
"github.com/codecrafters-io/kafka-tester/protocol"
kafkaapi "github.com/codecrafters-io/kafka-tester/protocol/api"
realdecoder "github.com/codecrafters-io/kafka-tester/protocol/decoder"
"github.com/codecrafters-io/kafka-tester/protocol/errors"
"github.com/codecrafters-io/kafka-tester/protocol/serializer"
"github.com/codecrafters-io/tester-utils/logger"
"github.com/codecrafters-io/tester-utils/test_case_harness"
)

func testAPIVersionErrorCase(stageHarness *test_case_harness.TestCaseHarness) error {
b := kafka_executable.NewKafkaExecutable(stageHarness)
if err := b.Run(); err != nil {
err := serializer.GenerateLogDirs(logger.GetQuietLogger(""), true)
if err != nil {
return err
}

quietLogger := logger.GetQuietLogger("")
logger := stageHarness.Logger
err := serializer.GenerateLogDirs(quietLogger, true)
if err != nil {
stageLogger := stageHarness.Logger
if err := b.Run(); err != nil {
return err
}

correlationId := getRandomCorrelationId()
apiVersion := getInvalidAPIVersion()

broker := protocol.NewBroker("localhost:9092")
if err := broker.ConnectWithRetries(b, logger); err != nil {
if err := broker.ConnectWithRetries(b, stageLogger); err != nil {
return err
}
defer broker.Close()
defer func(broker *protocol.Broker) {
_ = broker.Close()
}(broker)

request := kafkaapi.ApiVersionsRequest{
Header: kafkaapi.RequestHeader{
Expand All @@ -50,8 +51,8 @@ func testAPIVersionErrorCase(stageHarness *test_case_harness.TestCaseHarness) er
}

message := kafkaapi.EncodeApiVersionsRequest(&request)
logger.Infof("Sending \"ApiVersions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId)
logger.Debugf("Hexdump of sent \"ApiVersions\" request: \n%v\n", GetFormattedHexdump(message))
stageLogger.Infof("Sending \"ApiVersions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId)
stageLogger.Debugf("Hexdump of sent \"ApiVersions\" request: \n%v\n", GetFormattedHexdump(message))

err = broker.Send(message)
if err != nil {
Expand All @@ -61,13 +62,13 @@ func testAPIVersionErrorCase(stageHarness *test_case_harness.TestCaseHarness) er
if err != nil {
return err
}
logger.Debugf("Hexdump of received \"ApiVersions\" response: \n%v\n", GetFormattedHexdump(response))
stageLogger.Debugf("Hexdump of received \"ApiVersions\" response: \n%v\n", GetFormattedHexdump(response))

decoder := realdecoder.RealDecoder{}
decoder.Init(response)
logger.UpdateSecondaryPrefix("Decoder")
stageLogger.UpdateSecondaryPrefix("Decoder")

logger.Debugf("- .Response")
stageLogger.Debugf("- .Response")
messageLength, err := decoder.GetInt32()
if err != nil {
if decodingErr, ok := err.(*errors.PacketDecodingError); ok {
Expand All @@ -76,9 +77,9 @@ func testAPIVersionErrorCase(stageHarness *test_case_harness.TestCaseHarness) er
}
return err
}
protocol.LogWithIndentation(logger, 1, "- .message_length (%d)", messageLength)
protocol.LogWithIndentation(stageLogger, 1, "- .message_length (%d)", messageLength)

logger.Debugf("- .ResponseHeader")
stageLogger.Debugf("- .ResponseHeader")
responseCorrelationId, err := decoder.GetInt32()
if err != nil {
if decodingErr, ok := err.(*errors.PacketDecodingError); ok {
Expand All @@ -87,7 +88,7 @@ func testAPIVersionErrorCase(stageHarness *test_case_harness.TestCaseHarness) er
}
return err
}
protocol.LogWithIndentation(logger, 1, "- .correlation_id (%d)", responseCorrelationId)
protocol.LogWithIndentation(stageLogger, 1, "- .correlation_id (%d)", responseCorrelationId)

errorCode, err := decoder.GetInt16()
if err != nil {
Expand All @@ -97,20 +98,20 @@ func testAPIVersionErrorCase(stageHarness *test_case_harness.TestCaseHarness) er
}
return err
}
protocol.LogWithIndentation(logger, 1, "- .error_code (%d)", errorCode)
logger.ResetSecondaryPrefix()
protocol.LogWithIndentation(stageLogger, 1, "- .error_code (%d)", errorCode)
stageLogger.ResetSecondaryPrefix()

if responseCorrelationId != correlationId {
return fmt.Errorf("Expected Correlation ID to be %v, got %v", correlationId, responseCorrelationId)
}

logger.Successf("✓ Correlation ID: %v", responseCorrelationId)
stageLogger.Successf("✓ Correlation ID: %v", responseCorrelationId)

if errorCode != 35 {
return fmt.Errorf("Expected Error code to be 35, got %v", errorCode)
}

logger.Successf("✓ Error code: 35 (UNSUPPORTED_VERSION)")
stageLogger.Successf("✓ Error code: 35 (UNSUPPORTED_VERSION)")

return nil
}
Loading

0 comments on commit d726367

Please sign in to comment.