Skip to content

Commit

Permalink
use new migration indexing table
Browse files Browse the repository at this point in the history
  • Loading branch information
LexLuthr committed Oct 17, 2024
1 parent d0ea1d7 commit ad00bf4
Showing 1 changed file with 49 additions and 25 deletions.
74 changes: 49 additions & 25 deletions cmd/migrate-curio/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,21 +306,33 @@ func migrateBoostDeals(ctx context.Context, activeSectors bitfield.BitField, mad
}

if !c {
var proof abi.RegisteredSealProof
err = tx.QueryRow(`SELECT reg_seal_proof FROM sectors_meta WHERE sp_id = $1 AND sector_num = $2`, mid, deal.SectorID).Scan(&proof)
// Check if we can index and announce i.e. we have unsealed copy
var exists bool
err = tx.QueryRow(`SELECT EXISTS (SELECT 1 FROM sector_location WHERE miner_id = $1
AND sector_num = $2
AND sector_filetype = 1);`, mid, deal.SectorID).Scan(&exists)
if err != nil {
return false, fmt.Errorf("seal: %s: failed to get sector proof: %w", deal.DealUuid.String(), err)
return false, fmt.Errorf("seal: %s: failed to check if sector is unsealed: %w", deal.DealUuid.String(), err)
}

// Add deal to mk12 pipeline in Curio for indexing and announcement
_, err = tx.Exec(`INSERT INTO market_mk12_deal_pipeline (uuid, sp_id, started, piece_cid, piece_size, raw_size, offline,
after_commp, after_psd, after_find_deal, sector, reg_seal_proof, sector_offset,
sealed, should_index, indexing_created_at, announce)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (uuid) DO NOTHING`,
deal.DealUuid.String(), mid, true, prop.PieceCID.String(), prop.PieceSize, deal.NBytesReceived, deal.IsOffline,
true, true, true, deal.SectorID, proof, deal.Offset, true, true, time.Now(), true)
if err != nil {
return false, fmt.Errorf("deal: %s: failed to add deal to pipeline for indexing and announcing: %w", deal.DealUuid.String(), err)
if exists {
var proof abi.RegisteredSealProof
err = tx.QueryRow(`SELECT reg_seal_proof FROM sectors_meta WHERE sp_id = $1 AND sector_num = $2`, mid, deal.SectorID).Scan(&proof)
if err != nil {
return false, fmt.Errorf("seal: %s: failed to get sector proof: %w", deal.DealUuid.String(), err)
}

// Add deal to mk12 pipeline in Curio for indexing and announcement
_, err = tx.Exec(`INSERT INTO market_mk12_deal_pipeline_migration (
uuid, sp_id, piece_cid, piece_size, raw_size, sector, reg_seal_proof, sector_offset, should_announce
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (uuid) DO NOTHING`,
deal.DealUuid.String(), mid, prop.PieceCID.String(), prop.PieceSize, deal.NBytesReceived,
deal.SectorID, proof, deal.Offset, deal.AnnounceToIPNI)
if err != nil {
return false, fmt.Errorf("deal: %s: failed to add deal to pipeline for indexing and announcing: %w", deal.DealUuid.String(), err)
}
} else {
llog.Infof("Skipping indexing as sector %d is not unsealed", deal.SectorID)
}
}
return true, nil
Expand Down Expand Up @@ -470,7 +482,7 @@ func migrateDDODeals(ctx context.Context, full v1api.FullNode, activeSectors bit
if i > 0 && i%100 == 0 {
fmt.Printf("Migrating DDO Deals: %d / %d (%0.2f%%)\n", i, len(deals), float64(i)/float64(len(deals))*100)
}
llog := log.With("Boost Deal", deal.ID.String())
llog := log.With("DDO Deal", deal.ID.String())
if deal.Err != "" && deal.Retry == types.DealRetryFatal {
llog.Infow("Skipping as deal retry is fatal")
continue
Expand Down Expand Up @@ -553,21 +565,33 @@ func migrateDDODeals(ctx context.Context, full v1api.FullNode, activeSectors bit

// TODO: Confirm if using the mk12 pipeline will have any impact for DDO deals
if !c {
var proof abi.RegisteredSealProof
err = tx.QueryRow(`SELECT reg_seal_proof FROM sectors_meta WHERE sp_id = $1 AND sector_num = $2`, mid, deal.SectorID).Scan(&proof)
// Check if we can index and announce i.e. we have unsealed copy
var exists bool
err = tx.QueryRow(`SELECT EXISTS (SELECT 1 FROM sector_location WHERE miner_id = $1
AND sector_num = $2
AND sector_filetype = 1);`, mid, deal.SectorID).Scan(&exists)
if err != nil {
return false, fmt.Errorf("deal: %s: failed to get sector proof: %w", deal.ID.String(), err)
return false, fmt.Errorf("seal: %s: failed to check if sector is unsealed: %w", deal.ID.String(), err)
}

// Add deal to mk12 pipeline in Curio for indexing and announcement
_, err = tx.Exec(`INSERT INTO market_mk12_deal_pipeline (uuid, sp_id, started, piece_cid, piece_size, raw_size, offline,
after_commp, after_psd, after_find_deal, sector, reg_seal_proof, sector_offset,
sealed, should_index, indexing_created_at, announce)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (uuid) DO NOTHING`,
deal.ID.String(), mid, true, deal.PieceCID.String(), deal.PieceSize, deal.InboundFileSize, true,
true, true, true, deal.SectorID, proof, deal.Offset, true, true, time.Now(), true)
if err != nil {
return false, fmt.Errorf("deal: %s: failed to add DDO deal to pipeline for indexing and announcing: %w", deal.ID.String(), err)
if exists {
var proof abi.RegisteredSealProof
err = tx.QueryRow(`SELECT reg_seal_proof FROM sectors_meta WHERE sp_id = $1 AND sector_num = $2`, mid, deal.SectorID).Scan(&proof)
if err != nil {
return false, fmt.Errorf("deal: %s: failed to get sector proof: %w", deal.ID.String(), err)
}

// Add deal to mk12 pipeline in Curio for indexing and announcement
_, err = tx.Exec(`INSERT INTO market_mk12_deal_pipeline_migration (
uuid, sp_id, piece_cid, piece_size, raw_size, sector, reg_seal_proof, sector_offset, should_announce
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (uuid) DO NOTHING`,
deal.ID.String(), mid, deal.PieceCID.String(), deal.PieceSize, deal.InboundFileSize,
deal.SectorID, proof, deal.Offset, true)
if err != nil {
return false, fmt.Errorf("deal: %s: failed to add DDO deal to pipeline for indexing and announcing: %w", deal.ID.String(), err)
}
} else {
llog.Infof("Skipping indexing as sector %d is not unsealed", deal.SectorID)
}
}
return true, nil
Expand Down

0 comments on commit ad00bf4

Please sign in to comment.